none
Buffering Streams without Using Subjects

    Question

  • Hey all,

    I've been trying to create an observable which streams a state-of-the-world (snapshot) from a repository, as a single pulse, followed by live updates from a separate feed. The catch is that the snapshot call is blocking, so the updates have to be buffered during that time.

    This is what I've come up with, a little simplified. The GetStream() method is the one I'm concerned with. I'm wondering whether there is a more elegant solution.

    private static readonly IConnectableObservable<long> _updateStream;
    
    public static Constructor()
    {
          _updateStream = GetDataFeed().Publish();
          _updateStream.Connect();
    }
    
    static void Main(string[] args)
    {
          _updateStream.Subscribe(Console.WriteLine);
          Console.ReadLine();
          GetStream().Subscribe(l => Console.WriteLine("Stream: " + l));
          Console.ReadLine();
    }
    
    public static IObservable<long> GetStream()
    {
          return Observable.Create<long>(observer =>
                {
                      var bufferedStream = new ReplaySubject<long>();
                      _updateStream.Subscribe(bufferedStream);
                      var data = GetSnapshot();
                      // This returns the ticks from GetSnapshot
                      //  followed by the buffered ticks from _updateStream
                      //  followed by any subsequent ticks from _updateStream
                      data.ToObservable().Concat(bufferedStream).Subscribe(observer);
    
                      return Disposable.Empty;
                });
    }
    
    private static IObservable<long> GetDataFeed()
    {
          var feed = Observable.Interval(TimeSpan.FromSeconds(1));
          return Observable.Create<long>(observer =>
          {
                feed.Subscribe(observer);
                return Disposable.Empty;
          });
    }

    I keep hearing that Subjects shouldn't be used, but I can't find a way of doing this without a ReplySubject. The Replay filter on a hot observable doesn't work because it replays everything, not just from the start of the subscription.

    I'm also concerned about race conditions. Is there a way to guarantee sequencing of some sort, should an earlier update be buffered before the snapshot?

    Thanks.

    -Will

    Friday, June 14, 2013 11:27 AM

