Is there a better way to detect stalls in an IObservable stream ?
-
Tuesday, December 18, 2012 9:35 PM
I'm just starting to play around wth Rx and so far enjoying it :-) Perhaps someone can give me some advice on the following scenario....
I have a continuous stream of measurements arriving on an aperiodic basis via an Observable called MeasurementFeed.
I'd like to detect whether this stream has stalled. A 'stall' is defined as occurring if no value arrives within 5 seconds of the previous value. Once a stall condition has occurred, then it is cleared as soon as the next value arrives. In the application I'd show a red/green light depending upon whether the measurement feed is flowing well or is currently stalled.
TimeoutInterval is not quite sufficient for this scenario since it can only return intervals between values rather than being able to timeout if no value arrives.
The best I've been able to come up with is...
// Generate the sequence [false ..5 second delay... true] HasTimedout = Observable.Generate<int, bool>( 0, s => s < 2, s => s + 1, s => s != 0, s => s == 0 ? TimeSpan.Zero : TimeSpan.FromSeconds(5) ); // kick off a new timeout sequence every time we get a value from MeasurementFeed // but cancel the sequence as soon as another value arrives // We should see lots of 'false' values with the occasional 'true' // so use DistinctUntilChanged to ensure we only get events on edges FeedIsStalled = (from measurement in MeasurementFeed from status in HasTimedout.TakeUntil(MeasurementFeed) select status) .DistinctUntilChanged();
but it feels like there ought to be a more elegant way to do this with some combination of standard methods.
Can anyone suggest a better approach ?
All Replies
-
Tuesday, December 18, 2012 11:41 PM
Hi,
You can use Throttle. It generates a sequence of stalled indicators by storing the last notification (buffer size = 1) until it is followed by dueTime, then it pushes the buffered notification; i.e., each notification in the output sequence indicates that the source has been silent for dueTime.
IObservable<bool> reds = MeasurementFeed.Throttle(TimeSpan.FromSeconds(5)).Select(_ => false);
According to your spec, it's only red when there's no value in the sequence for dueTime. Therefore, whenever there's any value, it's green.
IObservable<bool> greens = MeasurementFeed.Select(_ => true);
Now you can simply Merge these queries. However, if MeasurementFeed is cold then you'll probably want to share subscription side-effects using the Publish operator, as shown below.
IObservable<bool> isNotStalled = MeasurementFeed.Publish(feed => { var reds = feed.Throttle(TimeSpan.FromSeconds(5)).Select(_ => false); var greens = feed.Select(_ => true); return reds.Merge(greens); });- Dave
- Edited by Dave Sexton Tuesday, December 18, 2012 11:43 PM Renamed the result to isNOTStalled :)
-
Tuesday, December 18, 2012 11:53 PM
Hi,
Note the relationship between Throttle and Timeout:
Throttle = OnNext + dueTime -> OnNext
Timeout = OnNext + dueTime -> OnError
Undefined = OnNext + dueTime -> OnCompletedI don't think there's a native operator for that last case, but it's easy to convert either of the others:
Undefined = xs.Throttle(dueTime).Take(1).IgnoreElements()
-or-
Undefined = xs.Timeout(dueTime).Catch<X, TimeoutException>(_ => Observable.Empty<X>())
- Dave
- Edited by Dave Sexton Tuesday, December 18, 2012 11:56 PM Added IgnoreElements to make the conversion pure; Added missing type arg.
-
Wednesday, December 19, 2012 7:32 PM
Thanks Dave - using Throttle as you've suggested is definitely a much nicer solution than my attempt to hand-build a timeout! I'm still trying to get to grips with the way that observable streams work with multiple subscribers so I hope you don't mind a follow-up question :-)
I think I understand the use of Publish in your example above (this post http://stackoverflow.com/questions/2833904/iconnectableobservables-in-rx is a good explanation) but if we were to assume that MeasurementFeed were _hot_ then what would be the best approach to ensure that a new subscriber gets the current value of Is(Not)Stalled immediately without having to wait for a new value to arrive on MeasurementFeed ?
This seems plausible to me but I'm not completely confident that I have the Publish in the right place ...
var stallStatus = MeasurementFeed .Throttle(TimeSpan.FromSeconds(5)) .Select(_ => true) .Merge(MeasurementFeed.Select(_ => false)) .DistinctUntilChanged(); //ensure that we have a common stream but that every new subscriber (including the first?) //gets an immediate status report ConnectableStallStatus = stallStatus.Publish().StartWith(stallStatus.MostRecent(true));Thanks again,
Neil
-
Wednesday, December 19, 2012 8:36 PM
Hi Neil,
Good idea to use DistinctUntilChanged.
> what would be the best approach to ensure that a new subscriber gets the current value [snip]?
Replay with a buffer size of 1. Though since this is such a common requirement, and also since Replay doesn't generate any values until at least the first value is observed, Rx has a specialized overload of Publish that allows you to specify an initial value. The initial value serves two purposes:
- It is replayed until the first value is observed from the source.
- It is an indicator for Publish to behave like Replay(1), after the first value is observed from the source.
var stallStatus = MeasurementFeed .Throttle(TimeSpan.FromSeconds(5)) .Select(_ => true) .Merge(MeasurementFeed.Select(_ => false)) .DistinctUntilChanged(); .Publish(true);The above query begins with a "stalled" notification to all new observers, until the feed generates its first value. Later, a new observer will immediately receive the latest status, followed by the live status feed.
Note the difference between Publish(selector, [initialValue]) and Publish([initialValue]). In my previous example I had used the former, while in my latest example I'm using the latter. The latter returns an IConnectableObservable<T>, hence it's a (deferred) hot observable, while the former returns a cold IObservable<T> and merely injects a hot observable into the selector function. Each time that you subscribe to the observable in my former example, selector is invoked and another subscription is made to the underlying observable, so it's only hot within the scope of the selector function. In my latest example, every subscription shares the same underlying subscription.
When using the latter, don't forget to call stallStatus.Connect() to create the underlying subscription.
Your latest example:
> ConnectableStallStatus =stallStatus.Publish().StartWith(stallStatus.MostRecent(true));
does not share the subscription to stallStatus. You're using the latter overload of Publish, which returns a hot observable, though it's only being used once. Your MostRecent query is based on stallStatus again. If stallStatus were a cold observable, then subscription side-effects would occur twice. To solve this problem, you could either use the selector overload that I had used in my previous example, or you could assign the result of calling stallStatus.Publish() to a local variable and then base your MostRecent query on that variable.
- Dave
- Edited by Dave Sexton Wednesday, December 19, 2012 8:37 PM Didn't see that you flipped the bools (as I should have :)
-
Friday, December 21, 2012 4:43 PMThat's a great explanation and has given me a lot to think about ! Thanks again for your help :-)

