Choosing correct event type for a scenario and for Input and Output adapters...
-
19 Aralık 2011 Pazartesi 11:12
Hi,
Point events - instant events --> Example: speedometer reading
Interval events - payloads are relevant for a specific period of time (both start and end time known) --> Any example for this?
Edge events are similar to interval events, except the duration of the event isn’t known upon arrival of the event (only Start time is set. End time is empty until another edge event arrives that sets the end time)
--> Can reading the data from Temperature sensor can be considered as Edge events? for example: At 10:00 AM, my InputAdapter read the data from Temperature sensor, say 20 degree C. again the input adapter read it as 20 degree C after a second. Like this my Input adapter reading the Temperature sesnor data for every second, and the vlaue is same: 20 degree C always, till 3:00 PM
At 3:00:01 PM Input adpater read the data as 25 degree C. So my input adapter simply created/dumped 18000 events with same value (60*60*5) Into StreamInsight engine. Value is changed only at 3:00:01 PM. (this can be detected only by the Output adapter)
So If I use Edge event type for this scenario (for Input adpater), how does StreamInsight behaves?
If I use Point or Interval event type, how does the StreamInsight behaves?
What kind of event type I have to choose for Output adapter?
Could any one clarify this to me?
Venkat
Tüm Yanıtlar
-
20 Aralık 2011 Salı 03:08
An example of an interval event would be this: Let's say that you are developing a StreamInsight application to calculate the tolls on the Katy Managed Lanes on I-10 in Houston. These are toll lanes that have various tolls based on the time of day. From 5:00 AM to 10:00 AM, the toll for each booth is $1.50. From 10:00 AM to 3:00 PM, the toll is $0.50. At 3:00 PM, it goes back to $1.50 until 7:00 PM. After 7:00 PM, it goes back to $0.50 until 5:00 AM the next morning. On weekends, the toll is always $0.50. You would enqueue events representing the toll using interval events via a reference data input adapter pulling from, say, Sql Server. You would then join this reference stream to the live stream from the toll sensors to calculate tolls calculated in real time.
Now, to your question about the temperature sensor ... you should enqueue those as point events. In most cases, sensor data should be enqueued as a point event ... that's what they naturally are. While you could convert them to edge events on the input adapter, that's not ideal. First, your input adapter is doing data processing logic, not the queries. The queries should really be doing the processing. Second, it is often desirable to know when the sensor is no longer transmitting ... if your input adapter is designed the way that you suggest, you won't have any way to do that. Finally .. and most importantly ... your input adapter won't properly handle data interruptions. How would you handle startup? What happens if you enqueue an end without a start? Or a start and never an end? If you want to do checkpointing, your complexity has now increased exponentially.
Internally, all events are intervals and StreamInsight handles them all pretty much the same. The difference is on how the end time is handled. With a point, the end time is 1 tick past the start. Intervals are self-explanatory. Edges will (usually) initially have infinity as the end time and then get retracted sometime later and replaced with the end time.
The biggest difference - and the most important part - is how the events are then exposed to the output adaper. Any stream can be "turned into" point, interval or edge events when to call ToQuery(). Using your example, it may well be desirable to expose your temperature events as intervals or edge events to your output adapter.
In your scenario, you could say this: I am getting inbound point events with temperature readings. I don't want to store each individual event, just temperature change events.
In StreamInsight, you would:
- Get only the changed items. This would be done using the FoldPairs macro that is in the LinqPad samples for StreamInsight.
- From the changed items stream, convert to a signal that shows start and end of a specific value reading. You can use the ToSignal macro mentioned on the StreamInsight blog for this.
- Determine your event shape when you call ToQuery(). You will get the correct result whether you send them to your output adapter as an interval or as an edge. But with an edge, you get a separate start and end while an interval only gives you one event.
Take the code below and run it in LinqPad to see what I mean:
void Main() { Func<int, int, DateTimeOffset> t = (h, m) => new DateTimeOffset(2011, 1, 25, 0, 0, 0, TimeSpan.Zero).AddHours(h).AddMinutes(m); var sourceData = new [] { new { SourceId = "A", Value = 0, TimeStamp = t(4, 12) }, new { SourceId = "B", Value = 0, TimeStamp = t(4, 12) }, new { SourceId = "A", Value = 23, TimeStamp = t(4, 13) }, new { SourceId = "B", Value = 24, TimeStamp = t(4, 13) }, new { SourceId = "A", Value = 23, TimeStamp = t(4, 14) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 14) }, new { SourceId = "A", Value = 23, TimeStamp = t(4, 15) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 15) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 16) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 16) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 17) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 17) }, new { SourceId = "A", Value = 54, TimeStamp = t(4, 18) }, new { SourceId = "B", Value = 31, TimeStamp = t(4, 18) }, new { SourceId = "B", Value = 50, TimeStamp = t(4, 30) }, new { SourceId = "A", Value = 87, TimeStamp = t(4, 35) }, }; var source = sourceData.ToPointStream( Application, ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), new { ev.SourceId, ev.Value }), AdvanceTimeSettings.IncreasingStartTime); var valueChanges = FoldPairs(source, (e1, e2) => e1.SourceId == e2.SourceId, TimeSpan.MaxValue, (e1, e2) => new {HasChanged = e1.Value != e2.Value, Item = e2} ); var changedValuesOnly = from e in valueChanges where e.HasChanged select e.Item; var changeSignal = ToSignal(changedValuesOnly, e=> e.SourceId); changeSignal.ToPointEnumerable().Where(e=>e.EventKind == EventKind.Insert).Dump("Point"); changeSignal.ToIntervalEnumerable().Where(e=>e.EventKind == EventKind.Insert).Dump("Interval"); changeSignal.ToEdgeEnumerable().Where(e=>e.EventKind == EventKind.Insert).Dump("Edge"); } public static CepStream<T> ToSignal<T, K>( CepStream<T> inputstream, Expression<Func<T, K>> keySelector) { return inputstream .AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(inputstream, (e1, e2) => (keySelector.Compile()(e1)).Equals(keySelector.Compile()(e2))); } public static CepStream<TResult> FoldPairs<TStream, TResult>( CepStream<TStream> input, Expression<Func<TStream, TStream, bool>> predicate, TimeSpan timeout, Expression<Func<TStream, TStream, TResult>> resultSelector) { var signal = input .AlterEventDuration(e => timeout) .ClipEventDuration(input, (f, s) => predicate.Compile()(f, s)); return from l in signal.ShiftEventTime(e => TimeSpan.FromTicks(1)) from r in input where predicate.Compile()(l, r) select resultSelector.Compile()(l, r); }
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
20 Aralık 2011 Salı 11:10
Hi DeviBiker,
You said:
"What happens if you enqueue an end without a start? Or a start and never an end? If you want to do checkpointing, your complexity has now increased exponentially."
To answer this, I think I need to understand the behavior of these event types in StreamInsight. But based on your reply, I think the follwoing si correct:
Input Adapter: Point event type and Output Adapter: Edge event type
I understand the defintions of these event types, but what I am not getting is:
In case of Edge event:
What happens if you enqueue an end without a start? --> this event is not availble for Dequeuing? what is the real problem here?
a start and never an end --> This event is not availble for Dequeuing? what is the real problem here?
I am bit confused about the liftime of an Event inside StreamInsight. Can you explain me liftime of these event types inside the StreamInsight?
Point Event --> how long this is availble for Dequeuing? When this event will be removed from StreamInsight?
Edge Event --> how long this is availble for Dequeuing? When this event will be removed from StreamInsight?
Interval Event --> how long this is availble for Dequeuing? When this event will be removed from StreamInsight?
I am running a test here with Point event (for both Input and output adapters) with Qery condtion is:
var filtered = from i in inputStream where i.Value > 100 select i;
My input adapter is sending values, some of them are less than 100 and some of them are more than 100. But all of these events are getting Queued into StreamInsight.
When I Dequeue in my Output Adapter, i am getting only events which are > 100. And once I Dequeu these events, I am remving themf rom Queue. But there are other 1000's of events whose values are less than 100 and getting accumulated in the StreamInsight engine. I am wondering when these evetns will be removed from the StreamInsight..
Another question/clarification: When standing Queries get executed? Based on my observation, these Queries get executed only when I try to Dequeue the event. Otherwise if these queries execute before (as per your earlier reply: StreamInsight Scheduler executes these queries) where does it stores results? When I looked at MS Event Flow Debugger, I can see only the events which are Queued using Input Adapter. Where is results of the Query exectuion? who stores these results and when and where? Can you please clarify this?
I yet to understand your example on: "stroing only change events", may be if I get answers in this post I will understand your sample better. Looks like you have some magic way, but as I mentioned here, when my InputAdapter Enqueues the evetns, it directly stroes in StreamInsight, no Queries are running at this point. So all 18000 events (pls see in the first post here) gets stored in StreamInsight, so wondering how the Queries can help to store only change events?
Or you mean, with your example: if I Dequeue, I get only changed events (And StreamIsnght still have all events)? (that means how long I have to store the previous events/data for comparison?)
Thanks.
Venkat -
20 Aralık 2011 Salı 13:37
First ... whether you dequeue or not, once Query.Start() is called, the queries run and execute. Results are queued up for the output adapter to get. But it is always processed. Try this: start a query without an output adapter defined. Attach to the query and record it from the event flow debugger. Guess what? You still have events. Take it up a step ... in your query (with no output adapter), add a simple UDF. In your UDF, add a Debug.WriteLine() to write every time it gets called by the query engine. Guess what? It still gets called.
With edge input adapters/events, you cannot enqueue an end without a corresponding start. You'll get an exception. So that's problem #1. Enqueueing an start that never ends could well extend the validity of an event far longer than it should be valid within the application timeline. This would mean that results from joins and unions that use that event would not have the proper results. See http://msdn.microsoft.com/en-us/library/ee391157.aspx for a simple diagram that shows how the timeline affects joins.
And don't think of events as "available for dequeueing" ... it's "available for queries". Events have a start and end time that represent their validity within the application's timeline. CTIs move the application along this timeline. Events are removed from StreamInsight when they are no longer valid in the timeline as determined by their start and end times. You have methods like AlterEventDuration() and AlterEventLifetime to manipulate the start and end times, thereby controlling their validity in the timeline. So no, not all 18K events are "stored" in StreamInsight. Only the current valid events are in memory.
The event flow debugger does show you the results of your query execution but don't think of it as similar to, say, SSMS query window. It's not. Query results are continuously processed; your output (result) events will be in the last operator in the event flow debugger view after recording the query. It's not, however, a query/result request/response process ... it's always running. You seem to be trying to fit StreamInsight queries into the paradigm that developers are used to and what we've been doing for ages ... you really need to step back from that paradigm because StreamInsight is inherently different.
And the sample has no "magic way" ... it simply works with the temporal model of StreamInsight to a) determine change between previous and current event then b) filter only those events that have changed and, finally c) convert the changed events to a "signal" that starts with the first and ends when the next change event happens. The result of the query is a consistent timeline of start/end temperatures that would then be stored in a database, CSV or other durable storage.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.- Yanıt Olarak İşaretleyen Roman SchindlauerMicrosoft Employee 29 Aralık 2011 Perşembe 22:30
-
21 Aralık 2011 Çarşamba 14:30
you said: "determine change between previous and current event"
But my Input adapter events are Point events. When event comes, it will immediately go through the standing query. So how does the Standing query get the opprtunity to look back the previous event and compare?
Input adapter read the sensor value and CreateInsertEvent and then apply pay load for it. This happens whenever InputAdapter reads the sensor value. and then this event Queued to StreamInsight.
And this event goes through the Standing query (by this time, asume that next Event is not Queued yet), how does Standing Query gets previous value? because the previous event is alreday processed...
I am confused bit here...
Venkat -
21 Aralık 2011 Çarşamba 17:02
First ... internally, everything is an Interval. StreamInsight also provides statements and methods to manipulate the temporal properties of events that allow you to do things like comparing current with previous.
Second ... to understand how you "look back", you need to look at the query macro "Fold Pairs". Here is the macro included in the above with comments added.
public static CepStream<TResult> FoldPairs<TStream, TResult>( CepStream<TStream> input, Expression<Func<TStream, TStream, bool>> predicate, TimeSpan timeout, Expression<Func<TStream, TStream, TResult>> resultSelector) { //Take the input stream an convert to a signal. var signal = input //Extend the events by a timeout. This creates overlap .AlterEventDuration(e => timeout) //Clip the events by the next event's start time. //This creates a series of intervals that are end-to-end //and eliminates overlap between the events. .ClipEventDuration(input, (f, s) => predicate.Compile()(f, s)); //Our signal stream is end-to-end events. //By shifting this stream's start and end times forward 1 tick //we create overlap (of 1 tick) between the current (input) stream //and the signal stream. This allows us to compare the previous //value (l) with the current value (r) return from l in signal.ShiftEventTime(e => TimeSpan.FromTicks(1)) from r in input where predicate.Compile()(l, r) select resultSelector.Compile()(l, r); }
Have you downloaded and installed LinqPad? If not, I strongly suggest that you do so and install the StreamInsight samples and driver. LinqPad is an essential tool for understanding and prototyping StreamInsight queries ... and the samples of a TON of common query patterns.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.- Yanıt Olarak İşaretleyen VenkatABC 23 Aralık 2011 Cuma 14:41
- Yanıt İşaretini Geri Alan VenkatABC 23 Aralık 2011 Cuma 15:25
- Yanıt Olarak İşaretleyen Roman SchindlauerMicrosoft Employee 29 Aralık 2011 Perşembe 22:31