none
How can StreamInsight be used in this scenario?

    Soru

  • Hi,

    I have a problem scenario for which I would like to design a StreamInsight based solution. I have gone through many SI blogs/post and also the SI examples on LINQPad, but I am not sure how StreamInsight can be used in my scenario.

    The scenario is as below:

    I have lot of variables (may be thousands) whose value change over time. Each of these variables can have multiple threshold limits defined. A threshold limit defines the threshold value, type (max or min) and holding period. A holding period is a time span for which if a threshold is violated, an alarm has to be raised.

    For example, lets take two variables, each having two thresholds:

    Variable1

    -- Threshold1, value = 90, type = max, holding period = 10s

    -- Threshold2, value = 95, type = max, holding period = 12s

    Variable2

    -- Threshold3, value = 80, type = max, holding period = 5s

    -- Threshold4, value = 10, type = min, holding period = 8s

    Now, I need to raise an output event, if Variable1 value stays above 90 for 10 seconds or above 95 for 12 seconds etc. So no alarm is raised in case the value returns to below 90 (or 95) within the holding period of 10 seconds (or 12 seconds).

    Please let me know if SI can be applied in this scenario. It seems similar to the LINQPad example "Alarm Floods and Transients", but here the holding period is not fixed and depends on the threshold.

     

    Regards,

    Jayanta

    10 Ocak 2012 Salı 12:17

