none
why does managed thread id keep changing?

    Question

  • hi,

    I have a really simple code (to see messages arrive in different execution contexts and the rx-contract that only one onNext can be executing at a time): 

    static void Main(string[] args)
            {
                int count = 0;
                var perSec = Observable.Interval(TimeSpan.FromSeconds(2));
                
    var sub1= perSec.Subscribe(d => 
    Console.WriteLine(String.Format("on main thread with id: {0} value: {1} count: {2}",System.Threading.Thread.CurrentThread.ManagedThreadId,d,count++)));
    
               
     var sub2 = perSec
                    .ObserveOn(new NewThreadScheduler())
                    .Subscribe(e => Console.WriteLine(String.Format("on new thread with id:{0} value: {1} count: {2}",  System.Threading.Thread.CurrentThread.ManagedThreadId, e, count++)));
    
                Console.ReadKey();
                sub1.Dispose();
                sub2.Dispose();
    
    
    
            }

    When I run this code, I see that the managed thread id of main thread keeps changing ( although the managed thread id of the new thread which i create remains the same ). Why does the thread id keep changing? 

    As per msdn, the managed thread id is a unique identifier of a thread, so I would assume that it should remain a constant throughout the programs execution. 

    thank you. 

    Thursday, August 14, 2014 1:03 AM

Answers

  • Hi,

    It's true that Rx's §4.2 contract requires notifications to be serialized, but that does not mean that all notifications must leave on the same thread.  (Nor does it mean that the notifications of two subscriptions to a cold observable must be serialized with respect to each other, in case you were wondering.)

    Rx is a free-threaded programming model.  In general, notifications arrive on some thread and leave on that same thread; however, we often need to schedule notifications, causing them to leave on different threads.  That's the purpose of the IScheduler interface.

    All operators in Rx that may introduce asynchrony offer an overload that accepts an IScheduler parameter.  Many of those operators use CurrentThreadScheduler by default (meaning that they use the typical free-threaded model; it does not mean that they are bound to the "current" thread at the time of subscription: a common misunderstanding), though some operators actually introduce concurrency by default because it's intuitive with regard to their particular semantics.

    For example, Observable.Interval uses ThreadPoolScheduler by default (conceptually speaking only; technically, it's a platform-dependent periodic timer via the concurrency abstraction layer (CAL), which uses the ThreadPool if it's available).  This means that each notification may arrive on a different pooled thread, hence why the thread ID is changing in your example.

    Note that NewThreadScheduler actually has the same behavior, though it's optimized to reuse the same thread for consecutive/overlapping notifications, due to the cost of warming up new threads.  So it's possible that the thread remains the same even when using NewThreadScheduler; however, that's not at all why you're seeing the same thread ID in your second test.

    Your second test uses ObserveOn, which also takes advantage of the CAL, though instead of using periodic scheduling it uses long-running scheduling.  As a result, passing in NewThreadScheduler to ObserveOn causes every notification to execute on the same new thread.  It keeps a notification queue and loops (over a semaphore) until the subscription is disposed, which is why your second test always reports the same thread ID.

    Recommended Alternative

    ObserveOn is not necessary in your second test.  As I mentioned previously, it doesn't take advantage of the CAL's periodic scheduling capability, which seems more appropriate for Interval than the CAL's long-running capability.

    Operators that may introduce asynchrony have overloads accepting an IScheduler parameter.  You should always specify scheduling as close to the source as possible to take advantage of semantic optimizations in the CAL.  Use ObserveOn as a last resort only.

    var perSec2 = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default);

    However, you'll find that passing in NewThreadScheduler to Interval doesn't really change the behavior at all when compared to ObserveOnInterval always tries to use periodic scheduling if it's available.  The periodic scheduling implementation of NewThreadScheduler is basically the same as its long-running implementation.  But still, the advantage to avoiding ObserveOn here is that internally there's one less "queue" mechanism in place, keeping your observable just a bit leaner.

    It's also worth noting that ObserveOn actually ignored the fact that you requested NewThreadScheduler with specific intentions to have a new thread.  If you were to pass in ThreadPoolScheduler or TaskPoolScheduler to ObserveOn you'd get the same exact results!  ObserveOn assumes that all we really care about is concurrency, since these 3 schedulers all introduce concurrency.  Even though conceptually only the first gives the "long-running" hint, ObserveOn chooses to apply long-running behavior for each of them anyway.

    See this discussion for details.
    See also this related discussion and work item.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by kharesp Friday, August 15, 2014 2:12 PM
    Thursday, August 14, 2014 2:16 PM

