If "CurrentThreadScheduler.IsScheduleRequired" is set, how do you know which scheduler to use?

Answered If "CurrentThreadScheduler.IsScheduleRequired" is set, how do you know which scheduler to use?

  • 2012年7月17日 下午 08:23
     
     

    In writing a custom observable, it is sometimes the case that CurrentThreadScheduler.IsScheduleRequired is asserted for a given subscriber (detected in my Subscribe function). What does this indicate for the associated subscription request?

    I am aware of the different types of schedulers which are statically available, but how do I find out which one of them has been requested by this subscriber, so that I can schedule my work items to it?

    Please be gentle, I'm a Rx n00b.  Using Rx 2.0.

所有回覆

  • 2012年7月17日 下午 11:22
    擁有者
     
     已答覆 包含代碼

    The CurrentThreadScheduler allows asynchronous subscriptions on the current thread. To do so, it uses a so-called trampoline, which needs to be installed on the current thread's stack, as deeply as possible (i.e. the closer to the thread start point, the better). When a request is made to schedule something on the current thread's scheduler, we're already running on that thread (hence the name "current thread", i.e. you're referring to the place you're already running on). Instead of running the work immediately (which would be the ImmediateScheduler's behavior), we queue it up at the end of the trampoline's queue. When we return from the current work item, back to the trampoline, we run the next item in the queue, etc.

    For example, try to predict the outcome of the code below:

    Scheduler.CurrentThread.Schedule(() =>
    {
        Console.WriteLine("A");
    
        Scheduler.CurrentThread.Schedule(() =>
        {
            Console.WriteLine("C");
        });
    
        Scheduler.CurrentThread.Schedule(() =>
        {
            Console.WriteLine("D");
        });
    
        Console.WriteLine("B");
    });
    The remaining question is whose responsibility it is to install the trampoline if none is present on the current thread. Each call to any of the Schedule methods on the CurrentThreadScheduler will first check whether the trampoline is installed. If not, it will insert one at the calling stack frame depth, i.e. start to dispatch a loop on a queue it associates with the current thread. When the work passed to the Schedule method (as the action delegate) runs it may schedule more work by calling Schedule again, which will now notice there's already a queue, causing the work to be enqueued for it to be picked up eventually by the top-level dispatching loop. As a result, the top-level Schedule call becomes long-running as it takes responsibility for dispatching the items in the queue. In fact, it cannot return an IDisposable to cancel work unless it's drained all the work. (Recall the guidance to install the trampoline "as deeply as possible" which means the blocking occurs at a much deeper level in the stack. This is effectively what message loops do, often at the very start of the thread as their sole purpose in life.)

    But there's another place where the trampoline gets installed... Each call to Subscribe on an observable sequence generated by Rx or through the use of either the Observable.Create<T> method or by deriving from ObservableBase<T> will include a check for the presence of a trampoline on the current thread. If none is present, the same logic applies as discussed above: the call to the Subscribe method becomes effectively the victim for running the current thread's trampoline. Other Rx work may land on the trampoline for the current thread, so you may be faced with a synchronous Subscribe that cannot return an IDisposable for cancellation any time soon. (Note, however, that Rx tries to avoid using the CurrentThreadScheduler for anything but short-lived work. This said, long-running user callbacks may still take place and land on the trampoline. How long user code takes is entirely beyond our control.)

    What the IsScheduleRequired property tells you is whether or not there's a trampoline on the current thread already. If not, calling Schedule will install one. If there is, you could also just call Schedule to enqueue work. So, what's the point of this property if Schedule does the right thing either way? There are a couple of reasons why we have it.

    First of all, higher layers of the Rx framework may need to check for this, without peeking at internal state (such as the queue which gets installed to thread local storage, except for platforms where TLS is not available, where we use another technique). Hence, we expose this as a property.

    Secondly, knowing that there's no trampoline on the current thread yet, other frameworks can prevent blocking Subscribe calls down the line. (As we discussed higher up, a Subscribe could be long-running if it's the top-level method call that drains the trampoline.) For example, if you're writing a framework or tool that runs user code, you may want to check whether there's already a trampoline. If not, wrap all of the user code in CurrentThreadScheduler.Schedule, and Subscribe (or other Schedule) methods that run in that context are more likely to return immediately. This ensures an IDisposable for cancellation will be returned. An example of this use is LinqPad which wraps user code involving Rx in a CurrentThreadScheduler frame. By doing so, there'll always be an IDisposable for cancellation (unless the ImmediateScheduler is used, you're on your own then), which can be hooked up to the "stop" button in LinqPad.

    Finally, if you're implementing an IObservable<T> from scratch - that is, without using Observable.Create<T> or ObservableBase<T> - it's recommended to play nice with the CurrentThreadScheduler by doing a check for IsScheduleRequired and conditionally wrap the core subscribe logic in a call to Schedule. If we weren't to expose the IsScheduleRequired property, you'd always have to go through a Schedule call "just in case", which is suboptimal. This said, we don't recommend our users to implement IObservable<T> from scratch. It's harder than you may think!

    In the end, the answer to your question is most likely: "don't bother to call this property in your code". In fact, in Rx v2.0 we marked the property with an Advanced EditorBrowsable flag to indicate its uncommon specialized use.


    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

  • 2012年7月18日 下午 08:54
     
     

    Great information and this helps with what I am trying to design. Thanks Bart,

    Glenn