Yanıtlar

  • Yes, StreamInsight can be used in this scenario. And ... honestly ... your scenario is more common than a fixed "holding period" and a scenario that my team and I have implemented several times. The Alarm Floods and Transients is, like you said, very similar but the problem with that method is that you can't access the payload to change the start time in AlterEventDuration or ShiftEventTime ... something that I do wish would be added to the API. But, we can still do it ... a couple of ways. Here is 1 way (from LinqPad):

    void Main()
    {
    	Func<int, DateTimeOffset> t = 
    		(s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s);
    	var values = new []
    	{
    		//Good data
    		new {Item="Variable1", Value=92, Timestamp=0},
    		new {Item="Variable2", Value=60, Timestamp=0},
    		new {Item="CTI", Value=60, Timestamp=0},
    		new {Item="Variable1", Value=93, Timestamp=2},
    		new {Item="Variable2", Value=75, Timestamp=2},
    		new {Item="CTI", Value=60, Timestamp=2},
    		//non-repeating violation for Variable 1
    		new {Item="Variable1", Value=88, Timestamp=3},
    		//repeating violation for Variable 2
    		new {Item="Variable2", Value=81, Timestamp=3},
    		new {Item="CTI", Value=60, Timestamp=3},
    		new {Item="Variable1", Value=93, Timestamp=5},
    		new {Item="Variable2", Value=82, Timestamp=5},
    		new {Item="CTI", Value=60, Timestamp=5},
    		new {Item="Variable1", Value=92, Timestamp=8},
    		new {Item="Variable2", Value=82, Timestamp=8},
    		new {Item="CTI", Value=60, Timestamp=8},
    		new {Item="Variable1", Value=92, Timestamp=12},
    		new {Item="Variable2", Value=82, Timestamp=12},
    		new {Item="CTI", Value=60, Timestamp=12},
    		//End of violation for Variable 2
    		new {Item="Variable1", Value=92, Timestamp=15},
    		new {Item="Variable2", Value=60, Timestamp=15},
    		new {Item="CTI", Value=60, Timestamp=15},
    		new {Item="Variable1", Value=92, Timestamp=18},
    		new {Item="Variable2", Value=60, Timestamp=18},
    		new {Item="CTI", Value=60, Timestamp=18},
    		new {Item="Variable1", Value=92, Timestamp=20},
    		new {Item="Variable2", Value=60, Timestamp=20},
    		new {Item="CTI", Value=60, Timestamp=20},
    	};
    	//Threshold reference stream. 
    	//See http://windowsazurecat.com/2011/08/sql-server-reference-data-streaminsight-query/
    	var thresholds = new []{
    		new {Item="Variable1", Value=90, ThresholdType=MIN_VALUE, Timeout=10},
    		new {Item="Variable1", Value=95, ThresholdType=MAX_VALUE, Timeout=12},
    		new {Item="Variable2", Value=80, ThresholdType=MAX_VALUE, Timeout=5},
    		new {Item="Variable2", Value=10, ThresholdType=MIN_VALUE, Timeout=8}
    	};
    	
    	var sourceData = values.ToPointStream(Application, 
    		e => e.Item != "CTI" ? 
    			PointEvent.CreateInsert(t(e.Timestamp), new Payload(){Item=e.Item, Value=e.Value}):
    			PointEvent<Payload>.CreateCti(t(e.Timestamp).AddTicks(1))); 
    			
    	//Create the threshold stream
    	var thresholdSource = thresholds.ToPointStream(Application, 
    		e=> PointEvent.CreateInsert(t(0), e), AdvanceTimeSettings.IncreasingStartTime); 
    	
    	//And use ToSignal to get last value. 
    	//This allows you to use a Sql input adapter that 
    	//has a refresh period so you can change thresholds.
    	var thresholdRef = thresholdSource.ToSignal((e1, e2) => 
    		e1.Item == e2.Item && e1.ThresholdType == e2.ThresholdType); 
    	
    	var thresholdEval = from s in sourceData
    							 from tr in thresholdRef 
    							 where s.Item == tr.Item 
    							 select new{
    							 	Value=s, 
    								Threshold=tr,
    								IsViolation=EvalThreshold(s.Value, tr.Value, tr.ThresholdType)
    							};
    
    	var violation = thresholdEval.Where(e=> e.IsViolation); 
    	violation.Where(e=> e.Value.Item == "Variable1").ToIntervalEnumerable().Dump("Violations"); 
    	
    	//Any repeats in the timeout period? 
    	//We need to shift the violation stream so it doe
    	var nonTransient =  from v in violation
    				  from ext in violation
    				  	//Alter to timeout
    				  	.AlterEventDuration(e=> TimeSpan.FromSeconds(e.Payload.Threshold.Timeout))
    					//Shift so a single violation doesn't join itself!! 
    					.ShiftEventTime(e=>e.StartTime.AddTicks(1))
    				  where v.Value.Item == ext.Value.Item 
    				  where v.Threshold.ThresholdType == ext.Threshold.ThresholdType
    				  select v; 
    	
    	nonTransient.ToIntervalEnumerable().Dump("Non-transient alarms");
    	
    	//de-flood so we only get 1.
    	// Expand all alarm events to the timeout and count over snapshots
    	var counts = from nonTrans in nonTransient
    				 group nonTrans by new {nonTrans.Value.Item, nonTrans.Threshold.ThresholdType} into grouped
    				 from win in grouped
    					.AlterEventDuration(e => TimeSpan.FromSeconds(e.Payload.Threshold.Timeout))
    					.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
    				select new { count = win.Count(), Item = grouped.Key.Item, ThresholdType = grouped.Key.ThresholdType };
    	
    	// Those snapshots with a count of 1 belong to the initial alarms.
    	// reduce to points and join with original stream.
    	var result = from c in counts
    					.Where(e => e.count == 1)
    					.ToPointEventStream()
    				from e in nonTransient
    				where c.Item == e.Value.Item 
    				where c.ThresholdType == e.Threshold.ThresholdType
    				select e;
    				
    	result.ToIntervalEnumerable().Dump("Final"); 
    	
    }
    
    public static bool EvalThreshold(int value, int thresholdValue, int thresholdType){
    		switch (thresholdType) 
    		{
    			case MIN_VALUE:
    				return value < thresholdValue; 
    			case MAX_VALUE: 
    				return value > thresholdValue; 
    			default:
    				return false; 
    		}
    	}
    
    
    public const int MIN_VALUE = 0; 
    public const int MAX_VALUE=1; 
    
    public static class MacroExtensions{
    	public static CepStream<T> ToSignal<T>(this CepStream<T> inputstream, 
    		Expression<Func<T, T, bool>> matchExpression)
    	{
    		return inputstream
    			.AlterEventDuration(e => TimeSpan.MaxValue)
    			.ClipEventDuration(inputstream, (e1, e2) => matchExpression.Compile()(e1, e2));
    	}
     }
     
     public struct Payload{
     	public string Item; 
    	public int Value; 
     }
    

     


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    • Yanıt Olarak İşaretleyen Stephanie Lv 17 Ocak 2012 Salı 06:59
    11 Ocak 2012 Çarşamba 04:19
  • Keep in mind that this is one way to do it ... often there are a couple of different ways to tweak the queries to do very similar things. In this case, if there is any repeat of a violation within the window, the alarm will trigger. An alternative would be to only trigger if all items in a window were violations (1 good value would cancel). In this case, you'd do a Left-Anti-SemiJoin between the violation stream and a non-violation stream.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    • Yanıt Olarak İşaretleyen Stephanie Lv 17 Ocak 2012 Salı 07:00
    11 Ocak 2012 Çarşamba 21:20

