locked
Transforming OnNext notifications into OnError notifications RRS feed

  • Question

  • Hi Guys,

    I'm surprised there is not an easy way to transform an OnNext notification into an error.

    Is there a reason why you have not provided this functionality? Or have I just missed it somehow?

    Eg;

    static class Ex
    {
      public static IObservable<TResult> SelectOrError<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector)
      {
        return Observable.CreateWithDisposable<TResult>(o =>
        {
          return source.Subscribe(i =>
          {
            TResult result = default(TResult);
            Exception error = null;
            try
            {
              result = selector(i);
            }
            catch (Exception ex)
            {
              error = ex;
            }
            if (error != null) o.OnError(error);
            else o.OnNext(result);
          }, o.OnError, o.OnCompleted);
        });
      }
    }
    
    class Program
    {
      static void Main()
      {
        Observable.Range(1,10).SelectOrError(s =>
        {
          if (s > 3)
            throw new Exception("testing");
          return s * 2;
        }).Subscribe(Console.WriteLine, ex => Console.WriteLine("Oops!"));
      }
    }

    Thanks,
    James 

    Thursday, April 22, 2010 11:27 PM

Answers

  • Hi James,

    What is SelectOrError supposed to be doing differently than what Select already does?

    Take the following code for example:

    var xs = Observable.Range(1, 10).Select(s =>
     {
      if (s > 3)
       throw new Exception("testing");
      return s * 2;
     });
    
    using (xs.Subscribe(Console.WriteLine, ex => Console.WriteLine("Oops!")))
    {
     Console.ReadKey();
    }

    It produces the following console output:

    2
    4
    6
    Oops!

    The above output is identical to the output of your SelectOrError method.  As a matter of fact, the contract for exceptions in Rx is such that an exception generated by an observable must be pushed to OnError, and that's exactly how Select behaves.

    Another way of converting OnNext notifications into OnError notifications is via Observable.Throw.

    var xs = Observable.Range(1, 10);
    
    var ys = from x in xs
         from y in (x > 3)
         ? Observable.Throw<int>(new Exception("testing"))
         : Observable.Return(x)
         select y * 2;
    
    using (ys.Subscribe(Console.WriteLine, ex => Console.WriteLine("Oops!")))
    {
     Console.ReadKey();
    }

    The code above produces the same output as before.

    - Dave


    http://davesexton.com/blog
    Friday, April 23, 2010 2:27 AM

