none
Subject and sharing subscriptions to server

    問題

  • Hi,

    I'm creating a Service class that handles trades in a financial application. Simple enough, I create a property with a Subject as a backing field, like this:

    public class TradeService
    {
        private readonly Subject<Trade> trades;
    
        public IObservable<Trade> Trades { get {return trades; } }
    }

    Next step is to have some kind of RefCount on the Trades, so that when the first subscription occurs, I want to send a subscribe command to the server. When the last unsubscribe (via .Dispose()) happens, an unsubscribe command should be sent to the server.

    One way to do this is to add a list of observers to the TradeService class, and have a helper method do the ref counting, like this:

    // backing field - a list of observers
    private readonly List<Observer<Trade>> tradeObservers;
    
    // modified property
            public IObservable<Trade> Trades
            {
                get
                {
                    return CreateTradeObservable(tradeObservers);
                }
            }
    // helper method that does the ref-counting
            private IObservable<T> CreateTradeObservable<T>(List<IObserver<T>> observerList)
            {
                return Observable.Create<T>(o =>
                {
                    observerList.Add(o);
                    if (observerList.Count == 1)
                    {
                        // send subscribe to server here
                    }
                    return () =>
                    {
                        observerList.Remove(o);
                        if (observerList.Count == 0)
                        {
                            // send unsubscribe to server here
                        }
                    };
                }).Publish().RefCount();
            }
    
    This works, but my question is: is there a simpler way to do it, without having to manage the list of observers myself? I feel I'm doing something that's already available in the Rx framework.

    2012年4月24日 上午 07:25

解答

  • Hi, 

    > I don't see how the Defer() call would enable the first side-effect (subscribe to server), neither the .Finally().
    > The documentation says that .Defer() will invoke the observableFactoryFunction whenever a new observer subscribes.

    That is correct, unless you use PublishDefer creates a cold observable and Publish makes it hot.  By publishing the observable, you're sharing subscription side-effects; i.e., if you were to call Subscribe on your published observable twice, then the code in Defer would only have executed once, when you called Connect.  If you use RefCount, then it calls Connect for you upon the first subscription.  So again, even with RefCount, the code in Defer will only execute once for one or more simultaneous subscriptions.

    > I wrote a small unit test to see how it behaves, and I get two calls to "subscribe to server" and two calls to "unsubscribe to server".

    Feel free to post your code.

    If you're using RefCount, then be sure not to dispose of the previous subscription before subscribing again; otherwise, the code in Defer will execute more than once, which I believe is the correct behavior according to your specification.

    Here's an example:

    using System;
    using System.Reactive.Linq;
    
    namespace Rxx.Labs.Reactive
    {
    	public sealed class DeferPublishLab : BaseConsoleLab
    	{
    		protected override void Main()
    		{
    			var published = Observable
    				.Defer(() =>
    				{
    					TraceLine("Defer Called");
    					return Observable.Never<int>();
    				})
    				.Finally(() => TraceLine("Finally Called"))
    				.Publish()
    				.RefCount();
    
    			var s1 = published.Subscribe();
    			var s2 = published.Subscribe();
    			var s3 = published.Subscribe();
    
    			TraceLine("Created Subscriptions");
    
    			s1.Dispose();
    			s2.Dispose();
    			s3.Dispose();
    
    			TraceLine("Disposed Subscriptions");
    
    			published.Subscribe();
    			published.Subscribe();
    			published.Subscribe();
    		}
    	}
    }

    Here's the output:

    Defer Called
    Created Subscriptions
    Finally Called
    Disposed Subscriptions
    Defer Called

    Notice that Defer and Finally are only called once for 3 subscriptions.  That's the behavior of the Publish operator.  When all of the subscriptions have been disposed, the next subscription starts the entire observable again.  That's the behavior of the RefCount operator.

    > the "return trades" should always happen

    Do you mean that it must execute a side-effect, or simply that all observables should share the same trade sequence?

    In your original example, you're using Subject<Trade> to represent trades, which implies that you want all observables to share the same trade sequence.  That would work fine if you simply return trades from within Defer.

    - Dave


    http://davesexton.com/blog

    • 已編輯 Dave Sexton 2012年4月25日 上午 08:19 Spelling
    • 已標示為解答 blooksa 2012年4月25日 上午 09:27
    2012年4月25日 上午 08:19

