Answered ReplayUntilSubscribe

  • יום שני 30 יולי 2012 13:03
     
      קוד כלול

    Hi, 

    I've written a small ReplayUntilSubscribe, any ideas how to turn it into an IConnectableObservable? Any other suggestions/anything I'm missing, or alternative solutions I've missed out there?

    public static IObservable<T> ReplayUntilSubscribe<T>(this IObservable<T> obs)
    {
        var lockToken = new object();
        var replay = new Queue<T>();
        var replaySubscription = obs.Subscribe(replay.Enqueue);
        return Observable.Create<T>(o =>
        {
            if (replay == null)
            {
                return obs.Subscribe(o);
            }
    
            lock (lockToken)
            {
                if (replay == null)
                {
                    return obs.Subscribe(o);
                }
    
                var response = replay.ToObservable().Merge(obs).Subscribe(o);
                replaySubscription.Dispose();
                replay = null;
                return response;
            }
        });
    }
    

    • נערך על-ידי znite יום שני 30 יולי 2012 13:25
    •  

כל התגובות

  • יום שלישי 31 יולי 2012 19:19
     
     

    Hi,

    I'm concerned that your operator's actual behavior is not your intended behavior.

    Is the following specification correct?

    1. Subscribe to the source immediately when the method is called.  (Usually the intention is to share subscription side-effects among all observers, but as you'll see later that's not the behavior of your operator.)
    2. Buffer data until the first observer subscribes.  (Though if the observable completes or throws an exception, that information is lost.)
    3. When the first observer subscribes, create another subscription to the source.  (This may cause subscription side-effects, again.)
    4. Merge the buffer with notifications from the new subscription.  (This may cause duplicate values; e.g., with a cold or replaying source.  Furthermore, it may cause old values to be observed after new values due to concurrency and Merge.  Together, these behaviors could cause an old value to be observed before and after a new value.)
    5. Dispose of the original subscription.
    6. When subsequent observers subscribe, create new subscriptions to the source.  (This may cause subscription side-effects, again.  Furthermore, it may cause values that you didn't intend to be replayed to be replayed; e.g., if the source is a ReplaySubject<T>)

    Of course, this operator may meet your particular requirements - it doesn't have to be a general-purpose operator.  But your question about IConnectableObservable<T> leads me to believe that your intention is for subscription side-effects to be shared.

    If you'd like it to be more robust, then consider the following specification instead:

    1. Subscribe to the source once and share the subscription side-effects for all observers.
    2. Buffer all notifications until the first observer subscribes.
    3. When the first observer subscribes, replay all buffered notifications and then concatenate any subsequent notifications from the source.
    4. When subsequent observers subscribe they will simply receive notifications directly from the source.

    Is the former specification accurate?

    - Dave


    http://davesexton.com/blog

  • יום שלישי 31 יולי 2012 19:54
     
     

    Yep, your more concise spec is definitely what I'm after - I've tweaked it slightly, but still mostly the same form and risking either duplicate or missing values depending on the ordering - any suggestions Dave? What do you mean by 'subscription side-effects'? 

  • יום שלישי 31 יולי 2012 22:38
     
     

    Hi,

    > What do you mean by 'subscription side-effects'?

    Here's a video about hot and cold observables:

    http://channel9.msdn.com/Blogs/J.Van.Gogh/Rx-API-in-depth-Hot-and-Cold-observables

    - Dave


    http://davesexton.com/blog

  • יום שלישי 31 יולי 2012 23:32
     
     תשובה קוד כלול

    Hi,

    After having reviewed your question more thoroughly, I must say that the requirements are quite strange.  I'd even consider not creating an operator and instead allowing callers to determine whether or not to use the Observable.Replay operator directly themselves.

    Regardless, here's an implementation that may work for you, though I haven't tested it.

    Note: Although it's easy to convert this operator into an IConnectableObservable<T> by simply applying the Publish operator to the observable that it returns, it defeats the purpose of the operator since it will wrap the observable with another hot observable.  As a result, all observers will see replayed values because subscription side-effects are shared; i.e., the custom operator will only receive a single subscription.

    public static IObservable<T> ReplayOnce_Hot<T>(IObservable<T> source, out IDisposable disconnect)
    {
    	var published = source.Publish();
    	var replay = published.Replay();
    
    	disconnect = new CompositeDisposable(
    		replay.Connect(),
    		published.Connect());
    
    	int first = 1;
    
    	return Observable.Create<T>(
    		observer =>
    			Interlocked.Exchange(ref first, 0) == 1
    			? replay.Subscribe(observer)
    			: published.Subscribe(observer));
    }

    - Dave


    http://davesexton.com/blog

    • סומן כתשובה על-ידי znite יום שלישי 04 ספטמבר 2012 16:51
    •  
  • יום שלישי 31 יולי 2012 23:38
     
     תשובה

    Hi,

    It's probably also worth noting that the replay buffer is unbounded in my example.  In other words, it will buffer the entire source sequence, even after the first observer unsubscribes.  You may want to consider using one of the overloads of Replay to pass in a limit such as a count or duration.

    Actually, I just realized that calling RefCount would help a bit, though it still may allow unnecessary buffering while the first sequence remains connected and has already joined the "live" source.

    Alternatively, you may want to fallback to some kind of stateful strategy similar to your original example.

    - Dave


    http://davesexton.com/blog

    • סומן כתשובה על-ידי znite יום שלישי 04 ספטמבר 2012 16:51
    •