Issue casting DispatcherScheduler.Instance to IScheduler

Answered Issue casting DispatcherScheduler.Instance to IScheduler

  • Tuesday, July 31, 2012 10:34 AM
     
     

    Hi, 

    I have a seemingly obscure issue here that seems to be with Rx, however does affect the behaviour of our application in error handling situations. It took a while to figure out the actual cause but have now isolated it be when the DispatcherScheduler.Instance is cast to an IScheduler for use in an ObserveOn. 

    This issue was reproduced on both Rx Main and Experimental 1.x in Silverlight 4: 

    public partial class MainPage : UserControl
        {
            public MainPage()
            {
                InitializeComponent();

                Loaded += OnLoaded;
            }

            private Subject<string> subject;

            private void OnLoaded(object sender, RoutedEventArgs e)
            {
                subject = new Subject<string>();

                subject
                    .ObserveOn(DispatcherScheduler.Instance)
                    .Subscribe(
                        s =>
                        {
                            if (s == "ErrorInSubscribe")
                                throw new Exception("Error in Subscribe");

                            AddToListBox("Observable", s);
                        }, 
                        ex => AddToListBox("Observable Error", ex.ToString()));
            }

            private void Publish(string message)
            {
                OutputListBox.Items.Add(string.Format("Publishing '{0}'", message)); 
                subject.OnNext(message);    
            }

            private void AddToListBox(string name, string s)
            {
                OutputListBox.Items.Add(string.Format("{0}-->{1}", name, s));
            }
            
            private void Message_Click(object sender, RoutedEventArgs e)
            {
                Publish("Message");
            }

            private void ErrorInSubscribe_Click(object sender, RoutedEventArgs e)
            {
                Publish("ErrorInSubscribe");
            }
        }

    I also have a root UnhandledExceptionHandler to display the exception from the "ErrorInSubscribe" click in a ChildWindow. After this error is displayed, I can continue to send a normal message (not causing an exception in the Subscribe) through the Rx pipeline with no issue, this is the behaviour we tend to want. 

    However, if I use .ObserveOn((IScheduler)DispatcherScheduler.Instance) instead of .ObserveOn(DispatcherScheduler.Instance), then as soon as the exception is raised from the Subscribe, no more messages will be piped through after this. 

    The reason our dispatcher scheduler is cast to IScheduler is it is wrapped up behind an interface that allows us to test appropriately by replacing the dispatcher scheduler with Scheduler.Immediate. 

    Any thoughts on this?
    Thanks,
    Hamish

     

