none
Scan with async seed

    Question

  • Hi,

    I need a Scan with async seed, is there any more straightforward way of implementing it other then the following:

        public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source,
            Task<TAccumulate> seed,
            Func<TAccumulate, TSource, TAccumulate> accumulator)
        {
            if (source == null) throw new ArgumentNullException("source");
            if (seed == null) throw new ArgumentNullException("seed");
            if (accumulator == null) throw new ArgumentNullException("accumulator");
    
            return Observable.Create<TAccumulate>(observer =>
            {
                var gate = new object();
                var pendingUpdates = new List<TSource>();
                var snapshot = default(TAccumulate);
                var gotSnapshot = false;
    
                var updatesSubscription = Disposable.Empty;
                updatesSubscription = source.Subscribe(
                    update =>
                    {
                        lock (gate)
                        {
                            if (gotSnapshot)
                            {
                                try
                                {
                                    snapshot = accumulator(snapshot, update);
                                }
                                catch (Exception ex)
                                {
                                    updatesSubscription.Dispose();
                                    observer.OnError(ex);
                                    return;
                                }
                                observer.OnNext(snapshot);
                            }
                            else
                            {
                                pendingUpdates.Add(update);
                            }
                        }
                    },
                    ex =>
                    {
                        lock (gate)
                        {
                            updatesSubscription.Dispose();
                            observer.OnError(ex);
                        }
                    },
                    observer.OnCompleted);
    
                var cts = new CancellationDisposable();
                seed.ContinueWith(
                    s =>
                    {
                        lock (gate)
                        {
                            switch (s.Status)
                            {
                                case TaskStatus.RanToCompletion:
                                    snapshot = s.Result;
                                    gotSnapshot = true;
    
                                    observer.OnNext(snapshot);
                                    try
                                    {
                                        foreach (var update in pendingUpdates)
                                        {
                                            snapshot = accumulator(snapshot, update);
                                            observer.OnNext(snapshot);
                                        }
                                    }
                                    catch (Exception ex)
                                    {
                                        updatesSubscription.Dispose();
                                        observer.OnError(ex);
                                    }
    
                                    pendingUpdates.Clear();
                                    pendingUpdates = null;
                                    break;
                                case TaskStatus.Canceled:
                                    updatesSubscription.Dispose();
                                    break;
                                case TaskStatus.Faulted:
                                    updatesSubscription.Dispose();
                                    observer.OnError(s.Exception);
                                    break;
                            }
                        }
                    },
                    cts.Token);
    
                return new CompositeDisposable(cts, updatesSubscription);
            });
        }

    ?

    Thanks,

    Andrew


    • Edited by let rec Thursday, February 21, 2013 2:00 AM Fixed typos
    Thursday, February 21, 2013 1:59 AM

