none
About the contracts in RX (exceptions and blocking)

    Question

  • I watched the following video from Wes about the non expressed contracts in IObservable and IObserver: http://channel9.msdn.com/posts/J.Van.Gogh/Reactive-Extensions-API-in-depth-Contract/.

    I have a couple of question though.

    Wes says that one of the contracts is that Subscribe is not blocking. This is not quite true, if I subscribe this way, the code will block on Subscribe:

    var timer = Observable.Interval(Scheduler.Now, TimeSpan.FromSeconds(1));
    timer.Subscribe(i => Console.WriteLine(i));
    And it looks to me that the purpose of the scheduler Now is to provide blocking observables. Is it a bug in the library, or is the contract wrong?

    The other question I have is related to exceptions, Wes says that:
    - Subscribe is not allowed to throw
    - If an observer passed to the observable throws (for instance in OnNext), the exception shouldn't be caught and should bubble up

    It means that if OnNext throws, we let the exception bubble up, but then Subscribe throws as well, which is not allowed, can you clarify?

    Thanks

    Flavien
    Friday, January 22, 2010 3:23 PM

Answers

  • This is a great discussion!  I love when we talk about interesting semantics.  These are exactly the questions that we spend the most time discussing within our team.

    Here is what I meant:

    Rx operators do not catch exceptions that occur in a call to OnNext, OnError, or OnCompleted.  This is because we expect that (1) the observer implementor knows best how to handle those exceptions and we can't do anything reasonable with them and (2) if an exception occurs then we want that to bubble out and not be handled by Rx.

    In Rx, if the Subscribe method throws then we shouldn't catch it because (1) we can't reasonably repair the computation, (2) this error should not be in the stream but in the ambient context, and (3) we want the exception to occur.  So when I say that it Subscribe shouldn't throw, I don't mean that it is impossible for Subscribe to throw but that a conforming implementation wouldn't throw.  If an implementation does throw then we attempt to let the exception bubble out because something terribly wrong has happened.

    Subscribe is non-blocking.  Actually, this is still more or less true with the trampoline scheduler, but will be less true going forward.  We have gone back on forth on this one for a long time, but I think we have settled (you will see it clearly in upcoming releases) on allowing both blocking and non-blocking subscribes.  I think that generally blocking subscribes are not a good idea and usually quite dangerous, but there are some valid cases where they could be used (Observable.Return comes to mind).  The problem with a blocking subscribe is that it prevents unsubscribing.  Note that this isn't as bad of a situation with the Now scheduler because it uses a trampoline.

    In regards to unhandled exceptions and Tasks, that looks like a bug.  Thanks for reporting it and it will be fixed.
    Monday, January 25, 2010 6:49 PM

