Unanswered Rx Schedulers

  • Wednesday, July 18, 2012 4:40 PM
     
      Has Code

    I'm using version 1.1.10621.0 in case that makes any difference.

    I'm trying to get my head around the different dispatchers and one thing has just surprised me. I'm sure there is a logical explanation though.

    I have the following code that subscribes to a stream of ints. I was expecting the stream to be returned to me on the same thread as the "Main Thread". However this isn't happening, instead it seems to be another random thread. Also I have a .StartsWith(someInts) in my stream and these are returned again on a different thread.

    So I guess the question is why does .ObserveOn(Scheduler.CurrentThread) not return the values on the "Main Thread" and why is the "Initial Image" returned on a different Thread to the other values?

    If I want to enforce a kind of ObserveOnDispatcher type behaviour in a non WPF context and get all the notifications on the same thread how do I do that?

    Console.WriteLine("Main Thread is {0}", Thread.CurrentThread.ManagedThreadId);

    var eventStream = new EventStream(); eventStream.GetStream() .SubscribeOn(Scheduler.ThreadPool) .ObserveOn(Scheduler.CurrentThread) .Subscribe(vals => Console.WriteLine( vals.Contains(EventStream.InitialImageFlag) ? "Received Initial Image on Thread {0}" : "Received Values on Thread {0}, Last Value is {1}", Thread.CurrentThread.ManagedThreadId, vals.Last()));

    /* Code form the EventStream class */

    public IObservable<IEnumerable<int>> GetStream()
            {
                Console.WriteLine("Received Request on Thread {0}", Thread.CurrentThread.ManagedThreadId);
                var stream = Observable.FromEvent<IEnumerable<int>>(
                    ev =>
                        {
                            Console.WriteLine("Processing Subscription Request on Thread {0}", Thread.CurrentThread.ManagedThreadId);
                            NewIntEvent += ev;
                        },
                    ev => { NewIntEvent -= ev; })
                    .Publish().RefCount().StartWith(new[] { GetInitialImage() });

                return stream;
            }


    Thanks in advance for any help.

All Replies

  • Friday, July 20, 2012 11:40 AM
     
     

    I too have bumped into this... I grabbed this explanation from StackOverflow:

    I had a similar problem a while back and asked this question about it. I think the responses (including the comments) there will answer your question. To summarize:

    • If you want to update controls on a gui thread, use ObserveOn. If you referenceSystem.Reactive.Windows.Forms.dll you get the .ObserveOn(form) which is handy.
    • SubscribeOn controls the thread on which the actual call to subscribe happens. The problem solved here is that WinForms and WPF will throw exceptions if you add event handlers from multiple different threads.

    Also, this post was very helpful in figuring out the relationship between ObserveOn andSubscribeOn.


    JP Cowboy Coders Unite!

  • Tuesday, July 31, 2012 7:58 PM
     
     

    Hi,

    > why does .ObserveOn(Scheduler.CurrentThread) not return the values on the "Main Thread"

    Scheduler.CurrentThread does not represent a reference to the thread that is running when the property is read (and you're not the first person to assume it).  Instead, it returns an instance of CurrentThreadScheduler, which schedules work on the thread that is "current" whenever work is scheduled.

    What does that mean?  Well, contrast it with ImmediateScheduler, which also schedules work on the thread that is "current" whenever work is scheduled.  The difference in behavior is that ImmediateScheduler executes work immediately, while CurrentThreadScheduler inserts work into a queueing mechanism on the current thread (a.k.a., a trampoline).  See the following discussion for details:

    http://social.msdn.microsoft.com/Forums/en-US/rx/thread/7f75482f-eff2-4938-9491-47fe870989e8/

    > why is the "Initial Image" returned on a different Thread to the other values?

    SubscribeOn is passed Scheduler.ThreadPool and the initial data is pushed synchronously from an array, thus the initial data is pushed asynchronously on a pooled thread when an observer subscribes.  That's what you told Rx to do by using SubscribeOn.

    Subsequent event notifications are probably being sent on a pooled thread as well, independently of SubscribeOn, though you haven't posted the code.  Thus all notifications are being sent on pooled threads.

    Next, an instance of CurrentThreadScheduler receives all notifications via ObserveOn, which schedules them on the "current" thread, which of course is a pooled thread.  So a trampoline is being created on a pooled thread, which has no noticeable effect in your query.  It's as if ObserveOn wasn't even there.

    > If I want to enforce a kind of ObserveOnDispatcher type behaviour in a non WPF context and
    > get all the notifications on the same thread how do I do that?

    EventLoopScheduler

    - Dave


    http://davesexton.com/blog