Reducing frequency of updates from a stream to at most once per interval


  • We're looking for a way to reduce a stream's frequency of updates to a certain rate. We want it to publish as soon as possible but only once every interval. If multiple updates are received within the interval, the most recent should be published when the interval ends.

    We looked at Throttle and Sample, but these both don't quite seem to fit the bill. Throttle won't push at all while the frequency of updates is too high, and Sample will wait till the end of the sample period to publish even when the frequency of updates is slow.

    It feels like Window would be a useful tool to solve this problem. We had an initial stab:

    public static IObservable<T> ConstrainPublishingRate<T>(this IObservable<T> source, TimeSpan interval)
    	return source
    			.SelectMany(w => w.FirstAsync().Concat(w.LastAsync()).DistinctUntilChanged());

    This achieves some success, but it's not perfect. If you use it in the following way:


    The values pushed are: 0, 4, 5, 10, 11. The values published at the end of one window and the beginning of the next (eg 4,5 and 10,11) come in quick succession, as the constraint only holds back publishing within an individual window.

    Another option we came up with that fits the bill better is:

    public static IObservable<T> ConstrainPublishingRate<T>(this IObservable<T> source, TimeSpan interval)
    	var sampleInterval = Observable.Interval(interval).StartWith(-1);
    	return source
    			.CombineLatest(sampleInterval, (value, timer) => new { value, timer })
    			.DistinctUntilChanged(x => x.timer)
    			.Select(x => x.value);

    It's not perfect because the interval starts when the resulting stream is subscribed to, not when the first update from the source is received. This is probably not a major issue for our use case. I guess it would be possible to Publish() sampleInterval and Connect() it when the first update comes from source.

    Any comments on this approach? Is there a cleaner, more direct way to do this?

    Monday, June 24, 2013 1:29 AM

All replies

  • Hi, 

    Normally, your requirements are meant to alleviate pressure on an observer that's potentially taking a longer amount of time to process an individual notification than the time between notifications.  Is that correct in your case too?

    Choosing a static frequency seems a bit arbitrary to me.  Perhaps a better solution is to dynamically react to the changing latency of the observer.

    There are two common scenarios of which I'm aware:

    1. An observer can process a buffer in the same amount of time or more efficiently than it can process an individual notification; thus, buffering notifications improves throughput.
    2. An observer processes buffers in linear or worse time, thus dropping notifications is the only way to improve throughput.

    Either way, we can buffer values regardless and drop notifications if necessary.  The idea is to figure out how many values go into each buffer.

    You could hard-code a frequency using an overload of Buffer for example, entirely independent of the changing latencies of an observer.  This is essentially the same thing as your Window implementation, though perhaps instead of taking the first and last notification, just take the last (most recent).

    Alternatively, the size of each buffer can be calculated dynamically by reacting to an observer's capabilities.  That way you don't have to hard-code any frequency.

    Rxx offers an "Introspective" family of operators for reacting to an observer's latency.

    The first scenario is implemented by BufferIntrospective.*
    The second scenario is implemented by SampleIntrospective.  (This simply uses BufferIntrospective and takes only the last value of each buffer.)

    Here's a related lab:

    - Dave

    * As an optimization, WindowIntrospective should really be refactored into BufferIntrospective because it artificially creates windows from buffers, and the existing BufferIntrospective should be deleted.  I just haven't gotten around to it.

    • Edited by Dave Sexton Saturday, June 29, 2013 1:37 AM Forgot to end a sentence
    Saturday, June 29, 2013 1:35 AM
  • Hi Dave,

    Thanks for your answer. I've been meaning to have a good sit down with Rxx and learn about it - do you have any suggestions for the best starting place to reading about its capabilities at a high(ish) level?

    For this specific problem our issue isn't so much about trying to cope with performance of the observer. The problem is that the end subscriber of this chain of observables will publish some information over a shared network connection that has quite limited bandwidth.

    So ultimately we want to publish on the network as soon as possible, but if another notification appears within a certain timespan, we want to hold it back to the end of that timespan, and then publish only the latest update at that time.

    This will let us make a judgement call about how many updates per second we want and how long we can tolerate an update being delayed, while still having immediate notification in scenarios where load is low.

    Tuesday, July 02, 2013 11:12 PM