Tüm Yanıtlar

  • Yes, StreamInsight can be used in this scenario. And ... honestly ... your scenario is more common than a fixed "holding period" and a scenario that my team and I have implemented several times. The Alarm Floods and Transients is, like you said, very similar but the problem with that method is that you can't access the payload to change the start time in AlterEventDuration or ShiftEventTime ... something that I do wish would be added to the API. But, we can still do it ... a couple of ways. Here is 1 way (from LinqPad):

    void Main()
    {
    	Func<int, DateTimeOffset> t = 
    		(s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s);
    	var values = new []
    	{
    		//Good data
    		new {Item="Variable1", Value=92, Timestamp=0},
    		new {Item="Variable2", Value=60, Timestamp=0},
    		new {Item="CTI", Value=60, Timestamp=0},
    		new {Item="Variable1", Value=93, Timestamp=2},
    		new {Item="Variable2", Value=75, Timestamp=2},
    		new {Item="CTI", Value=60, Timestamp=2},
    		//non-repeating violation for Variable 1
    		new {Item="Variable1", Value=88, Timestamp=3},
    		//repeating violation for Variable 2
    		new {Item="Variable2", Value=81, Timestamp=3},
    		new {Item="CTI", Value=60, Timestamp=3},
    		new {Item="Variable1", Value=93, Timestamp=5},
    		new {Item="Variable2", Value=82, Timestamp=5},
    		new {Item="CTI", Value=60, Timestamp=5},
    		new {Item="Variable1", Value=92, Timestamp=8},
    		new {Item="Variable2", Value=82, Timestamp=8},
    		new {Item="CTI", Value=60, Timestamp=8},
    		new {Item="Variable1", Value=92, Timestamp=12},
    		new {Item="Variable2", Value=82, Timestamp=12},
    		new {Item="CTI", Value=60, Timestamp=12},
    		//End of violation for Variable 2
    		new {Item="Variable1", Value=92, Timestamp=15},
    		new {Item="Variable2", Value=60, Timestamp=15},
    		new {Item="CTI", Value=60, Timestamp=15},
    		new {Item="Variable1", Value=92, Timestamp=18},
    		new {Item="Variable2", Value=60, Timestamp=18},
    		new {Item="CTI", Value=60, Timestamp=18},
    		new {Item="Variable1", Value=92, Timestamp=20},
    		new {Item="Variable2", Value=60, Timestamp=20},
    		new {Item="CTI", Value=60, Timestamp=20},
    	};
    	//Threshold reference stream. 
    	//See http://windowsazurecat.com/2011/08/sql-server-reference-data-streaminsight-query/
    	var thresholds = new []{
    		new {Item="Variable1", Value=90, ThresholdType=MIN_VALUE, Timeout=10},
    		new {Item="Variable1", Value=95, ThresholdType=MAX_VALUE, Timeout=12},
    		new {Item="Variable2", Value=80, ThresholdType=MAX_VALUE, Timeout=5},
    		new {Item="Variable2", Value=10, ThresholdType=MIN_VALUE, Timeout=8}
    	};
    	
    	var sourceData = values.ToPointStream(Application, 
    		e => e.Item != "CTI" ? 
    			PointEvent.CreateInsert(t(e.Timestamp), new Payload(){Item=e.Item, Value=e.Value}):
    			PointEvent<Payload>.CreateCti(t(e.Timestamp).AddTicks(1))); 
    			
    	//Create the threshold stream
    	var thresholdSource = thresholds.ToPointStream(Application, 
    		e=> PointEvent.CreateInsert(t(0), e), AdvanceTimeSettings.IncreasingStartTime); 
    	
    	//And use ToSignal to get last value. 
    	//This allows you to use a Sql input adapter that 
    	//has a refresh period so you can change thresholds.
    	var thresholdRef = thresholdSource.ToSignal((e1, e2) => 
    		e1.Item == e2.Item && e1.ThresholdType == e2.ThresholdType); 
    	
    	var thresholdEval = from s in sourceData
    							 from tr in thresholdRef 
    							 where s.Item == tr.Item 
    							 select new{
    							 	Value=s, 
    								Threshold=tr,
    								IsViolation=EvalThreshold(s.Value, tr.Value, tr.ThresholdType)
    							};
    
    	var violation = thresholdEval.Where(e=> e.IsViolation); 
    	violation.Where(e=> e.Value.Item == "Variable1").ToIntervalEnumerable().Dump("Violations"); 
    	
    	//Any repeats in the timeout period? 
    	//We need to shift the violation stream so it doe
    	var nonTransient =  from v in violation
    				  from ext in violation
    				  	//Alter to timeout
    				  	.AlterEventDuration(e=> TimeSpan.FromSeconds(e.Payload.Threshold.Timeout))
    					//Shift so a single violation doesn't join itself!! 
    					.ShiftEventTime(e=>e.StartTime.AddTicks(1))
    				  where v.Value.Item == ext.Value.Item 
    				  where v.Threshold.ThresholdType == ext.Threshold.ThresholdType
    				  select v; 
    	
    	nonTransient.ToIntervalEnumerable().Dump("Non-transient alarms");
    	
    	//de-flood so we only get 1.
    	// Expand all alarm events to the timeout and count over snapshots
    	var counts = from nonTrans in nonTransient
    				 group nonTrans by new {nonTrans.Value.Item, nonTrans.Threshold.ThresholdType} into grouped
    				 from win in grouped
    					.AlterEventDuration(e => TimeSpan.FromSeconds(e.Payload.Threshold.Timeout))
    					.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
    				select new { count = win.Count(), Item = grouped.Key.Item, ThresholdType = grouped.Key.ThresholdType };
    	
    	// Those snapshots with a count of 1 belong to the initial alarms.
    	// reduce to points and join with original stream.
    	var result = from c in counts
    					.Where(e => e.count == 1)
    					.ToPointEventStream()
    				from e in nonTransient
    				where c.Item == e.Value.Item 
    				where c.ThresholdType == e.Threshold.ThresholdType
    				select e;
    				
    	result.ToIntervalEnumerable().Dump("Final"); 
    	
    }
    
    public static bool EvalThreshold(int value, int thresholdValue, int thresholdType){
    		switch (thresholdType) 
    		{
    			case MIN_VALUE:
    				return value < thresholdValue; 
    			case MAX_VALUE: 
    				return value > thresholdValue; 
    			default:
    				return false; 
    		}
    	}
    
    
    public const int MIN_VALUE = 0; 
    public const int MAX_VALUE=1; 
    
    public static class MacroExtensions{
    	public static CepStream<T> ToSignal<T>(this CepStream<T> inputstream, 
    		Expression<Func<T, T, bool>> matchExpression)
    	{
    		return inputstream
    			.AlterEventDuration(e => TimeSpan.MaxValue)
    			.ClipEventDuration(inputstream, (e1, e2) => matchExpression.Compile()(e1, e2));
    	}
     }
     
     public struct Payload{
     	public string Item; 
    	public int Value; 
     }
    

     


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    • Yanıt Olarak İşaretleyen Stephanie Lv 17 Ocak 2012 Salı 06:59
    11 Ocak 2012 Çarşamba 04:19
  • Thanks J Sawyer for the detailed response.

    I shall try your approach and shall come back with my comments.

    Regards,

    Jayanta

     

     

     

     

    11 Ocak 2012 Çarşamba 07:27
  • Keep in mind that this is one way to do it ... often there are a couple of different ways to tweak the queries to do very similar things. In this case, if there is any repeat of a violation within the window, the alarm will trigger. An alternative would be to only trigger if all items in a window were violations (1 good value would cancel). In this case, you'd do a Left-Anti-SemiJoin between the violation stream and a non-violation stream.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    • Yanıt Olarak İşaretleyen Stephanie Lv 17 Ocak 2012 Salı 07:00
    11 Ocak 2012 Çarşamba 21:20