All replies

  • Hi Will,

    > I keep hearing that Subjects shouldn't be used [snip]

    Generally you should only explicitly use subjects when you need event-like observables; i.e., when you must store an observable in a field and push notifications imperatively, similar to raising an event.  For example, BehaviorSubject<T> is useful as the backing field of a property because it can represent an observable sequence of property changed notifications while simultaneously storing the latest value.

    > The Replay filter on a hot observable doesn't work because it replays everything,
    > not just from the start of the subscription.

    That's only true if you immediately call Connect, but clearly that's not what you want.  Try the Replay overload that has a selector parameter instead.

    For example:

    using System;
    using System.Collections.Generic;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using System.Threading;
    
    namespace Rx.Labs
    {
    	class ConnectLab
    	{
    		static readonly IConnectableObservable<long> live = Observable
    			.Interval(TimeSpan.FromSeconds(.5d))
    			.Publish();
    
    		public static void Main()
    		{
    			using (live.Subscribe(Console.WriteLine))
    			using (GetStream()
    				.DelaySubscription(TimeSpan.FromSeconds(2))
    				.Subscribe(l => Console.WriteLine("Stream: " + l)))
    			using (live.Connect())
    			{
    				Console.ReadLine();
    			}
    
    			Console.ReadLine();
    		}
    
    		public static IObservable<long> GetStream()
    		{
    			return live.Replay(r => GetSnapshot().ToObservable().Concat(r));
    		}
    
    		public static IEnumerable<long> GetSnapshot()
    		{
    			for (var i = 1; i <= 5; i++)
    			{
    				yield return -i;
    
    				Thread.Sleep(TimeSpan.FromSeconds(.5d));
    			}
    		}
    	}
    }

    Output:

    0
    1
    2
    3
    Stream: -1
    4
    Stream: -2
    5
    Stream: -3
    6
    Stream: -4
    7
    Stream: -5
    8
    Stream: 3
    Stream: 4
    Stream: 5
    Stream: 6
    Stream: 7
    Stream: 8
    9
    Stream: 9
    10
    Stream: 10
    11
    Stream: 11
    12
    Stream: 12
    ...

    > I'm also concerned about race conditions [snip]

    If the sequences being concatenated are from the same source and you want to eliminate duplicate notifications or ensure that notifications aren't missed, that can be much more difficult to accomplish.

    You could use the Collect extension in Rxx if you were to ensure that your sources meet the operator's contracts (see the XML documentation).  Its usage could look something like this:

    public static IObservable<long> GetStream()
    {
    	return GetSnapshot()
    		.Collect(live.Select(v => CollectionModification.CreateAdd(v)))
    		.ExistingOrAddedOrReplacements();
    }

    Note that ExistingOrAddedOrReplacements is merely a convenience operator.  It's only available in Rxx 2.0, which hasn't been released to NuGet yet, so you could either build the source code yourself or copy the operator into your library.

    - Dave


    http://davesexton.com/blog

    Friday, June 14, 2013 6:42 PM
  • Hey,

    I just posted a lengthy reply but msdn seems to have hijacked it. It said something like "the post has to be verified". I'm hoping it's actually been cached!

    Thanks.

    -Will

    Monday, June 17, 2013 11:34 AM
  • Hi Will,

    I received an email notification containing your post!  Strange that it doesn't appear in the forum.

    Hey Dave,

    Thanks for the reply. I think I see what you're doing here but I'm unable to get it to work as I don't have a DelaySubscription() method. Is that an extension method you wrote which just Sleeps for input time?

    To give some context, the live updates are Tibco callbacks (ticking over the wire), whereas the snapshot is the result of a WCF call.

    As an example, take 'Orders': GetSnapshot should make a WCF call that returns, say, 100 orders, and meanwhile the Tibco callback would be firing notifications for orders changed, added or deleted. Those should be buffered and appended after the snapshot is delivered.

    The part I'm struggling to see in your solution is how to handle multiple subscribers. The goal of this class is to expose a GetStream() method which returns a snapshot of (say) orders, followed by buffered updates, followed by live updates. Many users could subscribe from many different places. Do I have to call Connect every time? My solution was to make a different ReplaySubject every time.

    Only at subscription should WCF be called and Tibco be subscribed to.

    I hope this makes sense. It might be your solution works perfectly, I just can't test it without the DelaySubscription (replacing it with a Sleep() causes the Replay to not return any buffered values).

    Many thanks again.

    -Will

    P.S.

    When I talk about race conditions, I mean race between WCF call and the buffered Tibco updates. I'm wondering whether the only way to solve this is to add sequencing to the orders.

    - Dave


    http://davesexton.com/blog

    Monday, June 17, 2013 3:51 PM
  • Hi Will,

    > I don't have a DelaySubscription() method

    Make sure that you're using the latest version of Rx.

    > the live updates are Tibco callbacks (ticking over the wire),
    > whereas the snapshot is the result of a WCF call

    Requiring two different service requests for a snapshot and the live stream complicates things for you because it places part of the burden of avoiding race conditions on the services and a lot of it on the client.  See this related discussion.

    For example, if you think about the live stream as an observable, the model that you want is the same as if the service itself started by streaming a snapshot followed immediately with the live stream.  That's the ideal approach, if possible.

    Otherwise, you can use Rxx's Collect operator as shown above, though it has some strict requirements for the inputs (see the documentation linked above), which in your case means that both services must behave properly.

    > As an example, take 'Orders': [snip]

    That's exactly what Rxx's Collect operator is designed to handle, though as mentioned it has some strict preconditions.

    > The part I'm struggling to see in your solution is how to handle multiple subscribers [snip]
    > Do I have to call Connect every time?

    Paste my example into Visual Studio and you'll see that the Replay overload I'm using returns IObservable<T>, not IConnectableObservable<T>.  It generates a cold observable.

    The Collect operator returns a subject, which is hot.  To make it cold you should wrap the entire query in Defer, though the result will no longer be a subject - it will be a normal IObservable<T>.

    - Dave


    http://davesexton.com/blog

    Monday, June 17, 2013 4:20 PM
  • Hi Will,

    > I'm wondering whether the only way to solve [race conditions] is to add sequencing to the orders.

    Rxx's Collect operator is designed to handle race conditions through a reconciliation process that uses the specified equality comparer, which defaults to EqualityComparer<T>.Default.  If you were to add sequence numbers to the items then you could pass in a custom equality comparer.

    - Dave


    http://davesexton.com/blog

    Monday, June 17, 2013 4:29 PM
  • Hey Dave,

    Thanks for picking up the email and composing such a thorough reply.

    The discussion you linked to is very interesting, and exactly the problem I'm facing. I agree with all the points you made about the difficulty of the two-service model, but like Andrew I don't see any alternative.

    I chose Tibco because it's pub-sub, light and broadcast. Everyone shares the same stream. If I were to publish the snapshot every time someone made a data request, I'd flood the network. Flipside, if I opened a separate stream for each user, the server side would go from firing one notification with each DB update to firing hundreds, and having to maintain some sort of state and/or heartbeat for each one.

    So I chose to go WCF for the snapshot request, because it gave me point-to-point communication, and was statically typed. I don't see any other way of doing it without using third-party middleware.

    The server unfortunately faces the same issue. Its job is to mirror another database, and keep an internal cache. It gets an initial snapshot via WCF and then listens to updates via MQ.

    I haven't had time to upgrade RX yet, and try Connect, but I'd be interested to hear your opinion about this reply to the same question on StackOverflow: http://stackoverflow.com/questions/17107924/rx-how-to-concat-a-snapshot-stream-and-an-update-stream

    Thanks again for the response. It's much appreciated.

    -Will
    Wednesday, June 19, 2013 5:13 PM
  • Hi Will,

    Thanks for the details.  I understand that in your situation you didn't define the Tibco service, so you can't adjust it to push an initial snapshot to new subscribers.  So it seems that you're stuck with a two-service model, unless you're willing to wrap the service with your snapshot service; i.e., clients won't use the Tibco service directly - they'll all subscribe to your WCF service instead.  Of course, the point of this would be two-fold:

    1. Clients are greatly simplified since they only require a single web request.  Buffering is no longer required either.
    2. The WCF service is capable of returning a snapshot followed by the live Tibco stream, without any race conditions, though it depends on how you define "snapshot".

    > I haven't had time to upgrade RX yet, and try Connect [snip]

    Collect is available in Rxx 1.3, which targets Rx 1.0 SP1.  There's no need to upgrade to Rx 2.0, if that's what you mean.

    > I'd be interested to hear your opinion about this reply [snip]

    The first code example is similar to mine except that it's more verbose.  I'm using the Replay overload with a selector parameter, which avoids having to call Connect on the output and having another disposable to keep around.  It does this by returning a cold observable rather than a hot observable.

    The second example follows the procedure that I stated in the linked discussion.  It's fine if you can't combine the services into a single stream, though it also requires consistent sequential version numbers in your messages, a permanent filter in addition to buffering, and it doesn't support collection semantics (add, remove, clear) as Collect does.

    Perhaps it's unlikely, but it doesn't avoid all possible race conditions either.  The snapshot request may return before the live stream request, which could cause a gap between the last element in the snapshot and the first element in the live stream.  This possibility is technically unavoidable with the two-service model, due to network inconsistencies.  You could try detecting gaps within your query, but it's not going to work unless you completely buffer the output up to the first element of the live stream, just to be sure there aren't any gaps, before pushing elements to observers.  Alternatively, you could insert an artificial delay between the live request and the snapshot request and hope for the best, or you could wait for the first element of the live request before requesting the snapshot, though it may cause a lot of overlap.

    - Dave


    http://davesexton.com/blog

    Friday, June 21, 2013 1:06 AM