How can StreamInsight be used in this scenario?

# How can StreamInsight be used in this scenario?

• 10 Ocak 2012 Salı 12:17

Hi,

I have a problem scenario for which I would like to design a StreamInsight based solution. I have gone through many SI blogs/post and also the SI examples on LINQPad, but I am not sure how StreamInsight can be used in my scenario.

The scenario is as below:

I have lot of variables (may be thousands) whose value change over time. Each of these variables can have multiple threshold limits defined. A threshold limit defines the threshold value, type (max or min) and holding period. A holding period is a time span for which if a threshold is violated, an alarm has to be raised.

For example, lets take two variables, each having two thresholds:

Variable1

-- Threshold1, value = 90, type = max, holding period = 10s

-- Threshold2, value = 95, type = max, holding period = 12s

Variable2

-- Threshold3, value = 80, type = max, holding period = 5s

-- Threshold4, value = 10, type = min, holding period = 8s

Now, I need to raise an output event, if Variable1 value stays above 90 for 10 seconds or above 95 for 12 seconds etc. So no alarm is raised in case the value returns to below 90 (or 95) within the holding period of 10 seconds (or 12 seconds).

Please let me know if SI can be applied in this scenario. It seems similar to the LINQPad example "Alarm Floods and Transients", but here the holding period is not fixed and depends on the threshold.

Regards,

Jayanta

### Tüm Yanıtlar

• 11 Ocak 2012 Çarşamba 04:19

Yes, StreamInsight can be used in this scenario. And ... honestly ... your scenario is more common than a fixed "holding period" and a scenario that my team and I have implemented several times. The Alarm Floods and Transients is, like you said, very similar but the problem with that method is that you can't access the payload to change the start time in AlterEventDuration or ShiftEventTime ... something that I do wish would be added to the API. But, we can still do it ... a couple of ways. Here is 1 way (from LinqPad):

```void Main()
{
Func<int, DateTimeOffset> t =
(s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s);
var values = new []
{
//Good data
new {Item="Variable1", Value=92, Timestamp=0},
new {Item="Variable2", Value=60, Timestamp=0},
new {Item="CTI", Value=60, Timestamp=0},
new {Item="Variable1", Value=93, Timestamp=2},
new {Item="Variable2", Value=75, Timestamp=2},
new {Item="CTI", Value=60, Timestamp=2},
//non-repeating violation for Variable 1
new {Item="Variable1", Value=88, Timestamp=3},
//repeating violation for Variable 2
new {Item="Variable2", Value=81, Timestamp=3},
new {Item="CTI", Value=60, Timestamp=3},
new {Item="Variable1", Value=93, Timestamp=5},
new {Item="Variable2", Value=82, Timestamp=5},
new {Item="CTI", Value=60, Timestamp=5},
new {Item="Variable1", Value=92, Timestamp=8},
new {Item="Variable2", Value=82, Timestamp=8},
new {Item="CTI", Value=60, Timestamp=8},
new {Item="Variable1", Value=92, Timestamp=12},
new {Item="Variable2", Value=82, Timestamp=12},
new {Item="CTI", Value=60, Timestamp=12},
//End of violation for Variable 2
new {Item="Variable1", Value=92, Timestamp=15},
new {Item="Variable2", Value=60, Timestamp=15},
new {Item="CTI", Value=60, Timestamp=15},
new {Item="Variable1", Value=92, Timestamp=18},
new {Item="Variable2", Value=60, Timestamp=18},
new {Item="CTI", Value=60, Timestamp=18},
new {Item="Variable1", Value=92, Timestamp=20},
new {Item="Variable2", Value=60, Timestamp=20},
new {Item="CTI", Value=60, Timestamp=20},
};
//Threshold reference stream.
//See http://windowsazurecat.com/2011/08/sql-server-reference-data-streaminsight-query/
var thresholds = new []{
new {Item="Variable1", Value=90, ThresholdType=MIN_VALUE, Timeout=10},
new {Item="Variable1", Value=95, ThresholdType=MAX_VALUE, Timeout=12},
new {Item="Variable2", Value=80, ThresholdType=MAX_VALUE, Timeout=5},
new {Item="Variable2", Value=10, ThresholdType=MIN_VALUE, Timeout=8}
};

var sourceData = values.ToPointStream(Application,
e => e.Item != "CTI" ?

//Create the threshold stream
var thresholdSource = thresholds.ToPointStream(Application,

//And use ToSignal to get last value.
//This allows you to use a Sql input adapter that
//has a refresh period so you can change thresholds.
var thresholdRef = thresholdSource.ToSignal((e1, e2) =>
e1.Item == e2.Item && e1.ThresholdType == e2.ThresholdType);

var thresholdEval = from s in sourceData
from tr in thresholdRef
where s.Item == tr.Item
select new{
Value=s,
Threshold=tr,
IsViolation=EvalThreshold(s.Value, tr.Value, tr.ThresholdType)
};

var violation = thresholdEval.Where(e=> e.IsViolation);
violation.Where(e=> e.Value.Item == "Variable1").ToIntervalEnumerable().Dump("Violations");

//Any repeats in the timeout period?
//We need to shift the violation stream so it doe
var nonTransient =  from v in violation
from ext in violation
//Alter to timeout
//Shift so a single violation doesn't join itself!!
where v.Value.Item == ext.Value.Item
where v.Threshold.ThresholdType == ext.Threshold.ThresholdType
select v;

nonTransient.ToIntervalEnumerable().Dump("Non-transient alarms");

//de-flood so we only get 1.
// Expand all alarm events to the timeout and count over snapshots
var counts = from nonTrans in nonTransient
group nonTrans by new {nonTrans.Value.Item, nonTrans.Threshold.ThresholdType} into grouped
from win in grouped
.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { count = win.Count(), Item = grouped.Key.Item, ThresholdType = grouped.Key.ThresholdType };

// Those snapshots with a count of 1 belong to the initial alarms.
// reduce to points and join with original stream.
var result = from c in counts
.Where(e => e.count == 1)
.ToPointEventStream()
from e in nonTransient
where c.Item == e.Value.Item
where c.ThresholdType == e.Threshold.ThresholdType
select e;

result.ToIntervalEnumerable().Dump("Final");

}

public static bool EvalThreshold(int value, int thresholdValue, int thresholdType){
switch (thresholdType)
{
case MIN_VALUE:
return value < thresholdValue;
case MAX_VALUE:
return value > thresholdValue;
default:
return false;
}
}

public const int MIN_VALUE = 0;
public const int MAX_VALUE=1;

public static class MacroExtensions{
public static CepStream<T> ToSignal<T>(this CepStream<T> inputstream,
Expression<Func<T, T, bool>> matchExpression)
{
return inputstream
.AlterEventDuration(e => TimeSpan.MaxValue)
.ClipEventDuration(inputstream, (e1, e2) => matchExpression.Compile()(e1, e2));
}
}

public string Item;
public int Value;
}
```

DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14

• 11 Ocak 2012 Çarşamba 07:27

Thanks J Sawyer for the detailed response.

I shall try your approach and shall come back with my comments.

Regards,

Jayanta

• 11 Ocak 2012 Çarşamba 21:20

Keep in mind that this is one way to do it ... often there are a couple of different ways to tweak the queries to do very similar things. In this case, if there is any repeat of a violation within the window, the alarm will trigger. An alternative would be to only trigger if all items in a window were violations (1 good value would cancel). In this case, you'd do a Left-Anti-SemiJoin between the violation stream and a non-violation stream.

DevBiker (aka J Sawyer)
My Blog
My Bike - Concours 14