How to remove events from a reference stream?
-
15 martie 2012 20:06
My colleagues and I are trying to code up a more complex StreamInsight example using LINQPad. We believed we had a good understand of everything, until we came across the ClipEventDuration extension method. Please have a look at the code in this linq file: Location Filtering Sample.
The code works for determining when a package has reached its destination, even if the destination changes. However, what we’d like to have happen is that an even be clipped whenever the following conditions are met:
- A new destination event enters the stream.
- A package has reached its location.
To run through the sample, here is what we have going on.
- The package ships from Philadelphia, headed for San Francisco.
- Along the way, it is re-routed to Detroit (this clips the original destination of San Francisco).
- The package then travels through a few cities, until it finally reaches Detroit. (A destination reached event is created for this)
- The package then travels to San Francisco (to prove that the clip from #2 works as expected).
- Then, the package is sent from San Francisco, head for Philadelphia.
- Again, it travels through a few cities, no re-route occurs, until it finally ends up in Philadelphia. (A second destination reached event is created).
- Then, for one reason or another, it travels to New York, then back to Philadelphia. (Creating a third destination reached event).
Basically, we don't want the third destination event to occur. We'd like the destination reached event from #6 to remove the destination entry from the destinations stream.
Note that we are using StreamInsight v1.2 with the LINQPad Driver for StreamInsight.
Any help or guidance that can be given is most appreciated.
Toate mesajele
-
16 martie 2012 04:07
Now, I think that - if I understand your use case - you want to reduce duplicate destination reached events. So ... with that assumption ... try this on for size:
//Elimimate duplicates var reduceDupsDestinationReached = from current in destinationReachedFilter from previous in destinationReachedFilter .AlterEventDuration(e=> TimeSpan.MaxValue) .ClipEventDuration(destinationReachedFilter, (ev,c) => c.AssetName == ev.AssetName) .ShiftEventTime(e=> TimeSpan.FromTicks(1)) where current.AssetName == previous.AssetName where current.City != previous.City select current; //Reduce dups misses the first event. LASJ to make sure that we have the first also var final = from first in destinationReachedFilter where (from d in reduceDupsDestinationReached .AlterEventDuration(e=> TimeSpan.MaxValue) .ClipEventDuration(destinationReachedFilter, (ev,c) => c.AssetName == ev.AssetName) .ShiftEventTime(e=> TimeSpan.FromTicks(1)) where first.AssetName == d.AssetName select d).IsEmpty() select first;
I do hope, also, that you use some of the common Linq macros (ToSignal, FoldPairs in LinqPad samples) since it'll make the code much easier to read. ;-)
Also, why do you have all the calls to ToPointEventStream()? Your events are enqueued as points and don't have their lifetime altered ... so they are still points. This adds additional operators into the stream that aren't necessary. I commented those lines out when I ran the above query.
var currentStream = currentFilter;//.ToPointEventStream<Location>(); var destinationStream = destinationFilter//.ToPointEventStream<Location>() .AlterEventLifetime(ev => ev.StartTime, ev => TimeSpan.MaxValue) .ClipEventDuration(locationStream, (ev,c) => c.LocationType == 2 && c.AssetName == ev.AssetName);
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.- Editat de DevBikerMVP 16 martie 2012 04:09
-
16 martie 2012 18:43
I was looking at this again because I wasn't quite sure that the query was exactly right - since I wasn't sure how it was working and why it got the correct results. This query first finds duplicates (Asset and City are the same) and then gets only those from the destination filter stream that aren't duplicates via a left anti semi-join. It's only slightly different from the one above but the comments are also more descriptive and, more importantly, correct about what's going on.
//Get duplicates var duplicateEventReached = from current in destinationReachedFilter from previous in destinationReachedFilter .AlterEventDuration(e=> TimeSpan.MaxValue) .ClipEventDuration(destinationReachedFilter, (ev,c) => c.AssetName == ev.AssetName) .ShiftEventTime(e=> TimeSpan.FromTicks(1)) where current.AssetName == previous.AssetName where current.City == previous.City select current; duplicateEventReached.ToPointEnumerable().Where(e=> e.EventKind != EventKind.Cti).Dump("Dups Reduced"); //Found the duplicates. Get only the first events with an LASJ var final = from first in destinationReachedFilter where (from d in duplicateEventReached .AlterEventDuration(e=> TimeSpan.MaxValue) .ClipEventDuration(duplicateEventReached, (ev,c) => c.AssetName == ev.AssetName) where first.AssetName == d.AssetName where first.City == d.City select d).IsEmpty() select first;DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.- Propus ca răspuns de DevBikerMVP 16 martie 2012 18:44
- Marcat ca răspuns de David Mulford 20 martie 2012 17:35
-
19 martie 2012 20:50
Biker,
Thanks for the quick reply! We tried converting the original example we had (duplicated included) into a native Stream Insight application - using QueryBinders, etc. This has introduced a few questions to us that are a little more fundamental than the question we posed previously. Here is what we have.
InputAdapter inputAdapter = application.CreateInputAdapter<Adapters.CsvInputFactory>("CsvInput", "CSV Input Adapter"); OutputAdapter outputAdapter = application.CreateOutputAdapter<CsvOutputFactory>("CsvOutput", "Console Output of All Location Events"); var locationIStream = CepStream<Location>.Create("locationInput"); QueryTemplate locationQT = application.CreateQueryTemplate("locationQT", "All Location events", locationIStream); QueryBinder queryBinder = new QueryBinder(locationQT); // Data input from Locations.csv (found below) queryBinder.BindProducer<Location>("locationInput", inputAdapter, new CsvInputConfig() { Delimiter = new char[] { ',' }, InputFileName = "../../../Locations/Input/Locations.csv", BufferSize = 4096, CtiFrequency = 1, CultureName = "en-US", Fields = new List<string>() { "LocationType", "AssetName", "City" }, NonPayloadFieldCount = 1, StartTimePos = 1, EndTimePos = 1 }, EventShape.Point); // This prints all events to All_Locations.csv (and works) queryBinder.AddConsumer("locationOutput", outputAdapter, new CsvOutputConfig() { Delimiter = new string[] { "," }, OutputFileName = "../../../Locations/Output/All_Locations.csv", CultureName = "en-US" }, EventShape.Edge, StreamEventOrder.FullyOrdered); Query locationQuery = application.CreateQuery("locationQuery", "All Locations", queryBinder); // Build from the input Stream CepStream<Location> LocationOStream = locationQuery.ToStream<Location>("locationQueryStream"); // get all events from location stream where the type is "Destination" var destinationFilter = (from destLoc in LocationOStream where destLoc.LocationType == 2 select destLoc);
var allEventsEdge = LocationOStream.AlterEventDuration(ev => TimeSpan.MaxValue);
// AllEdge.csv is always empty and we think it should contain *ALL* events from the file
var allEventsEdgeQuery = allEventsEdge.ToQuery(application,
"edgeQuery","All events as edge events", typeof(CsvOutputFactory), new CsvOutputConfig() { CultureName = "en-US", Delimiter = new string[] { ", " }, OutputFileName = "../../../Locations/Output/AllEdge.csv" }, EventShape.Edge, StreamEventOrder.FullyOrdered); // Produces results of type Edge, but the start/end times are always the same var destinationQuery = destinationFilter .ToQuery(application, "destinationQuery", "All Destination Location events", typeof(CsvOutputFactory), new CsvOutputConfig() { CultureName = "en-US", Delimiter = new string[] { ", " }, OutputFileName = "../../../Locations/Output/Destination_Locations.csv" }, EventShape.Edge, StreamEventOrder.FullyOrdered); // Start queries here...
Here is the data we're using
03/15/2012 9:00:00 AM,1,Package A,Philadelphia 03/15/2012 09:01:00 AM,2,Package A,San Francisco 03/15/2012 09:02:00 AM,2,Package A,Detroit 03/15/2012 10:01:00 AM,3,Package A,West Chester 03/15/2012 4:00:00 PM,3,Package A,Detroit 03/15/2012 5:00:00 PM,3,Package A,San Francisco 03/16/2012 5:00:00 AM,1,Package A,San Francisco 03/16/2012 05:01:00 AM,2,Package A,Philadelphia 03/16/2012 05:02:00 AM,3,Package A,Dallas 03/16/2012 05:03:00 AM,3,Package A,Memphis 03/16/2012 05:04:00 AM,3,Package A,Philadelphia 03/16/2012 05:05:00 AM,3,Package A,New York 03/16/2012 05:06:00 AM,3,Package A,Philadelphia
So our questions are, when we look at the output of the input stream, everything looks good - point events, as expected. However, in the destination output stream, and it contains edge data; however, the start and end times for each insert event are the same. Does this basically create a bunch of PointEvents? In which case, how do we properly modify the event lifetime to make these true edge events?
Also, the AllEdge.csv file is completely empty. Since it is basically a copy of the input stream (with only the event lifetime altered), should it not contain all of the events?
Once we completely understand this much simpler scenario, we absolutely would like to get back to our original question of removing duplicates.
Thanks for all the help!
-
19 martie 2012 23:11
What do you expect the edges to look like?
The scenario that I imagine with this would have a start edge at the first destination then an end edge when it reaches the second, which is a corresponding start ... and so on and so on.
The problem with this, however, is that you'd have a "hanging" start edge at the end of the line ... and that can produce less-than-desirable results. Actually, it'll produce some pretty bad results long-term.
So ... since I may be missing something here, what would you expect the edges to look like?
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
20 martie 2012 02:50
We expect the edges to look like the following image. As a destination event enters the location stream, it is converted from a PointEvent to an EdgeEvent and replaces the Asset's current destination EdgeEvent, if one exists.
To simply the scenario, all events are coming from the same asset. So, each time a destination event enters the stream, it would replace the currently existing destination event. Is our understanding of EdgeEvents correct; or is there something we're missing?
In the example above, suppose L3 is the asset telling us it's in Detroit, and L6 is telling us that the asset has entered Philadelphia. Ideally, L3 and L6 would generate a PointEvent in a third stream - the destinations reached stream. They would also remove / end the EdgeEvents in the destination stream for Detroit and Philadelphia, respectively.
Once again, thank you for the help. We are really trying to understand what's going on here.
- Editat de David Mulford 20 martie 2012 02:55
-
20 martie 2012 12:39
Getting your head around the temporal aspect of StreamInsight queries takes some time ... don't worry about it.
But I have to go back to one of my previous questions ... what would the end be for the final destination event? The start is easy enough - it's the end that worries me.
By using the ToSignal() pattern (Alter/Clip), you'll get nice, neat edges for the destinations except the final one. IF you take the previous changes that I posted here and add the following in, you'll see what I mean:
var signalEdges = final.AlterEventDuration(e=> TimeSpan.MaxValue).ClipEventDuration(final, (e1, e2) => e1.AssetName == e2.AssetName); signalEdges.ToEdgeEnumerable().Where(e => e.EventKind == EventKind.Insert).Dump("Edges");
What winds up happening is that the first destination reached event (Detroit) is fine ... start and end time as you would expect. But the final destination has no end time ... it never gets clipped and won't get clipped. Make sense?DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
20 martie 2012 13:09
Agreed that StreamInsight queries are going to take some time. It's just a different way of thinking!
To answer your question, as of the the example we were writing, there would be no handling of the final destination events. We now understand why this is a problem, and have a working solution to that, as well. The solution is that we also clip the destination events where there are any location events that would mark the destination as "reached". See the code below.
var destinationLocations = (from e in inputStream where e.LocationType == 2 select e) .ToSignal<Location>((cur, dest) => cur.AssetName == dest.AssetName) // convert everything to edge events .ClipEventDuration(currentLocations, (dest, cur) => ((dest.City == cur.City) && (dest.AssetName == cur.AssetName))) // clip all destinations that have been reached .ShiftEventTime(ev => TimeSpan.FromTicks(1)); // shift forward 1 tick because the end time (from the clip) is being set to the start time of the next, minus 1 tick
We wrote the ToSignal, as per your suggestion, and it works beautifully (and looks a lot nicer).
Thanks a lot for your help in wrapping our heads around this. This post alone has definitely been a learning experience for us!
-
20 martie 2012 14:43
Glad to help.
So ... your final destination ... what does the edge look like? How is it marked as destination? I didn't see that in your queries but, then again, I may well have missed it. But ... as long as you have a solution for it, that's good. I will, howeve,r just make sure to reiterate that you want to make sure that you have an end for your edges ... never have one with just a start. The dangling edges will continue to stay in memory and will, sooner or later (depending on volume) start chewing up your RAM.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
20 martie 2012 16:40
The second call to ClipEventDuration takes care of the final destination event. Once the asset reaches its destination, the even is clipped. Now, if the asset never reaches its destination, we most likely do have a problem.
Here is the output (CTI's removed) of the destinationLocations stream I posted above.
Type Start Time End Time Asset Name City Location Type Start 3/15/2012 9:01:00 AM +00:00 No end time Package A San Francisco 2 End 3/15/2012 9:01:00 AM +00:00 3/15/2012 9:02:00 AM +00:00 Package A San Francisco 2 Start 3/15/2012 9:02:00 AM +00:00 No end time Package A Detroit 2 End 3/15/2012 9:02:00 AM +00:00 3/15/2012 4:00:00 PM +00:00 Package A Detroit 2 Start 3/16/2012 5:01:00 AM +00:00 No end time Package A Philadelphia 2 End 3/16/2012 5:01:00 AM +00:00 3/16/2012 5:04:00 AM +00:00 Package A Philadelphia 2 Note that there is no dangling Start event, as soon as the location event for Philadelphia comes through. As far handling the case when an event doesn't reach its destination, I'm not sure how we should go about tackling that one. Are there any best practices you could point us to?
Thanks!
-
20 martie 2012 17:29
Well, if it never reaches the end destination, that, in itself, is an event and may be interesting in itself. You could, for example, clip the stream of "open" (non-arrived) packages and clip to a particular timeout/EOL, which would give you the packages still en route.
Now ... question ... how much time to you anticipate these events lasting? You may still wind up with some hanging edges if you have to reboot in the middle of the process. Have you given that any thought?
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful. -
20 martie 2012 18:05
We'll definitely want to clip the stream of non-arrived packages. We do intend to see events lasting up to 3 months travel time, as a maximum. At that point, we'd ideally have the event end, and have a new PointEvent generated saying "what happened to Package A?" just as an alert sort of thing.
Given the 3 month event duration, I can definitely see instances where rebooting will come into play. Are you referring to the resiliency features that StreamInsight has? If so, we have not gotten that far, and intend to do so.
-
20 martie 2012 18:29That's certainly an option. There are a couple of challenges with it though. First, you'll need to write your adapter to do whatever is appropriate to "catch up" events that happened during reboot/shutdown/etc. Second, the current version doesn't support checkpointing queries/streams created via dynamic query composition. This may or may not be an issue, depending on several variables in your use cases, but it is a serious limitation.
DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14
If I answered your question, please mark as answer.
If my post was helpful, please mark as helpful.