none
Get sum of payload values over a variable length tumbling or snapshot window

    Question

  • Hi, I need to summarise the values in my event payload when the event overlap with interval events of varying length:

    |----interval 1-----|---interval 2-------|------...

    |1|    |2|   |3|       |4|        |5|    |6|   |7| ...  


     - I need the sum of a payload fields from 1,2 & 3 and 4,5, & 6.
     - The length of interval 1 and interval 2 changes
     - There are 1 or more events occurring during each interval 

    Tried creating a tumbling window over each interval event but cant set the length dynamically, so looked at snapshot windows but these generate windows on each change of points(1,2,3) during the interval i'm trying to summarise.

    How can I create a variable length tumbling window (this would allow me to easily summarise the points contained in the window) or alter the snapshot to capture all the events in an interval at once instead of on each change?

    Thanks in advance.

     

    Monday, September 30, 2013 4:12 PM

All replies

  • You are correct that tumbling windows are not dynamic.

    Of the top of my head, you need to determine which events should be summed by grouping them. You can do this by joining the interval stream and the point stream. Then I would project the result into a new payload type that has some kind of group identifier on it (timestamp or unique identifier from the interval event maybe). This new stream/streamable would be fed into an User-Defined Stream Operator (UDSO). This UDSO would then sum up the values from each payload with the same group identifier. Once the group identifier changes, you would yield the result in whatever payload you need and reset the sum in the UDSO to zero.

    • Edited by TXPower125 Monday, September 30, 2013 7:18 PM Formatting
    Monday, September 30, 2013 7:17 PM
  • Thanks for the reply, will try what you suggest. With a UDSO won't the event returned have the timestamp of the last event processed by the UDSO? I'm trying to get the summary of a value for all the events inside one of the intervals, with the start and end times of the interval. sounds simple but I've really struggled with this one.

    Monday, September 30, 2013 8:24 PM
  • What is the use case that you are trying to solve here? There may be other ways to do it but if we don't have the use case - just the proposed solution - we can't give you something else that might work.

    As for the snapshot windows, to get multiple points in the window, you would alter their event duration, not the snapshot window. Then, each event that is still current will be in the snapshot window.

    See this example, where we have a snapshot window that includes all events from the previous 5 minutes:

        // Create some source data - note the time gap between 4:16 and 4:30
        var sourceData = Application.DefineEnumerable(() => new []
        {
            new { Status = 1, TimeStamp = new DateTime(2009, 10, 23, 4, 12, 0) },
            new { Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 13, 0) },
            new { Status = 1, TimeStamp = new DateTime(2009, 10, 23, 4, 15, 0) },
            new { Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 19, 0) },
            new { Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 20, 0) },
            new { Status = 1, TimeStamp = new DateTime(2009, 10, 23, 4, 21, 0) },
            new { Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 23, 0) },
        });
        
        var source = sourceData.ToPointStreamable(
            ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), ev),
            AdvanceTimeSettings.StrictlyIncreasingStartTime);
        
    	var snapshotWindow = from s in source.AlterEventDuration(ef => TimeSpan.FromMinutes(5)).SnapshotWindow()
    						select s.Count();
    						
    	snapshotWindow.ToIntervalEnumerable().Dump(); 


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Tuesday, October 01, 2013 12:53 AM
    Moderator
  • I agree with DevBiker that some more clarification about what you are trying to accomplish would make it easier to give you better a better answer.

    From MSDN (User-Defined Stream Operators):
    In contrast, an output event is simply an event payload. There is no opportunity to timestamp output events or inject CTIs into the output stream. Output events are generated as point events, with timestamps that are based on the timestamps of the corresponding input events.

    Wednesday, October 02, 2013 2:40 PM

  • Having stared at this for ages I don't know if I'm looking at this in the right way - realise my questionis a bit vague too - sorry.

    I have an input that has a time stamp, status, duration of the status (minutes) (and some other data, eg distance, not included for brevity) as a payload.

    Some statuses need to be concatenated into longer events (used the fold pairs macro for this) but i need totals from the events that were concatenated together to create the longer event included in the payload of the new longer concatenated event.  

    I tried the UDSO to sum up a value from the payload when the status doesn't change (see linqpad example below) but cant see how to combine this with the status 'signal' so i have a signal that has the values from the UDSO totals and changeSignal streams combined. 

    I plan on querying the changesSignal stream over different windowed periods to get the totals of different statuses and their duration within the window.
    eg:  what was the total time spent at status X each day?,
    what was the sum of [some payload value] while in status X today?
    was status X longer than X hours?

    Perhaps there is a more succinct way to do this? Feels like allot of effort to get the durations of events from a temporal processing engine.

    void Main()
    {
    	    var sourceData = Application.DefineEnumerable(() => new []
    	    {
    		new Event {Id=1, Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 0, 0), Value = 4},
    		new Event {Id=1, Status = 3, TimeStamp = new DateTime(2009, 10, 23, 4, 4, 0), Value = 2},
    		new Event {Id=1, Status = 2, TimeStamp = new DateTime(2009, 10, 23, 4, 6, 0), Value = 6},
    	        new Event {Id=1, Status = 3, TimeStamp = new DateTime(2009, 10, 23, 4, 12, 0), Value = 1},
    	        new Event {Id=1, Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 13, 0), Value = 2},
    	        new Event {Id=1, Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 15, 0), Value = 2},
    	        new Event {Id=1, Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 19, 0), Value = 4},
    	        new Event {Id=1, Status = 3, TimeStamp = new DateTime(2009, 10, 23, 4, 20, 0), Value = 1},
    	        new Event {Id=1, Status = 2, TimeStamp = new DateTime(2009, 10, 23, 4, 21, 0), Value = 1},
    	        new Event {Id=1, Status = 0, TimeStamp = new DateTime(2009, 10, 23, 4, 23, 0), Value = 2},
    	    });
    	   
    	    var source = sourceData.ToPointStreamable(
    	        ev => PointEvent.CreateInsert<Event>(ev.TimeStamp.ToLocalTime(), ev),
    	        AdvanceTimeSettings.StrictlyIncreasingStartTime);
    	    
    		//collapse consecutive statuses to one point per change of status (2&3 are equivalent)
    		var changes = FoldPairs(source, (a, b) => a.Id == b.Id, TimeSpan.MaxValue,
    								(a, b) => new Event
    								{
    									Id = b.Id, 
    									TimeStamp = b.TimeStamp,
    									Status = b.Status,
    									Changed = !((b.Status == 2 && a.Status == 3) || (b.Status == 3 && a.Status == 2) || (b.Status == a.Status)),
    									Value = 0
    								});
    		
    		//just the events where a change has occured 
    		changes = changes.Where(e=>e.Changed);
    		
    		//convert to interval covering whole of status period							
    		var changesSignal = changes
    				.AlterEventDuration(e => TimeSpan.MaxValue)
    				.ClipEventDuration(changes, (e1, e2) => true);
    
    		changesSignal.ToIntervalEnumerable().Where(e => e.EventKind == EventKind.Insert).Dump("changes as signal"); 							
    		
    		
    		
    		//get summary values from source stream - sum of values in payload when statuses are the same 
    		var totals = source.Scan(()=> new StatusTotals());
    		totals.ToIntervalEnumerable().Where(e => e.EventKind == EventKind.Insert).Dump("totals");
    		
    		// ? Combine the totals and changes so we have the summary value
    		// ? from the totals included in the payload of the the changesSignal
    		
    
    }
    public class Event
    {
    	public int Id;
    	public int Status;
    	public DateTime TimeStamp;
    	public int Value;
    	public bool Changed;
    }
    
    public static IQStreamable<TResult> FoldPairs<TStream, TResult>(
    	IQStreamable<TStream> input,
    	Expression<Func<TStream, TStream, bool>> predicate,
    	TimeSpan timeout,
    	Expression<Func<TStream, TStream, TResult>> resultSelector)
    {
    	var signal = input
    					.AlterEventDuration(e => timeout)
    					.ClipEventDuration(input, (f, s) => predicate.Compile()(f, s));
    
    	return from l in signal.ShiftEventTime(e => TimeSpan.FromTicks(1))
    		   from r in input
    		   where predicate.Compile()(l, r)
    		   select resultSelector.Compile()(l, r);
    }
    	
    [DataContract]
    public class StatusTotals : Microsoft.ComplexEventProcessing.Extensibility.CepPointStreamOperator<Event, Event>
    {
    	[DataMember]
    	int _runningSum;
    	[DataMember]
    	int _lastStatus = -1;
    	[DataMember]
    	DateTime _timeStamp=DateTime.MinValue;
    	[DataMember]
    	int _id;
    
    	public StatusTotals()
    	{
    		_runningSum = 0;
    	}
    
    	public override bool IsEmpty
    	{
    		get { return _runningSum == 0; }
    	}
    
    	public override IEnumerable<Event> ProcessEvent(PointEvent<Event> inputEvent)
    	{
    		if (inputEvent.EventKind == EventKind.Insert)
    		{
    			if(!((_lastStatus ==2 && inputEvent.Payload.Status==3) || (_lastStatus==3 && inputEvent.Payload.Status==2) || _lastStatus==inputEvent.Payload.Status))
    			{
    				int sum = _runningSum;
    				int state= _lastStatus;
    				int id=_id;
    				DateTime timeStamp = _timeStamp;
    				
    				_runningSum = 0;
    				_lastStatus = inputEvent.Payload.Status;
    				_id=inputEvent.Payload.Id;
    				_timeStamp=inputEvent.Payload.TimeStamp;
    
    				yield return new Event{ Status= state, TimeStamp = timeStamp, Value = sum, Id=id};
    			}
    			_runningSum += inputEvent.Payload.Value;
    		}
    	}
    }

    Thursday, October 03, 2013 12:59 PM
  • This is getting clearer now.

    First off, you should change this:

    var changesSignal = changes				.AlterEventDuration(e => TimeSpan.MaxValue)
    	.ClipEventDuration(changes, (e1, e2) => true);

    to this:

    var changesSignal = changes				.AlterEventDuration(e => TimeSpan.MaxValue)
    	.ClipEventDuration(changes, (e1, e2) => e1.Id == e2.Id);

    You need that predicate in the second argument to match up your events correctly for a proper "ToSignal" pattern.

    You can combine the two streams with a join as follows:

    var combined = from change in changesSignal
    	join total in totals
    	on change.Id equals total.Id
    	select new
    	{
    		change,
    		total
    	};

    When you say "summary value" are you talking about a sum of values or projecting the changes and totals into a new summary event payload?

    There can be a good amount of leg work to get the durations of the events, but, with LINQ's composable nature, we can take a divide and conquer approach to solving the problem.

    Thursday, October 03, 2013 3:18 PM
  • The "summary values" im after are the totals from the UDSO - the sum of payload values where the status doesn't change.

    These UDSO values are emitted at the end of the status change and i want them available as a part of the changeSignal; so if i shift them so they overlap with the changeSignal and alter the join so it matches on id and timestamp it gives me the changeSignal intervals with the totals from the UDSO:

    		//get summary values from source stream - sum of values in payload when statuses are the same 
    		var totals = source.Scan(()=> new StatusTotals());
    		
    		//alter the totals lifespan so it overlaps with corresponding changeSignal event (no span is ever >1 week)
    		totals = totals.ShiftEventTime(e=>TimeSpan.FromDays(-7)).AlterEventDuration(e=>TimeSpan.MaxValue);
    		
    		//  Combine the totals and changes so we have the summary value
    		//  from the totals included in the payload of the the changesSignal
    		var combined = 	from change in changesSignal
    						join total in totals
    						on new {change.Id, change.TimeStamp} equals new {total.Id, total.TimeStamp}
    						select new Event {
    							Changed=false, Id=change.Id, Status=change.Status, TimeStamp=change.TimeStamp, Value=total.Value
    								};

    The results from this are what I was after but Im not sure that this is a good approach - still feels like a lot of work for a fairly straight forward request.

    Insert 23/10/2009 04:04:00 23/10/2009 04:13:00  
        Id 1 
        Status 3 
        TimeStamp 23/10/2009 04:04:00 
        Value 9 
        Changed False 
     
    Insert 23/10/2009 04:13:00 23/10/2009 04:20:00 
        Id 1 
        Status 0 
        TimeStamp 23/10/2009 04:13:00 
        Value 8 
        Changed False 
     
    Insert 23/10/2009 04:20:00 23/10/2009 04:23:00 
        Id 1 
        Status 3 
        TimeStamp 23/10/2009 04:20:00 
        Value 2 
        Changed False 
     

    DevBiker and TXPower - couldn't have got this far without your help. Would have abandoned streaminsight and be doing hacky batch processing in SQL to get the data instead by now.

     
    Friday, October 04, 2013 10:37 AM
  • Here's an alternative method that gives you the sum when the status changes. It starts with "normalizing" the status - since 2 and 3 are the same - to simplify things a bit and allow for grouping. See if this meets your needs:

    //First, normalize the status. 2 and 3 are the same so mark them both as "2". 
    var normalized = from b in source 
    				select new Event{
    					Id = b.Id, 
    					TimeStamp = b.TimeStamp,
    					Status = b.Status == 3 ? 2 : b.Status,
    					Changed = b.Changed,
    					Value = b.Value				};
    
    //Convert to signal. All events with the same status should overlap so clip to new status. 
    var signal = normalized
    			.AlterEventDuration(ef => TimeSpan.MaxValue)
    			.ClipEventDuration(normalized, (e1, e2) => e1.Id == e2.Id && e1.Status != e2.Status); 
    
    //Snapshot window, aggregate. 
    var aggregate = from s in signal
    				group s by new {Id = s.Id, Status = s.Status} into statusGroup 
    				from sg in statusGroup.SnapshotWindow()
    				select new {
    					Id = statusGroup.Key.Id,
    					Status = statusGroup.Key.Status, 
    					Value = sg.Sum(ef => ef.Value), 
    					Count = sg.Count()
    				};
    
    //aggregate.ToIntervalEnumerable().Dump(); 
    
    //only those where the next event from the (normalized) source is different. 
    var changed = from a in aggregate
    				.ShiftEventTime(e => TimeSpan.FromTicks(1))
    			  from s in normalized 
    			  where a.Id == s.Id 
    			  where a.Status != s.Status 
    			  select a; 
    			  
    changed.ToIntervalEnumerable().Dump(); 


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.


    Sunday, October 06, 2013 6:14 PM
    Moderator
  • Thanks for the alternative, think this temporal querying stuff is starting to make sense to me now and I get how the snapshot will give me the figures I'm after at last (couldn't "see" how it worked before now).

    Of the two methods (UDSO vs group / SnapShot) which is the better option? the snapshot looks more concise but will it perform better? (i'll try and benchmark them later)

    Monday, October 07, 2013 12:38 PM
  • If I had to guess, I'd say that the snapshot window is a touch more efficient. Not because UDSO's aren't efficient - they are - but because it has, I think, fewer operators overall. You should make sure, however, that the temporal properties of the output events are correct with what you are looking for and trying to do.

    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Tuesday, October 08, 2013 3:46 AM
    Moderator