none
join - unexpected temporal behaviour (beginner question)

    Question

  • Hi,

    I'm new to StreamInsight and trying to implement a simple scenario:

    I've got two sensors a and b (later more) whose values are available as doubles. The values change occasionally. But the time of the change is not predictable.
    If both of the sensor's values are above a threshold an alarm is triggered. I modeled this with the following query using a cross join with a where clause:

    var alarmQ = from a in sourceA
                         from b in sourceB
                         where a > 10 && b > 5
                         select new Alarm() { /* ...*/ };

    My problem is that the alarm is not triggered immediately. Instead alarm is triggered after both sensor values have changed at a later time. Sometimes alarms are triggered in a bulk. I created a console application and set the sensor values through the command prompt (e.g. "a=20" sets the value of sensor a to 20). Here is an example:

    
    a=20
    20.09.2013 14:30:12 value changed:a=20
    b=100
    20.09.2013 14:30:16 value changed:b=100
    a=19
    20.09.2013 14:30:20 value changed:a=19
    20.09.2013 14:30:21 alarm ausgelöst: Alarm! a=20 b=100
    a=18
    20.09.2013 14:30:28 value changed:a=18
    b=99
    20.09.2013 14:30:34 value changed:b=99
    20.09.2013 14:30:34 alarm ausgelöst: Alarm! a=19 b=100
    20.09.2013 14:30:34 alarm ausgelöst: Alarm! a=18 b=100
    b=98
    20.09.2013 14:30:50 value changed:b=98
    a=11
    20.09.2013 14:30:59 value changed:a=11
    20.09.2013 14:30:59 alarm ausgelöst: Alarm! a=18 b=99
    20.09.2013 14:30:59 alarm ausgelöst: Alarm! a=18 b=98
    

    This is what I would have expected (a and b are initialized with 0):

    a=20
    b=100 => alarm
    a=19 => alarm
    a=18=> alarm
    b=99=>alarm
    ...

    Here is my complete code:

    using System; using System.Collections.Generic; using System.Linq; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; using System.ServiceModel; using System.Text; using System.Threading.Tasks; using Microsoft.ComplexEventProcessing; using Microsoft.ComplexEventProcessing.Linq; using Microsoft.ComplexEventProcessing.ManagementService; namespace MyServer { class Program { static Dictionary<string, IQbservable<double>> sources = new Dictionary<string, IQbservable<double>>(); static IQbservable<double> GetSource(string id) { return sources[id]; } private static Dictionary<string, Sensor> _Sensors = new Dictionary<string, Sensor>(); static void Main(string[] args) { using (var server = Microsoft.ComplexEventProcessing.Server.Create("Default")) { var myApp = server.CreateApplication("alarmApp"); var sourceA = createSensorSource(myApp, "a"); var sourceB = createSensorSource(myApp, "b"); var alarmQ = from a in sourceA from b in sourceB where a > 10 && b > 5 select new Alarm() { Id = "Alarm1", Message = "Alarm! a=" + a.ToString() + " b=" + b.ToString(), TimeStamp = DateTime.Now }; var alarmSink = myApp.DefineObserver(() => Observer.Create<Alarm>(x => Console.WriteLine("{1} alarm ausgelöst: {0}", x.Message, x.TimeStamp.ToString()))); var alarmProc = alarmQ.Bind(alarmSink).Run("alarmProcess"); Console.WriteLine("----------------------------------------------------------------"); Console.WriteLine("StreamInsightServer is running, type \"exit\" to stop the server"); Console.WriteLine("----------------------------------------------------------------"); Console.WriteLine(" "); string line; while ((line = Console.ReadLine()) != "exit") { try { var segments = line.Split('='); var sensorId = segments[0]; var sensorValue = double.Parse(segments[1]); var sensor = _Sensors[sensorId]; sensor.Value = sensorValue; } catch (Exception e) { Console.WriteLine(e.ToString()); } } alarmProc.Dispose(); } } private static IQStreamable<double> createSensorSource(Application myApp, string id) //private static IQbservable<double> createSensorSource(Application myApp, string id) { _Sensors.Add(id, new Sensor(id)); var source = myApp.DefineObservable(() => Observable.Create<double>(observer => registerObserver(id, observer))); sources.Add(id, source); source.Deploy(id + "_process"); var streamableSource = source.ToPointStreamable(val => PointEvent.CreateInsert(DateTime.Now, val), AdvanceTimeSettings.StrictlyIncreasingStartTime); streamableSource = streamableSource.AlterEventDuration(e => TimeSpan.MaxValue).ClipEventDuration(streamableSource, (v, c) => true); // use a sink to gibe feedback when a value changes var sink = myApp.DefineObserver(() => Observer.Create<double>(x => Console.WriteLine("{2} value changed:{0}={1}", id, x, DateTime.Now))); sink.Deploy("sensor_" + id + "_sink"); source.Bind(sink).Run(); return streamableSource; //return source; } private static Action registerObserver(string id, IObserver<double> observer) { var sensor = _Sensors[id]; SensorValueChangedHandler handler = (s, v) => observer.OnNext(v); sensor.ValueChanged += handler; return () => sensor.ValueChanged -= handler; } } public delegate void SensorValueChangedHandler(Sensor sensor, double newValue); public class Sensor { public Sensor(string id) { this.Id = id; } public string Id { get; private set; } private double _Value; public double Value { get { return _Value; } set { if (_Value != value) { _Value = value; OnValueChanged(); } } } public event SensorValueChangedHandler ValueChanged; private void OnValueChanged() { if (ValueChanged != null) { ValueChanged(this, Value); } } }

    public class Alarm
    {
    public string Id { get; set; }
    public string Message { get; set; }
    public DateTime TimeStamp { get; set; }
    }
    }

    I hope you can help me solve this problem. 

    Friday, September 20, 2013 1:08 PM

Answers

  • Hi,

    This problem is caused by StreamInsight temporal model. The joined streams use one time incrementing. The stream a  process event, when event come from b stream with later time-stamp.

    If StreamInsight process the event immediately a stream lose events, when event come from b stream with earlier time-stamp.

    Friday, September 20, 2013 2:38 PM
  • First, why are you creating separate streams for each sensor? This will seriously limit your scalability and how many sensors you can monitor. You'd be better off having them as a single stream, especially since you expect them to be in the same timeline.

    Second, Szakal's response is close to the mark ... the behavior that you are seeing is due to the temporal model but not because "the joined streams use one time incrementing". It's because joined streams sync to the slowest stream ... that is, the joined stream doesn't move forward until both source streams have issues CTIs past a particular point in the timeline. So if sensor A is sending readings every second and sensor B is sending readings every 10 seconds, the joined stream will produce results every 10 seconds - based on sensor B. If both sensors are in the same stream, however, you won't see this issue since you won't need to join and it's the same timeline. :-) An alternative is to import the CTIs from sensor A stream into sensor B stream, make sensor B stream "move" at the same speed as sensor A. This might cause some issues, however, if they have different latency or are off by so much as a tick or two - you'd need to add delay for the import.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Sunday, September 22, 2013 1:30 PM
    Moderator

