none
How to join a slow and fast stream and output at the slowest frequency?

    Question

  • I am having a great deal of trouble joining a slow stream (with a variable frequency of 1-2 minutes) with a fast stream (with a freqency of 20 milliseconds).
    I would like the result of the joined streams to output 1 aggregated result at the frequency of the slow stream. Below is an example that simulates my scenario at slower frequencies.

    var streamableSlow = from s in slowSubject
    	.ToPointStreamable(e => PointEvent<long>.CreateInsert(new DateTimeOffset(DateTime.UtcNow), e), AdvanceTimeSettings.IncreasingStartTime)
    	select s;
    
    var streamableFast = from s in fastSubject
    	.ToPointStreamable(e => PointEvent<long>.CreateInsert(new DateTimeOffset(DateTime.UtcNow), e), AdvanceTimeSettings.IncreasingStartTime)
    	select s;
    
    
    var signalSlow = streamableSlow.AlterEventDuration(e => TimeSpan.MaxValue)
    	.ClipEventDuration(streamableSlow, (e1, e2) => (true));
    
    
    var signalFast = streamableFast.AlterEventDuration(e => TimeSpan.MaxValue)
    	.ClipEventDuration(streamableSlow, (e1, e2) => (true));
    
    
    
    var queryJoin = from s in signalSlow
    				join f in signalFast
    					on s equals f
    				select new LongPair
    				{
    					slow = s,
    					fast = f
    				};
    
    
    var query = from win in queryJoin.SnapshotWindow()
    				 select new LongPair
    					 {
    						 slow = win.Max(a => a.slow),
    						 fast = win.Count()
    					 };

    In the example above I have 2 input streams that both emit a long. The slow stream at a 2 second interval and the fast stream at a 50 millisecond interval. The fast stream emits a random number between 1-4.
    I first convert both inputs into point streams, and then alter and clip the event durations. I then join the streams together into a class LongPair and finally aggregate using a snapshot window.

    Here is an example of my output:
    slow=1, fast count=1   time=08:27:28
    slow=1, fast count=2   time=08:27:28
    slow=1, fast count=3   time=08:27:28
    slow=1, fast count=4   time=08:27:28
    slow=1, fast count=5   time=08:27:28
    slow=1, fast count=6   time=08:27:28
    slow=1, fast count=7   time=08:27:28
    slow=1, fast count=8   time=08:27:28
    slow=1, fast count=9   time=08:27:28
    slow=2, fast count=1   time=08:27:30
    slow=2, fast count=2   time=08:27:30
    slow=3, fast count=1   time=08:27:32
    slow=3, fast count=2   time=08:27:32
    slow=4, fast count=1   time=08:27:34
    slow=4, fast count=2   time=08:27:34
    slow=4, fast count=3   time=08:27:34
    slow=4, fast count=4   time=08:27:34
    slow=4, fast count=5   time=08:27:34
    slow=1, fast count=1   time=08:27:36
    slow=1, fast count=2   time=08:27:36
    slow=1, fast count=3   time=08:27:36
    slow=2, fast count=1   time=08:27:38
    slow=2, fast count=2   time=08:27:38
    slow=3, fast count=1   time=08:27:40
    slow=3, fast count=2   time=08:27:40
    slow=3, fast count=3   time=08:27:40
    slow=3, fast count=4   time=08:27:40
    slow=4, fast count=1   time=08:27:42
    slow=4, fast count=2   time=08:27:42
    slow=4, fast count=3   time=08:27:42
    slow=4, fast count=4   time=08:27:42

    This is the output that would like to produce:
    slow=1, fast count=9   time=08:27:28
    slow=2, fast count=2   time=08:27:30
    slow=3, fast count=2   time=08:27:32
    slow=4, fast count=5   time=08:27:34
    slow=1, fast count=3   time=08:27:36
    slow=2, fast count=2   time=08:27:38
    slow=3, fast count=4   time=08:27:40
    slow=4, fast count=4   time=08:27:42

    As you can see the output emits a result for every fast event, but I only want to output 1 aggregated event at the speed of the slow stream.
    Now the crux of my problem is that in my real scenario the reference stream has a variable frequency (itself a snapshot window on each event received) and so I can't use either a hopping, tumbling or count window to calculate my fast count aggregate because  the slow stream doesn't have either a fixed timespan or fixed event count!

    Is there a method of aggregating the fast stream over a variable timespan or variable number of fast events per slow stream event?



    • Edited by JeremyCupit Wednesday, July 03, 2013 8:38 PM
    Wednesday, July 03, 2013 8:41 AM

