Answered by:
Higher-Order Hot Observables (e.g. Hot GroupedObservables)

Question
-
I have a stream of hot CarLocation (id:int, location:point) elements (received from comm channel).
I grouped it on the id field, and exposed it in my model as Cars of type IObservable<IGroupedObservable<int,CarLocation>>.
In my view-model I subscribed to the Cars stream, and in the subscription I subscribed to the grouped observables.
The problem is that the hot nature of the system causes my code to miss the first elements in the grouped observables.
I tried applying the Replay() operator on the Cars stream, but that didn't help.
As a workaround I ditched the grouping all together, but I think there must be a better way.
Any help would be appreciated.
Omer Mor
- Edited by Omer Mor Friday, April 6, 2012 1:26 PM
Wednesday, March 28, 2012 11:20 AM
Answers
-
This is expected. The main problem is such queries end up with a time gap between producing the inner observable (here the group) and subscribing to it. In this case, the time gap is caused by ObserveOnDispatcher which puts the group (before it's subscribed to) in a scheduler queue. At that point, the OnNext call of the GroupBy operator to its observer returns and elements are produced in the group. However, the group is only subscribed to when the dispatcher gets around to drain the work item from the queue. A similar problem would appear when you use Delay because of similar queuing. Other higher-order stream producers, such as Window, will exhibit the same behavior.
If possible, try to do away with the first ObserveOnDispatcher. If not possible, you'll have to pull off a trick that makes the group hot and backed by a subject before hitting an enqueuing operator such as ObserveOn[Dispatcher] or Delay. The use of Replay comes to mind, but beware you need to make the replay subject hot. An alternative approach is to use a ReplaySubject directly.
The code below shows a Unit Test project's code using Microsoft.Reactive.Testing with minimal repros of the behavior you're seeing, and a possible fix at the end:
using System.Collections.Generic; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; using Microsoft.Reactive.Testing; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace Repro { [TestClass] public class ObserveOnBehavior : ReactiveTest { [TestMethod] public void GroupBy() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); var res = s.Start(() => xs.GroupBy(x => x % 2)); Assert.AreEqual(3, res.Messages.Count); Assert.IsTrue(res.Messages[0].Time == 210 && res.Messages[0].Value.Value.Key == 1 /* odd */); Assert.IsTrue(res.Messages[1].Time == 220 && res.Messages[1].Value.Value.Key == 0 /* even */); Assert.IsTrue(res.Messages[2].Time == 250 && res.Messages[2].Value.Kind == NotificationKind.OnCompleted); } [TestMethod] public void GroupBy_Count() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); var res = s.Start(() => xs.GroupBy(x => x % 2).SelectMany(g => g.Count())); res.Messages.AssertEqual( OnNext(250, 2), OnNext(250, 2), OnCompleted<int>(250) ); } [TestMethod] public void GroupBy_ObserveOn() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); var res = s.Start(() => xs.GroupBy(x => x % 2).ObserveOn(s)); // // Notice the additional tick in the timing assertions below. Each OnNext call with a new group is scheduled // through the scheduler due to the use of ObserveOn. In this case, the scheduler queue is empty at that point, // so the propagation of the OnNext message on the scheduler takes place one tick later. // Assert.AreEqual(3, res.Messages.Count); Assert.IsTrue(res.Messages[0].Time == 210 + 1 && res.Messages[0].Value.Value.Key == 1 /* odd */); Assert.IsTrue(res.Messages[1].Time == 220 + 1 && res.Messages[1].Value.Value.Key == 0 /* even */); Assert.IsTrue(res.Messages[2].Time == 250 + 1 && res.Messages[2].Value.Kind == NotificationKind.OnCompleted); } [TestMethod] public void GroupBy_ObserveOn_Count() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); // // It should be clear why we're missing messages now. The groups are subscribed to by Count at the point // they are received, which is one tick post the first value in the group. In realistic scenarios, the // scheduler queue may have more work, postponing the subscription to the group even further. // var res = s.Start(() => xs.GroupBy(x => x % 2).ObserveOn(s).SelectMany(g => g.Count())); res.Messages.AssertEqual( OnNext(250, 1), OnNext(250, 1), OnCompleted<int>(251) ); } [TestMethod] public void GroupBy_Replay_ObserveOn_Count() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); // // Using an imperative island of code, we now cure all the groups by a ReplaySubject by subscribing to it // before control is handed to ObserveOn which can cause time gaps due to the queueing behavior. // var res = s.Start(() => (from g in xs.GroupBy(x => x % 2) let replay = new ReplaySubject<int>() let _ = g.Subscribe(replay) select new { g.Key, Items = replay.AsObservable() }) .ObserveOn(s) .SelectMany(g => g.Items.Count())); res.Messages.AssertEqual( OnNext(250, 2), OnNext(250, 2), OnCompleted<int>(251) ); } } static class Extensions { public static void AssertEqual<T>(this IEnumerable<T> actual, params T[] expected) { ReactiveAssert.AreElementsEqual(expected, actual); } } }
Hope this helps!using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
- Proposed as answer by Bart De Smet [MSFT] Friday, April 6, 2012 11:14 AM
- Marked as answer by Omer Mor Friday, April 6, 2012 1:26 PM
Friday, April 6, 2012 6:28 AM
All replies
-
Hi Omer,
If you can't control when CarLocation begins generating values, then you'll either have to buffer the data or you'll just have to accept that some values may be missed. That's a price we pay for concurrency; i.e., race conditions.
If you'd like to buffer the data then you'll have to buffer the individual groups. In other words, Replay won't work unless it's applied to each individual group. Applying Replay to the source won't affect each individual group because groups are hot themselves.
However, if you can control when CarLocation begins generating values, then consider modeling it as an IConnectableObservable<T> so that your view model can subscribe first, before you call Connect.
- Dave
http://davesexton.com/blog
Wednesday, March 28, 2012 3:07 PM -
The general rule of thumb is that when a higher-order stream is returned, you have to obey to two requirements when receiving an inner stream:
- Don't block - you likely will stall the entire pipeline if you do so.
- Subscribe immediately before returning from the OnNext call.
For the second point, you won't miss values e.g. when you have xs.GroupBy(x => ...).SelectMany(g => g) because the SelectMany operator follows this rule.
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
Wednesday, March 28, 2012 9:17 PM -
Thanks guys.
Bart,
in my onNext handler, I do something akin to this:
void onNextGroup(IGroupedObservable<CarLocation> group) { group .ObserveOnDispatcher() .Subscribe(onNextCarLocation); }
Is it possible that the ObserveOnDispatcher operator does not follow your second rule?
Omer Mor
Wednesday, March 28, 2012 9:32 PM -
I tried applying the Replay to each group - didn't work.
I can control when CarLocation begins generating values (when I open my comm channel), but once it's open, new groups are generated whenever new cars are being observed, and for each group I need to subscribe without missing the first elements. So your second suggestion is not applicable.
Omer Mor
Wednesday, March 28, 2012 9:37 PM -
Hi Omer,
> I tried applying the Replay to each group - didn't work.
> [snip] for each group I need to subscribe without missing the first elements.Can you post a short but complete example that reproduces the problem?
- Dave
http://davesexton.com/blog
Wednesday, March 28, 2012 11:27 PM -
We'd need a minimal repro, as Dave suggested. Handling a group the way you seem to do should not cause you to miss messages. Are you sure the production starts after the Subscribe call to the GroupBy result stream?
Below is a simple test showing produce/consume count across groups balancing out, indicating no elements were lost (tested with the v1.0 SP1 stable release).
while (true) { var s = new Subject<int>(); var N = 10000; var cd = new CountdownEvent(N); s.GroupBy(x => x % 10).Subscribe(g => { g.ObserveOn(Scheduler.NewThread).Subscribe(_ => { cd.Signal(); }); }); new Thread(() => { var r = new Random(19830211); for (int i = 0; i < N; i++) s.OnNext(r.Next()); s.OnCompleted(); }).Start(); cd.Wait(); Console.WriteLine("Done!"); }
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
Friday, March 30, 2012 10:07 AM -
I'll post a repro next week. Thanks.
Omer Mor
Friday, March 30, 2012 2:50 PM -
I managed to reproduce this. I found the problem is caused when I apply ObserveOnDispatcher twice: on the outer observable and on the inner group observable.
using System; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using System.Windows; using System.Windows.Threading; using NUnit.Framework; namespace TestRx { public class CarLocation { public CarLocation(int id, double location) { Id = id; Location = location; } public int Id { get; set; } public double Location { get; set; } } public class RxBug { [Test] public void Repro() { var task = CreateDispatcherOnBackgroundThread(); var dispatcher = task.Result; const int N = 1000; var scheduler = new DispatcherScheduler(dispatcher); var source = Observable.Generate(0, i => i < N, i => ++i, i => new CarLocation(i%100, i), scheduler); var cd = new CountdownEvent(N); var wtcs = new TaskCompletionSource<Window>(); Window window = null; dispatcher.BeginInvoke(() => { window = new Window(); window.Show(); wtcs.SetResult(window); }); wtcs.Task.Wait(); source .GroupBy(cl => cl.Id) .ObserveOn(dispatcher) // commenting this line will make this test pass .Subscribe(grp => grp .ObserveOn(dispatcher) .Subscribe(cl => { Thread.SpinWait(10); // simulate work cd.Signal(); window.Title = cd.CurrentCount.ToString(); })); cd.Wait(TimeSpan.FromMilliseconds(N*30)); Assert.AreEqual(0, cd.CurrentCount); } public Task<Dispatcher> CreateDispatcherOnBackgroundThread() { var taskCompletionSource = new TaskCompletionSource<Dispatcher>(); var thread = new Thread(() => { var dispatcher = Dispatcher.CurrentDispatcher; taskCompletionSource.SetResult(dispatcher); Dispatcher.Run(); }) { Name = "UI", IsBackground = true }; thread.SetApartmentState(ApartmentState.STA); thread.Start(); var task = taskCompletionSource.Task; return task; } } }
- Edited by Omer Mor Thursday, April 5, 2012 7:33 PM
Thursday, April 5, 2012 7:31 PM -
This is expected. The main problem is such queries end up with a time gap between producing the inner observable (here the group) and subscribing to it. In this case, the time gap is caused by ObserveOnDispatcher which puts the group (before it's subscribed to) in a scheduler queue. At that point, the OnNext call of the GroupBy operator to its observer returns and elements are produced in the group. However, the group is only subscribed to when the dispatcher gets around to drain the work item from the queue. A similar problem would appear when you use Delay because of similar queuing. Other higher-order stream producers, such as Window, will exhibit the same behavior.
If possible, try to do away with the first ObserveOnDispatcher. If not possible, you'll have to pull off a trick that makes the group hot and backed by a subject before hitting an enqueuing operator such as ObserveOn[Dispatcher] or Delay. The use of Replay comes to mind, but beware you need to make the replay subject hot. An alternative approach is to use a ReplaySubject directly.
The code below shows a Unit Test project's code using Microsoft.Reactive.Testing with minimal repros of the behavior you're seeing, and a possible fix at the end:
using System.Collections.Generic; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; using Microsoft.Reactive.Testing; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace Repro { [TestClass] public class ObserveOnBehavior : ReactiveTest { [TestMethod] public void GroupBy() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); var res = s.Start(() => xs.GroupBy(x => x % 2)); Assert.AreEqual(3, res.Messages.Count); Assert.IsTrue(res.Messages[0].Time == 210 && res.Messages[0].Value.Value.Key == 1 /* odd */); Assert.IsTrue(res.Messages[1].Time == 220 && res.Messages[1].Value.Value.Key == 0 /* even */); Assert.IsTrue(res.Messages[2].Time == 250 && res.Messages[2].Value.Kind == NotificationKind.OnCompleted); } [TestMethod] public void GroupBy_Count() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); var res = s.Start(() => xs.GroupBy(x => x % 2).SelectMany(g => g.Count())); res.Messages.AssertEqual( OnNext(250, 2), OnNext(250, 2), OnCompleted<int>(250) ); } [TestMethod] public void GroupBy_ObserveOn() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); var res = s.Start(() => xs.GroupBy(x => x % 2).ObserveOn(s)); // // Notice the additional tick in the timing assertions below. Each OnNext call with a new group is scheduled // through the scheduler due to the use of ObserveOn. In this case, the scheduler queue is empty at that point, // so the propagation of the OnNext message on the scheduler takes place one tick later. // Assert.AreEqual(3, res.Messages.Count); Assert.IsTrue(res.Messages[0].Time == 210 + 1 && res.Messages[0].Value.Value.Key == 1 /* odd */); Assert.IsTrue(res.Messages[1].Time == 220 + 1 && res.Messages[1].Value.Value.Key == 0 /* even */); Assert.IsTrue(res.Messages[2].Time == 250 + 1 && res.Messages[2].Value.Kind == NotificationKind.OnCompleted); } [TestMethod] public void GroupBy_ObserveOn_Count() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); // // It should be clear why we're missing messages now. The groups are subscribed to by Count at the point // they are received, which is one tick post the first value in the group. In realistic scenarios, the // scheduler queue may have more work, postponing the subscription to the group even further. // var res = s.Start(() => xs.GroupBy(x => x % 2).ObserveOn(s).SelectMany(g => g.Count())); res.Messages.AssertEqual( OnNext(250, 1), OnNext(250, 1), OnCompleted<int>(251) ); } [TestMethod] public void GroupBy_Replay_ObserveOn_Count() { var s = new TestScheduler(); var xs = s.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnCompleted<int>(250) ); // // Using an imperative island of code, we now cure all the groups by a ReplaySubject by subscribing to it // before control is handed to ObserveOn which can cause time gaps due to the queueing behavior. // var res = s.Start(() => (from g in xs.GroupBy(x => x % 2) let replay = new ReplaySubject<int>() let _ = g.Subscribe(replay) select new { g.Key, Items = replay.AsObservable() }) .ObserveOn(s) .SelectMany(g => g.Items.Count())); res.Messages.AssertEqual( OnNext(250, 2), OnNext(250, 2), OnCompleted<int>(251) ); } } static class Extensions { public static void AssertEqual<T>(this IEnumerable<T> actual, params T[] expected) { ReactiveAssert.AreElementsEqual(expected, actual); } } }
Hope this helps!using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
- Proposed as answer by Bart De Smet [MSFT] Friday, April 6, 2012 11:14 AM
- Marked as answer by Omer Mor Friday, April 6, 2012 1:26 PM
Friday, April 6, 2012 6:28 AM -
Thanks,
I think I got it.
However let me quote you:
The general rule of thumb is that when a higher-order stream is returned, you have to obey to two requirements when receiving an inner stream:
- Don't block - you likely will stall the entire pipeline if you do so.
- Subscribe immediately before returning from the OnNext call.
So If I got it correctly, the ObserveOn operator does not obey the second rule. Is there any way you think it could be handled better by Rx?
BTW - I thought I understand Rx and I got bitten by this badly (took me a lot of time to nail the reason I was missing elements - it is not easy to debug Rx). Consider that not all users would debug it, and those who would, won't necessarily post a question in the forums. If you care about the adoption of Rx - you should give it a thought.
Omer Mor
Friday, April 6, 2012 6:59 AM -
ObserveOn's type doesn't allow it to obey to this rule - after all, it operates on an IObservable<T> where T can be anything, including a stream, a stream of streams, a stream of streams of streams, etc. That's the price of a higher order nature. And even if it would detect (statically or dynamically) this situation, it'd be undesirable to mix concerns: making a sequence (i.e. the inner one, but up to what nesting level?) hot eagerly is something you can't take back...
Notice, on the other hand, that operators like Merge do have overloads that take an IObservable<IObservable<T>> and hence those can (and do) subscribe to the inner stream as soon as it's received (unless you're using a special overload to limit the degree of concurrency), therefore obeying to the second rule. This is why Map/Reduce style jobs in Rx work: take source input, partition it (e.g. using GroupBy or Window), perform an ObserveOn on the inner streams (this is safe, if you don't have the whole problem applied recursively, e.g. if you're going beyond 2nd order streams) to offload each chunk to a separate worker thread, apply some computation to the inner streams, and finally use Merge to bring everything together again. See the section "Real-world performance and better scalability" in our Rx v2.0 Beta blog post to see this in action.
Having operators that produce higher order streams return ReplaySubjects rather than Subjects could have been done, but things can get out of hand quickly with regards to storage and the lurking potential for leaks if no-one ever subscribes to an inner stream. After all, there could be 0 to an arbitrary number of subscriptions on the resulting inner streams.
Other tricks like blocking the OnNext that sends out the inner stream until someone handles it is also not feasible. Users may not subscribe to the inner streams at all (e.g. indirectly by using a Skip operator on the sequence of groups, dropping the first ones). And worse, stalling the pipeline prevents progress and can lead to a complete deadlock of the pipeline. To remedy issues like these, you end up having queues again, and the problem of potentially unbounded storage strikes again.
The best way to debug such things is to distill the essence by drawing a marble diagram, putting it in virtual time schedulers, and check timings. If results are unexpected, use composition in reverse: decompose the query's constituents and check your assumptions for each piece. One of the nice things about Rx is the layering on top of schedulers, so you can intercept the lowest layer of execution for testing purposes as illustrated earlier.
Finally, one note about higher order streams in other domains. In the world of LINQ to Objects, operators like GroupBy don't have this issue, but at a very big cost. GroupBy over enumerable sequences isn't truly lazy: upon the first MoveNext call to obtain the first group, the entire source input is drained and stuffed into a dictionary. Assume you just wanted the first element of the first group in a sequence of 100M elements, you've done a lot of unnecessary computation. Luckily, enumerable sequences tend to be finite (and often persisted, e.g. List<T>, hence no spreading out in time for enumeration), unlike event streams where large volumes are common practice and timing enters the picture. Trying to build a truly lazy GroupBy on enumerable sequences is a good non-trivial exercise, and it will look very familiar to readers of this forum thread. In fact, you'll encounter the equivalent of subjects in the world of pull-based sequences, and have to make the same decisions as the ones discussed here (e.g. ballooning storage, repeatable behavior of inner sequences, etc.). Maybe you'll want to optimize for different design points and considerations in the world of pull versus push.
using (Microsoft.Sql.Cloud.DataProgrammability.Rx) { Signature.Emit("Bart De Smet"); }
Friday, April 6, 2012 11:14 AM