All Replies

  • Tuesday, July 31, 2012 2:39 PM
     
      Has Code

    Are you saying that if the only thing you change is the commented line for the new line (as per this code)

    private void OnLoaded(object sender, RoutedEventArgs e)
    {
      subject = new Subject<string>();
    
      subject
        //.ObserveOn(DispatcherScheduler.Instance)
        .ObserveOn((IScheduler)DispatcherScheduler.Instance)
        .Subscribe(
          s =>
          {
            if (s == "ErrorInSubscribe")
              throw new Exception("Error in Subscribe");
            AddToListBox("Observable", s);
          }, 
          ex => AddToListBox("Observable Error", ex.ToString()));
    }
    

    Then you observe different behavior?

    If this is the case then I will have a deeper look, but my suspicion is that there is more code to this than you are revealing (for example I assume you are not actually using a UserControl if you are UnitTesting this stuff and you have an interface that exposes the schedulers too?)

    I would suggest that it is very common for people to Observe on the dispatcher via the IScheduler interface for testing, and I am sure this would have come up before had there been an issue. I know there are at least two major Investment banks building Silverlight 4.0 clients with Rx for the last 2-3 years. As an aside, you may want to looking to the TestScheduler for testing (though the ImmediateScheduler is handy too)

    http://www.introtorx.com/Content/v1.0.10621.0/16_TestingRx.html

    I am keen to hear your feedback.

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

  • Tuesday, July 31, 2012 3:00 PM
     
     

    Hi Lee, 

    Yes - that's correct, changing just the one line exactly as you say changes the behaviour. There is no additional code - I can send you the sample if you like.  

    Your right in the real application there are no user controls and all MVVM, and yes we have an interface that exposes the schedulers, but that is why this issue is appearing because it is exactly that interface that is returning the DispatcherScheduler as IScheduler.  I have just brought it back to it's most simplest form to reproduce it here:) 

    Thanks, 
    Hamish

  • Tuesday, July 31, 2012 3:12 PM
     
     

    Cool. If you post both the XAML and cs and let me know which versions of everything you are using, I will try to repro it when I get home.


    Lee Campbell http://LeeCampbell.blogspot.com

  • Tuesday, July 31, 2012 3:18 PM
     
     

    Hi Hamish,

    I agree with Lee that there must be some other changes that you've introduced, perhaps unknowingly.

    Your claim implies two possibilities that I see, which as it turns out are quite easy to investigate:

    1. Casting to IScheduler causes a different overload of ObserveOn to be chosen by the compiler.
    2. -or- Casting causes a different object to be returned (with different behavior), yet the ObserveOn overload remains the same.

    If you take a look at the ObserveOn overloads in Rx v1.0.10621, you'll notice that there are only two:

    http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.observeon(v=vs.103).aspx

    One has an IScheduler parameter and the other has a SynchronizationContext parameter.

    1. To disprove the first possibility, we simply need to check whether DispatcherScheduler.Instance returns an object that is implicity convertable to SynchronizationContext.  As you can see from the documentation, it returns an instance of DispatcherScheduler, which only implements IScheduler and derives from System.Object.

    Therefore both of your examples must be calling the ObserveOn(IScheduler) overload.  You can confirm this through IntelliSense in Visual Studio.  Please do :)

    2. This leaves the second possibility, which we can disprove by checking whether DispatcherScheduler defines any conversion operator overloads that would allow it to return a different instance for IScheduler than itself.  (However, I actually doubt the C# compiler would choose a conversion operator over an interface which the object actually implements, but it can't hurt to assume it's possible for the sake of argument.  Furthermore, I highly doubt the Rx team would take advantage of such a "feature" even if it were possible.)

    As you can see from the documentation, DispatcherScheduler does not override any operators.

    Conclusion:

    You must either be introducing another change that is causing a change in behavior, or your code is somehow already non-deterministic and it's simply coincidence that when you changed a single line of code without any effect on compilation another behavioral possibility revealed itself.

    Frankly, it doesn't sound like a good design to begin with.  I would never rely on global exception handlers to catch and display any exception to the user in a prompt and then let the application continue running as if nothing critical has happened to user data (assuming that's what you're doing).

    Instead, consider the following designs (non-exhaustive list):

    • Don't throw an exceptin for a non-exceptional event.
    • Throw an exception if the state of the application may no longer be valid.  If you must, catch the exception in a global exception handler for logging purposes only.  Then create new state (read: create a new instance of a Subject) and hook everything up again to ensure that no corrupt data remains in memory.  If you can't do this, which is likely, then fail fast to protect user data.

    - Dave


    http://davesexton.com/blog

  • Tuesday, July 31, 2012 3:24 PM
     
     

    Hi Hamish,

    > I have just brought it back to it's most simplest form to reproduce it here

    Though you left out code.  Your example cannot be compiled as is.  It would be helpful if you could reproduce the problem in a short but complete program.

    - Dave


    http://davesexton.com/blog

  • Tuesday, July 31, 2012 4:13 PM
     
      Has Code

    Hi Dave - you are onto something there when you said: 

    1. Casting to IScheduler causes a different overload of ObserveOn to be chosen by the compiler.

    That is something that I had not thought of as is indeed seems to be what is happening here. When cast to IScheduler it calls the extension method you posted here http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.observeon(v=vs.103).aspx which is part of the System.Reactive assembly. 

    When the concrete type DispatcherScheduler however is used, it uses a further ObserveOn overload which is defined in the System.Reactive.Windows.Threading assembly - you can see these here: http://msdn.microsoft.com/en-us/library/system.reactive.linq.dispatcherobservable.observeon(v=vs.103).aspx 

    So flicking between using the concrete type and the interface is actually choosing an implementation of ObserveOn in completely different assemblies which I'm sure is correctly down the track of what is going on. 

    Lee/Dave here is the rest of the sample application for completeness which can be created from a brand new Silverlight project:

    MainPage.xaml

    <UserControl x:Class="RxTestSilverlight.MainPage"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
        xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
        mc:Ignorable="d"
        d:DesignHeight="300" d:DesignWidth="400">
    
        <Grid x:Name="LayoutRoot" Background="White">
            <Grid.RowDefinitions>
                <RowDefinition Height="*"></RowDefinition>
                <RowDefinition Height="auto"></RowDefinition>
            </Grid.RowDefinitions>
            <ListBox x:Name="OutputListBox" Grid.Row="0">            
            </ListBox>
            <StackPanel Orientation="Horizontal" Grid.Row="1">
                <Button Click="Message_Click" >Add message</Button>
                <Button Click="ErrorInSubscribe_Click">Add error in Subscribe</Button>
            </StackPanel>
        </Grid>
    </UserControl>
    


    MainPage.xaml.cs

    using System;
    using System.Windows;
    using System.Windows.Controls;
    
    namespace RxTestSilverlight
    {
        using System.Reactive.Concurrency;
        using System.Reactive.Linq;
        using System.Reactive.Subjects;
    
        public partial class MainPage : UserControl
        {
            public MainPage()
            {
                InitializeComponent();
    
                Loaded += OnLoaded;
            }
    
            private Subject<string> subject;
    
            private void OnLoaded(object sender, RoutedEventArgs e)
            {
                subject = new Subject<string>();
    
                subject
                    .ObserveOn(DispatcherScheduler.Instance)
    //                .ObserveOn((IScheduler)DispatcherScheduler.Instance)
                    .Subscribe(
                        s =>
                        {
                            if (s == "ErrorInSubscribe")
                                throw new Exception("Error in Subscribe");
    
                            AddToListBox("Observable", s);
                        }, 
                        ex => AddToListBox("Observable Error", ex.ToString()));
            }
    
            private void Publish(string message)
            {
                OutputListBox.Items.Add(string.Format("Publishing '{0}'", message)); 
                subject.OnNext(message);    
            }
    
            private void AddToListBox(string name, string s)
            {
                OutputListBox.Items.Add(string.Format("{0}-->{1}", name, s));
            }
            
            private void Message_Click(object sender, RoutedEventArgs e)
            {
                Publish("Message");
            }
    
            private void ErrorInSubscribe_Click(object sender, RoutedEventArgs e)
            {
                Publish("ErrorInSubscribe");
            }
        }
    }
    

    App.xaml.cs

            private void Application_UnhandledException(object sender, ApplicationUnhandledExceptionEventArgs e)
            {
                    e.Handled = true;
            }

    Dave - I note your comment about the root exception handling, and note the actual unhandled exception handler in the app is nothing like the above but the effect for the sample is the same.

    Remember the main reason we have this conundrum is due to injecting different schedulers in during testing. I.e the real code looks more like this: 

    .ObserveOn(_schedulerService.Dispatcher)

    Where _schedulerService is a: 

     public interface ISchedulerService
        {
            IScheduler CurrentThread { get; }
    
            IScheduler Dispatcher { get; }
    
            IScheduler Immediate { get; }
    
            IScheduler NewThread { get; }
    
            IScheduler ThreadPool { get; }
        }

    Our MockSchedulerService can then be used to return Scheduler.Immediate for all of these. Whereas the real SchedulerService looks like the following: 

        public class SchedulerService : ISchedulerService
        {
            public IScheduler CurrentThread
            {
                get { return Scheduler.CurrentThread; }
            }
    
            public IScheduler Dispatcher
            {
                get { return DispatcherScheduler.Instance; }
            }
    
            public IScheduler Immediate
            {
                get { return Scheduler.Immediate; }
            }
    
            public IScheduler NewThread
            {
                get { return Scheduler.NewThread; }
            }
    
            public IScheduler ThreadPool
            {
                get { return Scheduler.ThreadPool; }
            }
        }

    Thanks, 
    Hamish


  • Tuesday, July 31, 2012 4:23 PM
     
     
    Lee I forgot you also wanted the versions of everything: Visual Studio 2010, Silverlight 4, Rx 1.0 Main or 1.1 Experimental, XP 32bit. 
  • Tuesday, July 31, 2012 5:32 PM
     
     

    Hi Hamish,

    Nice find.  I was able to repro with your code.

    It turns out that the overload of ObserveOn that you've discovered internally calls the ObserveOn(SynchronizationContext) overload, one of the two that I had identified earlier.  It's actually a slightly different form of my second case that is occurring.

    The difference in behavior may therefore boil down to differences between DispatcherSynchronizationContext and DispatcherScheduler.

    It's possible that the Rx team will see this difference as a bug and fix it.  Though if so, I'd expect the behavior of both to be changed so they act in the way that you didn't want, which would ensure that a potentially corrupt observable will no longer generate notifications when an exception is thrown in Subscribe.  Or perhaps the behavior will be left "undefined", since the safer choice of the two isn't necessarily the "correct" choice in all circumstances.

    > Remember the main reason we have this conundrum is due to injecting different schedulers in during testing

    No, the main reason is that you're throwing an exception in Subscribe.  :)

    I still think you'll be better off either not throwing an exception in Subscribe or throwing an exception and letting the broken observable die, as the ObserveOn(IScheduler) overload does.

    - Dave


    http://davesexton.com/blog

  • Tuesday, July 31, 2012 8:25 PM
     
      Has Code

    Thanks for the extra detail of what is going on behind the scenes. Yes it will be interesting to see what the Rx team think about it and what the correct behavior should actually be.

    Just to be clear - I am not actually throwing an exception in the Subscribe in the real world, it is just bubbling out of calls to other code made within the subscribe.

    I accept that any exception that is raised within the Rx observable pipeline will cause the subscriber's OnError to be raised and the observable faulted/ended. However, exceptions coming from the subscribe part could you not argue that they are not strictly part of the observable and hence errors from there should not cause the observable itself to terminate? OnError of the observer is not called when an exception is raised from observer itself (in it's OnNext), which seems to infer that it isn't really part of the observable it's subscribing to and should not affect it (maybe I'm wrong).

    It also makes exceptions from the observer harder to catch and handle the Rx way (Catch and Retry operators etc) due to the OnError not being raised. Does one wrap everything in the subscribe in a try catch? Or does this imply you should always put everything in a .Do as part of the observable and use an empty Subscribe() ?

    I am curious about the topic of keeping observables alive - for example, take the following completely contrived example I just made up, say you have an observable stream of requests to open a report (imagine the open report requests subject being part of a message bus):

    namespace ConsoleApplication1
    {
        class Program
        {
            private static Subject<string> _openReportRequests;
    
            static void Main(string[] args)
            {
                var reportService = new ReportService(); 
                
                _openReportRequests = new Subject<string>();
                _openReportRequests
                    .ObserveOn(TaskPoolScheduler.Default)
                    .Do(reportService.OpenReport)
                    .Subscribe(s => { }, ex => Console.WriteLine(ex));
    
                OpenReport("Report1");
                OpenReport("Report2");
                OpenReport("Report3");
    
                Console.ReadLine();
            }
    
            private static void OpenReport(string reportName)
            {
                Console.WriteLine("Opening report " + reportName);
                _openReportRequests.OnNext(reportName);
            }
    
            public class ReportService
            {
                public void OpenReport(string reportName)
                {
                    if (reportName == "Report2")
                        throw new Exception("Report2 error");
    
                    Console.WriteLine(reportName);
                }
            }
        }
    I get the following output:

    Opening report Report1
    Opening report Report2
    Opening report Report3
    Report1
    System.Exception: Report2 error
       at ConsoleApplication1.Program.ReportService.OpenReport(String reportName) in
     c:\dev\ConsoleApplication1\ConsoleApplication1\Program.cs:line 45
       at System.Reactive.Linq.Observable.Do`1._.OnNext(TSource value)

    So, I really just want to show an error message to the user regarding Report2, and allow them to continuing opening other reports that may not have the same issue as Report2 has. So after reading Lee's (very great by the way) introtorx I try putting in a Retry so it looks like this:

    _openReportRequests
      .ObserveOn(TaskPoolScheduler.Default)
      .Do(reportService.OpenReport)
      .Retry()
      .Subscribe(s => { }, ex => Console.WriteLine(ex));

    And I get the following output:

      Opening report Report1
      Opening report Report2
      Opening report Report3
      Report1

    And it stops.. I don't really want to continue on with another observable (say with Catch or OnErrorResumeNext) I just want the existing one to carry on receiving the open report requests as I have logged the exception or shown the issue regarding Report2 to the user. What is usually done here? I have no concern about the observable being in an invalid state, it is just receiving messages and wish it to carry on doing so. I know though that the way Rx works is the stream is faulted, what's the best practice here to "get it going again"? Maybe it's a simple answer. Sorry for the tangent but I think it still relates in a way. 

    Thanks,
    Hamish


    • Edited by HamishG Tuesday, July 31, 2012 8:28 PM
    • Edited by HamishG Tuesday, July 31, 2012 8:28 PM
    • Edited by HamishG Wednesday, August 01, 2012 6:16 AM
    •  
  • Tuesday, July 31, 2012 9:44 PM
     
     Answered

    Hi,

    > [snip] OnError of the subscriber is not called when an exception is raised from subscriber itself (in it's OnNext), which seems
    > to infer that it isn't really part of the observable it's subscribing to and should not affect it (maybe I'm wrong).

    Actually, I agree that "[the observer] isn't really part of the observable it's subscribing to", which is why OnError isn't called when an exception is thrown by an observer; however, I disagree that, in general, an exception thrown by an observer doesn't affect the observable it's subscribing to.  Whether it should or shouldn't is a different issue, and I'd agree that it shouldn't.  It just so happens that in reality, it may. 

    For example, imagine that some observer processes data from some observable.  A bug in the observable causes it to push some bad data to the observer.  The observer throws an exception, which is caught and suppressed (as you're doing).  This leaves the observer's state corrupted (in this scenario).  As a result, the bad data may cause the observable to become more corrupted too - assuming that there's some physical or behavioral relationship between the observer's data and the data source of the observable.  Now, subsequent notifications from the observable containing bad data will cause the observer to either continue throwing exceptions or to continue processing bad data.  What's the point of letting the application continue at all?  What if the bad data is being persisted?  What if it's overwriting good data?

    As I wrote previously, it's "safer" to let the observable die.  But I respect the fact that it's not always "correct", which is why the Rx team may just leave the behavior as undefined.  I could also understand if they "fix" this behavior instead, but it may actually cause more problems than it fixes.  It's a breaking change for people who are depending upon the safer behavior, which means that "fixing" the problem could actually cause existing applications to start producing notifications containing more bad data that wasn't previously being observed.

    Frankly, nobody should depend upon either behavior.  Again, consider failing fast or creating new observables, but I certainly wouldn't attempt to reuse existing observables under any exceptional circumstances within observers.

    > Does one wrap everything in the subscribe in a try catch?

    No.  Well, not necessarily, but it's probably not a good idea to just blindly wrap code in try...catch blocks.  See my previous comments.

    > Or does this imply you should always put everything in a .Do as part of the observable and use an empty Subscribe() ?

    No, I don't think it implies that at all.

    Do is intended for side-effects within the observable.  If an exception is thrown in Do, the observable faults.   Subscribe is intended for side-effects outside of the observable; i.e., you're no longer in the query.  If an exception is thrown, it may have a larger impact than an exception thrown within the observable, because the scope generally becomes larger when exiting observables.  Therefore, if you want to make it easier to debug your application and you want to protect end-user data, then don't move your faulty code from Subscribe to Do just to suppress exceptions.

    > [snip] say you have an observable stream of requests to open a report [snip]

    I wouldn't use Do here.  I'd use Subscribe.  I'd also consider one of two possibilities:

    1. Is the exception actually exceptional?  In other words, if your program is prepared to handle "faulty" reports then don't throw an exception to begin with, if at all possible.  Or if an exception originates from an internal API that you can't control, then read the documentation and experiment to be sure that there won't be any irreversible side-effects and then catch it at the closest place to the API that is throwing it, using a finally block to reverse any global state.

      In other words, don't throw boneheaded, vexing and exogenous exceptions to callers of OpenReport.
    2. -or- If the OpenReport method is beyond your control and it may throw exceptions (e.g., a custom add-in scenario) and you require your observer to be robust enough to handle all exceptions gracefully, then you could add a try...catch around the call to OpenReport within the observer that you pass to Subscribe.  But be very careful here: you must ensure that all related state is released; i.e., lose the bad report.  Don't make any assumptions about it or any other state that the OpenReport method may have corrupted.  However, especially if it's an add-in scenario, it's possible that the entire add-in is now corrupted.  Consider attempting to unload it without notification and then possibly reload the add-in from scratch.  However, it's also possible that the broken add-in may have already corrupted global state in your application, if you give it such access, which means that you can't really be sure about anything at this point.  It's probably safest to just crash here.  In other words, fail fast.

    What happens if the application crashes?  Well, considering that there's a bug somewhere, crashing protects user data.  You should also have some kind of log or error report that can help you to determine the cause and fix the bug.

    - Dave


    http://davesexton.com/blog

    • Marked As Answer by HamishG Wednesday, August 01, 2012 9:40 AM
    •  
  • Tuesday, July 31, 2012 9:48 PM
     
      Has Code

    In summary, this is your problem :)

    private void Application_UnhandledException(object sender, ApplicationUnhandledExceptionEventArgs e)
    {
          e.Handled = true;
    }

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Tuesday, July 31, 2012 9:48 PM Formatting
    •  
  • Tuesday, July 31, 2012 10:10 PM
    Owner
     
     Answered

    I've had a look at this behavior and while it's unfortunate there's a difference in behavior in v1.0 (due to the way the SynchronizationContext and the Dispatcher work together), the problem is rooted in undefined behavior when an observer throws. Compare this to the often ill-defined behavior when an event handler in .NET throws: the poor piece of code trying to invoke the event's delegate gets to handle your error, and who knows what it will do with it in return? Dave's suggestions on revising your error handling are the right advise.

    Having said this, the error behavior has been revisited for the Rx v2.0 release. Before I got into much detail: fundamentally, nothing is changing; all core principles stay the same. However, what we're doing in addition is this: when an observer throws, we'll shut down the observable sequence as soon as possible (effectively triggering a Dispose call on the subscription of the faulting observer) and propagate the exception up the stack. Therefore, it will still reach the originator of the On* call to the observer, which likely is a scheduler that may trigger its global exception handling mechanism.

    Prior to taking this approach, an exception in an observer could leave sequences in an orphaned state where things couldn't get cleaned up properly or - in the worst case - invalid data may be produced (e.g. a CombineLatest's failure of one channel may leave the other one ticking along, producing combinations with the last value before the observer faulted). Notice that exceptions in an On* call can't possibly be redirected to the same observer's OnError channel. Doing so would violate the observer grammar in that the OnError message would overlap the failing OnNext message. (Worse, if an error from OnError would tunnel back to OnError, you can see a potential stack overflow coming up as well.)

    One final word of advise. Consider whether the continuation of the query logically belongs to the query itself or not. If it does, e.g. each OnNext call of a source leads to a service request, asynchronously returning a status code, then you may want to lift that logic into the query (e.g. using a SelectMany) rather than performing this work in a handler passed to Subscribe. Once in the world of querying, you can manipulate errors in a more Rx-natural fashion, e.g. using Catch, Retry, etc. or even by materializing those into notification objects (e.g. to provide a separate error stream, much like PowerShell does on its pipeline).


    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }


  • Tuesday, July 31, 2012 11:45 PM
     
     

    Thanks Bart.  I actually read your post but forgot about the change.  It's good to know.

    - Dave


    http://davesexton.com/blog