All replies

  • It means that if OnNext throws, we let the exception bubble up, but then Subscribe throws as well, which is not allowed, can you clarify?

    Thanks

    Flavien

    You catch the exception in Subscribe ... using the onError Action parameter. Or use another operator in the chain (i.e. Catch or OnErrorResumeNext).
    • Edited by Richard Hein Friday, January 22, 2010 3:28 PM You can use operators to catch exceptions
    Friday, January 22, 2010 3:26 PM
  • I watched the following video from Wes about the non expressed contracts in IObservable and IObserver: http://channel9.msdn.com/posts/J.Van.Gogh/Reactive-Extensions-API-in-depth-Contract/.

    I have a couple of question though.

    Wes says that one of the contracts is that Subscribe is not blocking. This is not quite true, if I subscribe this way, the code will block on Subscribe:

    var timer = Observable.Interval(Scheduler.Now, TimeSpan.FromSeconds(1));
    
    timer.Subscribe(i => Console.WriteLine(i));
    
    
    And it looks to me that the purpose of the scheduler Now is to provide blocking observables. Is it a bug in the library, or is the contract wrong?

    That video was posted Nov. 25th, but the IScheduler changes didn't happen until later.  Check out this video with updated information about using ISchedulers.  I think that'll clear things up.  The Default Scheduler is the LaterScheduler, btw.
    Friday, January 22, 2010 3:48 PM
  • That video was posted Nov. 25th, but the IScheduler changes didn't happen until later.  Check out thisvideo with updated information about using ISchedulers.  I think that'll clear things up.  The Default Scheduler is the LaterScheduler, btw.

    That is absolutely correct.
    Friday, January 22, 2010 4:01 PM
    Owner
  • It means that if OnNext throws, we let the exception bubble up, but then Subscribe throws as well, which is not allowed, can you clarify?

    Thanks

    Flavien

    You catch the exception in Subscribe ... using the onError Action parameter. Or use another operator in the chain (i.e. Catch or OnErrorResumeNext).

    I don't think so, look at how Observable.Return is implemented with Reflector, they don't catch any exception that can be thrown by OnNext, the exception will bubble up and be thrown by Subscribe.
    Friday, January 22, 2010 4:03 PM
  • "I don't think so, look at how Observable.Return is implemented with Reflector, they don't catch any exception that can be thrown by OnNext, the exception will bubble up and be thrown by Subscribe."

    If I do this, Subscribe never gets called:

     static void Main(string[] args)
            {
                try
                {
                    var throwObs = Observable.Create<string>(o =>
                    {
                        Observable.Return(ThrowAnException());
                        return () => { };
                    });
    
                    throwObs.Subscribe(
                        next => Console.WriteLine(next),
                        ex => Console.WriteLine("Exception thrown: {0}", ex),
                        () => Console.WriteLine("Done."));
    
                   
                } catch {
                   
                }
                Console.ReadKey();
            }
    
            private static string ThrowAnException()
            {
                throw new NotImplementedException();
            }

     Is this the kind of scenario you meant?

    Friday, January 22, 2010 4:27 PM
  • No, this isn't exactly what I'm talking about. I'm talking about:

    var throwObs = Observable.Create<string>(o =>
    {
        o.OnNext("Hello");
        return () => { };
    });
    
    throwObs.Subscribe(
        next => ThrowAnException(),
        ex => Console.WriteLine("Exception thrown: {0}", ex),
        () => Console.WriteLine("Done."));
    Wes says we should let the exception thrown by OnNext bubble up, but he also says that Subscribe shouldn't throw, and in this case, it does, so it wasn't clear to me what he meant.

    But I've looked at how the operator converting an IEnumerable to IObservable has been implemented, and it looks like exceptions thrown by the code for enumerating (the code of the operator) is wrapped in try/catch, but the calls to OnNext/OnCompleted/OnError aren't. So it looks like the rule is don't catch exceptions thrown by OnNext/OnCompleted/OnError, but catch everything else your observable is throwing (and notify it through OnError).
    Friday, January 22, 2010 4:51 PM
  • Hi,

    > Wes says we should let the exception thrown by OnNext bubble up, but he also says
    > that Subscribe shouldn't throw, and in this case, it does, so it wasn't clear to me what he meant.


    No, Subscribe isn't throwing - it's just not catching the exception.  (It may be re-throwing, but that's different.)

    > But I've looked at how the operator converting an IEnumerable to IObservable has been implemented [snip]

    I believe you're correct.  The contract is that Subscribe will not throw; instead, it passes exceptions to observers.  However, if an observer throws, then it's not caught and the application crashes.

    - Dave
    http://davesexton.com/blog
    Friday, January 22, 2010 6:07 PM
  • I think he meant don't handle it in the OnNext, OnCompleted or OnError (in the case of OnError, you handle the exception but don't rethrow, or don't throw a new one) actions, maybe I'm wrong ... but I thought this was the idea:

            static void Main(string[] args)
            {
                try
                {
                    var throwObs = Observable.Create<string>(o =>
                    {
                        try
                        {
                            o.OnNext("Hello");
                        }
                        catch (Exception ex) // ok
                        {
                            o.OnError(ex);
                        }
                        return () => { };
                    });
    
                    throwObs.Subscribe(
                        next => {
                            try
                            {
                                ThrowAnException();
                            }
                            catch (Exception e)
                            {
                                Console.WriteLine("Exception handled in OnNext ... you shouldn't do this.");
                            }
                        },
                        ex => Console.WriteLine("Exception handled in OnError: {0} ... ok", ex),
                        () => Console.WriteLine("Done."));
    
    
                   
                } catch(Exception ex) {
                    Console.WriteLine("Exception handled in catch block: {0} ... ok", ex);
                }
                Console.ReadKey();
            }
    I'm open to correction of course.
    • Edited by Richard Hein Friday, January 22, 2010 6:17 PM clarification
    Friday, January 22, 2010 6:14 PM
  • Well, that's actually what I am wondering as well, but from what I can see, in the implementation of Subscribe for Observable.Return doesn't catch exceptions thrown by OnNext, so I guess the exception handling should be made in OnNext in the observer rather than in Subscribe.

    If you think about it it works with the duality with IEnumerable, if the code within the foreach loop throws, you never get the end of the enumeration. Same here, if you throw in OnNext, you screw up your observer, and it is no longer able to work as expected.
    Friday, January 22, 2010 6:25 PM
  • Hi,

    I think the duality of exceptions between IEnumerable and IObservable is two-fold.  For IEnumerable, an exception can be thrown in the call to MoveNext or it can be thrown within the foreach block.  For IObservable, an exception can be thrown while generating the next value or within the IObserver.OnNext, OnError or OnCompleted methods.

    MoveNext   = Generate next observable value
    foreach      = IObserver

    IEnumerable does not provide any special mechanism for when MoveNext throws because it's synchronous and Exceptions automatically propagate up the call stack, unless caught.  Thus error handling in IEnumerable appears to be simpler.  An exception from MoveNext and an exception within a foreach block can both be handled the same way by placing a try...catch outside of the foreach.  But you can also handle an exception inside of the foreach by placing another try...catch block; however, this inner try...catch won't catch an exception thrown by MoveNext.

    The duality, I believe, is that when an IObservable throws while generating the next value, the registered observer will be notified via the OnError method - similar to how MoveNext can throw and you can catch the exception in an outer try...catch block.  But if the observer itself throws an exception then it will not be passed to the observer.  You can catch the exception if you'd like by placing a try...catch block around the body of your OnNext method - similar to how you can catch an exception thrown by the foreach block by placing a try...catch block within the foreach block.

    - Dave
    http://davesexton.com/blog
    Friday, January 22, 2010 8:06 PM
  • Hi guys,

    I am a little bit confused about the advise to not catch exceptions in OnNext() handlers. Where will those exceptions finally end? Especially in the case of asynchronous methods (e.g. Observable.ToAsync())? I am not sure but I had frequently programm terminations that were generated by thrown exceptions in the finalizers of Task objects and I could imagine that there are two ways to get them:
    1. By not binding an OnError() handler which effectively converts OnError(e) to throw e.
    2. By throwing an exception in the OnNext() handler.

    I think in a lot of situations it is very important to add try-catch blocks to ensure that your application continues to run. Imagine a server app that will be terminated as soon as the garbage collector finalized the task item because somebody forgot to write the try-catch block...

    Maybe somebody from the Rx team could clarify the advise a little bit?

    Demo code:
    // evil demo 1:
    Observable.Start(() => { Thread.Sleep(100); throw new Exception("I am evil!"); })
    .Subscribe(_ => { });   // intentionally left out onError handler
    Thread.Sleep(1000);
    GC.Collect();
    
    // evil demo 2:
    Observable.Start(() => { Thread.Sleep(100); return 1; })
        .Subscribe(i =>
            {
                if (i == 1)
                    throw new Exception("One stinks.");
            }
        );
    Thread.Sleep(1000);
    GC.Collect();
    The two demos should generate the following exception:

    "Unhandled Exception: System.AggregateException: A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread. ---> System.Exception: I am evil!"

    If you do not get them the first time try running the examples multiple times (at least 5 times).

    If I remove the Sleep(100) from the "evil demo 1" it becomes VERY strange and in most cases the original exception is thrown. I guess this might be a race-condition because the implementation of AsyncSubject registers its own onError handler to rethrow the error as soon as somebody subscribes to the async subject (e.g. as it is returned by Observable.Start()).

    I think the Rx team should consider changing the way in which exceptions in OnNext() and in the Subscribe() extension methods without onError handler are processed. Maybe it would be possible to specify a sink for this kind of exceptions...

    Andreas
    Sunday, January 24, 2010 2:57 PM
  • Demo code:
    // evil demo 1:
    
    Observable.Start(() => { Thread.Sleep(100); throw new Exception("I am evil!"); })
    
    .Subscribe(_ => { });   // intentionally left out onError handler
    
    Thread.Sleep(1000);
    
    GC.Collect();
    
    
    
    // evil demo 2:
    
    Observable.Start(() => { Thread.Sleep(100); return 1; })
    
        .Subscribe(i =>
    
            {
    
                if (i == 1)
    
                    throw new Exception("One stinks.");
    
            }
    
        );
    
    Thread.Sleep(1000);
    
    GC.Collect();
    
    
    The two demos should generate the following exception:

    "Unhandled Exception: System.AggregateException: A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread. ---> System.Exception: I am evil!"


    I only get that AggregateException on finalization with the 4.0 bits, do you get it in 3.5?  It is a bug in one or the other I think.
    • Edited by Richard Hein Sunday, January 24, 2010 3:35 PM remove extra quote
    Sunday, January 24, 2010 3:34 PM
  • Hi Richard,

    I tested my code with 4.0. I just retargeted to 3.5 and here the exception text looks a little bit diffrent:

    Unhandled Exception: System.AggregateException: TaskExceptionHolder_UnhandledException ---> System.Exception: I am evil!
    Unhandled Exception: System.AggregateException: TaskExceptionHolder_UnhandledException ---> System.Exception: One stinks.

    But overall I see the same behavior.

    Richard: Do you know what normally should happen in these cases or how those exceptions could be handled?

    Andreas
    Sunday, January 24, 2010 3:51 PM
  • This is the stack information of the exception:

    Unhandled Exception: System.AggregateException: TaskExceptionHolder_UnhandledException ---> System.Exception: I am evil!

    Server stack trace:
       at RxTests.RxTestApp.<Main>b__0() in C:\Code\Prototyping\RxTests\RxTests\Program.cs:line 410
       at System.Linq.Observable.<>c__DisplayClassc2.<>c__DisplayClassc4.<ToAsync>b__c1()

    Exception rethrown at [0]:
       at System.ObservableExtensions.<Subscribe>b__3[TSource](Exception exception)
       at System.Collections.Generic.AnonymousObserver`1.Error(Exception exception)
       at System.Collections.Generic.AbstractObserver`1.OnError(Exception exception)
       at System.Collections.Generic.AsyncSubject`1.OnError(Exception exception)
       at System.Linq.Observable.<>c__DisplayClassc2.<>c__DisplayClassc4.<ToAsync>b__c1()
       at System.Threading.Tasks.Task.InnerInvoke()
       at System.Threading.Tasks.Task.Execute()
       --- End of inner exception stack trace ---
       at System.Threading.Tasks.TaskExceptionHolder.Finalize()
    ---> (Inner Exception #0) System.Exception: I am evil!

    Server stack trace:
       at RxTests.RxTestApp.<Main>b__0() in C:\Code\Prototyping\RxTests\RxTests\Program.cs:line 410
       at System.Linq.Observable.<>c__DisplayClassc2.<>c__DisplayClassc4.<ToAsync>b__c1()

    Exception rethrown at [0]:
       at System.ObservableExtensions.<Subscribe>b__3[TSource](Exception exception)
       at System.Collections.Generic.AnonymousObserver`1.Error(Exception exception)
       at System.Collections.Generic.AbstractObserver`1.OnError(Exception exception)

       at System.Collections.Generic.AsyncSubject`1.OnError(Exception exception)
       at System.Linq.Observable.<>c__DisplayClassc2.<>c__DisplayClassc4.<ToAsync>b__c1()
       at System.Threading.Tasks.Task.InnerInvoke()
       at System.Threading.Tasks.Task.Execute()<---

    Sunday, January 24, 2010 4:00 PM
  • Does an unhandled exception in a Task causes the AppDomain to unload like it used to do with .NET 2.0/3.5 and the thread pool?

    Anyway I looked at the code for a bunch of methods creating observables in RX, and none of them handles exceptions thrown by OnNext/OnError/OnCompleted. It seems to be done on purpose, because the rest of the code in the same methods does handle exceptions. Take a look at Select:

     return new AnonymousObservable<TResult>(delegate (IObserver<TResult> observer) {
            return base.source.Subscribe<TSource>(delegate (TSource x) {
                TResult local;
                try
                {
                    local = base.CS$<>8__locals2db.selector(x);
                }
                catch (Exception exception)
                {
                    if (exception.IsCritical())
                    {
                        throw;
                    }
                    base.observer.OnError(exception);
                    return;
                }
                base.observer.OnNext(local);
            }, new Action<Exception>(observer.OnError), new Action(observer.OnCompleted));
        });
    The first part calling the selector does catch exceptions, but the part that calls OnNext is intentionally left out of the try/catch. Plus this is consistent with what Wes says in the video.

    Now I agree I would like to hear what's the rationale behind this decision from the authors of the RX Framework.
    Sunday, January 24, 2010 6:46 PM
  • Hi,

    I agree that there may be a bug in Rx with regards to unhandled Task exceptions.  In my opinion, Rx should treat exceptions in two ways, depending upon the source:

    1. Exceptions thrown by IObservables (i.e., while producing values) should be passed to the OnError method of subscribers.  Furthermore, the default OnError behavior should be to throw exceptions immediately, regardless of thread or synchronization context.  This will crash the application.  It's possible to override the default OnError behavior when subscribing by supplying an onError action.  Thus if any subscriber chooses not to handle exceptions then they will be thrown automatically, regardless of whether other subscribers have chosen to override the default onError action.  This is important: any time you register an IObserver you are essentially invoking a method, even though the invocation is in reverse (push vs. pull).  If an exception is thrown by that method, the application should crash to protect user data and to provide valuable diagnostic information.  If any exception goes unnoticed it could have a devastating effect on the application, unbeknownst to end-users and devs.
    2. Exceptions thrown by IObservers (i.e., while observing values) should be thrown immediately.  They must not be caught by any mechanism of Rx.  If an exception is originating from your own code, then you are responsible for deciding whether the exception is a bug that must be fixed, or whether the exception is an unavoidable side-effect of some internal API, in which case you can catch the exception by wrapping your onNext method within a try...catch block.  Then you can handle the exception gracefully.

    Unhandled Task exceptions appear to happen because Rx does not properly implement #1 for all possible scenarios.

    - Dave


    http://davesexton.com/blog
    Sunday, January 24, 2010 6:52 PM
  • Hi Dave,

    I think the two rules are currently implemented in the way you describe them (which leads to the exception rethrow in the Task's finalizer).

    I guess the idea behind rule #2 is that the generator should place a try-catch block around the OnNext() call if it wants to handle exceptions. But in a couple of cases Rx objects itself call OnNext() and there is no easy way to handle those exceptions... only by adding try-catch blocks to every onNext handler when subscribing...

    Especially if you subscribe to the same observable in many different places it would be convenient to have a single point (an exception sink) where you could register an exception handler for observer exceptions.

    BTW: Does somebody know if exceptions are passed back to the generator when they are thrown on a different thread (e.g. when using Observable.ObserveOn())? Or is the value passing in such cases completely asynchronous which means that every exception will certainly lead to program termination because even the generator has no way of catching the exception? That would be another point for adding a oberserver exception sink mechanism. ;)

    Andreas
    Monday, January 25, 2010 4:17 PM
  • Hi Andreas,

    > I guess the idea behind rule #2 is that the generator should place a try-catch block around the OnNext() call if it wants to handle exceptions.

    No, the point is that the generator shouldn't even attempt to catch or handle any exceptions when calling OnNext, OnError and OnCompleted.

    I think this actually might be a bug with the Catch operator.  I'll test it again later to see, but if I remember correctly it suppresses errors that originate from OnNext, which is really bad.


    > But in a couple of cases Rx objects itself call OnNext() and there is no easy way to handle those exceptions...
    > only by adding try-catch blocks to every onNext handler when subscribing...


    That's exactly the point :)

    It's your code when you subscribe, so it's your responsibility to handle exceptions - or not.  The default preference is to not handle exceptions.  That means that in most of your OnNext functions you won't want to wrap them in try...catch blocks.  If anything, you'll want to use try...finally blocks to ensure that state is cleaned up if an exception gets thrown.

    > Especially if you subscribe to the same observable in many different places it would be convenient to have a single
    > point (an exception sink) where you could register an exception handler for observer exceptions


    I disagree.  I think gobal exception handling can be done in several different ways already depending upon the type of the application; e.g., ASP.NET use Global.asax, WinForms and WPF use the platform error events and AppDomain.UnhandledException.

    If an exception occurs in an observer, you should handle it in the observer.  Most likely it's a bug, so "handle" means fix the bug, not catch the exception.

    If an exception occurs in an observable, you should first check to see whether it's a bug.  If so, then clearly you'll want to fix it.  If it's not a bug, or if it's just not fixable (e.g., it's not originating from your code) then you can handle it using the Catch operator or by registering onError functions.


    - Dave


    http://davesexton.com/blog
    Monday, January 25, 2010 5:13 PM
  • This is a great discussion!  I love when we talk about interesting semantics.  These are exactly the questions that we spend the most time discussing within our team.

    Here is what I meant:

    Rx operators do not catch exceptions that occur in a call to OnNext, OnError, or OnCompleted.  This is because we expect that (1) the observer implementor knows best how to handle those exceptions and we can't do anything reasonable with them and (2) if an exception occurs then we want that to bubble out and not be handled by Rx.

    In Rx, if the Subscribe method throws then we shouldn't catch it because (1) we can't reasonably repair the computation, (2) this error should not be in the stream but in the ambient context, and (3) we want the exception to occur.  So when I say that it Subscribe shouldn't throw, I don't mean that it is impossible for Subscribe to throw but that a conforming implementation wouldn't throw.  If an implementation does throw then we attempt to let the exception bubble out because something terribly wrong has happened.

    Subscribe is non-blocking.  Actually, this is still more or less true with the trampoline scheduler, but will be less true going forward.  We have gone back on forth on this one for a long time, but I think we have settled (you will see it clearly in upcoming releases) on allowing both blocking and non-blocking subscribes.  I think that generally blocking subscribes are not a good idea and usually quite dangerous, but there are some valid cases where they could be used (Observable.Return comes to mind).  The problem with a blocking subscribe is that it prevents unsubscribing.  Note that this isn't as bad of a situation with the Now scheduler because it uses a trampoline.

    In regards to unhandled exceptions and Tasks, that looks like a bug.  Thanks for reporting it and it will be fixed.
    Monday, January 25, 2010 6:49 PM
  • If an exception occurs in an observer, you should handle it in the observer.  Most likely it's a bug, so "handle" means fix the bug, not catch the exception.

    If an exception occurs in an observable, you should first check to see whether it's a bug.  If so, then clearly you'll want to fix it.  If it's not a bug, or if it's just not fixable (e.g., it's not originating from your code) then you can handle it using the Catch operator or by registering onError functions.

    Not every exception is a bug. For example I am using Rx to write a tcp server application that works indireclty with sockets (using streams). If the remote host unexpectedly terminates the connection or if a different network error occurs exception are generated. I think Rx was designed especially for the cloud-computing age .. as soon as you start working with distributed restful webservices everything can throw anytime...

    Of course you can leave it up to every developer to write his/her own exception handling code for onNext blocsk. I just think that exceptions happen quite frequently in an onNext handlers in real life applications and that a proper handling of those exceptions is absolutely mandatory - you just can't afford recycling your Azure worker roles every five minutes because you did not handle an exception.

    In my code I have for example about 20 different places at which I register short completion handlers for asynchronous output operations using a subscription to an observable that notifies me about the completion of that action .. in all cases I have wrapped the onNext handler with a try-catch block logging the exception and disconnecting the underlying socket. I really was astonished at how many places I had to add exception handling code in order to properly trap any possible socket or database failure... this is currently for me one of the most annoying parts of using Rx because exception handling is just sooo much easier with synchronous operations where you simply surround an inner dispatcher code with your last-resort try-catch block and everything is fine. In Rx this single outer block becomes distributed to many different locations .. making it very easy to miss one!

    Also I am pretty sure that most developers using Rx are not aware that not passing an onError handler to one of the Subscribe() extension methods will result in an unhandled exception when the generator calls OnError()... Maybe this changes once a detailed documentation is available that explictely explains this.

    Monday, January 25, 2010 6:52 PM
  • For my project I will try to handle errors of the different subscribed observers using the following custom extension, which allows me to place a central last-resort exception handler:

    public static IObservable<T> CatchObserverError<T>(this IObservable<T> source, Action<Notification<T>, Exception> handler)
    {
        return Observable.CreateWithDisposable<T>(o =>
        {
            return source.Subscribe(
                n => { 
                    try { o.OnNext(n); } 
                    catch (Exception ex) 
                    { handler(new Notification<T>.OnNext(n), ex); } 
                },
                e => { 
                    try { o.OnError(e); } 
                    catch (Exception ex) 
                    { handler(new Notification<T>.OnError(e), ex); }
                },
                () => { 
                    try { o.OnCompleted(); } 
                    catch (Exception ex) 
                    { handler(new Notification<T>.OnCompleted(), ex); }
                }
            );
        });
    }
    
    Monday, January 25, 2010 8:24 PM
  • Hi Andreas,

    Thanks for the sample contribution. You could generalize your operator slightly by making the exception into a generic parameter constraint to Exception such that you can filter on specic exception types (See the Catch operator).

    Thanks,


    Jeffrey
    Monday, January 25, 2010 9:56 PM
  • Hi Jeffrey,

    great idea. Actually I did not know that the generic type could be used to filter exceptions - I thought if I would specify a derived-from-constraint the generic code would work as if the type specified in the constraint had been used (e.g. in catch() handlers)... but it really works. :)

    Here is the updated code:

    public static IObservable<TResult> CatchObserverError<TResult>(this IObservable<TResult> source, Action<Exception> handler)
    {
        return CatchObserverError<TResult, Exception>(source, handler);
    }
    
    public static IObservable<TResult> CatchObserverError<TResult, TException>(this IObservable<TResult> source, Action<TException> handler)
        where TException : Exception
    {
        return Observable.CreateWithDisposable<TResult>(o =>
            source.Subscribe(
                n => { try { o.OnNext(n); } catch (TException ex) { handler(ex); } },
                e => { try { o.OnError(e); } catch (TException ex) { handler(ex); } },
                () => { try { o.OnCompleted(); } catch (TException ex) { handler(ex); } }
            )
        );
    }
    
    public static IObservable<TResult> CatchObserverError<TResult, TException>(this IObservable<TResult> source, Action<Notification<TResult>, TException> handler)
        where TException : Exception
    {
        return Observable.CreateWithDisposable<TResult>(o =>
            source.Subscribe(
                n =>
                {
                    try { o.OnNext(n); }
                    catch (TException ex)
                    { handler(new Notification<TResult>.OnNext(n), ex); }
                },
                e =>
                {
                    try { o.OnError(e); }
                    catch (TException ex)
                    { handler(new Notification<TResult>.OnError(e), ex); }
                },
                () =>
                {
                    try { o.OnCompleted(); }
                    catch (TException ex)
                    { handler(new Notification<TResult>.OnCompleted(), ex); }
                }
            )
        );
    }

    Monday, January 25, 2010 11:14 PM
  • Hi Andreas,

    > Not every exception is a bug.

    Agreed, and that's certainly not what I meant when you quoted me :)


    > For example I am using Rx to write a tcp server application that works indirectly with sockets (using streams). [snip]

    I'll assume then that you're referring to making a socket observable, or at least some facade over sockets.  In that case, when the socket throws an exception it should be sent to observers via OnError, but only when the exception is thrown because the socket closed unexpectedly, not because it closed naturally (it may throw in either case).  This is because an observable socket falls into category #1 from one of my previous responses.  Socket exceptions are part of the observable code that generates notifications, and the observable must push error notifications via OnError.  An exception that closes the socket naturally, on the other hand, could be caught and pushed to observers via OnCompleted, for example.

    My problem is not with observables throwing errors.  Clearly Rx has an appropriate mechanism to support this: OnError.

    My problem is when observers throw errors, and either Rx or user code blindly catches them.  Why would I want to use your code sample to catch all exceptions for an undefined number of observers, all of different types and behaviors, including potential side-effects that might even cause execution to leave user code and enter third-party libraries or unamanged Win32 APIs?  How could I possibly write any code in one place that will handle all potential exceptions gracefully?  (That's rhetorical - the answer is almost always that I can't :).  Using code such as your example is the same as wrapping methods in:

    try { ... some stuff ... }
    catch (Exception ex) { }

    That's bad practice and is not recommended.
    http://msdn.microsoft.com/en-us/library/ms229005(VS.100).aspx


    > Of course you can leave it up to every developer to write his/her own exception handling code for onNext blocsk [snip]

    Absolutely!  That's our responsibility now when using IEnumerable as well.  If code in your foreach block throws, you can choose how best to handle the exception: fix the bug, or if it's not a bug, catch an exception of a specific type and handle it gracefully (how depends upon the context of the app.)


    > [snip] you just can't afford recycling your Azure worker roles every five minutes because you did not handle an exception .

    If you have exceptions being thrown every 5 minutes, in any application, then you have bigger problems to worry about than how best to create some mechanism that ignores them ;)


    > I really was astonished at how many places I had to add exception handling code in order to properly
    > trap any possible socket or database failure...


    This is the essence of where we differ in opinion on proper exception handling.  I would never wrap my network or database code in local catch blocks to trap "any possible failure" and then let the application slug on as if exceptions in networking components don't matter.  This is true, IMO, regardless of whether the code is reacting or interacting.  I don't care if I'm pulling from a NetworkStream or reacting to an ExecuteCompleted observable event.


    I've had this very conversation many times - I'm sure we're never going to agree on this.  But the fact remains that this is how Rx works at the moment - it's up to developers to worry about exceptions in their own code.  Rx doesn't make any attempt at suppressing valuable diagnostic information from exceptions that originate in observer code.  And for observable code, you must opt-in to catching exceptions or they will be thrown for you.  That's wonderful because it's exactly how interactive, imperative code works.


    > Also I am pretty sure that most developers using Rx are not aware that not passing an onError
    > handler to one of the Subscribe() extension methods will result in an unhandled exception when the generator calls OnError()...


    Well they'll find out when they get their first exception :).  And that's the point.  If a dev discovers this, their next move should not be to automatically go drop onError functions everywhere - and certainly not to put try...catch blocks around all of their onNext functions.  The next step for the developer is to identify the bugs and fix them, because chances are it was buggy code that caused the exception in the first place.

    - Dave
    http://davesexton.com/blog
    Tuesday, January 26, 2010 9:55 AM
  • Hi Dave,

    I surely do not want to start an argument with you about exception handling.

    > And for observable code, you must opt-in to catching exceptions or they will be thrown for you. 
    > That's wonderful because it's exactly how interactive, imperative code works.

    The big difference between interactive code and reactive code is that an exception thrown in an interactive application can easily be caught by an outer catch handler even in the most complex code. In rx there is no such outer handler anymore when OnNext handlers are executing .. e.g. if the control logic is broken into little pieces that react to incoming OnNext calls that are executing on the thread-pool and you want to have effectively the same errro handling as before you simply have to wrap each OnNext handler with a try-block .. otherwise your app will terminate as soon as the Task objects finalizer detects that nobody handled the exception. That it might be possible to narrow the type of caught exceptions down and not always catch each and every exception may be true...

    For finding bugs in your code I absolutely see you point that by default no exception should be ignored or catched. Actually since Rx uses the new Task objects of System.Threading.Tasks which catch all store them to be processed by a monitoring thread it is quite hard to debug code ... if the program terminates without garbage collection the finalizers are not executed and you will not even hear about the exception. Maybe this will be changed in future Rx releases.

    Andreas
    Tuesday, January 26, 2010 11:25 AM
  • Hi Andreas,

    There's a new spin on the discussion now - Rx.  It's fresh again :D

    I get your point about not being able to use an outer try...catch for multiple observers in as easy a manner as in interactive code.  It's the value of it that I can't help but doubt.  That said, your extension method is actually a nice implementation and I'm sure it can be useful in some very specific scenarios.

    The Task finalizer throwing I think is actually a bug in Rx, as Wes mentioned in this thread.  It's not a bug in TPL though - I'm pretty sure I remember reading about that behavior on their blog and it made sense.  However, Rx shouldn't be allowing exceptions to get that far away from user code.  I think we both can agree about that :)

    - Dave
    http://davesexton.com/blog
    Tuesday, January 26, 2010 12:56 PM