Buffering events identified by a common identifier, up until a specified amount of time, each event also has a TimeStamp
We're thinking of using Reactive Extensions for a rewrite of a problem at my financial firm.
The premise is that we get events identified by a Guid (stock symbol + uniqueness entropy embedded into it), a timestamp, and a Value field. These come at a high rate, and we cannot act on these objects until "at least" after X seconds (10 seconds), after which we have to act on them, and remove them from the system.
Think about it like two windows, an initial window of "10 seconds" (for example T0 to T10), where we identify all the unique events (basically, group by guid), then we look into the next "10 seconds", "secondary window" (T10-T20), to make sure we're implementing the policy of "at least" 10 seconds. From the "initial window", we remove all the events (because we've accounted for them), and then from the "secondary window", we remove the ones that occurred in the "initial window". And we keep on moving 10 second sliding windows, so now we're looking at window T20-T30, repeat and rinse.
How could I implement this in Rx, because it seems like the way to go.
It's easier for me to think in terms of the route that individual notifications ("events") will take through the query. Also, are you sure that you want sliding windows or do you really want hopping windows? Sliding implies overlapping, but with respect to an individual group, it seems like you want 10 second non-overlapping buffers. It's also unclear from the image why event "A" is lost at the end, if I'm reading that correctly.
Please let me know whether the following specification is accurate, and if not please provide any corrections.
- Your data source is a sequence of events: IObservable<Event>
- Given an Event, the query must first direct it into a group of similar events; i.e., group by Guid
IGroupedObservable<Guid, Event> : IObservable<Event>
Groups will be reactive, so it's best not to think of them like buckets; instead, think of them as keyed IObservable<T> routes that events can flow through.
- When an event enters its group, it will be sent into the current, non-overlapping 10 second buffer for that group. This requires an additional operator, so it's best to think of buffering as another step in the query.
- Every 10 seconds for that group, all of the events in the current buffer will be pushed to the next phase of the query to be processed, and a new 10 second buffer will be created for that group.
- Subsequent events routed to the same group are gathered into the latest 10 second buffer.
Based on #3, a given event is processed with a maximum delay of 10 seconds, but it could be less if it enters the current buffer closer to the 10 second mark.
IObservable<Event> events = ...; IObservable<IList<Event>> query = from e in events group e by e.Guid into eventsByGuid from buffer in eventsByGuid.Buffer(TimeSpan.FromSeconds(10)) select buffer;
Thanks for the reply Dave.
After re-reading my post (which it self is confusing), I have more info to add.
1. What I meant when I said sliding is that, for this system to be "correct", we have to always analyze 20 seconds of data, or more simply stated, twice the time period of the "at least" period, which in this case is 10 seconds. In other words, if an Event with Guid "A" fires at T0, and that's the first such occurrence of "A", I will have to wait till T10, and only then can I say that all my events with Guid "A" have arrived. This is my SLA, and the reason why in the image the last "A" is lost, is because I'm only looking at Time (T0-T20). The arrival of "A" in between T20 and T30 is considered a new unique Guid (even though it's not really). Does that make sense? It's an SLA thing, I mean eventually I have to aggregate these events into a single "ComeptitiveQuotationObject", and write it to a log or do whatever.
2. If you read 1. you'll notice I mentioned "the first such occurence". That's crucial, though because of Grouping and the "at least" we don't have to worry about it. Think about it, if I had to care about the exact expiration of each event, it would be a nightmare, this way of doubling the window size, I guarantee my SLA.
I'm going to now read what you have said more clearly, because at first glance I'm a bit confused. I would think that I would need to buffer 20 seconds. And then move my window 10 seconds at a time.
2012年5月4日 21:26Also, from your query, it looks like you're using time, but I want to buffer based on the Timestamp field of the Event. So basically, we somehow have to remember the time of the first event? Hmm...
Here's the list of requirements so far:
- The data source is a sequence of Event objects.
- An Event object is identified by its Guid property.
- Many Event objects can have the same Guid value.
- Event objects also have a TimeSpan property.
- When the first event arrives for a particular Guid value...
- The Event is added to a new buffer, which is created for a group of Events with the same Guid.
- At Now + Event.TimeSpan + (AtLeastDuration * 2)...
- The buffer is sent to observers.
- A new buffer is created for the group of Events with this Guid.
- When a subsequent Event arrives with the same Guid value...
- It is added to an existing buffer, which hasn't been sent to observers yet.
Is this correct? Is it complete?
It's probably not correct or complete, but unless you can describe your requirements in these terms, it's going to be difficult for me to understand what you want.
In particular, I don't understand why any events would be "lost", unless "lost" simply means deferred into the next buffer? If in fact you mean "dropped", without any observation, then that implies some kind of throttling or sampling. If possible, it may be easier at first to simply ignore this requirement and then come back to it later.
Furthermore, I still don't understand the consecutive window analogy - specifically, how does it apply to grouping and buffering? Is the first 10 second window buffered separately from the next consecutive 10 second window? Or are all of the events grouped into the same buffer?
And finally, why are you considering it to be a sliding window as opposed to a hopping window? The term sliding implies that windows will overlap, but in all of your examples it seems that windows are consecutive without overlapping, which is a behavior that is typically described as hopping.
I see how you have it setup, and I think it is both complete and correct.
The way I think about it is that you get events, and you have to analyze 20 seconds, but can only definitively say that all the events of the same guid you care about are in the first 10 seconds, since you have the next 10 seconds to verify. But for an event that arrives at the 13th second, for example, could have events up until the 23rd second, which is not in the window you're considering. So for any given 2*SLA_Window_In_Seconds, you can only look in the first half (i.e. from 0 to SLA_Window_In_Seconds), and now you can certainly say that the first 10 seconds are accounted for. Now, you create a new window, from mid-point, i.e. 10th second to 30th second, and you can only analyze 10th-20th seconds, and not the next 10 ... so on and so forth.
Here is the logic for doing this in regular code -->
foreach (event as this comes in)
if (beginTime == null)
beginTime = event.Timestamp;
// Every 10 seconds
if (event.Timestamp - beginTime == 10seconds)
if (midTime == null)
midTime = event.Timestamp;
if (endTime == null)
endTime = event.Timestamp;
FireEvent(begin, mid, end); // send to observers
begin = midTime;
midTime = endTime;
endTime = null;
From the looks of it you're saying the same thing, I think.