none
subscribeOn behavior RRS feed

  • Question

  • hi,

    I was reading the following post to understand the difference in the behavior of observeOn and subscribeOn. 

    http://blogs.msdn.com/b/rxteam/archive/2009/11/21/observable-context-observable-subscribeon-and-observable-observeon.aspx

    An example of Concat operator that concatenates three observable sequences, which produce notifications on different threads has been used to describe the behavior of subscribeOn. When first observable sequence's onComplete gets called, it triggers the subscribe call for the next observable sequence passed as parameter in Concat. The subscription for the next observable takes place on the same thread on which the previous observable's onComplete was called. In case this is not a desirable behavior, then subscribeOn can be used to schedule subscription on a desired scheduler. 

    Observable.Concat(ob1,ob2).SubscribeOn(new NewThreadScheduler()).Subscribe();

    However in the code below, although I have used SubscribeOn, the next observable sequence(ob2) gets subscribed on the same thread on which ob1's onComplete was called. ( The message "ob2 getting created on tid:" displays the same thread id as that of the thread on which ob1's onComplete was called). 

    I would expect the following messages to show the same thread-id, on which subscription gets made- 

    ob1 getting created on tid:3 

    ob2 getting created on tid:3 


    public static void testSubscribeOn()
            {
                var ob1 = Observable.Create<int>(observer =>
                    {
                        Console.WriteLine("ob1 getting created on tid:{0}", Thread.CurrentThread.ManagedThreadId);
                        IScheduler scheduler = new NewThreadScheduler();
                        scheduler.Schedule(() =>
                            {
                                Console.WriteLine("onNext with value 1, called on thread:{0}", Thread.CurrentThread.ManagedThreadId);
                                observer.OnNext(1);
                                Console.WriteLine("ob1 calling onComplete on tid:{0}", Thread.CurrentThread.ManagedThreadId);
                                observer.OnCompleted();
                            });
                        
                        return Disposable.Create(() => Console.WriteLine("dispose for ob1 subscription called on tid:{0}",
                        Thread.CurrentThread.ManagedThreadId));
                    });
    
                var ob2 = Observable.Create<int>(observer =>
                {
                    Console.WriteLine("ob2 getting created on tid:{0}", Thread.CurrentThread.ManagedThreadId);
                    IScheduler scheduler = new NewThreadScheduler();
                    scheduler.Schedule(() =>
                    {
                        Console.WriteLine("onNext with value 2, called on thread:{0}", Thread.CurrentThread.ManagedThreadId);
                        observer.OnNext(2);
                        Console.WriteLine("ob2 calling onCompleted on thread:{0}", Thread.CurrentThread.ManagedThreadId);
                        observer.OnCompleted();
                    });
                    return Disposable.Create(() => Console.WriteLine("dispose for ob2 subscription called on tid:{0}",
                        Thread.CurrentThread.ManagedThreadId));
                });
                Observable.Concat(ob1, ob2)
                    .SubscribeOn(new NewThreadScheduler())
                    .Subscribe(d => Console.WriteLine("received {0} on tid:{1}", d, Thread.CurrentThread.ManagedThreadId),
                    () => Console.WriteLine("final onCompeted called on tid:{0}", Thread.CurrentThread.ManagedThreadId) );
            }
    Wednesday, September 10, 2014 12:28 PM

Answers

  • SubscribeOn only applies to when Subscribe is called and when Dispose is called on the resulting subscription.

    In your example, you've applied the SubscribeOn operator to the result of Concat, which means that it only applies when you call Subscribe on the result of Concat (and Dispose on Concat's subscription, which you've elided from your example).

    In order to get the behavior that you expect, you must apply SubscribeOn to each observable on which you plan to call Subscribe, either directly or indirectly.

    The semantics of Concat are such that when ob1 calls OnCompleted, the operator will call ob2.Subscribe for you.  If you want the Concat operator to subscribe to ob2 on a particular context, then you must give it an observable with SubscribeOn applied.  If you also want the outer subscription to Concat's observable to subscribe on a particular context, then you must apply SubscribeOn to both observables.

    Observable.Concat(ob1, ob2.SubscribeOn(s)).SubscribeOn(s).Subscribe(...);

    The takeaway is that you must always be careful where you apply operators within a query.  Not all locations are equal, in general.

    Note that SubscribeOn is rarely the correct choice.  I'm working on a blog post that explains this in great detail, but it's still a work-in-progress.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by kharesp Wednesday, September 10, 2014 7:39 PM
    Wednesday, September 10, 2014 5:01 PM

