Retract events not updating End Timestamp on Edge events
-
miércoles, 14 de septiembre de 2011 15:22
Hi folks,
I'm stumped by the StreamInsight engine not translating my Edge End events into valid end timestamps. Here's where I'm at currently:
1. I've ensured that Start and End events have the same start timestamps and payload (verified in debugger - insert and retract start times are identical, as well as payload values).
2. All timestamps are assigned as DateTimeOffsets
3. Verified no CTI violations or errors stating matching edge start events were not found for the edge end events
Here's an example of the debug output I see from the untyped edge input adapter when I attach WinDbg to the StreamInsightHost process on the server:
INSERTED EDGE Start: Dev2 TestAppActive [09/13/2011 22:39:45 +00:00] [01/01/0001 00:00:00 -08:00]
INSERTED EDGE End: Dev2 TestAppActive [09/13/2011 22:39:45 +00:00] [09/13/2011 22:39:47 +00:00]Notice that the end timestamp is valid (2 seconds after the start time)
However, when I look at a recording in the Event Flow Debugger, the retract event corresponding to this edge does not updated the end timestamp from plus infinity:
Insert 2011-09-13 15:39:45.3564822 +∞ Dev2 TestAppActive
Retract 2011-09-13 15:39:45.3564822 +∞ Dev2 TestAppActive
Notice that the Retract's end time is also +∞, which is wrong.
Another thing I noticed is that the Retract events remain throughout the query pipeline in the debugger, which is unexpected. I expected that they are used to adjust the end times on the intervals and are then discarded.
If anyone can help me figure out how to fix this situation so that I get correct edge times going into my queries, that would be greatly appreciated!!
--Frederick
--Frederick
Todas las respuestas
-
viernes, 23 de septiembre de 2011 5:17Moderador
Hi Frederick,
Could you please post some sample code for investigating?
Best Regards,
Stephanie Lv
Forum Support
Please remember to mark the replies as answers if they help and unmark them if they provide no help. If you have feedback for TechNet Subscriber Support, contact tnmff@microsoft.com. -
martes, 27 de septiembre de 2011 6:03
Frederick,
Where exactly do you see the end edge events with the infinite end times in the debugger? Can you post you queries? You should see your edge events as enqueued at the beginning of the operator tree in the debugger.
Roman
MS StreamInsight Team
Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights.- Marcado como respuesta Roman SchindlauerMicrosoft Employee miércoles, 05 de octubre de 2011 5:07
-
jueves, 05 de abril de 2012 10:07
Hello,
I have exactly the same problem. The query i use is quite simple in a first time, it is a basic select from the edge inputstream. And here is the code of my input adapter to produced edges:
private void ProduceEvents() { // if the engine asked the adapter to stop if (AdapterState == AdapterState.Stopping) { // cleanup state this.Cleanup(); Stopped(); // exit the worker thread return; } if (deserializedMessage != null) { // produce events only for info message (colletedData with eventType == 2 if (deserializedMessage is M3XmlAdapter.Messages.CollectedData.CollectedData) { M3XmlAdapter.Messages.CollectedData.CollectedData message = deserializedMessage as M3XmlAdapter.Messages.CollectedData.CollectedData; if (message.Data.EventType != 2) return; } // newEvent will be used to build the current event and stored as the lasteEvent later EdgeEvent<M3BusinessDataInput> newEvent = null; // Did we have a previous state? // If so, then we need to send the end edge for a previous start edge. if (this.lastEvent != null) { // End edge with equivalent value to the previous start edge newEvent = CreateInsertEvent(EdgeType.End); currentEvent = CreateEventFromRow(ref newEvent); // Try to enqueue. If it fails, we will just ignore the current // event and try to send the proper end edge the next time. if (EnqueueOperationResult.Full == Enqueue(ref currentEvent)) { ReleaseEvent(ref currentEvent); Ready(); return; } // We don't need this state anymore. lastEvent = null; } // Now enqueue the new data as a start edge. newEvent = CreateInsertEvent(EdgeType.Start); if (null == newEvent) { // Throw away the current event. At this point, there is no // previous event stored for this data source, so we can try // a new start edge next time. return; } currentEvent = CreateEventFromRow(ref newEvent); if (EnqueueOperationResult.Full == Enqueue(ref currentEvent)) { ReleaseEvent(ref currentEvent); Ready(); // Throw away the current event. At this point, there is no // previous event stored for this data source, so we can try // a new start edge next time. return; } // If we arrived here, we did enqueue a start edge. Now we need // to remember it for the next round, when the corresponding end // edge will be enqueued. lastEvent = newEvent; } } private EdgeEvent<M3BusinessDataInput> CreateEventFromRow(ref EdgeEvent<M3BusinessDataInput> newEvent) { EdgeEvent<M3BusinessDataInput> validEvent = null; if (newEvent != null) { CultureInfo culture = new CultureInfo(this.config.CultureName); if (deserializedMessage is M3XmlAdapter.Messages.CollectedData.CollectedData) { M3XmlAdapter.Messages.CollectedData.CollectedData message = deserializedMessage as M3XmlAdapter.Messages.CollectedData.CollectedData; if (newEvent.EdgeType == EdgeType.Start) { newEvent.Payload = new M3BusinessDataInput(); newEvent.StartTime = DateTime.UtcNow; newEvent.Payload.ALEDH_Acknowledge = null; newEvent.Payload.ALEDH_AlertEvent = message.Data.EventDateTime.StartTime; newEvent.Payload.ALRID_AlertRule = config.AlertRuleId; newEvent.Payload.CPNID_Component = message.Data.DataType.ComponentId; newEvent.Payload.EQMID_Equipment = message.Data.EquipmentId; newEvent.Payload.EVTTID_EventType = message.Data.EventType; newEvent.Payload.EVTVA_Value = Convert.ToString(message.Data.Value); validEvent = CreateInsertEvent(EdgeType.Start); } else { newEvent.StartTime = lastEvent.StartTime; newEvent.Payload = lastEvent.Payload; newEvent.EndTime = DateTime.UtcNow; validEvent = CreateInsertEvent(EdgeType.End); validEvent.EndTime = newEvent.EndTime; } } } validEvent.StartTime = newEvent.StartTime; validEvent.Payload = newEvent.Payload; //return the current event wich will be used by streamInsight return validEvent; }And here the result from the flow debugger:
The endTime is not updated into the flow debugger and is still set to infinite :(
Has anyone solved this issue?
Thank you!
- Editado qerluisdhivdufhwxcs jueves, 05 de abril de 2012 10:13
-
jueves, 05 de abril de 2012 14:30Instead of using an Edge input adapter ... which can potentially leave you with "dangling edges", have you tried simply enqueuing points and then using AlterEventLifetime() + ClipEventDuration() to create the edge events? This will give you what you are looking for and be a heck of a lot easier to follow. I have to admit, in looking at the code above, I'm going a little cross-eyed trying to figure out exactly what's going where so it seems that it violates the KISS principle.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
jueves, 05 de abril de 2012 15:03
Well i agree the code is a little crapy, here is a new simpler code:
private void ProduceEvents() { // if the engine asked the adapter to stop if (AdapterState == AdapterState.Stopping) { // cleanup state this.Cleanup(); Stopped(); // exit the worker thread return; } if (deserializedMessage != null) { // produce events only for info message (colletedData with eventType == 2 if (deserializedMessage is M3XmlAdapter.Messages.CollectedData.CollectedData) { M3XmlAdapter.Messages.CollectedData.CollectedData message = deserializedMessage as M3XmlAdapter.Messages.CollectedData.CollectedData; if (message.Data.EventType != 2) return; } // Did we have a previous state? // If so, then we need to send the end edge for a previous start edge. if (this.startEventStored) { // End edge with equivalent value to the previous start edge CreateEventFromRow(EdgeType.End); } // Now enqueue the new data as a start edg.e CreateEventFromRow(EdgeType.Start); } } private void CreateEventFromRow(EdgeType edgeType) { EdgeEvent<M3BusinessDataInput> newEvent = CreateInsertEvent(edgeType); if (newEvent != null) { M3XmlAdapter.Messages.CollectedData.CollectedData message = deserializedMessage as M3XmlAdapter.Messages.CollectedData.CollectedData; if (newEvent.EdgeType == EdgeType.Start) { newEvent.Payload = new M3BusinessDataInput(); newEvent.StartTime = DateTime.UtcNow; newEvent.Payload.ALEDH_Acknowledge = null; newEvent.Payload.ALEDH_AlertEvent = message.Data.EventDateTime.StartTime; newEvent.Payload.ALRID_AlertRule = config.AlertRuleId; newEvent.Payload.CPNID_Component = message.Data.DataType.ComponentId; newEvent.Payload.EQMID_Equipment = message.Data.EquipmentId; newEvent.Payload.EVTTID_EventType = message.Data.EventType; newEvent.Payload.EVTVA_Value = Convert.ToString(message.Data.Value); // Keep startEvent data this.startEventPayload = newEvent.Payload; this.startEventStartTime = newEvent.StartTime; this.startEventStored = true; } else { newEvent.StartTime = this.startEventStartTime; newEvent.Payload = this.startEventPayload; newEvent.EndTime = DateTime.UtcNow; } // Try to enqueue. If it fails, we will just ignore the current // event and try to send the proper end edge the next time. if (EnqueueOperationResult.Full == Enqueue(ref newEvent)) { ReleaseEvent(ref newEvent); Ready(); } } }But the dangling edges is actually what i am looking for ( or i misunderstood what you wanted to say). At an instant T0, i don't know when the end edge will close the start edge. I find it weird that the edge events still have an infinite time. Anyway i will try the AlterEventLifetime() method.
- Editado qerluisdhivdufhwxcs jueves, 05 de abril de 2012 15:04
-
jueves, 05 de abril de 2012 15:11
By "dangling edge", I meant an edge that has no associated end ... or no associated start. It doesn't look like the latter will be an issue but the former may be.
And yes, this version is much easier to follow. I know it's an interative process ... get it working then clean it up. :-)
Enqueuing your events as points would simplify things still further ... and also prevent you from needing to maintain state. Just enqueue the event as a point when it comes in. By using Alter/Clip (or ToSignal ...), you'll get the same effect that you're looking for, won't need to maintain state and can take any number of id-based events inbound without having to match them with the proper start. It looks, right now, like your adapter would only take one type of event ... but using points and ToSignal would allow you to have one adapter handle events from any number of components, equipment, etc ... depending on what your unique key would be.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
jueves, 05 de abril de 2012 15:57
I tried to use the alter/clip function like i did into my linqpad test as you suggested it:
void Main() { Func<int, DateTimeOffset> t = (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddMinutes(s); //Status data (2 modes: start and eco) // start eco start // |___||________||_______| //--------------------------- //0 3 6 15 20 var reference = new [] { new {Timestamp=3, TimestampEnd=6,Type = 2,Value = "normalMode",}, new {Timestamp=6, TimestampEnd = 15,Type = 2, Value = "ecoMode",}, new {Timestamp=15, TimestampEnd = 20,Type=2, Value = "normalMode",}, }; var referenceSource = reference.ToPointStream(Application, e=> PointEvent.CreateInsert(t(e.Timestamp), e), AdvanceTimeSettings.IncreasingStartTime, "referenceStream"); var referenceStream = referenceSource.AlterEventDuration(e => TimeSpan.MaxValue).ClipEventDuration(referenceSource, (e1, e2) => true); // get the events such as the value is superior to 12 in eco mode and > 20 in start mode var overConsumption = from r in referenceStream select r; overConsumption.ToEdgeEnumerable().Dump("Overconsumption"); }In linqpad i get Edges with correct end time but in my application, the same queries give me infinite end time :(
From points, it becomes edges with infinite time but stay with infinite even after the clip function...
-
jueves, 05 de abril de 2012 22:54
So you are saying that the same StreamInsight code produces different results between a standalone app and LINQPad? Could you also post the code of the standalone app? Note that the clip operator will pass through the unclipped start event and only follow up with a retraction if the end event has been processed. You also see that in the LINQPad result, where start events are output first, and then followed by end events (since you are forcing edge output). If you change that to ToIntervalEnumerable you will only see the intervals.
Thanks,
RomanMS StreamInsight Team
Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights. -
viernes, 06 de abril de 2012 7:22
//code to create input adapter configuration here... ActiveMqInputConfig iConfigPoint = new ActiveMqInputConfig { ... }; var referenceSource = CepStream<M3BusinessDataInput>.Create("referenceSource", typeof(ActiveMqInputFactory), iConfigPoint, EventShape.Point, AdvanceTimeSettings.IncreasingStartTime); //filter by type (keep only status) var statusFilter = from r in referenceSource where (r.EVTTID_EventType == 2) select r; // transform into edges var referenceStream = statusFilter.AlterEventDuration(e => TimeSpan.MaxValue).ClipEventDuration(statusFilter, (e1, e2) => true); // get all events as edge type var overConsumption = from r in referenceStream select r; // code to create output adapter configuration here... MailOutputConfig oConfig = new MailOutputConfig { ... }; var query = overConsumption.ToQuery(M3EventEngineManager.Instance.application, "overConsumption", "overConsumption", typeof(MailOutputFactory), oConfig, EventShape.Point, StreamEventOrder.FullyOrdered); query.Start();I don't see a big difference into the process with what i did into linqpad or maybe i missed something?
edit: well i have found the solution here: Flow debugger tips
- Editado qerluisdhivdufhwxcs viernes, 06 de abril de 2012 8:51