All replies

  • Do you need the two subscriptions to run in parallel? Or can you start your subscription to the source once you have the seed? If so then this could do the trick

    var output = from seedValue in seed.ToObservable()
      from agg in values.Scan(seedValue, MyAggregateFunction)
      select agg;

    If this is not what you are looking for (as you sample would indicate) then I will have to scratch my head a little bit on it.

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

    Friday, February 22, 2013 5:38 PM
  • I need to subscribe to updates first (in order to not miss any), issue a request for the seed and once I get the seed start applying the updates.

    P.S. Thank you for the book!

    Andrew


    • Edited by let rec Saturday, February 23, 2013 5:48 AM
    Saturday, February 23, 2013 5:20 AM
  • Hi Andrew,

    > I need to subscribe to updates first (in order to not miss any)

    Is the problem that seed has side-effects that cause source to start pushing notifications?

    Or is source a hot observable?  What guarantee is there that you haven't already missed notifications before subscribing?

    - Dave


    http://davesexton.com/blog

    Tuesday, February 26, 2013 2:22 PM
  • Hi Dave,

    It's OK to have missed notifications before subscribing, because the seed is supposed to have them applied.

    What I need to do is to distribute some mutable state across the network.

    The state is pretty large, so instead of broadcasting a new state once it gets mutated, I broadcast diffs to be applied to it's previous version.

    Andrew


    • Edited by let rec Tuesday, February 26, 2013 3:03 PM
    Tuesday, February 26, 2013 3:03 PM
  • Hi Andrew. I have been meaning to post my attempt at a solution for a while now, but haven't had time until this evening (have been travelling). I'm actually a little surprised that there hasn't been more code posted in reply. Anyway - I think I understand the requirement, so my contribution is below.

    Firstly, lets write a little code which returns our 'seed' value. Since we want this to be Async, and potentially 'long running', I am faking it by returning a Task<T> with a Thread.Sleep in it. We can convert this to an observable later on.

    Task<long> CreateAsyncSeedTask()
    {
    	return Task.Factory.StartNew(() =>
    	{
    		//Simulatle a slow process for returning a seed value of -1
    		Thread.Sleep(TimeSpan.FromSeconds(5));
    		//Return our seed value. I'm sure your code here would be more intelligent than 'return -1'.
    		return -1L;
    	});
    }

    Now, we are going to want to subscribe to this Task, as an observable stream, so we can use:

    var seedObservable = CreateAsyncSeedTask().ToObservable();

    We also need an observable stream of 'updates', which we want to kick off at the time we start our 'Seed' subscription. I'm just going to use an 'Observable.Interval' to mock these up.

    var producerObservable = Observable.Interval(TimeSpan.FromSeconds(1));

    So, we now want to subscribe to both of these observables, but 'buffer' the "producerObservable's" values until the 'seedObservable' has returned, as well as include the seedObservable value in the output. I have written a 'BufferUntil' extension method which allows us to buffer a stream, until a closing selector OnNext's its first value:

    public static IObservable<T> BufferUntil<T>(this IObservable<T> sourceToBuffer, IObservable<T> endBufferWindow)
    	{
    		return sourceToBuffer.Publish(pSource =>
    		{
    			return endBufferWindow.Publish(pEndBuffer =>
    			{
    				return pSource.Buffer(pEndBuffer)
    							.SelectMany(_ => _)
    							.Merge(pEndBuffer.IgnoreElements().Select(_ => default(T)).Concat(pSource));
    			});
    		});
    	}

    This takes a stream of T's, and buffers them until the 'endBufferWindow' produces a result. After 'endBufferWindow' produces its first element, the output stream is no longer buffered. So if the input stream produces '0, 1, 2, 3, 4', the output stream will buffer these (and output nothing). However, once the 'endBufferWindow' produces an item, the buffered cache of '0, 1, 2, 3 4' will be published out on the result stream of 'BufferUntil'. Any subsequent items on the source such as '5, 6, 7' will not be buffered, and will play straight through.

    We then simply need to publish our seed value, so we can share it with our selector stream, as well as the input into our 'BufferUntil' stream, as follows:

    var subscription = seedObservable.Publish(seedPublished =>
    		seedPublished.Merge(producerObservable.BufferUntil(seedPublished)));

    When I subscribe to this final query, the following happens:

    - My 'producerObservable' starts producing values immediately (though I don't get them played through to me yet. They are buffered).

    - My 'seedObservable' is subscribed to immediately (though it produces no value for 5 seconds, because that is how long it takes for my AsyncTask to complete).

    - When the AsyncTask completes (after 5 seconds), I immediately receive the result of the Async Task (-1). I also immediately get the 'buffered' values of 0, 1, 2, 3, 4 and 5.

    - I then continue to get the '6, 7, 8.... ' etc (updates), for as long as I run the application.

    Here is the final code to run, and the output I get:

    	var seedObservable = CreateAsyncSeedTask().ToObservable();
    	var producerObservable = Observable.Interval(TimeSpan.FromSeconds(1));
    	
    	Console.WriteLine("Subscribing at " + DateTime.Now.ToLongTimeString());
    	var subscription = seedObservable.Publish(seedPublished =>
    		seedPublished.Merge(producerObservable.BufferUntil(seedPublished)));
    	subscription.Timestamp().Subscribe(output => {
    		Console.WriteLine(string.Format("{0} : {1}", output.Timestamp, output.Value));
    	});

    Output.

    As you can see - nothing happens for the first 5 seconds, then I get my seed value and any updates which have been cached, followed by the continuing updates.

    Subscribing at 23:47:26
    01/03/2013 23:47:31 +00:00 : -1
    01/03/2013 23:47:31 +00:00 : 0
    01/03/2013 23:47:31 +00:00 : 1
    01/03/2013 23:47:31 +00:00 : 2
    01/03/2013 23:47:31 +00:00 : 3
    01/03/2013 23:47:32 +00:00 : 4
    01/03/2013 23:47:33 +00:00 : 5
    01/03/2013 23:47:34 +00:00 : 6
    01/03/2013 23:47:35 +00:00 : 7
    01/03/2013 23:47:36 +00:00 : 8
    01/03/2013 23:47:37 +00:00 : 9
    01/03/2013 23:47:38 +00:00 : 10

    If you need to go on to apply a scan, you can just add it to my final observable query there. It will then apply the Scan over the seed, and all the cached updates (once the seed has completed).

    However, if you are able to, it may make sense to apply a scan over your updates (without the seed), and then scan your accumulated updates into your seed (so you spread the work). If you want to do that, you could easily apply a scan as part of the 'buffer' operation, passing in a Scan Func to the 'BufferUntil', effectively writing your own 'ScanUntil'.

    With Rx providing 'Buffer' features out of the box, you should be able to make it work without resorting to your own 'pendingUpdates' cache.

    Hopefully I have given you enough to play around with to enhance your solution. The 'scan' part of it isn't really the tricky part - its the buffering logic.

    Hope that helps - but shout if you are still stuck.

    Howard



    • Edited by h_andr Saturday, March 02, 2013 12:07 AM
    Friday, March 01, 2013 11:53 PM
  • Hi Andrew,

    > It's OK to have missed notifications before subscribing, because the seed is supposed to have them applied.

    Ok, but I'm still a bit confused by this design...

    > What I need to do is to distribute some mutable state across the network.
    > The state is pretty large, so instead of broadcasting a new state once it gets mutated, I broadcast diffs [snip]

    I assume that the web service design requires you to make 2 separate requests:

    Diffs Service -> Sequence of mutated states
    Seed Service -> Current state (somewhere within Diffs)

    This seems like a poor design.  It puts way too much burden on the consumer, having to concatenate a seed and mutations without any ability to synchronize at the source.  The service could probably do the concatenation itself with relative ease, so perhaps it would be better if the Diffs service returned a snapshot followed by state changes.

    Assuming that the service design is not under your control, this leads me to another question: 

    How do you determine the position within the buffered diffs that the seed belongs?

    For example, imagine that you subscribe to Diffs and receive the following changes:

    +A
    +B
    -A
    +C

    So the states are as follows:

    +A  => A
    +B  => AB
    -A   => B
    +C  => BC

    While receiving those, you subscribe to Seed, which returns the following state:

    AB

    The Diffs buffer contains extra state changes that must be discarded.  You'll need some mechanism within the data itself to be able to identify the position at which Seed belongs.

    +A (Discard)
    +B (Discard)
    AB (Seed)
    -A
    +C

    Hence my point why it's a poor service design to begin with.  It's not that it's impossible to resolve, it's just that it places a burden on consumers that seems unnecessarily complex.

    Do I understood your requirements correctly?

    - Dave


    http://davesexton.com/blog

    Sunday, March 03, 2013 3:06 AM
  • Hi Dave,

    I agree that such a design is questionable, but my interest is more of theoretical nature - I'm exploring different approaches to the problem.

    With regards to your question: the seed has a version number associated with it and all of the updates too, so you just need to discard all updates which have version <= than version of the seed, so your understanding seems to be correct.

    Having all that said does the design still look not OK to you?

    Andrew

    Monday, March 04, 2013 2:32 AM
  • Hi Dave, Andrew,

    Here is what I can add to the "bad/good design" question. So, if state and updates are relatively equal in size than yes it would be better to get a sequence of mutated states going out of service. However if the state is much more larger than updates then it would be preferable to have the mutation logic on a client, otherwise it may overload the transport layer.

    Slava

    Monday, March 04, 2013 3:56 AM
  • Hi Andrew,

    > [snip] Having all that said does the design still look not OK to you?

    Thanks for the clarification, but yes the design still doesn't seem right to me.

    Look at the semantics of your Diff service.  It's a sequence of mutations, though mutations are useless without a base state (Seed).  It implies that all clients either already know the latest Seed or that they can get it somehow.

    First, let's assume that a client already has a Seed.  Is it the latest Seed?  I.e., can the client simply subscribe to Diffs now and apply all of the mutations as they arrive?  Seems unlikely, but perhaps it's possible.

    Now let's assume that a client doesn't have a Seed.  It would have to make a separate request to the Seed service; however, due to the asynchronous nature of network services, it could be outdated before the client even receives it; i.e., the Seed service may actually (probably?) return a value that isn't the latest.  So essentially the second case is the same as the first case.

    To avoid the problem of stale data in both cases, you're forcing the client to:

    1. Call the Diffs service first.
    2. Buffer Diffs temporarily.
    3. Call the Seed service second.
    4. Compare the data in the Seed to the buffered Diffs and determine which mutations must be dropped to match the Seed.
    5. Replay the Seed, followed by the buffered sequence of Diffs, skipping old mutations, and concatenate the live sequence of Diffs.

    So we've already established how the Diffs service is essentially useless without a call to the Seed service.  And now the ordering of these dependencies reveals the problem with the two-service model: The Diffs service depends upon a Seed, yet the Diffs service must be called before the Seed service.

    So why force clients to make two separate requests?  Instead, the Diffs service could send an initial snapshot before the mutations, which I'd assume is the most likely use case for the Diffs service.

    Now you may argue that the two-service model makes some optimizations possible.  For instance, if there's any chance that a cached Seed is actually the latest, which is perhaps unlikely, though possible, then it may avoid a separate call to the Seed service in some cases, meaning that it may potentially avoid transferring the snapshot altogether.  So how will a client know if its Seed is the latest?  Answer: A client must subscribe to the Diffs service first and compare its cached Seed to the incoming data to determine whether a subsequent call to the Seed service is necessary, while buffering just in case it is.  But how often will clients benefit from this kind of optimization?  I.e., how often would you expect a cached Seed to be relevant?

    Furthermore, to quantify whether it's a potentially useful optimization to have separate Diffs and Seed services, you must be able to estimate how many irrelevant mutations a client must buffer and eventually drop before the latest Seed is retrieved.  And how many bytes in total are these irrelevant mutations?  Compared to a single Seed transfer, is this optimization still worth it?

    Another potential optimization in the two-service model is to have the server buffer mutations and parameterize the Diffs service with a version number.  A client would call the Seed service first, followed by a call to the Diffs service with the Seed's version number.  The server would replay all mutations starting from the specified version.

    However, as you may have logically deduced from the latter optimization, it turns out that both of the aforementioned optimizations are possible in the single-service model too, so I actually don't see any advantage for the two-service model at all.

    For instance, you could parameterize the Snapshot+Diffs service so that clients can specify whether or not they want an initial snapshot.  A client with a cached Seed could call Diffs without requesting an initial snapshot and then compare the data to determine whether its cached Seed is outdated.  If it's outdated, then the client would terminate the connection and call Diffs again, requesting an initial snapshot.

    You could also parameterize the Snapshot+Diffs service with a version number and a client could simply ask whether it wants an initial snapshot along with the mutations for a specific version number.  If it has a cached Seed, it won't request a snapshot; otherwise, it will.

    Though I'd like to add that the first optimization (the option to specify a snapshot) seems like it won't actually be very useful in practice, but of course it really depends on the semantics of your data, whether clients can cache state and your actual service specs.  The latter optimization could be useful, and may render the first optimization unnecessary, though of course it increases the burden on the server since it would have to buffer historical mutations so that it could replay them based on a requested version number.

    Am I missing anything?

    - Dave


    http://davesexton.com/blog

    Monday, March 04, 2013 5:46 AM
  • Hi Dave,

    You described everything correctly, but you base your reasoning on assumptions I can't agree with:

    • the cached seed will be out of date most of the time
    • you treat the Diff service and the Seed service as 2 separate services

    I look at the differ and seeder as 2 methods of the same service, so having a dependency between them is OK (like MoveNext and Current in IEnumerator).

    I also assume the following:

    • Both streams (seed and updates) are being served by the same service using the same communication channel, which preserves ordering of messages, so there's a guarantee that if you've subscribed for updates first and for the seed later the seed would be not too old
    • The seed is large (object like a volsurface)
    • Updates are small (single anchor point) and rare
    • You need to distribute data to hundreds of clients
    • Clients need to keep the data for a long period of time

    Having all that said the signature should look like

        public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source,
            Func<Task<TAccumulate>> createSeed,
            Func<TAccumulate, TSource, TAccumulate> accumulator)
    
    so it would be possible to control when the seed is being requested for.

    Thanks a lot for your input, it's really helpful.

    Andrew

    Monday, March 04, 2013 9:00 AM
  • Hi Andrew,

    > you base your reasoning on assumptions I can't agree with:
    > the cached seed will be out of date most of the time

    Well, it was a fair assumption since you haven't provided any quantifications about your data.  So I take this to mean that you expect mutations to happen relatively infrequently, thus a retrieved Seed is most likely going to be up-to-date or only slightly outdated.  You seem to confirm this in your subsequent points.

    > you treat the Diff service and the Seed service as 2 separate services
    > I look at the differ and seeder as 2 methods of the same service, so having a dependency
    > between them is OK (like MoveNext and Current in IEnumerator).

    Is this just a semantic argument?  Does consuming them require two distinct asynchronous service requests or not?

    Assuming that they are in fact two separate services (or two "service methods" within the same "communication channel"), MoveNext and Current doesn't seem like a good analogy because Current depends on a side-effect of MoveNext, but you've implied previously and seem to have confirmed in your bullet points that Diffs does not depend on a side-effect of SeedDiffs is hot.  Essentially MoveNext and Current are synchronous, not asynchronous, unlike requests for a Seed and its Diffs.

    A client requires a Seed followed by its Diffs, yet having two different services means that an acquired Seed could be outdated before it's even used by the client, just like a cached Seed could also be outdated.  The two-service model doesn't accurately represent the problem.  It seems to unnecessarily introduce asynchrony between dependent "methods".

    What do you think the advantages are of requiring two asynchronous requests for dependent data?

    What do you think the disadvantages are of requiring only a single request with an optional snapshot?

    - Dave


    http://davesexton.com/blog

    Monday, March 04, 2013 3:43 PM
  • Hi Dave,

    I came up with 2 methods because I'm not sure how to combine them into one, taking into account that the seed and updates have different types. Would you combine them into a stream of pairs, where one component represents optional seed?

    Andrew

    Tuesday, March 05, 2013 6:39 AM
  • Hi Andrew,

    That's a good point.  You could use Either<TLeft, TRight>, though I'd probably create a specific type as you've described.

    - Dave


    http://davesexton.com/blog

    Tuesday, March 05, 2013 6:49 AM
  • Hi Howard,

    Your BufferUntil looks very cool, but I'm not sure if we are not to miss any updates here:

    var subscription = seedObservable.Publish(seedPublished =>
    		seedPublished.Merge(producerObservable.BufferUntil(seedPublished)));

    Doesn't it subscribe for the updates too late?

    Also there's an assumption that the seed and updates have the same type.

    This is how it can be fixed

            public static IObservable<TSource> BufferUntil<TSource, TBufferClosing>(this IObservable<TSource> source, IObservable<TBufferClosing> bufferBoundaries)
            {
                return source.Publish(sharedSource =>
                {
                    return bufferBoundaries.Publish(sharedBufferBoundaries =>
                    {
                        var bufferedUpdates = sharedSource
                            .Buffer(sharedBufferBoundaries)
                            .SelectMany(_ => _);
    
                        var updates = sharedBufferBoundaries
                            .IgnoreElements()
                            .Select(_ => default(TSource))
                            .Concat(sharedSource);
    
                        return bufferedUpdates
                            .Merge(updates);
                    });
                });
            }

    But in this case it's not going to work with Merge.

    Andrew

    Tuesday, March 05, 2013 7:57 AM
  • Yes, I agree it is better to do buffered.Merge(seed), as opposed to seed.Merge(buffered) - good spot. I wouldn't expect you to lose updates given that change. The first subscription to the published observable is the one which does the buffering. I did indeed assume TSource == TBufferClosing, but easy enough to rectify that, and as Dave has suggested, there are ways to merge the different types into a single stream to work with (though it isn't quite as pretty).

    Perhaps you might consider projecting (.Select()) your deltas into a (mostly 'empty') typeof(seed), so you can merge. I would typically keep types like these the same, but have nullable (optional) values for much of the content. I guess it comes down to the specifics of your model. I also agree with the other comments about trying to have this on a single stream (where each subscriber receives the full seed state as their first result) - this is what I do most of the time, and it guarantees a good state without having to worry about sequence numbers/replay requests etc.

    Sounds like you are almost there though. Hope I was able to help.

    Tuesday, March 05, 2013 11:08 PM