none
An explanation of Observable.Start

    Question

  • Guys, 

    In Rx we can have pipelines which will stay on the same thread as long as we do not introduce operators which by default have concurrency. For example buffer or throttle.  In those cases when that happens the scheduler is switched and the pipeline continuing from that operator happens to be on the new scheduler.  

    So my question is why is this not the case for Observable.Start? Taking the code below. 

    Observable.Start(() => GetData(), NewThreadScheduler.Default).Select(FormatResults);

    In this sample the GetData method will be called on a worker thread, but the Select will be called on the thread upon which the observable was subscribe. 

    Can someone explain why this is the case? I have heard people say that transforms are low cost and hence do not need to be on background threads hence no scheduler param but I am not sure if this always the case.  Transforming N number items could be very expensive. 

    Also is it not inconsistent that some operators automatically switch the pipeline to a different scheduler and some (Observable.Start()) do not? 

    Thoughts? 


    David Hanson




    • Edited by DaveHanson Friday, October 11, 2013 1:12 PM
    Friday, October 11, 2013 1:08 PM

Answers

  • Hi David,

    It has nothing to do with the Select operator.  It's simply a race condition between GetData and Subscribe.  I assume that you're calling Subscribe immediately after Start in your particular test?

    Try putting the thread to sleep for a couple of seconds in GetData and you'll see the behavior that you expected.

    The reason for this race condition is because Start invokes GetData as soon as possible on the specified scheduler and then it caches the result for all observers.  That's the semantics of Start - it's supposed to be hot.  Contrast that with ToAsync, for example, which also converts a function into an observable, though ToAsync uses deferred execution; i.e., it's cold.

    So if GetData completes before Subscribe is called, then it caches the result and when Subscribe is finally called it observes the cached value synchronously, hence why it seems like the observable is marshaling back to the main thread, though in fact it's not.  It just so happens that you're subscribing on the main thread and observing the cached value synchronously.  If you were to subscribe on any other thread after the value has been cached, then you'd observe the result synchronously on the thread on which you subscribed, not the thread of the specified scheduler.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by DaveHanson Tuesday, October 15, 2013 9:06 AM
    Friday, October 11, 2013 5:43 PM

All replies

  • Hi David,

    It has nothing to do with the Select operator.  It's simply a race condition between GetData and Subscribe.  I assume that you're calling Subscribe immediately after Start in your particular test?

    Try putting the thread to sleep for a couple of seconds in GetData and you'll see the behavior that you expected.

    The reason for this race condition is because Start invokes GetData as soon as possible on the specified scheduler and then it caches the result for all observers.  That's the semantics of Start - it's supposed to be hot.  Contrast that with ToAsync, for example, which also converts a function into an observable, though ToAsync uses deferred execution; i.e., it's cold.

    So if GetData completes before Subscribe is called, then it caches the result and when Subscribe is finally called it observes the cached value synchronously, hence why it seems like the observable is marshaling back to the main thread, though in fact it's not.  It just so happens that you're subscribing on the main thread and observing the cached value synchronously.  If you were to subscribe on any other thread after the value has been cached, then you'd observe the result synchronously on the thread on which you subscribed, not the thread of the specified scheduler.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by DaveHanson Tuesday, October 15, 2013 9:06 AM
    Friday, October 11, 2013 5:43 PM
  • @Dave,

    So the fact Observable.Start() returns an IObservable led me to believe that the pipleine would be lazy until someone subscribes.  But having just checked what you have said you're 100%  correct.  However I am not sure I like this behaviour. 

    Is there an operator I can use to make the lazy? Or would I have to ue some combination of Defer to get this? I could always create my own operator but would be good if there is one out of the box.  I noticed the ToAsync operators do not define a scheduler.

    Cheers,

    Dave


    David Hanson

    Monday, October 14, 2013 8:21 AM
  • Hi David,

    There are several overloads of ToAsync with a scheduler parameter.  Are you using an older version of Rx?

    - Dave


    http://davesexton.com/blog

    Tuesday, October 15, 2013 3:25 PM