none
How do I ignore all-except-the-latest value when my Subscribe method is running?

    Question

  • Using Rx, I want to ignore messages coming from my event stream that occur while my Subscribe method is running. I.e. it sometimes takes me longer to process a message than the time between message, so I want to drop the messages I don't have time to process - but still process the newest one so I always have the most up-to-date result.

    That is, when my Subscribe method completes, if any messages did come through I want to process the last one. So I always process the most recent message.

    So, if I have some code which does:

    messages.OnNext(100);
    messages.OnNext(1);
    messages.OnNext(2);


    and if we assume the '100' takes a long time to process. Then I want the '2' to be processed when the '100' completes. The '1' should be ignored because it was superseded by the '2' while the '100' was still being processed.

    Here's an example of the result I want using a background task and Latest()

    var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
    
    Task.Factory.StartNew(() =>
    {
        foreach(var n in messages.Latest())
        {
            Thread.Sleep(TimeSpan.FromMilliseconds(250));
            Console.WriteLine(n);
        }
    });

    However, Latest() is a blocking call and I'd prefer not to have a thread sitting waiting for the next value like this (there will sometimes be very long gaps between messages).

    I can also get the result I want by using a BroadcastBlock from TPL Dataflow, like this:

    var buffer = new BroadcastBlock<long>(n => n);
    Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
    
    buffer.AsObservable()
        .Subscribe(n =>
        {
            Thread.Sleep(TimeSpan.FromMilliseconds(250));
            Console.WriteLine(n);
        });

    But I'd prefer to use Rx for all of it, so I can test it properly. At the moment, my tests contain Thread.Sleep calls because Dataflow doesn't have anything like IScheduler that I can fake.

    Wednesday, April 17, 2013 1:53 PM