All replies

  • Hi James,

    What is SelectOrError supposed to be doing differently than what Select already does?

    Take the following code for example:

    var xs = Observable.Range(1, 10).Select(s =>
     {
      if (s > 3)
       throw new Exception("testing");
      return s * 2;
     });
    
    using (xs.Subscribe(Console.WriteLine, ex => Console.WriteLine("Oops!")))
    {
     Console.ReadKey();
    }

    It produces the following console output:

    2
    4
    6
    Oops!

    The above output is identical to the output of your SelectOrError method.  As a matter of fact, the contract for exceptions in Rx is such that an exception generated by an observable must be pushed to OnError, and that's exactly how Select behaves.

    Another way of converting OnNext notifications into OnError notifications is via Observable.Throw.

    var xs = Observable.Range(1, 10);
    
    var ys = from x in xs
         from y in (x > 3)
         ? Observable.Throw<int>(new Exception("testing"))
         : Observable.Return(x)
         select y * 2;
    
    using (ys.Subscribe(Console.WriteLine, ex => Console.WriteLine("Oops!")))
    {
     Console.ReadKey();
    }

    The code above produces the same output as before.

    - Dave


    http://davesexton.com/blog
    Friday, April 23, 2010 2:27 AM
  • Thanks Dave. You are correct :)

    Friday, April 23, 2010 8:17 AM
  • Your approach is interesting, however isn't it quite inefficient?

    from y in (x > 3)
    ? Observable.Throw<int>(new Exception("testing"))
    : Observable.Return(x)

    I'm thinking something like this might be nice.

    public static IObservable<T> ThrowIf<T>(this IObservable<T> source, Func<T,bool> predicate, Func<T, Exception> throwEx)

    Friday, April 23, 2010 8:19 AM
  • Hi James,

    Well I can see how wrapping up that behavior in ThrowIf might be useful, even though it's only two lines of code.  But perhaps it should be called ThrowWhere instead?

    Anyway, my usage of conditional if and SelectMany doesn't seem inefficient to me.  For one thing, throwing an exception is more costly than simply returning one, so I'd assume that your original example is actually much less efficient than mine.  And granted, SelectMany is creating a new observable for each value of x, but using SelectMany is actually quite common among Rx queries; furthermore, Return is pretty efficient as it is.  Falling back to imperative code within a lambda is probably a micro-optimization that isn't going to reap any noticeable gain in performance anyway.

    Here's a benchmark of the performance between Select and SelectMany:

    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Linq;
    
    namespace ReactiveProgrammingConsole
    {
     class SelectManyPerformanceLab
     {
      static void Main()
      {
       var selectTimes = new List<TimeSpan>();
       var selectManyTimes = new List<TimeSpan>();
    
       var xs = Observable.Range(1, 100);
    
       var select =
        from x in xs
        select x;
    
       var selectMany =
        from x in xs
        from y in Observable.Return(x)
        select y;
    
       const int iterations = 1000;
    
       for (int i = 0; i < iterations; i++)
       {
        if (i % 100 == 0)
         Console.WriteLine("Iteration: {0} of {1}", i, iterations);
    
        IObservable<int> strategy =
         (i % 2 == 0) ? select : selectMany;
    
        IList<TimeSpan> log =
         (i % 2 == 0) ? selectTimes : selectManyTimes;
    
        if (i < 10)
         Run(strategy);
        else
         log.Add(Run(strategy));
       }
    
       Console.WriteLine("Select: {0}", Average(selectTimes));
       Console.WriteLine("SelectMany: {0}", Average(selectManyTimes));
      }
    
      private TimeSpan Run(IObservable<int> source)
      {
       var watch = new Stopwatch();
       var times = new List<TimeSpan>();
    
       source.Run(_ =>
       {
        times.Add(watch.Elapsed);
        watch.Restart();
       });
    
       return Average(times);
      }
    
      private TimeSpan Average(ICollection<TimeSpan> times)
      {
       return times.Aggregate(
        TimeSpan.Zero,
        (acc, time) => acc + time,
        acc => TimeSpan.FromTicks(acc.Ticks / times.Count));
      }
     }
    }
    

    Here's the output on my single-processor desktop (Xeon 3.4GHz, x86, Windows 7 Ultimate, 2GB Ram):

    Iteration: 0 of 1000
    Iteration: 100 of 1000
    Iteration: 200 of 1000
    Iteration: 300 of 1000
    Iteration: 400 of 1000
    Iteration: 500 of 1000
    Iteration: 600 of 1000
    Iteration: 700 of 1000
    Iteration: 800 of 1000
    Iteration: 900 of 1000
    Select: 00:00:00.0000185
    SelectMany: 00:00:00.0000612

    - Dave


    http://davesexton.com/blog
    Friday, April 23, 2010 3:25 PM
  • Hi Dave,

    ThrowWhere is a better name.

    Sorry, I wasn't suggesting that the .Select(i => throw new Exception()) approach was efficient ;) I was more thinking about the Observable.Return(x). It just means every notification yielded will be wrapped in a new observable.

    Thanks for the stats. Here are the results on my machine (Xeon X5560 @ 2.80 GHz, 4 Core, 3.23GB RAM, WinXP)

    Select: 00:00:00.0000520
    SelectMany: 00:00:00.0004156

    Don't know why its so much slower. The company I work for runs some pretty crazy software on my machine.

    If you are interested here is ThrowWhere.

    void Main()
    {
        var obs = Observable.Range(1,10)
                            .ThrowWhere(i => i > 5, i => new Exception("oops"));

        obs.Subscribe(i => i.Dump(), ex => ex.Message.Dump());
       
    }

    static class Ex
    {
        public static IObservable<T> ThrowWhere<T>(this IObservable<T> source, Func<T,bool> predicate, Func<T, Exception> createEx)
        {
            return Observable.CreateWithDisposable<T>(o => source.Subscribe(t =>
                {
                    if (predicate(t))
                    {
                        var ex = createEx(t);
                        o.OnError(ex);
                    }
                    else
                    {
                        o.OnNext(t);
                    }
                }, o.OnError, o.OnCompleted));
        }
    }

    Friday, April 23, 2010 4:10 PM
  • Hi James,

    Thanks for the code.

    Try running the benchmark in Release mode (w/optimizations enabled) and without the debugger attached.

    Edit: I should've also mentioned that I ran the test on .NET 4.0 RC.

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Friday, April 23, 2010 6:58 PM Mentioned .NET 4.0 RC
    Friday, April 23, 2010 6:47 PM