Window TimeSpan overload does not guarantee non-overlapping windows
-
4 mai 2012 17:06
Hi everyone,
The documentation for the Window TimeSpan overload states: "Projects each element of an observable sequence into consecutive non-overlapping windows (...)". However, I've found empirically that actually this does not seem to be true.
Apparently, for two consecutive windows, OnCompleted is called on the first window only after the first OnNext has been called on the second window. This can break the expected behavior of Window in important ways, such as when using the Observable.Using method to ensure resources are closed on the first window before any events arrive for the second window.
Can anyone confirm to me that this is the case? If so, is this the expected behavior? If it is, why do you consider these windows to be non-overlapping? Is there any way to ensure OnCompleted is called in the proper order?
Thanks for everything!
- Editat de glopes 4 mai 2012 18:55
Toate mesajele
-
4 mai 2012 18:54
For reference, I just checked the behavior of the "count" overload, where you specify the number of events in each window, and in that case, the operator does guarantee that each window is closed with OnCompleted before calling OnNext on the observers of the next window, so it seems that there's some sort of inconsistency between the two...
-
4 mai 2012 20:57
-
4 mai 2012 21:38
Hi,
The following unit test passes against Rx 2.0 Beta. Note that OnCompleted of the first window @ 251 is observed before the first OnNext of the second window @ 301. Maybe what you're seeing is a side-effect of some kind of explicit scheduling?
using System; using System.Reactive.Linq; using Microsoft.Reactive.Testing; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace UnitTests { [TestClass] public class HoppingWindow : ReactiveTest { [TestMethod] public void HoppingWindowByTime() { var scheduler = new TestScheduler(); var source = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnCompleted<int>(550)); var expectedWindow0 = new[] { OnNext(101, 1), OnNext(201, 2), OnCompleted<int>(251) }; var expectedWindow1 = new[] { OnNext(301, 3), OnNext(401, 4), OnCompleted<int>(501) }; var query = source.Window(TimeSpan.FromTicks(250), scheduler) .Select(window => { var observer = scheduler.CreateObserver<int>(); window.Subscribe(observer); return observer; }); var messages = scheduler.Start(() => query, 0, 0, 1000).Messages; var window0 = messages[0].Value.Value.Messages; var window1 = messages[1].Value.Value.Messages; ReactiveAssert.AreElementsEqual(expectedWindow0, window0); ReactiveAssert.AreElementsEqual(expectedWindow1, window1); } } }- Dave
-
4 mai 2012 23:03
Hi Dave,
Thanks for your time and help, as always. Here's a unit test that illustrates my problem with accessing a disposable resource using Observable.Using and the Window operator.
using System; using System.Reactive.Linq; using System.Threading; namespace RxLab { class DisposableResource : IDisposable { public DisposableResource() { Console.WriteLine("Loaded resource."); } public void Access() { Console.WriteLine("Used resource."); } public void Dispose() { Console.WriteLine("Unloaded resource."); } } class Program { static void Main(string[] args) { var source = Observable.Interval(TimeSpan.FromSeconds(1)); Console.WriteLine("Window with TimeSpan overload:"); var query = source .Window(TimeSpan.FromSeconds(2)) .SelectMany(window => Observable.Using( () => new DisposableResource(), resource => window.Do(value => resource.Access()))); query.Take(4).Last(); Thread.Sleep(100); Console.WriteLine("=== END ==="); Console.WriteLine(); Console.WriteLine("Window with element count overload:"); query = source .Window(2) .SelectMany(window => Observable.Using( () => new DisposableResource(), resource => window.Do(value => resource.Access()))); query.Take(4).Last(); Thread.Sleep(100); Console.WriteLine("=== END ==="); Console.ReadLine(); } } }
The output I get out of this using the Reactive v1.0 Stable SP1 is the following:
Window with TimeSpan overload: Loaded resource. Used resource. Loaded resource. Unloaded resource. Used resource. Used resource. Used resource. Unloaded resource. === END === Window with element count overload: Loaded resource. Used resource. Used resource. Unloaded resource. Loaded resource. Used resource. Used resource. Unloaded resource. === END ===
As you can see, with the TimeSpan overload the Resource for the second window is created before the Resource for the first window is disposed. This doesn't happen with the element count overload. I figured this would have something to do with the order OnCompleted is being called, since that is the signal for releasing resources with the Using operator, but I may be wrong, since I don't have access to Rx's internal implementation.
Maybe this has been fixed in experimental or next version branches?
- Editat de glopes 4 mai 2012 23:05
-
4 mai 2012 23:15
I just remembered another reason why your unit test may work even if the Window implementation is faulty. Maybe you're right that OnNext on the second window is called only after OnCompleted on the first window is called, but I just remembered that maybe the observer is subscribed to the second window before OnCompleted is called.
This would explain why Observable.Using actually creates the resource out of order and would also explain the strange Load->Unload->OnNext sequence showing up in the first test case of my code example.
Can you or anyone else confirm this?
-
5 mai 2012 13:16
Hi,
> maybe the observer is subscribed to the second window before OnCompleted is called
That's correct. I'm not sure of the purpose of this behavior though.
Perhaps the idea is that having overlapping window subscriptions is better than having breaks in time where there may be no active subscriptions at all. Although, breaks wouldn't seem to matter as long as the source observable is well-behaved.
- Dave
-
5 mai 2012 14:16
Hi Dave,
Well, I agree that there's definitely some debate regarding how to handle the breaks between each window. However, it appears to me most intuitive that Subscribe and OnCompleted should mark the boundaries of the window, meaning that non-overlapping behavior should imply that there's no subscription before a window completes. This is particularly important if operators such as Using rely on these moments to operate.
Quoting Observable.Using documentation for the return value: "The observable sequence whose lifetime controls the lifetime of the dependent resource object.". This wording seems to imply that an observable sequence's lifetime is tied to the moment of subscription. Again for the Window operator this would imply the current behavior is in fact overlapping, as there are two concurrently live sequences at the boundary where windows meet.
What's even more concerning for me is that behavior is not consistent between overloads of Window. As I've noted above, if you use the element count overload, it does in fact ensure that no two concurrent windows are live at the same time, even at window boundaries. It seems to me that one way or the other there is an issue, as operator behavior should be consistent between overloads, no?
As I currently rely on strict non-overlapping behavior to ensure only one resource ever exists at any point in the stream, I've turned to implementing my own version of the Window TimeSpan overload that ensures this behavior. Of course, I would rather prefer I could use Rx directly for this. Is there any place where one can submit issues in Rx for consideration? Any comments from the development team?
Thanks a lot for your time!
-
8 mai 2012 00:12Proprietar
Thanks for your feedback. We've discovered this mismatch in behavior between both overloads recently as well, during our work on Rx v2.0 RC. In this upcoming release, we're improving time-based scheduling (more info on that later) and as part of this work, we're introducing periodic scheduling for recurring timers, sampling, windowing, buffering, etc. Long story short - the implementation of Window/Buffer using single-shot timers where the span and skip parameters happen to coincide exhibits the out of order behavior you're seeing, while the upcoming implementation using recurring timers for this special case (of span equals skip) doesn't. Because this leads to inconsistent behavior, we're contemplating "fixing" this (at the cost of a breaking change behaviorally compared to v1.0 SP1).using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
- Propus ca răspuns de Bart De Smet [MSFT]Owner 8 mai 2012 00:18
- Marcat ca răspuns de glopes 8 mai 2012 18:32
-
8 mai 2012 00:27
Thanks for the answer! It's very reassuring to hear you've detected this as well. Just to be sure, and before I mark the post as answered, by "fixing" you mean the behavior of Window will ensure that OnCompleted will be called before new subscriptions are done for the next window, right? It seems the most reasonable behavior for the non-overlapping overloads.
Looking forward to Rx v2.0!
-
8 mai 2012 17:39ProprietarCorrect.
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
-
8 mai 2012 18:32
Thanks for the heads-up :)