All replies

  • Hi,

    It's true that Rx's §4.2 contract requires notifications to be serialized, but that does not mean that all notifications must leave on the same thread.  (Nor does it mean that the notifications of two subscriptions to a cold observable must be serialized with respect to each other, in case you were wondering.)

    Rx is a free-threaded programming model.  In general, notifications arrive on some thread and leave on that same thread; however, we often need to schedule notifications, causing them to leave on different threads.  That's the purpose of the IScheduler interface.

    All operators in Rx that may introduce asynchrony offer an overload that accepts an IScheduler parameter.  Many of those operators use CurrentThreadScheduler by default (meaning that they use the typical free-threaded model; it does not mean that they are bound to the "current" thread at the time of subscription: a common misunderstanding), though some operators actually introduce concurrency by default because it's intuitive with regard to their particular semantics.

    For example, Observable.Interval uses ThreadPoolScheduler by default (conceptually speaking only; technically, it's a platform-dependent periodic timer via the concurrency abstraction layer (CAL), which uses the ThreadPool if it's available).  This means that each notification may arrive on a different pooled thread, hence why the thread ID is changing in your example.

    Note that NewThreadScheduler actually has the same behavior, though it's optimized to reuse the same thread for consecutive/overlapping notifications, due to the cost of warming up new threads.  So it's possible that the thread remains the same even when using NewThreadScheduler; however, that's not at all why you're seeing the same thread ID in your second test.

    Your second test uses ObserveOn, which also takes advantage of the CAL, though instead of using periodic scheduling it uses long-running scheduling.  As a result, passing in NewThreadScheduler to ObserveOn causes every notification to execute on the same new thread.  It keeps a notification queue and loops (over a semaphore) until the subscription is disposed, which is why your second test always reports the same thread ID.

    Recommended Alternative

    ObserveOn is not necessary in your second test.  As I mentioned previously, it doesn't take advantage of the CAL's periodic scheduling capability, which seems more appropriate for Interval than the CAL's long-running capability.

    Operators that may introduce asynchrony have overloads accepting an IScheduler parameter.  You should always specify scheduling as close to the source as possible to take advantage of semantic optimizations in the CAL.  Use ObserveOn as a last resort only.

    var perSec2 = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default);

    However, you'll find that passing in NewThreadScheduler to Interval doesn't really change the behavior at all when compared to ObserveOnInterval always tries to use periodic scheduling if it's available.  The periodic scheduling implementation of NewThreadScheduler is basically the same as its long-running implementation.  But still, the advantage to avoiding ObserveOn here is that internally there's one less "queue" mechanism in place, keeping your observable just a bit leaner.

    It's also worth noting that ObserveOn actually ignored the fact that you requested NewThreadScheduler with specific intentions to have a new thread.  If you were to pass in ThreadPoolScheduler or TaskPoolScheduler to ObserveOn you'd get the same exact results!  ObserveOn assumes that all we really care about is concurrency, since these 3 schedulers all introduce concurrency.  Even though conceptually only the first gives the "long-running" hint, ObserveOn chooses to apply long-running behavior for each of them anyway.

    See this discussion for details.
    See also this related discussion and work item.

    - Dave


    http://davesexton.com/blog

    • Marked as answer by kharesp Friday, August 15, 2014 2:12 PM
    Thursday, August 14, 2014 2:16 PM
  • Hi Dave,

    Thanks a lot for the detailed explanation. I had difficulty understanding the Rx contract regarding serialized onNext calls and now I understand that onNext calls for a single piple-line are serialized but for multiple pipelines (subscribing to the same source) they can occur concurrently. 

    Also the example program is misguided, since it uses a cold-observable which will create a new pipeline everytime it is subscribed to. 

    But I do understand to some extent why the managed thread id keeps changing; as you said rx operators can introduce asynchrony based on the default schedulers they use and that notifications can leave on different threads. 

    I will read more on this. 

    thanks again! 

    Friday, August 15, 2014 2:11 PM