所有回覆

  • Hi,

    It seems that you're trying to take a hot observable and create a warm observable that is cold at first and then becomes hot upon the first subscription, and finally becomes cold again upon the last cancellation, with side-effects at two places.

    The first side-effect must occur upon the first subscription.  The second side-effect must occur upon the last cancellation.

    .Publish().RefCount() ensures that an underlying cold observable is made hot, thus it's only subscribed to one time for many observers.  But it only remains hot until all subscriptions are cancelled, at which point the underlying subscription is also cancelled and the observable becomes cold again.

    So to meet your requirements you'll first need to make a hot observable cold, to create a subscription side-effect, then apply an operator that executes when the subscription is cancelled, and finally apply .Publish().RefCount() to make this cold observable warm.

    The Defer operator makes a hot observable cold.

    The Finally operator executes a side-effect upon cancellation.

    Publish() with RefCount() makes a cold observable warm.

    Thus, your operator could be implemented as follows: (untested)

    public IObservable<Trade> Trades
    {
    	get
    	{
    		return Observable.Defer(() =>
    		{
    			// Subscribe to server
    
    			return trades;	// Hot or cold observable; can be a subject or, preferably, an observable query
    		})
    		.Finally(() =>
    		{
    			// Send unsubscribe to server
    			// This will be called upon cancellation and also source termination; e.g., OnCompleted and OnError.
    		})
    		.Publish()
    		.RefCount();
    	}
    }

    - Dave


    http://davesexton.com/blog

    • 已編輯 Dave Sexton 2012年4月24日 下午 12:37 Clarification
    2012年4月24日 下午 12:33
  • Thanks for taking the time to answer, and yes, your description of the functionality is correct. But I don't see how the Defer() call would enable the first side-effect (subscribe to server), neither the .Finally(). The documentation says that .Defer() will invoke the observableFactoryFunction whenever a new observer subscribes.

    I wrote a small unit test to see how it behaves, and I get two calls to "subscribe to server" and two calls to "unsubscribe to server".

    For one, I would like to do something like this (note: very pseudo code):

    return Observable.Defer(() =>
    		{
    			// Subscribe to server
    		})
     	        return trades;
    		.Finally(() =>
    		{
    			// Send unsubscribe to server
    		})
    		.Publish()
    		.RefCount();

    By that I mean that only the "subscribe to server" should happen once in the Defer(), and "unsubscribe to server" should happen once in Finally(), but the "return trades" should always happen.

    Or perhaps the issue has something to do with the .Publish().RefCount(), but I'm not sure. I'll do some more testing and thinking, and scratch my head to see if I can crack this one.
    2012年4月25日 上午 07:37
  • Hi, 

    > I don't see how the Defer() call would enable the first side-effect (subscribe to server), neither the .Finally().
    > The documentation says that .Defer() will invoke the observableFactoryFunction whenever a new observer subscribes.

    That is correct, unless you use PublishDefer creates a cold observable and Publish makes it hot.  By publishing the observable, you're sharing subscription side-effects; i.e., if you were to call Subscribe on your published observable twice, then the code in Defer would only have executed once, when you called Connect.  If you use RefCount, then it calls Connect for you upon the first subscription.  So again, even with RefCount, the code in Defer will only execute once for one or more simultaneous subscriptions.

    > I wrote a small unit test to see how it behaves, and I get two calls to "subscribe to server" and two calls to "unsubscribe to server".

    Feel free to post your code.

    If you're using RefCount, then be sure not to dispose of the previous subscription before subscribing again; otherwise, the code in Defer will execute more than once, which I believe is the correct behavior according to your specification.

    Here's an example:

    using System;
    using System.Reactive.Linq;
    
    namespace Rxx.Labs.Reactive
    {
    	public sealed class DeferPublishLab : BaseConsoleLab
    	{
    		protected override void Main()
    		{
    			var published = Observable
    				.Defer(() =>
    				{
    					TraceLine("Defer Called");
    					return Observable.Never<int>();
    				})
    				.Finally(() => TraceLine("Finally Called"))
    				.Publish()
    				.RefCount();
    
    			var s1 = published.Subscribe();
    			var s2 = published.Subscribe();
    			var s3 = published.Subscribe();
    
    			TraceLine("Created Subscriptions");
    
    			s1.Dispose();
    			s2.Dispose();
    			s3.Dispose();
    
    			TraceLine("Disposed Subscriptions");
    
    			published.Subscribe();
    			published.Subscribe();
    			published.Subscribe();
    		}
    	}
    }

    Here's the output:

    Defer Called
    Created Subscriptions
    Finally Called
    Disposed Subscriptions
    Defer Called

    Notice that Defer and Finally are only called once for 3 subscriptions.  That's the behavior of the Publish operator.  When all of the subscriptions have been disposed, the next subscription starts the entire observable again.  That's the behavior of the RefCount operator.

    > the "return trades" should always happen

    Do you mean that it must execute a side-effect, or simply that all observables should share the same trade sequence?

    In your original example, you're using Subject<Trade> to represent trades, which implies that you want all observables to share the same trade sequence.  That would work fine if you simply return trades from within Defer.

    - Dave


    http://davesexton.com/blog

    • 已編輯 Dave Sexton 2012年4月25日 上午 08:19 Spelling
    • 已標示為解答 blooksa 2012年4月25日 上午 09:27
    2012年4月25日 上午 08:19
  • Well, ignore my previous post. The code was ok (as per your suggestions, Dave), and my unit tests where fine. But the implementation of the IObservable<Trades> property always returned a new Observable.Defer(...) which meant I was getting a new one very time someone subscribed. The proper way to implement would be:

    IObservable<Trade> trades;
    
    public IObservable<Trade> Trades
    {
        get
        {
            if (trades == null)
            {
                trades = Observable.Defer(..)
            }
            return trades;
        }
    }
    
    Thanks for your help.

    2012年4月25日 上午 09:27