StreamInsight best practices to common CEP issues
-
mercoledì 10 febbraio 2010 06:23
Hi,
Some common problems that you face in Complex Event Processing include:- Eliminating duplicate Events from the sources
- Detecting the absence of Events
- Handling Correctional Events, that should replace earlier data
- Allowing Events to be sent to multiple outputs
How should I solve the above problems with StreamInsight? I've not found any obvious ways.
Thanks!
/Johan Åhlén
http://blogical.se/blogs/jahlen
Tutte le risposte
-
mercoledì 10 febbraio 2010 15:10I have more or less the same kind of problems... Anyone with good ideas?
-
mercoledì 10 febbraio 2010 17:10Johan,
What a great list of CEP scenarios! Normally I'd wait a couple of days for the community to reply instead of jumping in myself, but here I just need to :)
First of all, we have added significant features in the RTM version that enable 2) and 4) above in a much more straightforward way than CTP3. But before describing solutions, let me try to clarify the use cases:
1) Can you explain what you mean by duplicate? A StreamInsight Event has timestamps and payload fields. Do you consider a duplicate to be an exact copy of all of these?
2) I assume the detection of absence of events should look like this: "Tell me if you have not seen a (specific) event within X time units after the last one" or "Count the number of events every 10 minutes, and tell me when the count is zero". Correct?
3) Does a "correctional event" modify the temporal properties or the payload of a previously enqueued event, or both?
4) this sounds like you want to have a single query that feeds its results to multiple output adapters, right?
Regards,
Roman
MS StreamInsight Team
Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights. -
mercoledì 10 febbraio 2010 17:44Hi Roman,
Thanks for your answer. I will try to clarify my scenarios a bit:
1) From many source systems (for example within finance) you have a transaction number or similar ID that should be unique. When two Events have the same transaction number, some action must be taken.
I would say:
- If they have same timestamp and payload, they are duplicates.
- Otherwise, if they have same transaction number but differ in timestamp or payload, they are something else (a correctional event or an error)
2) Yes
3) Primarily I'm thinking of payload corrections. Also I think there could be scenarios where it would modify the temporal properties (most probably in cases of edge events).
4) Yes
Kind Regards,
Johan -
sabato 13 febbraio 2010 00:05
Johan,
Our RTM is still several weeks out, so this is what I can say at this point about 2) and 4):
2) You have to take into account that StreamInsight only works with application time, never system time. So if you have a single stream that does not move forward at all, there is no way for StreamInsight to know that events are not occurring, because time stands still. One possible method to approach this is to provide a second stream, like a "heartbeat", which can then be compared to the actual event stream, so that I can take an action on periods where the heartbeat contains something, but not the actual event stream.
4) We are planning to provide various methods for combining queries in a way that enables multiple output adapters feeding off the same query. In CTP3, a query can feed only in a single output adapter, so the adapter code would need to take care of distributing the resulting evnets to the intended destinations.
Now, regarding the de-duplication of events: Every operation in StreamInsight is deterministic - if you use a TopK(1), you will get more than one results if there are ties in the ranking. So there is no straightforward way to drop duplicates, eventhough the removal of an exact duplicate is not really an undeterministic procedure. However, there is a workaround: inject a guid into each event, rank by the guid, and pick the top event. This prevents ties and guarantees to return only one event. The question is how to set up the window for the TopK: if you can ensure that you don't have any overlapping events except for the duplicates, you can use a snapshot window:
var guidstream = from e in inputStream select new { // project all existing payload fields and add this: guid = System.Guid.NewGuid() }; var deduped = (from win in guidstream.Snapshot() from e in win orderby e.guid ascending select e).Take(1); var removeGuid = from e in deduped select new MeterData { // project all original payload fields, but not the guid };
Now to your last question: In our current version, there is no built-in way to retract events after they have been issued. We are handling such retraction events internally, though, and are thinking about various ways to expose that functionality in future releases. The only thing you can currently do is to submit an end edge event with an end time equal to the start time, which would fully remove that event. If you want to change the payload of an event, you could try to design your input as point events and assume the current value to be the last submitted payload, so that a new point event would "update" that value.
Let us revisit your cases 2) and 4) as soon as RTM is out!
Regards,
Roman
MS StreamInsight Team
Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights.- Contrassegnato come risposta Roman SchindlauerMicrosoft Employee venerdì 19 febbraio 2010 18:03
-
martedì 23 febbraio 2010 01:53
Just another note on the de-duplication problem: There is an alternative approach that is more concise than the TopK described above: Simply grouping by every single payload field and applying some dummy aggregation enables you to retrieve all the original fields as grouping keys and consolidates all exact event copies into one:
var deduped = from e in inputStream group inputStream by new { e.field1, e.field2, e.field3 } into eachGroup from window in eachGroup.Snapshot() select new { dummy = window.Count(), field1 = eachGroup.Key.field1, field2 = eachGroup.Key.field2, field3 = eachGroup.Key.field3, };Each unique event will be contained in a separate group. Duplicate events will have the same grouping key and thus fall into the same group, where - if they are overlapping in time - they are reduced to a single event at each point in time by the Snapshot operator. The dummy count can be removed in a subsequent projection.
Regards,
Roman
MS StreamInsight Team
Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights. -
mercoledì 24 febbraio 2010 09:54Hi Roman,
Thanks for your answers. I look forward to the RTM version.
About 2) Detecting the absence of events:
My first idea (before I wrote the original post) was to test if the CTI Events could function as a "heart beat". I made sure my input adapter provided CTI Events even if there were no INSERT Events. But it still seemed the window functions produced no windows when there were no INSERT Events.
Using a second stream for "heart beat" should work, but I would look forward to a more "platform supported" way to detect the gaps.
Regards,
Johan
http://blogical.se/blogs/jahlen -
giovedì 9 settembre 2010 09:33
2) You have to take into account that StreamInsight only works with application time, never system time. So if you have a single stream that does not move forward at all, there is no way for StreamInsight to know that events are not occurring, because time stands still. One possible method to approach this is to provide a second stream, like a "heartbeat", which can then be compared to the actual event stream, so that I can take an action on periods where the heartbeat contains something, but not the actual event stream.
Could you give an example of such implementation?
-
giovedì 9 settembre 2010 16:32
Pavel,
There are different incarnations of the "detectin non-occurrence" problem. They can all be tackled with the left-anti-semi-join operator, which subtracts the join result of two streams from one of them. The result of the subtraction are then periods in time where something was not occurring. It now depends on the use case how to model the two input streams. The stream to subtract is the actual event stream, likely with a lifetime modification applied to it. The stream to subtract from could be an "artificial" heartbeat stream, as suggested, or again a modified event stream.
I am preparing a blog posting about this problem, please bear with me for couple more days!
Regards,
Roman
MS StreamInsight Team
Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights. -
mercoledì 13 ottobre 2010 18:49
In case anyone is still looking for how to send a stream to multiple output adapters, I did it by using BindOutputToPublishedStream on my queryBinder object
queryBinder.BindOutputToPublishedStream(publishedStreamUri,inputEventShape, cep.
StreamEventOrder.FullyOrdered);
Then I created multiple output adapters and bound each one to the publishedStreamUri in the following way:
var consumedStream = CepStream<DummyPayload>.Create(name,typeof(cepa.PublishedStreamAdapterFactory),
new cepa.PublishedStreamInputAdapterConfiguration
{
PublishedStreamName = publishedStreamUri
},
inputEventShape); -
venerdì 16 marzo 2012 08:13
Hello,
I am also looking for a query that detects that an event is missing. Is anyone have an example for this use case? may i have a link to your post on your blog, because i haven't be able to find it :(
Thks
-
venerdì 16 marzo 2012 15:10
As a LinqPad query:
void Main() { Func<int, DateTimeOffset> t = (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddMinutes(s); var values = new [] { //Good data new {Item="Variable1", Value=92, Timestamp=0}, new {Item="Variable2", Value=60, Timestamp=0}, new {Item="Variable1", Value=93, Timestamp=2}, //Variable 1 is offline new {Item="Variable2", Value=75, Timestamp=2}, new {Item="Variable2", Value=81, Timestamp=3}, new {Item="Variable2", Value=82, Timestamp=5}, new {Item="Variable2", Value=82, Timestamp=8}, new {Item="Variable2", Value=82, Timestamp=12}, //Variable 1 is online. new {Item="Variable1", Value=92, Timestamp=15}, new {Item="Variable2", Value=60, Timestamp=15}, new {Item="Variable1", Value=92, Timestamp=18}, new {Item="Variable2", Value=60, Timestamp=18}, new {Item="Variable1", Value=92, Timestamp=20}, new {Item="Variable2", Value=60, Timestamp=20} }; var reference = new[] { new {Item="Variable1"}, new {Item="Variable2"} }; var dataStream = values.ToPointStream(Application, e=> PointEvent.CreateInsert(t(e.Timestamp), e), AdvanceTimeSettings.IncreasingStartTime); //Create interval stream that overlaps all values. //Would typical use ToSignal() but this is simpler. //Real-world, this would be a slow-moving reference stream //And use ToSignal() var referenceStream = reference.ToIntervalStream(Application, e=> IntervalEvent.CreateInsert( t(0), t(20), e), AdvanceTimeSettings.IncreasingStartTime); //Left Anti Semi-Join detects events in the reference stream not in the real stream. //Alter event duration on the data stream provides a "timeout" //How long between values before we say it's "offline". var offline = from r in referenceStream where (from d in dataStream .AlterEventDuration(e=>TimeSpan.FromMinutes(5)) where d.Item == r.Item select d).IsEmpty() select r; offline.ToEdgeEnumerable().Dump("Offline"); }DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.

