How to simulate queries on Edge type generation using linqPad?
-
2012년 4월 3일 화요일 오후 2:03
Hello,
I am using linqPad in order to manage a specific use case but i am not sur if i am doing it right. Actually i haven't found a good linqpad example simulating Edge type generation.
To summarize, i want to filter events under a specific threshold according to a specific mode. For example, if a previous status event was sent with value "ecoMode", then the threshold is 12 and if a previous status event was sent with the value "normaltMode", then the threshold is 20.
So i made a first sample using the interval type:
void Main() { Func<int, DateTimeOffset> t = (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddMinutes(s); var values = new [] { //Business data new {Timestamp=0,Type = 1,Value = 3}, new {Timestamp=2,Type = 1,Value = 13,}, new {Timestamp=3,Type = 1,Value = 22,}, new {Timestamp=5,Type = 1,Value = 10,}, new {Timestamp=6,Type = 1,Value = 14,}, new {Timestamp=7,Type = 1,Value = 16,}, new {Timestamp=9,Type = 1,Value = 26,}, new {Timestamp=11,Type = 1,Value = 7,}, }; //Status data (2 modes: start and eco) // start eco start // |___||________||_______| //--------------------------- //0 3 6 15 20 var reference = new [] { new {Timestamp=3, TimestampEnd=6,Type = 2,Value = "normalMode",}, new {Timestamp=6, TimestampEnd = 15,Type = 2, Value = "ecoMode",}, new {Timestamp=15, TimestampEnd = 20,Type=2, Value = "normalMode",}, }; // Create the data stream var dataStream = values.ToPointStream(Application, e=> PointEvent.CreateInsert(t(e.Timestamp), e), AdvanceTimeSettings.IncreasingStartTime, "dataStream"); // import the CTI so the result will be based on the dataStream CTI var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust); // Create the reference stream (interval version) var referenceStream = reference.ToIntervalStream(Application, e=> IntervalEvent.CreateInsert(t(e.Timestamp), t(e.TimestampEnd), e), ats, "referenceStream"); // get the events such as the value is superior to 12 in eco mode and > 20 in start mode var overConsumption = from d in dataStream from r in referenceStream where (r.Value == "ecoMode" && d.Value > 12) || (r.Value == "normalMode" && d.Value > 20) select d; overConsumption.ToPointEnumerable().Dump("Overconsumption"); }And i have the following result wich is what i expected:

But in real-world, i don't know exactly when the mode will switch on normal or eco. So the Edge type seems to fit exactly here. But i don't know how to simulate it throught linqpad, when should i create the end or the start event? So i tried to do that by merging two streams but i don't think that this is the good way to do it:
void Main() { Func<int, DateTimeOffset> t = (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddMinutes(s); var values = new [] { //Business data new {Timestamp=0,Type = 1,Value = 3}, new {Timestamp=2,Type = 1,Value = 13,}, new {Timestamp=3,Type = 1,Value = 22,}, new {Timestamp=5,Type = 1,Value = 10,}, new {Timestamp=6,Type = 1,Value = 14,}, new {Timestamp=7,Type = 1,Value = 16,}, new {Timestamp=9,Type = 1,Value = 26,}, new {Timestamp=11,Type = 1,Value = 7,}, }; //Status data (2 modes: start and eco) // start eco start // |___||________||_______| //--------------------------- //0 3 6 15 20 var reference = new [] { new {Timestamp=3, TimestampEnd=6,Type = 2,Value = "normalMode",}, new {Timestamp=6, TimestampEnd = 15,Type = 2, Value = "ecoMode",}, new {Timestamp=15, TimestampEnd = 20,Type=2, Value = "normalMode",}, }; // Create the data stream var dataStream = values.ToPointStream(Application, e=> PointEvent.CreateInsert(t(e.Timestamp), e), AdvanceTimeSettings.IncreasingStartTime, "dataStream"); // import the CTI so the result will be based on the dataStream CTI var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust); // // Create the reference stream (interval version) // var referenceStream = reference.ToIntervalStream(Application, // e=> IntervalEvent.CreateInsert(t(e.Timestamp), t(e.TimestampEnd), e), ats, "referenceStream"); // Create the reference stream for start events (edge version) var referenceStream = reference.ToEdgeStream(Application, e => EdgeEvent.CreateStart(t(e.Timestamp), e), ats, "referenceStream"); // Create the reference stream for end events (edge version) var referenceStreamEnd = reference.ToEdgeStream(Application, e => EdgeEvent.CreateEnd(t(e.Timestamp), t(e.TimestampEnd), e), ats, "referenceStreamEnd"); // Create one stream with both start and end events (edge version) referenceStream.Union(referenceStreamEnd); // get the events such as the value is superior to 12 in eco mode and > 20 in start mode var overConsumption = from d in dataStream from r in referenceStream where (r.Value == "ecoMode" && d.Value > 12) || (r.Value == "normalMode" && d.Value > 20) select d; overConsumption.ToPointEnumerable().Dump("Overconsumption"); }And here is the result:
So my current questions are:
-Is the edge type really the good way to do this?
-How to create the reference stream to simulate edge type with linqpad?
-Why do i have the last event twice?
And finally, to use the edge type in a real application, i have to create a start edge event in the input adapter, and next i will have to create two events, an end event to "close" the first one and a new start event, am i right?
Thank you for your help and sorry for the long topic.
모든 응답
-
2012년 4월 3일 화요일 오후 2:34
First, yes this is a good candidate for an Edge event. It is, in fact, a perfect use case for edges ... you get a start when it goes our of range and an end when it comes back in range.
First, when you dumping your overConsumption, you are using ToPointEnumerable. That won't give you edges, no matter how hard to try. ;-)
As for what you are doing, check out my blog entry on turning alarms into edge events. It does exactly what you need.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.- 답변으로 제안됨 DevBikerMVP 2012년 4월 3일 화요일 오후 2:35
-
2012년 4월 4일 수요일 오전 9:39
Actually i used ToPointEnumerable because i want the outputadapter send me a point result each time a data goes out of range according to the current status (eco or normal). I don't care about when it goes back in range. So i use the Edge type to correlate data (point) with the current status (edge) and i get as result a point event.
I switched to ToEdgeEnumerable to analyze the behavior but i got something that i didn't expect:
For example, for the first "end" EdgeType , i noticed that the StartTime and EndTime are actually the same (08:03:00), but i expected to have an EndTime set to 08:06:00 according to the following code:
// Create the reference stream for end events (edge version) var referenceStreamEnd = reference.ToEdgeStream(Application, e => EdgeEvent.CreateEnd(t(e.Timestamp), t(e.TimestampEnd), e), ats, "referenceStreamEnd");
So why the EndTime is not set correctly, did i miss something?
And i have an other issue with lindpad, when i run the code i sometimes get differents results (like 2 additional rows or 0 row in total for example). Is it possible to have a different behavior with the same code trunning?
-
2012년 4월 4일 수요일 오전 11:05
The ToEdgeEnumerable() gave you what you saw because the events are natural/native points and this is how a point is expressed when it is then coerced to an edge.
It may help to think of events in StreamInsight as intervals, with points and edges being simply special cases of an interval. That's not exactly how the engine works but it can help to think of it that way, especially when doing joins. The resulting event that goes to the output adapter and it's shape is simply an expression of that "interval" as whatever shape you specify.
Now ... as for why you are getting points as your output ... StreamInsight joins happen in time and the resulting event's temporal properties are based on the overlap between events. When you are joining any event with a point (think .. an interval with an event duration of 1 tick), the resulting event will have a duration of 1 tick ... a point. When expressed as an edge, this gives you a Start and End edge that are 1 tick apart. Also, each event that comes through that matches your where clause will generate a new output event.
I have seen some strange behavior when working with LinqPad to model StreamInsight queries, particularly when importing the CTIs from another stream. I suspect that this has to do with a timing issue when calling To[Shape]Stream(). You shouldn't look at LinqPad as the final test for the correctness of a query ... it is a tool to help you model them and you do need to understand that a couple of things may be a little different when running under the engine. Part of the challenge is modelling the scenario correctly ... which leads me to the question of how are your status changes (eco vs normal) actually being enqueued? Typically, there aren't many real-world scenarios where sensors provide a natural edge ... you have to alter the events to get that edge event. For example, if your status changes are actually going to be enqueued as points, then you need to "convert" them to edges.
Take a look at the following:
var referenceSource = reference.ToPointStream(Application, e=> PointEvent.CreateInsert(t(e.Timestamp), e), ats, "referenceStream"); var referenceStream = referenceSource.AlterEventDuration(e => TimeSpan.MaxValue).ClipEventDuration(referenceSource, (e1, e2) => true);
In this case, we're getting the source of the status changes as point (input ... new state). We are then altering the event duration to the max value and then clipping it. This provides us with an event that starts at the original point's enqueue time and then an end time when the next status change event comes in from the adapter ... effectively converting that point to an edge.
You can see they by dumping the new referenceStream. However, in order to do that, you need to set the AdvanceTimeSettings to IncreasingStartTime as it won't join with the data stream (where the CTIs are coming from) until later. You do need to do a join with the CTI source stream before any ToQuery() operation ... and To[Shape]Enumerable() is, under the hood, a ToQuery() operation.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.- 답변으로 표시됨 qerluisdhivdufhwxcs 2012년 4월 4일 수요일 오후 12:23
-
2012년 4월 4일 수요일 오후 12:23
ok, i understand how you create artificially the edge into the referenceStream. And i also undestand why i had an exception in linqpad when i simulated it by altering the event duration: because of the imported stream settings...
So I get the result i expect now.
Now the next part, i will try to implement it in the real application and make it work :)
Thank you very much and sorry to bother you with my questions, hope it will help someone else!