All replies

  • Update: I have been trying to work around this problem for 2 weeks now without any success.

    This seems like a real problem with the StreamInsight functionality. Basically there appears to be no way of joining two streams and aggregating the high frequency stream based upon the events of the low frequency stream!

    In my scenario a fast stream at 50Hz, and a slow stream with a variable frequency of between 1-2 minutes.

    Still hoping for a solution,

    Jeremy

    Monday, July 15, 2013 3:55 PM
  • I must have missed this over the 4th of July holiday. My apologies.

    So ... every query is trying to answer a question. I'm struggling with trying to understand the "question" that you are asking.

    It seems like you want to know how many events in StreamA occur in the valid time period of events in StreamB. The valid time period for StreamB is determined by new events arriving in the stream. Or ... perhaps to put it another way, how many events did I get in StreamA did I get between different events in StreamB?

    Is this correct?


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Monday, July 15, 2013 6:53 PM
  • Assuming that I understood what you were looking for on my previous question, try this LinqPad query out:

    void Main()
    {
    	//Define our source observables. 
    	var slowSource = Application.DefineObservable(() => new EventProducer(1000, 5000)); 
    	var fastSource = Application.DefineObservable(() => new EventProducer(100, 200)); 
    	
    	//Create streams from them. 
    	var fastStream = fastSource.ToPointStreamable(e => PointEvent<int>.CreateInsert(DateTimeOffset.Now, e), 
    		AdvanceTimeSettings.StrictlyIncreasingStartTime); 
    		
    	var slowStream = slowSource.ToPointStreamable(e => PointEvent<int>.CreateInsert(DateTimeOffset.Now, e), 
    		AdvanceTimeSettings.StrictlyIncreasingStartTime); 
    		
    	//Convert the slow stream to a signal.
    	var slowSignal = slowStream.AlterEventDuration(e => TimeSpan.MaxValue).ClipEventDuration(slowStream, (e1, e2) => e1 + 1 == e2); 
    	
    	//Join with the fast stream. 
    	//This ties the slow stream "current" value with the fast stream value. 
    	var joinedStream = from f in fastStream
    					   from s in slowSignal
    					   select new {
    					   		FastValue = f, 
    							SlowValue = s
    					   }; 
    	
    	//var expandedStream = joinedStream.AlterEventDuration(ef => TimeSpan.MaxValue).ClipEventDuration(slowStream, (e1, e2) => e1.SlowValue + 1 == e2);
    	
    	//Alter to *before* the possible start of the slow stream signal
    	//Extend the end time to "cover" the buffer possible length of the slow stream's events.
    	//Lowering the timespan for the buffer reduces latency.
    	var foreverStream = joinedStream.AlterEventLifetime(e => e.StartTime.AddSeconds(-15), e => (e.EndTime - e.StartTime) + TimeSpan.FromSeconds(30)); 
    	
    	//Joining to the slowSignal stream effectively "clips" the "foreverStream" to the entire lifetime
    	//of the signal from the slow stream. 
     	var mergedStream = from fs in foreverStream 
    					   from ss in slowSignal
    					   where fs.SlowValue == ss
    					   select fs;
    	
    	//NOW we can do a snapshot. 
    	//Min and Max for the "SlowValue" are added to show they are the same. 
    	var snapshot = from win in mergedStream.SnapshotWindow()
    					select new {
    						SlowValue1 = win.Max(ef => ef.SlowValue), 
    						SlowValue2 = win.Min(ef => ef.SlowValue), 
    						Count = win.Count()	
    					};
    	
    	
    	
    	
    	
    	snapshot.ToIntervalObservable().Dump(); 
    	
    	
    	
    }
    
    public class EventProducer:IObservable<int>{
    
    	private System.Timers.Timer _timer; 
    	private System.Random _nextIntervalRandomizer = new Random(); 
    	private IObserver<int> _observer; 
    
    	private int _minTime, _maxTime; 
    
    	private int _counter = 0; 
    
    
    	public EventProducer(int minTime, int maxTime)
    	{
    		_minTime = minTime; 
    		_maxTime = maxTime; 
    	}
    	
    	public IDisposable Subscribe(IObserver<int> observer)
    	{
    		_observer = observer;
    	
    		_timer = new System.Timers.Timer();
    		_timer.Elapsed += this.ProduceResult; 
    		_timer.AutoReset=false; 
    		ProduceResult(null, null); 
    		
    		return System.Reactive.Disposables.Disposable.Create(() => this.StopObservable()); 
    	}
    	
    	private void ProduceResult(object source, System.Timers.ElapsedEventArgs e){
    		if(_observer != null){
    			_observer.OnNext(_counter++); 
    			_timer.Interval = _nextIntervalRandomizer.Next(_minTime, _maxTime); 
    			_timer.Enabled = true; 
    		}
    		
    		
    		
    	}
    	
    	private void StopObservable(){
    		_timer.Enabled = false; 
    		_observer.OnCompleted(); 
    	}
    }


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Monday, July 15, 2013 10:01 PM
  • Hi DevBiker,

    Thanks for your  above solution to my problem, it works well but I do have one additional complication.

    My code is below and my slow signal stream (called signalTiming) has overlapping events and this is causing the merged streams to return what appears to be a cartesian join (or at least that's what the result looks like).

    What is the best method to write the final merge join to handle overlapping signal events.

    var streamTiming = from s in inputTimingSubject.ToPointStreamable(e => PointEvent<LoopTimingModel>.CreateInsert(new DateTimeOffset(e.StartLoop.EventTime), e), AdvanceTimeSettings.IncreasingStartTime)
    				  select s;
    
    var streamData = from s in inputDataSubject.ToPointStreamable(e => PointEvent<CarAggregateModel>.CreateInsert(new DateTimeOffset(e.MessageDateTime), e), AdvanceTimeSettings.IncreasingStartTime)
    				  select s;
    
    
    var signalTiming = streamTiming.AlterEventDuration(e => TimeSpan.MaxValue)
    							   .ClipEventDuration(streamTiming, (e1, e2) => (e1.RacingNumber == e2.RacingNumber));
    
    // Join the streams together
    var queryJoin = from t in signalTiming
    				from d in streamData
    				where d.RacingNumber == t.RacingNumber
    				select new CarAggregateTimingModel
    					{
    						StartLoop = new LoopModel
    							{
    								Id = t.StartLoop.Id,
    								EventTime = t.StartLoop.EventTime,
    							},
    						EndLoop = new LoopModel
    							{
    								Id = t.EndLoop.Id,
    								EventTime = t.EndLoop.EventTime,
    							},
    						RacingNumber = t.RacingNumber,
    						TLA = t.TLA,
    						Data = d.Data
    					};
    
    
    var signalData = queryJoin.AlterEventLifetime(e => e.StartTime.AddMilliseconds(-bufferLength), e => (e.EndTime - e.StartTime) + TimeSpan.FromMilliseconds(bufferLength * 2));
    
    // Merge the buffered stream back onto the timing stream
    var streamMerge = from t in signalTiming
    				 from d in signalData
    				 where d.RacingNumber == t.RacingNumber &&
    					   d.StartLoop.Id == t.StartLoop.Id && d.StartLoop.EventTime == t.StartLoop.EventTime &&
    					   d.EndLoop.Id == t.EndLoop.Id && d.EndLoop.EventTime == t.EndLoop.EventTime
    				 select new CarAggregateTimingModel
    					 {
    						 StartLoop = new LoopModel
    							 {
    								 Id = t.StartLoop.Id,
    								 EventTime = t.StartLoop.EventTime,
    							 },
    						 EndLoop = new LoopModel
    							 {
    								 Id = t.EndLoop.Id,
    								 EventTime = t.EndLoop.EventTime,
    							 },
    						 RacingNumber = t.RacingNumber,
    						 TLA = t.TLA,
    						 Data = d.Data
    					 };



    • Edited by JeremyCupit Tuesday, July 16, 2013 4:28 PM typos
    Tuesday, July 16, 2013 4:26 PM
  • Hi DevBiker,

    I can confirm that using the above AlterEventLifetime method does not seem to work correctly when the signal uses an operator rather than is always true.

    This works just great

    var signalTiming = streamTiming.AlterEventDuration(e => TimeSpan.MaxValue)
    	.ClipEventDuration(streamTiming, (e1, e2) => true);

    This does not

    var signalTiming = streamTiming.AlterEventDuration(e => TimeSpan.MaxValue)
    	.ClipEventDuration(streamTiming, (e1, e2) => (e1.RacingNumber == e2.RacingNumber));

    When using the second method to produce the signal the results after the merge query are wrong. Any ideas?

    Thanks,

    Jeremy


    • Edited by JeremyCupit Thursday, July 18, 2013 4:14 PM typos
    Thursday, July 18, 2013 4:14 PM
  • Hi Jeremy,

      Do you mean ClipEventDuration? Both do work correctly but I'd need to see a trace or some sample data in a LinqPad query to understand what's going on with your implementation.

       You can post a trace to http://sdrv.ms/19dmHak - and let me know when you've posted it so I can remove the permissions. You'll want to use trace.cmd from the command-line rather than the event flow debugger to do the trace - it'll capture all of the queries as well as any events at startup.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Monday, July 22, 2013 1:52 PM