How to simulate queries on Edge type generation using linqPad?

Answered How to simulate queries on Edge type generation using linqPad?

  • martes, 03 de abril de 2012 14:03
     
      Tiene código

    Hello,

    I am using linqPad in order to manage a specific use case but i am not sur if i am doing it right. Actually i haven't found a good linqpad example simulating Edge type generation.

    To summarize,  i want to filter events under a specific threshold according to a specific mode. For example, if a previous status event was sent  with value "ecoMode", then the threshold is 12 and if a previous status event was sent with the value "normaltMode", then the threshold is 20.

    So i made a first sample using the interval type:

    void Main()
    {
    	Func<int, DateTimeOffset> t = 
    	(s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddMinutes(s);
    		
    	var values = new []
    	{
    		//Business data
    		new {Timestamp=0,Type = 1,Value = 3},
    		new {Timestamp=2,Type = 1,Value = 13,},
    		new {Timestamp=3,Type = 1,Value = 22,},
    		new {Timestamp=5,Type = 1,Value = 10,},
    		new {Timestamp=6,Type = 1,Value = 14,},
    		new {Timestamp=7,Type = 1,Value = 16,},
    		new {Timestamp=9,Type = 1,Value = 26,},
    		new {Timestamp=11,Type = 1,Value = 7,},
    	};
    	
    	//Status data (2 modes: start and eco)
    	//	  start   eco      start
    	//    |___||________||_______|
    	//---------------------------
    	//0   3   6         15     20
    	var reference = new []
    	{
    		new {Timestamp=3, TimestampEnd=6,Type = 2,Value = "normalMode",}, 
    		new {Timestamp=6, TimestampEnd = 15,Type = 2, Value = "ecoMode",},
    		new {Timestamp=15, TimestampEnd = 20,Type=2, Value = "normalMode",},
    	};
    	
    	// Create the data stream
    	var dataStream = values.ToPointStream(Application, 
    		e=> PointEvent.CreateInsert(t(e.Timestamp), e), AdvanceTimeSettings.IncreasingStartTime, "dataStream");
    
    	// import the CTI so the result will be based on the dataStream CTI
    	var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);
    	
    	// Create the reference stream (interval version)
    	var referenceStream = reference.ToIntervalStream(Application, 
    		e=> IntervalEvent.CreateInsert(t(e.Timestamp), t(e.TimestampEnd), e), ats, "referenceStream");
    
    	// get the events such as the value is superior to 12 in eco mode and > 20 in start mode
    	var overConsumption = from d in dataStream
    							from r in referenceStream
    							where (r.Value == "ecoMode" && d.Value > 12) || (r.Value == "normalMode" && d.Value > 20)
    							select d; 
    							
    	overConsumption.ToPointEnumerable().Dump("Overconsumption"); 
    }

    And i have the following result wich is what i expected:


    But in real-world, i don't know exactly when the mode will switch on normal or eco. So the Edge type seems to fit exactly here. But i don't know how to simulate it throught linqpad, when should i create the end or the start event? So i tried to do that by merging two streams but i don't think that this is the good way to do it:

    void Main()
    {
    	Func<int, DateTimeOffset> t = 
    	(s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddMinutes(s);
    		
    	var values = new []
    	{
    		//Business data
    		new {Timestamp=0,Type = 1,Value = 3},
    		new {Timestamp=2,Type = 1,Value = 13,},
    		new {Timestamp=3,Type = 1,Value = 22,},
    		new {Timestamp=5,Type = 1,Value = 10,},
    		new {Timestamp=6,Type = 1,Value = 14,},
    		new {Timestamp=7,Type = 1,Value = 16,},
    		new {Timestamp=9,Type = 1,Value = 26,},
    		new {Timestamp=11,Type = 1,Value = 7,},
    	};
    	
    	//Status data (2 modes: start and eco)
    	//	  start   eco      start
    	//    |___||________||_______|
    	//---------------------------
    	//0   3   6         15     20
    	var reference = new []
    	{
    		new {Timestamp=3, TimestampEnd=6,Type = 2,Value = "normalMode",}, 
    		new {Timestamp=6, TimestampEnd = 15,Type = 2, Value = "ecoMode",},
    		new {Timestamp=15, TimestampEnd = 20,Type=2, Value = "normalMode",},
    	};
    	
    	// Create the data stream
    	var dataStream = values.ToPointStream(Application, 
    		e=> PointEvent.CreateInsert(t(e.Timestamp), e), AdvanceTimeSettings.IncreasingStartTime, "dataStream");
    
    	// import the CTI so the result will be based on the dataStream CTI
    	var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);
    	
    //	// Create the reference stream (interval version)
    //	var referenceStream = reference.ToIntervalStream(Application, 
    //		e=> IntervalEvent.CreateInsert(t(e.Timestamp), t(e.TimestampEnd), e), ats, "referenceStream");
    
    	// Create the reference stream for start events (edge version)
    	var referenceStream = reference.ToEdgeStream(Application, 
    		e => EdgeEvent.CreateStart(t(e.Timestamp), e), ats, "referenceStream");
    		
    	// Create the reference stream for end events (edge version)
    	var referenceStreamEnd = reference.ToEdgeStream(Application, 
    		e => EdgeEvent.CreateEnd(t(e.Timestamp), t(e.TimestampEnd), e), ats, "referenceStreamEnd");
    		
    	// Create one stream with both start and end events (edge version)	
    	referenceStream.Union(referenceStreamEnd);
    
    	// get the events such as the value is superior to 12 in eco mode and > 20 in start mode
    	var overConsumption = from d in dataStream
    							from r in referenceStream
    							where (r.Value == "ecoMode" && d.Value > 12) || (r.Value == "normalMode" && d.Value > 20)
    							select d; 
    							
    	overConsumption.ToPointEnumerable().Dump("Overconsumption"); 
    }
    And here is the result:

    So my current questions are:

    -Is the edge type really the good way to do this?

    -How to create the reference stream to simulate edge type with linqpad?

    -Why do i have the last event twice?

    And finally, to use the edge type in a real application, i have to create a start edge event in the input adapter, and next i will have to create two events, an end event to "close" the first one and a new start event, am i right?

    Thank you for your help and sorry for the long topic.

