Combining Aggregate stream with source stream ..

답변됨 Combining Aggregate stream with source stream ..

  • 2012년 4월 26일 목요일 오전 9:41
     
      코드 있음

    var l = from e1 in left join e2 in right on e1.ID equals e2.ID select new { X = e1.LogX, Y = e2.LogY, };

    I have recently been trying out StreamInsight and have the following problem - how does one aggregate using a Tumbling Window back to the source events?  I n the above simplified code a log stream exists and, based on a join, produces events X, Y.  

    var s = from l in logs.TumblingWindow(TimeSpan.FromSeconds(30))                             
    select new 
    {
             Avg = l.MovingAverage(e => e.X),
    };

    A new stream, s  based on l provides a time dependant moving average using a custom UDA. The question is how do I combine both streams ( l and s) so that every event in l contains the aggregate s.Avg? 

    PS - I have tried using Snapshot with AlterEventDuration re l. This does not seem to work with custom UDA's and also cannot join since l does not contain Key Id's.

모든 응답

  • 2012년 4월 27일 금요일 오전 11:42
     
     답변됨 코드 있음

    First, I don't understand why you need a custom UDA ... how is it different than the regular average that StreamInsight already calculates?

    Second, when joining back to the original stream, you need to keep in mind that all of your joins are temporal .. the events must overlap in time as well. So you need to make them overlap. The easiest way to do this is to specifiy your HoppingWindowOutputPolicy as ClipToWindowEnd. The default ... PointAlignToWindowEnd gives you a point event with a start time at the hopping/tumbling window end. ClipToWindowEnd gives you an interval that that starts at the window start and ends at the window end.

    Your aggregate would look something like the following:

    var s = from l in logs
    	group l by l.ID into logGroups 
    	from logWin in logGroups.TumblingWindow(TimeSpan.FromSeconds(30), 
    		        HoppingWindowOutputPolicy.ClipToWindowEnd)                             
    select new 
    {
    		ID = logGroups.Key, 
    		 Avg = l.MovingAverage(e => e.X),
    };


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    • 답변으로 표시됨 2deviate 2012년 5월 8일 화요일 오후 10:54
    •