ObserveOn(ThreadPool) Uses NewThread RRS feed

  • Question

  • Hi,

    The new long-running scheduler mechanism in Rx is causing an unintuitive side-effect, IMO.

    In the following query, would you expect observations to occur on pooled threads or not?

    using System;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
    using System.Threading;
    namespace RxLabs.Net45
    	internal static class Program
    		private static void Main()
    			var xs = Observable
    				.Range(0, 5)
    			using (xs.Subscribe(value =>
    				var thread = Thread.CurrentThread;
    				Console.WriteLine("{0}; Thread: {1}; Pooled: {2}", value, thread.ManagedThreadId, thread.IsThreadPoolThread);

    In fact, they're not.  Here's the output:

    0; Thread: 11; Pooled: False
    1; Thread: 11; Pooled: False
    2; Thread: 11; Pooled: False
    3; Thread: 11; Pooled: False
    4; Thread: 11; Pooled: False

    I assume that this behavior is intended.  It doesn't seem like a real problem to me - it was just unexpected.

    (Sorry if this was discussed already - I seem to remember it but I can't find any related threads.)

    - Dave

    Monday, August 20, 2012 7:11 PM


  • This behavior is by design. Both the TaskPool and CLR ThreadPool schedulers support long-running operations. The former does so through TaskCreationOptions.LongRunning (which really - in today's implementation of the TPL - amounts to creating a new thread); the latter calls into NewThread to achieve the same effect. What matters more than the specific type of scheduler used is the achieved effect. In this particular case, one expects asynchrony caused by introduction of additional concurrency.

    For the long-running case, there's a point of tension though. If we don't support it, a lot of the performance benefits in operators like Range, Repeat, ObserveOn, etc. don't have any effect, and we have to fall back to "recursive one iteration at a time" behavior which has a lot of overhead (in part due to closure and delegate allocations for each round through the loop). If we do support it, the question is how to do so. One way is to run a long-running piece of work on the thread pool. If a lot such - potentially infinite - work items are queued, this effectively will overload the thread pool (a phenomenon called "oversubscription"), requiring it to grow the amount of threads or delay work until a slot opens up. The TPL has a LongRunning hint precisely for this scenario, to bypass the thread/task pool altogether.

    Notice we can't even predict reliably whether work is going to be long-running: all we know (statically, during development of Rx operators) is we'd really like to run a tight loop to dispatch work, but how many messages will be involved is unknown in a lot of cases (e.g. ObserveOn could be applied to a zero-element sequence as well as an infinite sequence, a property we can't determine in the general case). Because event streams tend to be long in a lot of scenarios, we have a bias for this case and optimistically go down the ISchedulerLongRunning route if we have more than a constant amount of dispatch calls to make.

    Back to the design of the CLR ThreadPool scheduler implementation in Rx v2.0, we decided to go with a design that closely resembles what the TPL's LongRunning option achieves as well. As such, ambient properties such as IsThreadPoolThread aren't reliable when the long-running case is invoked (the same holds for the TPL: when creating a task with or without LongRunning, this property will have a different outcome).

    In case you really want to have a thread pool thread under any circumstance (e.g. to enforce a global maximum degree of parallelism for your app - though keep in mind things like TaskCreationOptions.LongRunning are wildcards to bypass this mechanism), you'll have to apply the DisableOptimizations extension method to the ThreadPoolScheduler instance to make it fall back to recursive behavior. Notice you can pass in the interfaces you want to disable (specifying none means disabling all optimizations), in this case typeof(ISchedulerLongRunning) suffices.

    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }

    Tuesday, August 21, 2012 6:54 AM