none
How to extend ReplaySubject TimeSpan buffer with latest message outside the TimeSpan.

    Question

  • Hello,

    I have a problem. I need to create an observable sequence which will provide messages from last few minutes (like a ReplaySubject(TimeSpan)) plus the latest one older than the specified TimeSpan.

    How can this be achieved with Rx?

    Best Regards,

      Hubert




    Thursday, October 17, 2013 2:30 PM

Answers

  • Hi Hubert,

    I must admit that this a strange request.  I can't recall ever seeing it before in this forum.

    To solve this problem, consider applying Scan to pair each value with its previous value, apply Replay over that, and then use the Publish overload with a selector parameter to create a cold observable, which creates a scope for each observer in which to extract the first previous value and concatenate the remaining sequence of replayed and live notifications.

    using System;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    
    namespace RxLabs.Net45
    {
    	class ReplayOldAndTimeLab
    	{
    		internal static void Main()
    		{
    			var xs = Observable.Interval(TimeSpan.FromSeconds(2));
    
    			var replay = xs
    				.Scan(
    					new { Prev = (long?)null, Cur = (long?)null },
    					(acc, cur) => new { Prev = acc.Cur, Cur = (long?)cur })
    				.Replay(TimeSpan.FromSeconds(5));
    
    			var query = replay.Publish(published =>
    				(from pair in published.Take(1)
    				 from value in pair.Prev.HasValue
    										 ? Observable.Return(pair.Prev.Value)
    																 .Concat(
    											 Observable.Return(pair.Cur.Value))
    										 : Observable.Return(pair.Cur.Value)
    				 select value)
    				 .Concat(
    					 from pair in published
    					 select pair.Cur.Value));
    
    			using (replay.Connect())
    			{
    				var subscription = new SerialDisposable();
    
    				do
    				{
    					subscription.Disposable = query.Subscribe(Console.WriteLine);
    				}
    				while (Console.ReadLine().Length == 0);
    			}
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

    Friday, October 18, 2013 5:34 PM

All replies

  • Hi Hubert,

    I must admit that this a strange request.  I can't recall ever seeing it before in this forum.

    To solve this problem, consider applying Scan to pair each value with its previous value, apply Replay over that, and then use the Publish overload with a selector parameter to create a cold observable, which creates a scope for each observer in which to extract the first previous value and concatenate the remaining sequence of replayed and live notifications.

    using System;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    
    namespace RxLabs.Net45
    {
    	class ReplayOldAndTimeLab
    	{
    		internal static void Main()
    		{
    			var xs = Observable.Interval(TimeSpan.FromSeconds(2));
    
    			var replay = xs
    				.Scan(
    					new { Prev = (long?)null, Cur = (long?)null },
    					(acc, cur) => new { Prev = acc.Cur, Cur = (long?)cur })
    				.Replay(TimeSpan.FromSeconds(5));
    
    			var query = replay.Publish(published =>
    				(from pair in published.Take(1)
    				 from value in pair.Prev.HasValue
    										 ? Observable.Return(pair.Prev.Value)
    																 .Concat(
    											 Observable.Return(pair.Cur.Value))
    										 : Observable.Return(pair.Cur.Value)
    				 select value)
    				 .Concat(
    					 from pair in published
    					 select pair.Cur.Value));
    
    			using (replay.Connect())
    			{
    				var subscription = new SerialDisposable();
    
    				do
    				{
    					subscription.Disposable = query.Subscribe(Console.WriteLine);
    				}
    				while (Console.ReadLine().Length == 0);
    			}
    		}
    	}
    }

    - Dave


    http://davesexton.com/blog

    Friday, October 18, 2013 5:34 PM
  • Hello Dave,


    Thanks for the solution. It works as required. It seems quite bit complex for me, but will be more clear after my Rx experience raise:)

    I have managed to solve this problem using a little bit different approach. The ReplaySubject provides almost complete solution but it lacks the extension I was seeking. The quick and dirty solution is to implement the required functionality inside the ReplaySubject itself:

    public ReplaySubjectEx(int bufferSize, TimeSpan window, bool extendWindow, IScheduler scheduler)

    then the only method which requires modification is:

    void Trim(TimeSpan now)
    {
        while (_queue.Count > _bufferSize)
            _queue.Dequeue();
    
        // This if statement has been added.
        if (_extendWindow)
        {
            TimeInterval<T>? extension = null;
    
            foreach (TimeInterval<T> item in _queue)
            {
                if (now.Subtract(item.Interval).CompareTo(_window) > 0)
                {
                    extension = item;
                }
                else
                {
                    break;
                }
            }
    
            if (extension.HasValue)
            {
                while (!_queue.Peek().Interval.Equals(extension.Value.Interval))
                {
                    _queue.Dequeue();
                }
            }
        }
        else
        {
            while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
                _queue.Dequeue();
        }
    }

    As I said this is a quick and dirty method which assumes that the messages in the queue are sorted and there are no duplicated Intervals. These assumptions are not true in general case, but everything is fine for our use case. And this is why we need this little feature:

    Part of our application (automated box test environment) listens to a changes of some state and stores them in a buffer (ReplaySubject). In some cases the test can take very long time so the ReplaySubject is created using time window declared by the test.

    Verification of the state can be performed by defining which state is expected and at what time (backward and forward verification time starting at verification invocation time). To improve memory usage the time used to create ReplayBuffer (declared by the test) is the maximum of the verification backward times in the test.

    In many cases the only required thing is to ensure that the one of the states within specified time (backward and forward verification time) had specific value. Even if there was no state change, in which case the ReplySubject could be empty. This is why we need to remember one more element to ensure that the ReplaySubject cannot be empty so we can say what was the value of the state across entire ReplaySubject time window.

    I hope I described our use case clearly. Maybe there are more convenient ways to achieve what we need, but I know no other approach than the one described.

    Best regards,

      Hubert

    Tuesday, October 22, 2013 10:53 AM