none
Request / Response pattern RRS feed

  • Question

  • Say I have a sequence 'source' that contains events that start to appear once I make a request on another channel. What is the best way to wrap this in an IObservable<T>, in particular ensuring there is no race-condition of dropped events?

     

    Observable.Create(observer =>
    {
        // react to response events
        source.Subscribe(observer);
    
        StartEvents();
    
        return () => StopEvents();
    });
    
    

     


    Is there a cleaner way, without subscribing into the observer, or is this best practice?


    Regards, Fil.
    • Edited by Fil Mackay Monday, September 12, 2011 8:17 AM
    Monday, September 12, 2011 8:16 AM

All replies

  • Hi Fil,

    > events that start to appear once I make a request on another channel.

    What exactly do you mean by "another channel"?  I can't tell for sure from your example code, but do you simply mean that the call to StartEvents causes source to start pushing values?

    > What is the best way to wrap this in an IObservable<T>, in particular ensuring there is no race-condition of dropped events?

    It seems like you're doing it correctly by subscribing to source before calling StartEvents.  That will eliminate the race condition.

    > Is there a cleaner way, without subscribing into the observer, or is this best practice?

    You don't want to subscribe the observer to the source?  Why not?

    If the problem is that you want to share the subscription side-effects with multiple observers before calling StartEvents, then simply use Publish.  This will allow you to call Subscribe many times and then finally call Connect, which will cause StartEvents to be called.

    var connectable = 
    	Observable.Create(observer =>
    	{
    		// react to response events
    		source.Subscribe(observer);
    
    		StartEvents();
    
    		return () => StopEvents();
    	})
    	.Publish();
    

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Monday, September 12, 2011 5:41 PM Formatting
    Monday, September 12, 2011 5:40 PM
  • Disregard the "another channel" reference..

    How would I subscribe the observer to the source, without risking race conditions?

    Observable.Defer(() =>
    {
        StartEvents();
        // items may appear here - and get missed!
        return source;
    });
    



    Regards, Fil.
    Monday, September 12, 2011 10:24 PM
  • Hi,

    Use PublishSubscribe and then Connect.

    - Dave


    http://davesexton.com/blog
    Monday, September 12, 2011 10:56 PM
  • Hi Fil,

    And use Create instead of Defer, just like your original example.

    - Dave


    http://davesexton.com/blog
    Monday, September 12, 2011 10:58 PM
  • Hi Fil,

    To be clear:

    Keep in mind that Defer is used to add side-effects to an observable.  In your example, you're adding unwanted side-effects in Defer.  So, simply don't do that :)

    Subscribe to source first, then call StartEvents.

    To wrap that in an observable sequence, use Create like you did.

    To share the subscription side-effects with multiple observers, use Publish.

    - Dave


    http://davesexton.com/blog
    Monday, September 12, 2011 11:02 PM
  • I might be on a slightly different track, but I've done a number of Request/Response interfaces using observables with this kind of signature:

    IObservable<Token> StoreGraphChanges(IObservable<IGraphChange> graphChanges, IScheduler scheduler);
    

    Now this lets me connect to an underlying data store (could be web services or a database) to then process the incoming observable, return corresponding response values on the outgoing observable, and then close the connection when complete.

    The implementation of these methods internally uses an extension method that asynchronously plays the results into a replayable observable so that I only cause the method "side-effect" only once. The signature of the extension method is:

    IObservable<T> ToAsyncReplayable<T>(this IObservable<T> @this, IScheduler scheduler)
    

    This kind of thing isn't exactly what your question is asking, but with Rx there are many ways to skin the cat, so I thought this might be worth putting forward.


    James C-S

    • Edited by James C-S Monday, September 12, 2011 11:45 PM Re-formatting again - someone @ ms please fix the firefox compat.
    Monday, September 12, 2011 11:43 PM
  • How would you use these as an alternative to the Observable.Create() code I originally posted?

    This is why I was interested in a ConnectableObservable.Create() where you can define Subscribe and Connect implementations directly. Or perhaps a Publish() override where you provide a Connect() delegate?

    source.Publish(() =>
    {
       StartEvents()
       return () => FinishEvents();
    });

     


    Regards, Fil.
    Monday, September 12, 2011 11:45 PM
  • Hi Fil,

    It's an interesting idea, although it would only save us from typing a few characters.  It's pretty simple now to use Create and Publish, as in my original example.

    Proposed solution:

    source.Publish(() =>
    {
    	StartEvents()
    
    	return StopEvents;
    });
    

    vs. Existing solution:

    Observable.Create(observer =>
    {
    	var subscription = source.Subscribe(observer);
    
    	StartEvents();
    
    	return () =>
    	{
    		subscription.Dispose();
    		StopEvents();
    	};
    })
    .Publish();
    

    - Dave


    http://davesexton.com/blog
    • Edited by Dave Sexton Tuesday, September 13, 2011 1:55 AM Fixed observable example to dispose of subscription
    Tuesday, September 13, 2011 1:50 AM
  • Hi Dave,

    Yes but the Publish() does not add anything to the original question; it simply allows the subscription to be shared. The original question was whether there was an alternative way to setup the observer and avoid race-conditions, I'll take it from this that there isn't..

     


    Regards, Fil.
    Tuesday, September 13, 2011 2:41 AM
  • Hi Fil,

    I figured that's what you actually wanted since you used Publish in your example.  Publish creates a connectable observable.  The fact that it also shares the subscription is just a bonus.

    If you only need a connectable observable to avoid the race condition with a single observer, then your original Create example is fine.

    - Dave


    http://davesexton.com/blog
    Tuesday, September 13, 2011 4:51 AM