Is there a way to reset an Observable.Window's windowClosingSelect function?

Unanswered Is there a way to reset an Observable.Window's windowClosingSelect function?

  • Thursday, March 07, 2013 11:08 PM
     
     
    My goal is to have an update handler be called no more than once per 5 seconds for a specific type of value ("Type2" objects in my case). The observable will be generating more than one value per 5 seconds but I want to ignore all of them that occur within 5 seconds of the last handled update.

    I asked this question here: Rx Throttle only if specific condition met

    and got good feedback. It led me to use Observable.Window to try to achieve my goal. I thought I had it working but it turns out that it can produce incorrect output if the first update comes right before a window closes (so the update is handled) and then once the next window opens, another update arrives and is also handled, when I don't want it to since it came within 5 seconds of the last handled update.

    Here is some code to demonstrate the issue, slightly modified from the code in the link:

    var source = new Subject<Thing>();    
    var feed = source.Publish().RefCount();

    var ofType1 = feed.Where(t => t.ActivationType == "Type1");
    var ofType2 = feed
        .Where(t => t.ActivationType == "Type2")
        .Window(() =>
                Observable.Timer(TimeSpan.FromSeconds(5))
                .Do(t => Console.WriteLine("\nTICK: " + DateTime.Now.ToString("hh:mm:ss:fff"))))
        .Select(x => x.Take(1))
        .Merge()
        .Do(t => Console.WriteLine("A new window opened " + DateTime.Now.ToString("hh:mm:ss:fff")));


    var query = ofType1.Merge(ofType2);        
    query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));    

    int msDelay = 3000;
    Task task = Task.Factory
        .StartNew(() => { Thread.Sleep(msDelay); })
        .ContinueWith((Task starter) =>
            {
                while (running)
                {
                    var thing = new Thing();  //Note that all Things are by default Type2
                    source.OnNext(thing);
                    Thread.Sleep(100);
                }
            }, TaskContinuationOptions.LongRunning);

    Console.ReadLine();
    So, the subscription is made and once the subscription is made, the Observable.Timer used in the Window begins. The while loop that is used to produce values does not begin until after a 3000 ms delay.

    The output looks like:

    A new window opened 03:48:03:725
    UPDATE: 1ac54fb3-f73d-4840-b4d8-95d4250ce65d 03:48:03:752

    TICK: 03:48:05:714
    A new window opened 03:48:05:754
    UPDATE: 12d36e53-010f-4ccd-b9f8-2951b085f88c 03:48:05:754

    TICK: 03:48:10:730
    A new window opened 03:48:10:755
    UPDATE: 25d84e72-94f9-4f50-83f4-14c1004c10fa 03:48:10:755

    TICK: 03:48:15:738
    A new window opened 03:48:15:755
    UPDATE: 5f32b7d5-196f-445c-bf25-5c362b2fd6f0 03:48:15:755

    TICK: 03:48:20:747
    A new window opened 03:48:20:756
    UPDATE: e3a3a30d-8031-41b5-b115-499dbe91aaf7 03:48:20:756

    TICK: 03:48:25:755
    A new window opened 03:48:25:756
    UPDATE: 239fb25b-5135-463b-bf7e-5728ffa07f5c 03:48:25:756
    As you can see, the first Type2 update comes in while a window is open, so it gets handled. Then, 2 secs later, the Window's timer ticks and a new window is opened. It immediately handles the next Type2 update, which I don't want it to do. After that it looks to work normally (one update every 5 secs as defined in the Window declaration).

    Is there a way or another method I can use to make sure that only one update per 5 seconds (or whatever timeframe I choose) is ever handled?  It seems like a common requirement but I just can't get it to work right.


