Is there a way to reset an Observable.Window's windowClosingSelect function?
-
Thursday, March 07, 2013 11:08 PMMy goal is to have an update handler be called no more than once per 5 seconds for a specific type of value ("Type2" objects in my case). The observable will be generating more than one value per 5 seconds but I want to ignore all of them that occur within 5 seconds of the last handled update.
I asked this question here: Rx Throttle only if specific condition met
and got good feedback. It led me to use Observable.Window to try to achieve my goal. I thought I had it working but it turns out that it can produce incorrect output if the first update comes right before a window closes (so the update is handled) and then once the next window opens, another update arrives and is also handled, when I don't want it to since it came within 5 seconds of the last handled update.
Here is some code to demonstrate the issue, slightly modified from the code in the link:
var source = new Subject<Thing>();
var feed = source.Publish().RefCount();
var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
.Where(t => t.ActivationType == "Type2")
.Window(() =>
Observable.Timer(TimeSpan.FromSeconds(5))
.Do(t => Console.WriteLine("\nTICK: " + DateTime.Now.ToString("hh:mm:ss:fff"))))
.Select(x => x.Take(1))
.Merge()
.Do(t => Console.WriteLine("A new window opened " + DateTime.Now.ToString("hh:mm:ss:fff")));
var query = ofType1.Merge(ofType2);
query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));
int msDelay = 3000;
Task task = Task.Factory
.StartNew(() => { Thread.Sleep(msDelay); })
.ContinueWith((Task starter) =>
{
while (running)
{
var thing = new Thing(); //Note that all Things are by default Type2
source.OnNext(thing);
Thread.Sleep(100);
}
}, TaskContinuationOptions.LongRunning);
Console.ReadLine();
So, the subscription is made and once the subscription is made, the Observable.Timer used in the Window begins. The while loop that is used to produce values does not begin until after a 3000 ms delay.
The output looks like:
A new window opened 03:48:03:725
UPDATE: 1ac54fb3-f73d-4840-b4d8-95d4250ce65d 03:48:03:752
TICK: 03:48:05:714
A new window opened 03:48:05:754
UPDATE: 12d36e53-010f-4ccd-b9f8-2951b085f88c 03:48:05:754
TICK: 03:48:10:730
A new window opened 03:48:10:755
UPDATE: 25d84e72-94f9-4f50-83f4-14c1004c10fa 03:48:10:755
TICK: 03:48:15:738
A new window opened 03:48:15:755
UPDATE: 5f32b7d5-196f-445c-bf25-5c362b2fd6f0 03:48:15:755
TICK: 03:48:20:747
A new window opened 03:48:20:756
UPDATE: e3a3a30d-8031-41b5-b115-499dbe91aaf7 03:48:20:756
TICK: 03:48:25:755
A new window opened 03:48:25:756
UPDATE: 239fb25b-5135-463b-bf7e-5728ffa07f5c 03:48:25:756
As you can see, the first Type2 update comes in while a window is open, so it gets handled. Then, 2 secs later, the Window's timer ticks and a new window is opened. It immediately handles the next Type2 update, which I don't want it to do. After that it looks to work normally (one update every 5 secs as defined in the Window declaration).
Is there a way or another method I can use to make sure that only one update per 5 seconds (or whatever timeframe I choose) is ever handled? It seems like a common requirement but I just can't get it to work right.
All Replies
-
Friday, March 08, 2013 11:12 AM
Hi,
The overload of the Window operator that you're using opens the first window when you subscribe and then opens subsequent windows as soon as the previous window closes. But it seems that your requirement is to open a window when observing the first notification and close it after 5 seconds, and then don't open another window right away; instead, you want to wait for the next notification before opening another window.
Another way to look at the problem is that you want to project each value into a waiting period, taking the first value that arrives from the beginning of the sequence or after the previous waiting period has elapsed, and ignoring all values during the waiting periods.
Here's a marble diagram of the problem showing 4 notifications with overlapping durations:
(Edit: Please excuse the bad formatting. The forum editor shows a fixed-width font, but apparently it's not used by browsers.)
1 2 3 4 |---o------o-------o-----o------> | |__|____|_| |___|__ |_________| 5s |_________| 5s 5s . . . . |---o--------------o------------>The result sequence must contain 1 because it's the first value at the beginning of the ofType2 sequence, and it must contain 3 because it's the first value to arrive after 1's waiting period has elapsed. Notifications 2 and 4 must be dropped because they arrive within waiting periods for 1 and 3, respectively.
Do I understand your requirements correctly?
So it seems that Window is not the correct solution to your problem because you need to open data-based windows, not time-based windows. In other words, it's the data within your sequence plus a constant duration that indicates the silent periods between values.
There's a different overload of Window that allows you to specify an observable that indicates when windows are opened; however, this overload permits sliding windows, so you can't simply pass in the source sequence (ofType2). It would need to be filtered so that notifications which are meant to be dropped don't open new windows for themselves. Unfortunately, converting ofType2 into a sequence of window opening notifications is actually just a different way of stating the original problem. If you can solve that, then you don't need the Window operator. Furthermore, to define windows seems to require recursion because you can't prevent a value from opening a window while it's within the duration of a previous window without first having defined a query that could generate notifications about the previous window!
In implementing a solution to this problem, I realized that it can be done using Scan in a fairly straightforward manner; however, the only catch is that OnCompleted is delayed until the last notification's duration elapses, even if that notification was dropped because it overlapped a previous waiting period.
Here's my test:
using System; using System.Reactive.Linq; using Microsoft.Reactive.Testing; namespace ConsoleApplication1 { struct Thing { public string ActivationType; public int Id; } class Program : ReactiveTest { static void Main() { new Program().Run(); } void Run() { const string type1 = "Type1"; const string type2 = "Type2"; var scheduler = new TestScheduler(); var source = scheduler.CreateColdObservable( OnNext(TimeSpan.FromSeconds(1).Ticks, new Thing { ActivationType = type1, Id = 0 }), OnNext(TimeSpan.FromSeconds(1.5).Ticks, new Thing { ActivationType = type1, Id = 1 }), OnNext(TimeSpan.FromSeconds(2).Ticks, new Thing { ActivationType = type2, Id = 2 }), OnNext(TimeSpan.FromSeconds(3).Ticks, new Thing { ActivationType = type1, Id = 3 }), OnNext(TimeSpan.FromSeconds(3.5).Ticks, new Thing { ActivationType = type1, Id = 4 }), OnNext(TimeSpan.FromSeconds(4).Ticks, new Thing { ActivationType = type2, Id = 5 }), OnNext(TimeSpan.FromSeconds(5).Ticks, new Thing { ActivationType = type2, Id = 6 }), OnNext(TimeSpan.FromSeconds(5.5).Ticks, new Thing { ActivationType = type2, Id = 7 }), OnNext(TimeSpan.FromSeconds(6).Ticks, new Thing { ActivationType = type1, Id = 8 }), OnNext(TimeSpan.FromSeconds(7.5).Ticks, new Thing { ActivationType = type2, Id = 9 }), OnNext(TimeSpan.FromSeconds(8).Ticks, new Thing { ActivationType = type2, Id = 10 }), OnNext(TimeSpan.FromSeconds(8.5).Ticks, new Thing { ActivationType = type1, Id = 11 }), OnNext(TimeSpan.FromSeconds(9).Ticks, new Thing { ActivationType = type2, Id = 12 }), OnNext(TimeSpan.FromSeconds(10).Ticks, new Thing { ActivationType = type2, Id = 13 }), OnNext(TimeSpan.FromSeconds(10.5).Ticks, new Thing { ActivationType = type1, Id = 14 }), OnNext(TimeSpan.FromSeconds(11).Ticks, new Thing { ActivationType = type2, Id = 15 }), OnNext(TimeSpan.FromSeconds(11.5).Ticks, new Thing { ActivationType = type1, Id = 16 }), OnNext(TimeSpan.FromSeconds(12).Ticks, new Thing { ActivationType = type1, Id = 17 }), OnNext(TimeSpan.FromSeconds(12.2).Ticks, new Thing { ActivationType = type2, Id = 18 }), OnNext(TimeSpan.FromSeconds(16).Ticks, new Thing { ActivationType = type2, Id = 19 }), OnNext(TimeSpan.FromSeconds(16.5).Ticks, new Thing { ActivationType = type2, Id = 20 }), OnCompleted<Thing>(TimeSpan.FromSeconds(17).Ticks)); var query = source.Publish( published => published .Where(t => t.ActivationType == "Type1") .Merge(published .Where(t => t.ActivationType == "Type2") .Select((obj, index) => new { obj, index }) .Publish(ofType2Published => (from item in ofType2Published select new { Index = item.index, Value = item.obj, HasValue = true }) .Merge( from item in ofType2Published from __ in Observable.Timer(TimeSpan.FromSeconds(5), scheduler) select new { Index = item.index, Value = default(Thing), HasValue = false }) .Scan( new { Index = 0, Value = default(Thing), Waiting = false, StartedWaiting = false }, (acc, cur) => new { Index = acc.Waiting ? acc.Index : cur.Index, Value = acc.Waiting ? acc.Value : cur.Value, Waiting = cur.HasValue || (cur.Index == acc.Index ? false : acc.Waiting), StartedWaiting = cur.HasValue && !acc.Waiting, }) .Where(result => result.StartedWaiting) .Select(result => result.Value)))); var observer = scheduler.Start( () => query, created: 0, subscribed: 0, disposed: TimeSpan.FromSeconds(100).Ticks); foreach (var message in observer.Messages) { if (message.Value.Kind == System.Reactive.NotificationKind.OnNext) { Console.WriteLine( "{0:0.00}: {1} {2}", TimeSpan.FromTicks(message.Time).TotalSeconds, message.Value.Value.ActivationType, message.Value.Value.Id); } else { Console.WriteLine(message); } } ReactiveAssert.AssertEqual( observer.Messages, OnNext(TimeSpan.FromSeconds(1).Ticks + 1, new Thing { ActivationType = type1, Id = 0 }), OnNext(TimeSpan.FromSeconds(1.5).Ticks + 1, new Thing { ActivationType = type1, Id = 1 }), OnNext(TimeSpan.FromSeconds(2).Ticks + 1, new Thing { ActivationType = type2, Id = 2 }), OnNext(TimeSpan.FromSeconds(3).Ticks + 1, new Thing { ActivationType = type1, Id = 3 }), OnNext(TimeSpan.FromSeconds(3.5).Ticks + 1, new Thing { ActivationType = type1, Id = 4 }), OnNext(TimeSpan.FromSeconds(6).Ticks + 1, new Thing { ActivationType = type1, Id = 8 }), OnNext(TimeSpan.FromSeconds(7.5).Ticks + 1, new Thing { ActivationType = type2, Id = 9 }), OnNext(TimeSpan.FromSeconds(8.5).Ticks + 1, new Thing { ActivationType = type1, Id = 11 }), OnNext(TimeSpan.FromSeconds(10.5).Ticks + 1, new Thing { ActivationType = type1, Id = 14 }), OnNext(TimeSpan.FromSeconds(11.5).Ticks + 1, new Thing { ActivationType = type1, Id = 16 }), OnNext(TimeSpan.FromSeconds(12).Ticks + 1, new Thing { ActivationType = type1, Id = 17 }), OnNext(TimeSpan.FromSeconds(16).Ticks + 1, new Thing { ActivationType = type2, Id = 19 }), OnCompleted<Thing>(TimeSpan.FromSeconds(21.5).Ticks + 1)); Console.WriteLine(); Console.WriteLine("Test passed"); Console.ReadLine(); } } }Results:
1.00: Type1 0 1.50: Type1 1 2.00: Type2 2 3.00: Type1 3 3.50: Type1 4 6.00: Type1 8 7.50: Type2 9 8.50: Type1 11 10.50: Type1 14 11.50: Type1 16 12.00: Type1 17 16.00: Type2 19 OnCompleted()@215000001 Test passed
- Dave
- Edited by Dave Sexton Friday, March 08, 2013 11:13 AM Note about bad formatting in forum