All replies

  • SubscribeOn only applies to when Subscribe is called and when Dispose is called on the resulting subscription.

    In your example, you've applied the SubscribeOn operator to the result of Concat, which means that it only applies when you call Subscribe on the result of Concat (and Dispose on Concat's subscription, which you've elided from your example).

    In order to get the behavior that you expect, you must apply SubscribeOn to each observable on which you plan to call Subscribe, either directly or indirectly.

    The semantics of Concat are such that when ob1 calls OnCompleted, the operator will call ob2.Subscribe for you.  If you want the Concat operator to subscribe to ob2 on a particular context, then you must give it an observable with SubscribeOn applied.  If you also want the outer subscription to Concat's observable to subscribe on a particular context, then you must apply SubscribeOn to both observables.

    Observable.Concat(ob1, ob2.SubscribeOn(s)).SubscribeOn(s).Subscribe(...);

    The takeaway is that you must always be careful where you apply operators within a query.  Not all locations are equal, in general.

    Note that SubscribeOn is rarely the correct choice.  I'm working on a blog post that explains this in great detail, but it's still a work-in-progress.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by kharesp Wednesday, September 10, 2014 7:39 PM
    Wednesday, September 10, 2014 5:01 PM
  • hi Dave,

    Thank you for the explanation. I understand that to get the expected behavior I will have to pass ob2.SubscribeOn(s) as a parameter to Concat. 

    If I run: 

    Observable.Concat(ob1.SubscribeOn(NewThreadScheduler.Default),ob2).Subscribe(); 

    I get the following output msgs: 

    ob1 getting created on tid:3 

    dispose for ob1 subscription called on tid:5 

    So both ob1's subscribe(tid:3) and dispose(tid:5) methods are getting called on a separate thread? 

    Sorry for being dense in this regard, but I modified the example as follows(definition of ob1 and ob2 is the same), to see final observable's dispose method being invoked on the specified scheduler: 

     var final_observable = Observable.Create<int>(observer =>
                    {
                        Console.WriteLine("final observable getting created on tid:{0}", Thread.CurrentThread.ManagedThreadId);
                        Observable.Concat(ob1, ob2).Subscribe(d =>observer.OnNext(d),
                            observer.OnError,observer.OnCompleted);
                        return Disposable.Create(() => Console.WriteLine("dipose for final subscription called on tid:{0}", Thread.CurrentThread.ManagedThreadId));
                    });
               
    final_observable
                    .SubscribeOn(NewThreadScheduler.Default)
                    .Subscribe(d => Console.WriteLine("received {0} on tid:{1}", d, Thread.CurrentThread.ManagedThreadId),
                    () => Console.WriteLine("final onCompeted called on tid:{0}", Thread.CurrentThread.ManagedThreadId));

    In this case I see the following messages: 

    final observable getting created on tid:3

    ob2 calling onCompleted on tid:5 

    dispose for final subscription called on tid:5 

    Dispose for final observable is getting called on the same thread as the one on which, ob2's onComplete gets called. Shouldn't the dispose for final observable be called on tid:3 ("final observable getting created on tid:3"). What am i failing to understand here? 




    • Edited by kharesp Wednesday, September 10, 2014 7:41 PM
    Wednesday, September 10, 2014 7:38 PM
  • The semantics of NewThreadScheduler is that every action is scheduled on a new thread.*

    Just like the semantics of ThreadPoolScheduler is that every action is scheduled on a different pooled thread.**

    If you want a single background thread, then use EventLoopScheduler instead.

    * There is an optimization that reuses the current thread when overlapping actions are scheduled (but in general you shouldn't rely on it).
    ** Of course, pooled threads are reused as well.

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Wednesday, September 10, 2014 9:54 PM Small clarification
    Wednesday, September 10, 2014 8:42 PM