All Replies

  • Friday, March 08, 2013 11:12 AM
     
      Has Code

    Hi,

    The overload of the Window operator that you're using opens the first window when you subscribe and then opens subsequent windows as soon as the previous window closes.  But it seems that your requirement is to open a window when observing the first notification and close it after 5 seconds, and then don't open another window right away; instead, you want to wait for the next notification before opening another window.

    Another way to look at the problem is that you want to project each value into a waiting period, taking the first value that arrives from the beginning of the sequence or after the previous waiting period has elapsed, and ignoring all values during the waiting periods.

    Here's a marble diagram of the problem showing 4 notifications with overlapping durations:

    (Edit: Please excuse the bad formatting.  The forum editor shows a fixed-width font, but apparently it's not used by browsers.)

        1      2       3     4
    |---o------o-------o-----o------>
        |      |__|____|_|   |___|__
        |_________| 5s |_________|
             5s             5s
        .              .
        .              .
    |---o--------------o------------>

    The result sequence must contain 1 because it's the first value at the beginning of the ofType2 sequence, and it must contain 3 because it's the first value to arrive after 1's waiting period has elapsed.  Notifications 2 and 4 must be dropped because they arrive within waiting periods for 1 and 3, respectively.

    Do I understand your requirements correctly?

    So it seems that Window is not the correct solution to your problem because you need to open data-based windows, not time-based windows.  In other words, it's the data within your sequence plus a constant duration that indicates the silent periods between values.

    There's a different overload of Window that allows you to specify an observable that indicates when windows are opened; however, this overload permits sliding windows, so you can't simply pass in the source sequence (ofType2).  It would need to be filtered so that notifications which are meant to be dropped don't open new windows for themselves.  Unfortunately, converting ofType2 into a sequence of window opening notifications is actually just a different way of stating the original problem.  If you can solve that, then you don't need the Window operator.  Furthermore, to define windows seems to require recursion because you can't prevent a value from opening a window while it's within the duration of a previous window without first having defined a query that could generate notifications about the previous window!

    In implementing a solution to this problem, I realized that it can be done using Scan in a fairly straightforward manner; however, the only catch is that OnCompleted is delayed until the last notification's duration elapses, even if that notification was dropped because it overlapped a previous waiting period.

    Here's my test:

    using System;
    using System.Reactive.Linq;
    using Microsoft.Reactive.Testing;
    
    namespace ConsoleApplication1
    {
    	struct Thing
    	{
    		public string ActivationType;
    		public int Id;
    	}
    
    	class Program : ReactiveTest
    	{
    		static void Main()
    		{
    			new Program().Run();
    		}
    
    		void Run()
    		{
    			const string type1 = "Type1";
    			const string type2 = "Type2";
    
    			var scheduler = new TestScheduler();
    
    			var source = scheduler.CreateColdObservable(
    				OnNext(TimeSpan.FromSeconds(1).Ticks, new Thing { ActivationType = type1, Id = 0 }),
    				OnNext(TimeSpan.FromSeconds(1.5).Ticks, new Thing { ActivationType = type1, Id = 1 }),
    				OnNext(TimeSpan.FromSeconds(2).Ticks, new Thing { ActivationType = type2, Id = 2 }),
    				OnNext(TimeSpan.FromSeconds(3).Ticks, new Thing { ActivationType = type1, Id = 3 }),
    				OnNext(TimeSpan.FromSeconds(3.5).Ticks, new Thing { ActivationType = type1, Id = 4 }),
    				OnNext(TimeSpan.FromSeconds(4).Ticks, new Thing { ActivationType = type2, Id = 5 }),
    				OnNext(TimeSpan.FromSeconds(5).Ticks, new Thing { ActivationType = type2, Id = 6 }),
    				OnNext(TimeSpan.FromSeconds(5.5).Ticks, new Thing { ActivationType = type2, Id = 7 }),
    				OnNext(TimeSpan.FromSeconds(6).Ticks, new Thing { ActivationType = type1, Id = 8 }),
    				OnNext(TimeSpan.FromSeconds(7.5).Ticks, new Thing { ActivationType = type2, Id = 9 }),
    				OnNext(TimeSpan.FromSeconds(8).Ticks, new Thing { ActivationType = type2, Id = 10 }),
    				OnNext(TimeSpan.FromSeconds(8.5).Ticks, new Thing { ActivationType = type1, Id = 11 }),
    				OnNext(TimeSpan.FromSeconds(9).Ticks, new Thing { ActivationType = type2, Id = 12 }),
    				OnNext(TimeSpan.FromSeconds(10).Ticks, new Thing { ActivationType = type2, Id = 13 }),
    				OnNext(TimeSpan.FromSeconds(10.5).Ticks, new Thing { ActivationType = type1, Id = 14 }),
    				OnNext(TimeSpan.FromSeconds(11).Ticks, new Thing { ActivationType = type2, Id = 15 }),
    				OnNext(TimeSpan.FromSeconds(11.5).Ticks, new Thing { ActivationType = type1, Id = 16 }),
    				OnNext(TimeSpan.FromSeconds(12).Ticks, new Thing { ActivationType = type1, Id = 17 }),
    				OnNext(TimeSpan.FromSeconds(12.2).Ticks, new Thing { ActivationType = type2, Id = 18 }),
    				OnNext(TimeSpan.FromSeconds(16).Ticks, new Thing { ActivationType = type2, Id = 19 }),
    				OnNext(TimeSpan.FromSeconds(16.5).Ticks, new Thing { ActivationType = type2, Id = 20 }),
    				OnCompleted<Thing>(TimeSpan.FromSeconds(17).Ticks));
    
    			var query = source.Publish(
    				published => published
    					.Where(t => t.ActivationType == "Type1")
    					.Merge(published
    						.Where(t => t.ActivationType == "Type2")
    						.Select((obj, index) => new { obj, index })
    						.Publish(ofType2Published =>
    							(from item in ofType2Published
    							 select new
    							 {
    								 Index = item.index,
    								 Value = item.obj,
    								 HasValue = true
    							 })
    							.Merge(
    								from item in ofType2Published
    								from __ in Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
    								select new
    								{
    									Index = item.index,
    									Value = default(Thing),
    									HasValue = false
    								})
    							.Scan(
    								new
    								{
    									Index = 0,
    									Value = default(Thing),
    									Waiting = false,
    									StartedWaiting = false
    								},
    								(acc, cur) => new
    								{
    									Index = acc.Waiting ? acc.Index : cur.Index,
    									Value = acc.Waiting ? acc.Value : cur.Value,
    									Waiting = cur.HasValue || (cur.Index == acc.Index ? false : acc.Waiting),
    									StartedWaiting = cur.HasValue && !acc.Waiting,
    								})
    							.Where(result => result.StartedWaiting)
    							.Select(result => result.Value))));
    
    			var observer = scheduler.Start(
    				() => query,
    				created: 0,
    				subscribed: 0,
    				disposed: TimeSpan.FromSeconds(100).Ticks);
    
    			foreach (var message in observer.Messages)
    			{
    				if (message.Value.Kind == System.Reactive.NotificationKind.OnNext)
    				{
    					Console.WriteLine(
    						"{0:0.00}: {1} {2}",
    						TimeSpan.FromTicks(message.Time).TotalSeconds,
    						message.Value.Value.ActivationType,
    						message.Value.Value.Id);
    				}
    				else
    				{
    					Console.WriteLine(message);
    				}
    			}
    
    			ReactiveAssert.AssertEqual(
    				observer.Messages,
    				OnNext(TimeSpan.FromSeconds(1).Ticks + 1, new Thing { ActivationType = type1, Id = 0 }),
    				OnNext(TimeSpan.FromSeconds(1.5).Ticks + 1, new Thing { ActivationType = type1, Id = 1 }),
    				OnNext(TimeSpan.FromSeconds(2).Ticks + 1, new Thing { ActivationType = type2, Id = 2 }),
    				OnNext(TimeSpan.FromSeconds(3).Ticks + 1, new Thing { ActivationType = type1, Id = 3 }),
    				OnNext(TimeSpan.FromSeconds(3.5).Ticks + 1, new Thing { ActivationType = type1, Id = 4 }),
    				OnNext(TimeSpan.FromSeconds(6).Ticks + 1, new Thing { ActivationType = type1, Id = 8 }),
    				OnNext(TimeSpan.FromSeconds(7.5).Ticks + 1, new Thing { ActivationType = type2, Id = 9 }),
    				OnNext(TimeSpan.FromSeconds(8.5).Ticks + 1, new Thing { ActivationType = type1, Id = 11 }),
    				OnNext(TimeSpan.FromSeconds(10.5).Ticks + 1, new Thing { ActivationType = type1, Id = 14 }),
    				OnNext(TimeSpan.FromSeconds(11.5).Ticks + 1, new Thing { ActivationType = type1, Id = 16 }),
    				OnNext(TimeSpan.FromSeconds(12).Ticks + 1, new Thing { ActivationType = type1, Id = 17 }),
    				OnNext(TimeSpan.FromSeconds(16).Ticks + 1, new Thing { ActivationType = type2, Id = 19 }),
    				OnCompleted<Thing>(TimeSpan.FromSeconds(21.5).Ticks + 1));
    
    			Console.WriteLine();
    			Console.WriteLine("Test passed");
    			Console.ReadLine();
    		}
    	}
    }

    Results:

    1.00: Type1 0
    1.50: Type1 1
    2.00: Type2 2
    3.00: Type1 3
    3.50: Type1 4
    6.00: Type1 8
    7.50: Type2 9
    8.50: Type1 11
    10.50: Type1 14
    11.50: Type1 16
    12.00: Type1 17
    16.00: Type2 19
    OnCompleted()@215000001
    
    Test passed

    - Dave


    http://davesexton.com/blog

    • Edited by Dave Sexton Friday, March 08, 2013 11:13 AM Note about bad formatting in forum
    •