All replies

  • Hi,

    This problem is caused by StreamInsight temporal model. The joined streams use one time incrementing. The stream a  process event, when event come from b stream with later time-stamp.

    If StreamInsight process the event immediately a stream lose events, when event come from b stream with earlier time-stamp.

    Friday, September 20, 2013 2:38 PM
  • First, why are you creating separate streams for each sensor? This will seriously limit your scalability and how many sensors you can monitor. You'd be better off having them as a single stream, especially since you expect them to be in the same timeline.

    Second, Szakal's response is close to the mark ... the behavior that you are seeing is due to the temporal model but not because "the joined streams use one time incrementing". It's because joined streams sync to the slowest stream ... that is, the joined stream doesn't move forward until both source streams have issues CTIs past a particular point in the timeline. So if sensor A is sending readings every second and sensor B is sending readings every 10 seconds, the joined stream will produce results every 10 seconds - based on sensor B. If both sensors are in the same stream, however, you won't see this issue since you won't need to join and it's the same timeline. :-) An alternative is to import the CTIs from sensor A stream into sensor B stream, make sensor B stream "move" at the same speed as sensor A. This might cause some issues, however, if they have different latency or are off by so much as a tick or two - you'd need to add delay for the import.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Sunday, September 22, 2013 1:30 PM
    Moderator