Todas las respuestas

  • martes, 03 de abril de 2012 14:34
     
     Respuesta propuesta

    First, yes this is a good candidate for an Edge event. It is, in fact, a perfect use case for edges ... you get a start when it goes our of range and an end when it comes back in range.

    First, when you dumping your overConsumption, you are using ToPointEnumerable. That won't give you edges, no matter how hard to try. ;-)

    As for what you are doing, check out my blog entry on turning alarms into edge events. It does exactly what you need.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    • Propuesto como respuesta DevBikerMVP martes, 03 de abril de 2012 14:35
    •  
  • miércoles, 04 de abril de 2012 9:39
     
      Tiene código

    Actually i used ToPointEnumerable because i want the outputadapter send me a point result each time a data goes out of range according to the current status (eco or normal). I don't care about when it goes back in range. So i use the Edge type to correlate data (point) with the current status (edge) and i get as result a point event.

    I switched to ToEdgeEnumerable to analyze the behavior but i got something that i didn't expect:

    For example, for the first "end" EdgeType , i noticed that the StartTime and EndTime are actually the same (08:03:00),  but i expected to have an EndTime set to 08:06:00 according to the following code:

    	// Create the reference stream for end events (edge version)
    	var referenceStreamEnd = reference.ToEdgeStream(Application, 
    		e => EdgeEvent.CreateEnd(t(e.Timestamp), t(e.TimestampEnd), e), ats, "referenceStreamEnd");

    So why the EndTime is not set correctly, did i miss something?

    And i have an other issue with lindpad, when i run the code i sometimes get differents results (like 2 additional rows or 0 row in total for example). Is it possible to have a different behavior with the same code trunning?

  • miércoles, 04 de abril de 2012 11:05
     
     Respondida Tiene código

    The ToEdgeEnumerable() gave you what you saw because the events are natural/native points and this is how a point is expressed when it is then coerced to an edge.

    It may help to think of events in StreamInsight as intervals, with points and edges being simply special cases of an interval. That's not exactly how the engine works but it can help to think of it that way, especially when doing joins. The resulting event that goes to the output adapter and it's shape is simply an expression of that "interval" as whatever shape you specify.

    Now ... as for why you are getting points as your output ... StreamInsight joins happen in time and the resulting event's temporal properties are based on the overlap between events. When you are joining any event with a point (think .. an interval with an event duration of 1 tick), the resulting event will have a duration of 1 tick ... a point. When expressed as an edge, this gives you a Start and End edge that are 1 tick apart. Also, each event that comes through that matches your where clause will generate a new output event.

    I have seen some strange behavior when working with LinqPad to model StreamInsight queries, particularly when importing the CTIs from another stream. I suspect that this has to do with a timing issue when calling To[Shape]Stream(). You shouldn't look at LinqPad as the final test for the correctness of a query ... it is a tool to help you model them and you do need to understand that a couple of things may be a little different when running under the engine. Part of the challenge is modelling the scenario correctly ... which leads me to the question of how are your status changes (eco vs normal) actually being enqueued? Typically, there aren't many real-world scenarios where sensors provide a natural edge ... you have to alter the events to get that edge event. For example, if your status changes are actually going to be enqueued as points, then you need to "convert" them to edges.

    Take a look at the following:

    var referenceSource = reference.ToPointStream(Application, 
    	e=> PointEvent.CreateInsert(t(e.Timestamp), e), ats, "referenceStream");
    
    var referenceStream = referenceSource.AlterEventDuration(e => TimeSpan.MaxValue).ClipEventDuration(referenceSource, (e1, e2) => true); 

    In this case, we're getting the source of the status changes as point (input ... new state). We are then altering the event duration to the max value and then clipping it. This provides us with an event that starts at the original point's enqueue time and then an end time when the next status change event comes in from the adapter ... effectively converting that point to an edge.

    You can see they by dumping the new referenceStream. However, in order to do that, you need to set the AdvanceTimeSettings to IncreasingStartTime as it won't join with the data stream (where the CTIs are coming from) until later. You do need to do a join with the CTI source stream before any ToQuery() operation ... and To[Shape]Enumerable() is, under the hood, a ToQuery() operation.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • miércoles, 04 de abril de 2012 12:23
     
     

    ok, i understand how you create artificially the edge into the referenceStream. And i also undestand why i had an exception in linqpad when i simulated it by altering the event duration: because of the imported stream settings...

    So I get the result i expect now.

    Now the next part, i will try to implement it in the real application and make it work :)

    Thank you very much and sorry to bother you with my questions, hope it will help someone else!