Combining Aggregate stream with source stream ..

답변됨 Combining Aggregate stream with source stream ..

  • 26 aprilie 2012 09:41
     
      Are cod

    var l = from e1 in left join e2 in right on e1.ID equals e2.ID select new { X = e1.LogX, Y = e2.LogY, };

    I have recently been trying out StreamInsight and have the following problem - how does one aggregate using a Tumbling Window back to the source events?  I n the above simplified code a log stream exists and, based on a join, produces events X, Y.  

    var s = from l in logs.TumblingWindow(TimeSpan.FromSeconds(30))                             
    select new 
    {
             Avg = l.MovingAverage(e => e.X),
    };

    A new stream, s  based on l provides a time dependant moving average using a custom UDA. The question is how do I combine both streams ( l and s) so that every event in l contains the aggregate s.Avg? 

    PS - I have tried using Snapshot with AlterEventDuration re l. This does not seem to work with custom UDA's and also cannot join since l does not contain Key Id's.

Toate mesajele

  • 27 aprilie 2012 11:42
     
     Răspuns Are cod

    First, I don't understand why you need a custom UDA ... how is it different than the regular average that StreamInsight already calculates?

    Second, when joining back to the original stream, you need to keep in mind that all of your joins are temporal .. the events must overlap in time as well. So you need to make them overlap. The easiest way to do this is to specifiy your HoppingWindowOutputPolicy as ClipToWindowEnd. The default ... PointAlignToWindowEnd gives you a point event with a start time at the hopping/tumbling window end. ClipToWindowEnd gives you an interval that that starts at the window start and ends at the window end.

    Your aggregate would look something like the following:

    var s = from l in logs
    	group l by l.ID into logGroups 
    	from logWin in logGroups.TumblingWindow(TimeSpan.FromSeconds(30), 
    		        HoppingWindowOutputPolicy.ClipToWindowEnd)                             
    select new 
    {
    		ID = logGroups.Key, 
    		 Avg = l.MovingAverage(e => e.X),
    };


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    • Marcat ca răspuns de 2deviate 8 mai 2012 22:54
    •