none
Scheduler.TaskPool doesn't always schedule things on the task pool

    Question

  • Hi,

    I've just spent most of a day puzzling with some misbehaviour relating to scheduling. Here's a simplified version of what I was doing:

    Subject<Unit> requestStream = new Subject<Unit>();
    IObservable<ResultData> resultStream = requestStream.ObserveOn(Scheduler.TaskPool).Select(_ => FetchData());
    

    So the idea is that each time a Unit object gets pushed onto the requestStream subject, the fetch operation gets scheduled on a pool thread and the returned data appears on the resultStream. So far so good, and this worked fine when I tried it in a little test app. However, when I did this within a service object in our real codebase, FetchData got executed on the UI thread.

    The difference is, the service initialisation was itself being done in a TPL task. This means that there is a current TPL scheduler in effect (one that uses the UI thread, as it happens) and it turns out that despite its name, the Rx TaskPoolScheduler is does not specifically use the task pool. From MSDN (http://msdn.microsoft.com/en-us/library/system.reactive.concurrency.scheduler.taskpool(v=vs.103).aspx): "The TaskPool scheduler schedules actions to execute using the Task Factory from the Task Programming Library (TPL)."

    Using Reflector to see the implementation of TaskPoolScheduler.Schedule() it appears that it's using an override of TaskFactory.StartNew() that doesn't take a TaskScheduler parameter. This means that it will use the current TPL scheduler if there is one in effect from higher up the call chain. If fact, the situation in my test app where the code was working could really be regarded as an edge case, because StartNew is fully intended to use TaskScheduler.Current and the MSDN entry for that says "When not called from within a task, Current will return the Default scheduler". So really the thing only worked in the first place because of this fallback behaviour.

    To the Rx team: there are two ways this problem could be addressed in Rx. One would be to change to using an explicit scheduler in the call to StartNew() inside TaskPoolScheduler.Schedule(), while the other would be to replace the Scheduler.TaskPool instance with one that uses a non-default TaskFactory that has been given TaskScheduler.Default as its TPL scheduler. I don't know whether there are strong reasons for the existing behaviour, but it's unexpected and I now know of others who have been caught out by it.

    In the meantime, others affected by this problem can apply the latter technique in their own code - just declare your own Rx scheduler like this:

    TaskPoolScheduler TaskPoolSchedulerThatWorks = new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default));
    

     

    Use that in calls to ObserveOn() or whatever and the problem goes away.

    Kind regards to all,

    Mike

    • Edited by mike_nunan Friday, November 04, 2011 7:14 PM formatting
    Friday, November 04, 2011 7:13 PM

Answers

  • Thanks for bringing this to our attention; we'll investigate this behavior taking your feedback in consideration.
    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
    Friday, November 04, 2011 8:58 PM

All replies

  • Thanks for bringing this to our attention; we'll investigate this behavior taking your feedback in consideration.
    using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
    Friday, November 04, 2011 8:58 PM
  • Hi Bart,

    Very much appreciate the response, I'll be most interested in your further thoughts once you've had time to look into this. Meanwhile we have a viable workaround, so no big dramas.

    Cheers,

    Mike

    PS. Thanks for the Rx Workshop series on Ch#9 :)

    Monday, November 07, 2011 10:34 AM
  • Hi Bart,

    Great to see this mentioned among the fixes in the v1.1.11111 experimental release, thanks v much for that!

    Mike

    Tuesday, November 15, 2011 4:20 PM