none
What if an exception is thrown by the observer in OnNext?

    Question

  • Hi,

    The Rx Framework is very interesting and I sense that it will enable a whole new set of achitecture opportunities.
    I have a question about implementing correctly IObservables.
    I looked to how Observable.Return is implemented, and the code is equivalent to doing that:

    int myValue = 42;
    Observable.Create(delegate(IObserver<int> observer)
    {
        observer.OnNext(myValue);
        observer.OnCompleted();
        return () => { };
    });
    Now my question is about exceptions. Obviously, if the subscriber throws an exception in OnNext, OnCompleted will never be called, and the Observable will never correctly notify (normal or exceptional) completion. So my question is whose responsability is that to catch the exception? It seems to be the case that it is the observer's responsability not to throw, but as you can see, it is very easy for a client to break functionality by throwing an exception in OnNext, and it doesn't seem right.
    I know the ultimate answer is "It depends on the implementation" but I think there should be some guidelines about that so that developers know what to expect.

    Thanks
    Flavien
    Thursday, January 21, 2010 2:56 PM

Answers

  • Hi Flavien,

    If an error is thrown by an observer it appears to go unhandled.  This is the correct behavior, IMO.  Most likely the error will be in user code, in which case you'd want the exception to be known immediately and you'd want the application to crash if it goes unhandled.

    Take the following program for example:

    using System;
    using System.Linq;
    
    namespace ReactiveProgrammingConsole
    {
      class Program
      {
        static void Main()
        {
          var ob = Observable.Return(42);
    
          using (ob.Subscribe(
            onNext: _ => { throw new Exception("Error!"); },
            onError: Console.WriteLine, 
            onCompleted: () => { Console.WriteLine("Completed"); }))
          {
            Console.ReadKey();
          }
        }
      }
    }
    


    In the code above my onNext function throws an exception.  The application stops running at this point - it never gets to onError or onCompleted.

    - Dave
    http://davesexton.com/blog
    Thursday, January 21, 2010 3:31 PM
  • Hi Omer,

    I like your plug-in example; however, I still disagree with how you would handle exceptions in these cases.

    If a plug-in throws an exception it could very well be that invariants in the program are broken and other plug-ins will fail to do their jobs correctly.  For example, what if plug-in A depends on plug-in B, both are notified of a shell event or subscribe to a particular shell observable, plug-in A is allowed to handle the notification first and it throws.  Now you would allow plug-in B to go ahead and handle the notification as if the exception never happened - this could be devastating for user data.

    The reasonable approach to handling this situation, IMO, is not to "remove the faulty plug-in" [and continue processing as normal].  The correct approach is for the application to fail fast and for the faulty plug-in vendor to fix their code so that it doesn't throw.  Applications that provide plug-ins should also provide a way to start in "safe-mode", without any plug-ins being loaded, so that end-users may "remove the faulty plug-in" at will.

    - Dave


    http://davesexton.com/blog
    Thursday, June 03, 2010 11:05 PM

