none
About Rx Schedulers and Parallelism

    Question

  • Hi,

    We are using Rx to add concurrency to a process.
    The process consists on:

    1. Generate a set of chunks of information
    2. Measure each of them
    3. Store the measured chunks
    4. Split them into parts
    5. Store those parts.
    6. Notify each stored part

    Here is the code used to bind all those parts.

    var chunks = Observable.FromEventPattern<ChunksAvailableEventArgs>(AddHandler, RemoveHandler);
    var parts = Observable.FromEventPattern<PartAvailableEventArgs>(AddHandler, RemoveHandler);
    var splitPartsChunkSuscription = chunks.ObserveOn(scheduler).Select(Measure)
    .ObserveOn(scheduler).Select(Store)
    .ObserveOn(scheduler).Subscribe(Divide);
    var notifyPartsSuscription = parts.ObserveOn(scheduler).Select(Store)
    .ObserveOn(scheduler).Subscribe(Notify, EndWithError, EndWithSuccess);
    ...

    Measure/Store are functions of the form:

    EventPattern<ChunksAvailableEventArgs> fn(EventPattern<ChunksAvailableEventArgs> chunkEventArgs);
    EventPattern<PartAvailableEventArgs> fn(EventPattern<PartAvailableEventArgs> partEventArgs);

    that perform the operation and return the same event. The Divide function divides the chunks and generate Parts Available Events.

    The code works fine, but we want to ensure that the following invariant holds:

    • Chunkn starts to measure only after Chunk n-1 has been measured (can't be in parallel).

    The same type of restriction should hold for the other stages Store/Divide/Notify.

    Based on the tests we had done with different schedulers (Scheduler.ThreadPool, Scheduler.NewThread, Scheduler.TaskPool, Scheduler.CurrentThreadScheduler.Immediate) this invariant seems to be always satisfied, but we are not sure why (what about PFX?).

    Can someone explain why? 

    Thanks!

    Thursday, August 02, 2012 11:39 PM

Answers

  • Hi,

    Observers in a query with a single subscription always observe notifications in a serialized manner; i.e., you would need to create multiple subscriptions if you wanted to observe notifications in parallel.

    For example, some operators accept two or more observables as input, which implies that they make two or more subscriptions internally and thus may observe notifications concurrently; however, these operators always generate an observable that never pushes notifications concurrently.

    The Rx contract requires that notifications are pushed in a serialized manner to ensure thread-safety in observers (§§4.2, 6.8).  Perhaps you could also consider this contract to be part of the Rx grammar (§4.1).  See the Rx Design Guidelines document for details.

    Furthermore, Rx doesn't introduce concurrency in operators that don't need it.  For example, when you use Select to project data into another form, no concurrency is introduced.  The projection is executed synchronously on the original thread of the notification.  In your code, none of the operators introduce concurrency; therefore, you don't have to call ObserveOn between each operation.  You should only call it where you want to change the scheduling on which notifications are observed.  This is typically done as close to the end of the query as possible, typically right before calling Subscribe.  But if you want to be sure that the entire query processes data concurrently with respect to the source (in your case, an event) then you can begin the query with Observable.FromEventPattern(...).ObserveOn(scheduler) where scheduler is a scheduler that introduces concurrency, such as NewThreadScheduler, ThreadPoolScheduler or TaskPoolScheduler.

    You can read more about schedulers here:

    http://msdn.microsoft.com/en-us/library/hh242963(v=vs.103)

    Since you're using Select for side-effects only, without projecting the data into another form, it's more semantically appropriate to use the Do operator instead.  For example:

    var splitPartsChunkSuscription = chunks
    	.ObserveOn(scheduler)
    	.Do(Measure)
    	.Do(Store)
    	.Subscribe(Divide);

    However, the semantics of that query are the same as the following query:

    var splitPartsChunkSuscription = chunks
    	.ObserveOn(scheduler)
    	.Subscribe(result =>
    	{
    		Measure(result);
    		Store(result);
    		Divide(result);
    	});

    Of course, if you're building the query in a deferred manner then the former is better because it allows you to piece together the query with control flow.  Though if your algorithm is static, as you've shown, then placing all of the side-effects into Subscribe is perhaps clearer and easier to maintain (IMO).

    Notice how the latter query is now much closer to the original specification.  It's simply using ObserveOn to "add concurrency to the process", as was originally stated.

    - Dave


    http://davesexton.com/blog

    • Proposed as answer by Josef Bláha Friday, August 03, 2012 6:14 AM
    • Edited by Dave Sexton Friday, August 03, 2012 7:27 AM Fixed grammar error
    • Marked as answer by Ing. Javier Coitiño Saturday, August 04, 2012 4:16 AM
    Friday, August 03, 2012 2:31 AM

All replies

  • Hi,

    Observers in a query with a single subscription always observe notifications in a serialized manner; i.e., you would need to create multiple subscriptions if you wanted to observe notifications in parallel.

    For example, some operators accept two or more observables as input, which implies that they make two or more subscriptions internally and thus may observe notifications concurrently; however, these operators always generate an observable that never pushes notifications concurrently.

    The Rx contract requires that notifications are pushed in a serialized manner to ensure thread-safety in observers (§§4.2, 6.8).  Perhaps you could also consider this contract to be part of the Rx grammar (§4.1).  See the Rx Design Guidelines document for details.

    Furthermore, Rx doesn't introduce concurrency in operators that don't need it.  For example, when you use Select to project data into another form, no concurrency is introduced.  The projection is executed synchronously on the original thread of the notification.  In your code, none of the operators introduce concurrency; therefore, you don't have to call ObserveOn between each operation.  You should only call it where you want to change the scheduling on which notifications are observed.  This is typically done as close to the end of the query as possible, typically right before calling Subscribe.  But if you want to be sure that the entire query processes data concurrently with respect to the source (in your case, an event) then you can begin the query with Observable.FromEventPattern(...).ObserveOn(scheduler) where scheduler is a scheduler that introduces concurrency, such as NewThreadScheduler, ThreadPoolScheduler or TaskPoolScheduler.

    You can read more about schedulers here:

    http://msdn.microsoft.com/en-us/library/hh242963(v=vs.103)

    Since you're using Select for side-effects only, without projecting the data into another form, it's more semantically appropriate to use the Do operator instead.  For example:

    var splitPartsChunkSuscription = chunks
    	.ObserveOn(scheduler)
    	.Do(Measure)
    	.Do(Store)
    	.Subscribe(Divide);

    However, the semantics of that query are the same as the following query:

    var splitPartsChunkSuscription = chunks
    	.ObserveOn(scheduler)
    	.Subscribe(result =>
    	{
    		Measure(result);
    		Store(result);
    		Divide(result);
    	});

    Of course, if you're building the query in a deferred manner then the former is better because it allows you to piece together the query with control flow.  Though if your algorithm is static, as you've shown, then placing all of the side-effects into Subscribe is perhaps clearer and easier to maintain (IMO).

    Notice how the latter query is now much closer to the original specification.  It's simply using ObserveOn to "add concurrency to the process", as was originally stated.

    - Dave


    http://davesexton.com/blog

    • Proposed as answer by Josef Bláha Friday, August 03, 2012 6:14 AM
    • Edited by Dave Sexton Friday, August 03, 2012 7:27 AM Fixed grammar error
    • Marked as answer by Ing. Javier Coitiño Saturday, August 04, 2012 4:16 AM
    Friday, August 03, 2012 2:31 AM
  • Dave, thanks a lot!!!
    Saturday, August 04, 2012 4:26 AM
  • For further details into how/why the Schedulers do what they do, you may find this to be a good resource

    http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html


    Lee Campbell http://LeeCampbell.blogspot.com

    Tuesday, August 14, 2012 10:26 AM