All replies

  • Ahhh, the mythical ObserveLatestOn(IScheduler) extension method.

    I have had this requirement before. Specifically for displaying prices on the UI. If for some reason the dispatcher got busy/flooded, I dont want it to show an old/dirty price, it should show me the latest. We wrote this method on another project by decompiling ObserveOn(IScheduler) and replacing the internal queue<T> with a simple backing value of T and potentially a flag to say it was unread.

    public static IObservable<TSource> ObserveLatestOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)
    {
    	if (source == null) throw new ArgumentNullException("source");
    	if (scheduler == null) throw new ArgumentNullException("scheduler");
    	
    	return Observable.CreateWithDisposable<TSource>(observer =>
    	{
    			//var q = new Queue<Notification<TSource>>();
    			//var q = new Stack<Notification<TSource>>();
    			Notification<TSource> q = null;
    			var gate = new object();
    			bool active = false;
    			var cancelable = new MutableDisposable();
    			var disposable = source.Materialize().Subscribe(delegate(Notification<TSource> n)
    					{
    							bool flag;
    							lock (gate)
    							{
    									flag = !active;
    									active = true;
    									//q.Enqueue(n);
    									//q.Push(n);
    									q = n;
    							}
    
    
    							if (flag)
    							{
    									cancelable.Replace(scheduler.Schedule(self =>
    									{
    											Notification<TSource> notification = null;
    											lock (gate)
    											{
    													//notification = q.Dequeue();
    													notification = q;
    													q = null;
    													//notification = q.Pop();
    													//q.Clear();
    											}
    											notification.Accept(observer);
    											bool flag2 = false;
    											lock (gate)
    											{
    													flag2 = active = (q != null);
    											}
    											if (flag2)
    											{
    													self();
    											}
    									}));
    							}
    					});
    			return new CompositeDisposable(new[] { disposable, cancelable });
    	});
    }

    This was the old code, however it is clearly using a very old version Rx, I am guessing circa 2010/2011.

    Try moding this code, and I will work on a Rx2 appropriate version.

    Lee


    Lee Campbell http://LeeCampbell.blogspot.com

    • Proposed as answer by LeeCampbell Wednesday, April 17, 2013 6:11 PM
    Wednesday, April 17, 2013 3:00 PM
  • Thanks, with a few small tweeks this seems to work fine in Rx2 and I can remove all the Thread.Sleep nonsense form my tests.

    The code I'm using is

    public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
    {
    	return Observable.Create<T>(observer =>
    	{
    		Notification<T> outsideNotification = null;
    		var gate = new object();
    		bool active = false;
    		var cancelable = new MultipleAssignmentDisposable();
    		var disposable = source.Materialize().Subscribe(thisNotification =>
    		{
    			bool wasNotAlreadyActive;
    			lock (gate)
    			{
    				wasNotAlreadyActive = !active;
    				active = true;
    				outsideNotification = thisNotification;
    			}
    
    			if (wasNotAlreadyActive)
    			{
    				cancelable.Disposable = scheduler.Schedule(self =>
    				{
    					Notification<T> localNotification = null;
    					lock (gate)
    					{
    						localNotification = outsideNotification;
    						outsideNotification = null;
    					}
    					localNotification.Accept(observer);
    					bool hasPendingNotification = false;
    					lock (gate)
    					{
    						hasPendingNotification = active = (outsideNotification != null);
    					}
    					if (hasPendingNotification)
    					{
    						self();
    					}
    				});
    			}
    		});
    		return new CompositeDisposable(disposable, cancelable);
    	});
    }

    • Proposed as answer by LeeCampbell Wednesday, April 17, 2013 6:11 PM
    Wednesday, April 17, 2013 5:36 PM
  • If you are happy, can you mark one of these as the answer so it shows the right icon on the main page :-)

    Lee Campbell http://LeeCampbell.blogspot.com

    Monday, April 22, 2013 4:27 PM
  • I came up with an improvement to Lee's version that simplifies the code a bit and increases efficiency by using CAS locks:

    public static IObservable<TSource> ObserveLatestOn<TSource>(
        this IObservable<TSource> source,
        IScheduler scheduler)
    {
        return Observable.Create<TSource>(observer =>
            {
                Notification<TSource> pendingNotification = null;
                var cancelable = new MultipleAssignmentDisposable();
                var sourceSubscription = source.Materialize().Subscribe(notification =>
                        {
                            var previousNotification = Interlocked.Exchange(ref pendingNotification, notification);
                            if (previousNotification == null)
                            {
                                cancelable.Disposable = scheduler.Schedule(() =>
                                    {
                                        var notificationToSend = Interlocked.Exchange(ref pendingNotification, null);
                                        notificationToSend.Accept(observer);
                                    });
                            }
                        });
                return new CompositeDisposable(sourceSubscription, cancelable);
            });
    }

    Adding a bit of explanation:

    The key idea here is that we keep track of the notification to be pushed on to the target scheduler in pendingNotification - and whenever an event is received, we swap the pendingNotification for the new notification. We ensure the new notification will be scheduled for dispatch on the target scheduler - but we may not need to do this...

    If the previousNotification is null we know that either (a) there was no previous notification as this is the first one or (b) the previousNotification was already dispatched. How to we know this? Because in the scheduler action that does the dispatch we swap the pendingNotification for null! So if previousNotification is null, we know we must schedule a new dispatch action.

    This approach keeps the checks, locks and scheduled actions to a minimum.

    Added  a post on this with a diagram here: http://www.zerobugbuild.com/?p=192
    • Proposed as answer by James World Thursday, May 9, 2013 8:23 PM
    • Edited by James World Friday, May 10, 2013 2:00 PM Added link to blog post.
    Thursday, May 9, 2013 8:10 PM
  • Personally I like James's implementation better.

    Because: 

    1. It is more elegant
    2. More importantly, it proves that you don't need to be a 6ft4, ex-bouncer, who jogs faster than I sprint, to grok RX.
    Friday, May 10, 2013 11:04 AM
  • James, Toddler,

    I am seeing problems in your implementation of ObserveLatestOn. I agree it looks a little neater. However, if you use the test code below and try the two differing implementations of ObserveLatestOn in this thread, you get different results. The code using CAS pumps out every notification, the implementation from Lee and Wilka pumps out every 20th notification.

    Here's the repro code:

    Observable.Interval(TimeSpan.FromMilliseconds(500))
    .ObserveLatestOn(NewThreadScheduler.Default)
    	.Subscribe(i =>
    	{
    		Console.WriteLine("{0} on thread {1}", i, Thread.CurrentThread.ManagedThreadId);
    		Thread.Sleep(TimeSpan.FromSeconds(10));
    	});


    Output from Lee/Wilka's implementation:

    0 on thread 12
    20 on thread 12
    40 on thread 12
    60 on thread 12


    Output from the CAS version:

    0 on thread 12
    1 on thread 14
    2 on thread 15
    3 on thread 16
    4 on thread 17
    5 on thread 18
    6 on thread 19
    7 on thread 20
    8 on thread 21
    9 on thread 22
    10 on thread 23

    The problem is that the scheduled action doesn't wait until the notification is Accepted before resetting the pendingNotification to null. When you have a slow observer, Accept will take longer to run than the time for the next notification to come along. When another notification arrives, pendingNotification is already null and a new action is scheduled.

    If you use this altered implementation of the scheduled action, it works, although I haven't put much testing or thought into it:

    cancelable.Disposable = scheduler.Schedule(() =>
    {
    	//var notificationToSend = Interlocked.Exchange(ref pendingNotification, null);
    	var notificationToSend = pendingNotification;
    	notificationToSend.Accept(observer);
    	Interlocked.Exchange(ref pendingNotification, null);
    });

    In the test case, the notifications received are slightly different, presumably due to race conditions between the two timers:

    0 on thread 12
    21 on thread 14
    42 on thread 15
    63 on thread 16

    Notice that the CAS version processes subsequent notifications on different threads each time, whereas Lee/Wilka's version is consistent on the thread. I assume this is to do with the calling of self() instead of re-scheduling.

    Have you encountered these problems with your implementation?

    Clarification: Lee/Wilka's version continues execution on the same thread in the case where a new notification is available by the time the previous notification is Accepted. Otherwise all bets are off.

    • Edited by Niall Connaughton Tuesday, July 2, 2013 7:55 AM More info on thread re-use
    Tuesday, July 2, 2013 7:23 AM
  • Hi,

    Rxx provides an "Introspective family" of operators that satisfies Rx's contracts.

    The SampleIntrospective operator should meet your needs, though you'll only get the expected results if you pass in a concurrent scheduler; e.g., if the source pushes notifications on pooled threads, then you can pass in DispatcherScheduler.  If the source pushes notifications on the UI thread, then you can pass in ThreadPoolSchedulerNewThreadScheduler, TaskPoolScheduler, etc.

    (As an optimization, I plan to change and rename WindowIntrospective to BufferIntrospective and drop the existing BufferIntrospective implementation since the current windowing implementation artificially creates windows from buffers.)

    Related lab: https://rxx.codeplex.com/SourceControl/latest#Main/Testing/Rxx.Labs/Reactive/SelfObservingLab.cs

    Unit test: https://rxx.codeplex.com/SourceControl/latest#Main/Testing/Rxx.UnitTests/Reactive/IntrospectiveTests.cs

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Tuesday, July 2, 2013 9:12 AM Clarification
    Tuesday, July 2, 2013 9:11 AM
  • Nice spot Niall!

    The unit tests I had only accounted for long gaps between time-slices on the consuming scheduler - as you might get with the Dispatcher (which was my scenario) - rather than a slow consumption of individual events.

    I don't see the thread you are called on as so much of an issue (obviously with the DispatcherScheduler you don't see that behaviour anyway) - I'll have to do some testing to see if there are significant performance implications.

    I'll have a think about the fix too - I'm on a train at the moment :) I *think* The difference in behaviour comes from the fact that Lee's implementation will reschedule immediately if there were any notifications during Accept, whereas with the fix any pending notifications arriving during Accept are dropped by the exchange. I think both approaches are valid:

    It will come down to what kind of breathing room you want/need which approach is more appropriate - similar to the kind of choice you make with reader/writer locks when you need to avoid write-starvation.

    I'll have a think and see if I can find a way to keep CAS neat and make the choice configurable.

    James.


    James World / http://www.zerobugbuild.com

    Wednesday, July 3, 2013 7:04 PM
  • Hi James,

    The problem with the CAS implementation is that it violates one of Rx's contracts that requires notifications to be pushed serially.  See §4.2 in the Rx Design Guidelines document.  It may cause threading bugs when defining a query with existing Rx operators.

    I don't think you're going to be able to use CAS here.  You'll either have to use explicit locking or a queue.  ConcurrentQueue<T> could be used, though I believe that it's not totally CAS - it actually uses locks in some places as well.

    (EDIT) Lee's implementation (translated by Wilka to Rx 2.0) uses Materialize but it's not necessary.  Also, the Rx team eventually decided to avoid using the Materialize operator themselves in general because it unnecessarily puts pressure on the GC in high-velocity sequences due to the creation of many ephemeral objects; i.e., if it's simple to avoid using it, then it should be avoided when implementing operators in general.

    - Dave


    http://davesexton.com/blog


    • Edited by Dave Sexton Thursday, July 4, 2013 12:26 AM Removed bad advice
    Thursday, July 4, 2013 12:23 AM
  • Good day !
    Probably there is a mistake with using MultipleAssignmentDisposable class. I think that you supposed to dispose previous subscription when cancelable.Disposable is assigned. But probably the SerialDisposable should be used instead.
    Sunday, October 19, 2014 11:31 AM
  • Yes this does seem to make sense. If I can, I will have a play/test and see if this is correct.

    Lee Campbell http://LeeCampbell.blogspot.com

    Monday, October 20, 2014 3:14 PM