All replies

  • Hi Flavien,

    If an error is thrown by an observer it appears to go unhandled.  This is the correct behavior, IMO.  Most likely the error will be in user code, in which case you'd want the exception to be known immediately and you'd want the application to crash if it goes unhandled.

    Take the following program for example:

    using System;
    using System.Linq;
    
    namespace ReactiveProgrammingConsole
    {
      class Program
      {
        static void Main()
        {
          var ob = Observable.Return(42);
    
          using (ob.Subscribe(
            onNext: _ => { throw new Exception("Error!"); },
            onError: Console.WriteLine, 
            onCompleted: () => { Console.WriteLine("Completed"); }))
          {
            Console.ReadKey();
          }
        }
      }
    }
    


    In the code above my onNext function throws an exception.  The application stops running at this point - it never gets to onError or onCompleted.

    - Dave
    http://davesexton.com/blog
    Thursday, January 21, 2010 3:31 PM
  • I think the framework should enable us to catch from within the observable.

    If I compare this to the events domain, when we simply raise an event by invoking it, then a subscriber exception propagates to the publisher's frame, and subsequent subscribers does not get a chance to handle the event. However, we have the ability to extract from the event's delegate its invocation list, and invoke each handler manually inside a try-catch block. This way we can give an equal chance for all the subscribers to handle the event.

    The important thing here is that we have options. We can ignore the exception and crash our program (fail-fast), we can catch and handle (but leave later subscribers in dark) or we can iterate the subscriber invocation list manually.

    I believe the Rx framework should enable us to choose from these options.

    One more thing: I tried the latest version (1.0.2521.104) with such a scenario, and received weird results:

     

    var subject = new Subject<int>();
    
    subject.Subscribe(Console.WriteLine);
    subject.Subscribe(i => { throw new ApplicationException(); });
    subject.Subscribe(Console.WriteLine);
    
    try
    {
      subject.OnNext(1);
      subject.OnNext(2);
    }
    catch
    {
    }
    subject.OnNext(3);

    Result:

     

    1
    3
    1
    3

     

    Can someone please explain the result?

    I expected it to be {1,3,3} or {1,1,3,3}. I have no idea why the second '1' is appearing where it does.


    Omer Mor
    Thursday, June 03, 2010 7:14 PM
  • Hi Omer,

    > Can someone please explain the result?

    Maybe it has something to do with the internal queuing mechanism.  Regardless, I don't think it's part of the Subject<T> contract that observers will always be invoked in the order that they subscribe.

    > [snip] However, we have the ability to extract from the event's delegate its invocation list, and
    > invoke each handler manually inside a try-catch block. [snip]

    That's a very strange way of writing events though.  What does the catch block do with an exception of which it knows nothing about, for any number of handlers of which it knows nothing about?

    E.g., What if the exception thrown by the first handler has the following semantics: the entire application must fail now otherwise the entire database will be deleted on the next piece of code that executes, because all invariants are broken and the application cannot recover.  Then, your OnEvent method (perhaps after logging the exception?) goes ahead and invokes the next event handler as if nothing has happened, of course inadvertently deleting your entire production database.

    The key here is that events are an abstraction for which any number of handlers, performing any number of tasks, can register for notifications.  Exceptions in this context are still exceptions.  They should either be handled very near to the faulty code or they should be caught and logged at the highest level of the application, but they should not be "handled" at all by unrelated intermediate code.

    See these related discussions:

    What are the OnError semantics?
    http://social.msdn.microsoft.com/Forums/en-US/rx/thread/6bcf4737-721f-4f1a-96cc-a60f19b51610/

    Question about Catch and exceptions
    http://social.msdn.microsoft.com/Forums/en-US/rx/thread/c4a11173-a4ab-4353-8575-62ba12b5e777

    About the contracts in RX (exceptions and blocking)
    http://social.msdn.microsoft.com/Forums/en-US/rx/thread/ee5dba5d-eea9-4d85-8f58-c2e1c71ef33a

    - Dave


    http://davesexton.com/blog
    Thursday, June 03, 2010 9:30 PM
  • I understand your reasoning.

    However, I think of event invocation semantics as different then method call semantics. While the latter should conceptually be treated with caution using proper exception handling, the former is usually regarded as asynchronous notification that should be not affected by its subscribers. even more so - different subscribers should not affect one another.

    Suppose you're building a system that supports extensions/plugins. When you raise events, you should be able to defend yourself from a faulty plugin. Also you probably should keep raising your event to other subscribers. A proper exception handling in that case could be to remove the faulty plugin. In the Rx world this could translate to disposing the subscription, and possible calling the subscriber's OnError before doing so.

    I know that best practice suggests not handling what you don't know how to handle. But even best practices don't count for all cases, and a framework is something that should enable you to support edge cases as well.


    Omer Mor
    Thursday, June 03, 2010 9:51 PM
  • Hi Omer,

    I like your plug-in example; however, I still disagree with how you would handle exceptions in these cases.

    If a plug-in throws an exception it could very well be that invariants in the program are broken and other plug-ins will fail to do their jobs correctly.  For example, what if plug-in A depends on plug-in B, both are notified of a shell event or subscribe to a particular shell observable, plug-in A is allowed to handle the notification first and it throws.  Now you would allow plug-in B to go ahead and handle the notification as if the exception never happened - this could be devastating for user data.

    The reasonable approach to handling this situation, IMO, is not to "remove the faulty plug-in" [and continue processing as normal].  The correct approach is for the application to fail fast and for the faulty plug-in vendor to fix their code so that it doesn't throw.  Applications that provide plug-ins should also provide a way to start in "safe-mode", without any plug-ins being loaded, so that end-users may "remove the faulty plug-in" at will.

    - Dave


    http://davesexton.com/blog
    Thursday, June 03, 2010 11:05 PM
  • A proper exception handling in that case could be to remove the faulty plugin.

    You can always achieve that by creating your own "safe" subject that wraps observers as you please.

     I don't think it's part of theSubject<T> contract that observers will always be invoked in the order that they subscribe

    That is correct. 

    Friday, June 04, 2010 2:51 AM
  • Dave: Your suggestion is valid, but is also debatable. My point is, that a framework, while allowed to be opinionated, should not be closed for other approaches. The default handling could be done as you believe is right, but it better give us hooks to override it (like the exception sink someone suggested in a different thread).

    head.in.the.box: You're probably right, but it's a bit too much work. I can't simply wrap a regular subject and try-catch On*() because I can't know which subscriber threw, and I can't continue signaling the rest of subscribers. It will probably have to account for all scheduling and threading scenarios. I believe the framework should have built-in support for this.

    Is there a place to vote for suggested features?

     


    Omer Mor
    Friday, June 04, 2010 5:14 AM
  • Hi Omer,

    > The default handling could be done as you believe is right, but it better give us hooks to override it.

    There are ways to change the default exception handling, as Erik pointed out, and as Andreas showed in the thread you're probably referring to with his custom combinator.  It's just that these other approaches aren't built-in as first-class citizens; I suspect because then developers would be tempted to actually use them ;)

    > I can't simply wrap a regular subject and try-catch On*() [snip]

    Don't wrap a regular subject.  I believe Erik's idea is to implement ISubject<T> yourself and implement the OnNext method so that it catches exceptions and continues calling OnNext on the remainder of the internal list of observers.  This is analgous to your example of manually iterating over the invocation list of a delegate in an On* method.

    - Dave


    http://davesexton.com/blog
    Friday, June 04, 2010 5:29 AM
  • mplement ISubject<T> yourself 

    Ultimately, yes. But I think much what Omer want can be done by wrapping the subscribers.

    The current behavior is similar to multicast delegates (http://msdn.microsoft.com/en-us/library/system.multicastdelegate.aspx). I think "extract from the event's delegate its invocation list, and invoke each handler manually inside a try-catch block. " is not the right thing either because you still have no guarantee that a failing handler actually behaves properly. The only reasonable assumption is that they can throw and you deal with that on the next level.

     

    Friday, June 04, 2010 2:39 PM
  • head.in.the.box is right.

    I actually achieved that by having a SafeObserver wrapper around an observer, which also acts as an Observable of exceptions. When an exception is raised in it, it will notify about it.

    Then I wrote a SafeSubject that wraps each subscriber with a SafeObserver inside the Subscribe method, and subscribe itself for the observer's errors.

    When an error is observed in the subject, it could remove the faulty observer and get on with it.

    I know it's not a best practice. But I still believe some systems needs to be immune against unknown/unsafe/hostile/etc observers.

    I'm sorry such an implementation is not a first-class citizen in the Rx framework.

    I think that the MulticastDelegate got it right: the default best-practice behavior is to invoke all delegates in order, and fail when the first one fails. But it still provides a first-class alternative: the GetInvocationList method.

     


    Omer Mor
    Friday, June 04, 2010 10:36 PM