Merging data back into a HoppingWindow

Unanswered Merging data back into a HoppingWindow

  • Thursday, January 31, 2013 9:38 PM
     
     

    One of the primary uses I have for StreamInsight relies on HoppingWindows which reference historical data necessary to make decisions on incoming events which are then merged into the window and themselves become history for subsequent events. This pattern I would assume is a typical use case because I can see its benefit for real time analytics of many kinds. However, I am experiencing two problems.  

    1.)   While StreamInsights provides me the tools to maintain continuity within these windows the structure of the incoming payload relies on IEnumerable. When your payload consists of almost half a million objects with no indexing structure, having to visit each one to make a decision becomes prohibitive. This forces you to apply an indexing scheme, however it appears this can’t be done until the window’s data is presented to my UDO this forces me to re-index the entire payload instead of updating the few indices that are being add and expried at each interval.

    2.)    I have historical data in a HoppingWindow that I’m using to evaluate events as they come in. I set a flag on the objects  hoping to merge them back into the HoppingWindow and use the aggregate to make decisions about subsequent events via a UDO. The changes to the values made in my UDO, however, are not reflected in my HoppingWindow’s payload on subsequent calls to my UDO. Clearly I am updating copies of the objects not objects in the source over which the HoppingWindow is being created. How can I accomplish this in version 2.0.

    It is entirely possible that I am approaching this incorrectly or am unaware of some vital aspect of StreamInsights, as this is the first time I am attempting to utilize it. Any help or advice would be appreciated.


    • Edited by DEKER Thursday, January 31, 2013 9:40 PM
    •  

All Replies

  • Tuesday, February 05, 2013 6:34 PM
     
      Has Code

    How are you utilizing the historical aggregate data? Are you doing it via query? One way to do this is to create your aggregate query/stream and then alter the event duration of the resulting events. This extends the lifetime of the events so that they can then be used to compare to historical data. You would take the extended duration aggregates and then join them back to the current data stream. Since the aggregate events have an extended duration, they will join back to the originals. You won't have to worry about the indexing or anything like that - the StreamInsight engine will handle that for you.

    Here is an example that uses the queries from the LinqPad sample "Aggregates with Groups (Categories)":

    var query = from e in weather
                    group e by e.StationCode into stationGroups
                    from win in stationGroups.TumblingWindow(TimeSpan.FromHours(2))
                    select new
                    {	
                        StationCode = stationGroups.Key,
                        AvgTempPerStation = win.Avg(e => e.Temperature),
                        AvgWindPerStation = win.Avg(e => e.WindSpeed)
                    };
        
    	
    	var changeFromAverage = from q in query
    									.AlterEventDuration(ef => TimeSpan.MaxValue)
    									.ClipEventDuration(query, (e1, e2) => e1.StationCode == e2.StationCode)
    							from w in weather 
    							where q.StationCode == w.StationCode 
    							select new {
    								WindDeltaFromAverage = w.WindSpeed - q.AvgWindPerStation, 
    								TempDeltaFromAverage = w.Temperature - q.AvgTempPerStation
    							};


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    • Marked As Answer by Iric WenModerator Wednesday, February 06, 2013 8:21 AM
    • Unmarked As Answer by DEKER Monday, April 01, 2013 6:56 PM
    •  
  • Tuesday, April 02, 2013 3:19 PM
     
     

    Unfortunately I wasn't able to continue this thread as I had to finish the project it was meant to address. Ultimately I resolved the issue but was unhappy with my solution as I feel it was a solution forged in a confluence of desperation and inexperience. I also feel there is a misunderstanding in my scenario and the fault there is probably mine for not having explained it thoroughly.

    I have a stream of clicks coming from a web application. The payload that represents these clicks in StreamInsight consists of among other things, the client IP, UserAgent, Url etc. These entities are sampled at a configurable interval (every 10 seconds currently). They are inserted into StreamInsights as an IntervalEvent the start of which was the end of the previous sampling interval and the end of which is the start of the current interval plus the configurable period (again 10 seconds currently).

    The initial interval starts at the current date and goes back 10 days pulling in on average 350000 IntervalEvent(s). From then on a continuous feed of IntervalEvent(s) is fed into the query

    every 10 seconds. On top of that a sliding window is generated shifting forward 10 seconds encompassing 10 days’ worth of data calculated from the end of the last interval. For each new IntervalEvent I insert I have to search the previous 10 days’ worth of events for clicks that have similar characteristics (Url is the same, IP address is the same, etc.) Here is where my problem begins. I Used a CepTimeSensitiveOperator.

    from w in inputStream.HoppingWindow(config.Input.History, new TimeSpan(0, 0, config.Input.Period), start.ToUniversalTime()) select w.CpcCEPUDO(config.Input);

    public override IEnumerable<IntervalEvent<CpcCEPOutput>> GenerateOutput(

                                     IEnumerable<IntervalEvent<CpcCEPInput>> payloads,

                                     WindowDescriptor windowDescriptor)

     This means 2 things

    1.)  I then have to split my payload into the historical portion and the current potion (since the initial Window encompasses both).

    2.)  When I compare aspects of the current clicks against the history of clicks I am doing so against an IEnumerable not an IQueryable so you can imagine the performance. For every new click I have to visit each of the prior 350000+ clicks.

    I resorted to using i4o (Index for Objects) which meant I had to maintain an IndexSet<CpcCEPInput> of the historical events within the class that implements the TimeSensitiveUdo. It works but I am horrified by how inelegant this is.

    My question, is there a better way that address the poor performance of using an IEnumerable vs an IQueryable?


    • Edited by DEKER Tuesday, April 02, 2013 3:22 PM
    •  
  • Wednesday, April 03, 2013 2:31 PM
     
     

    From the looks of it, you need to alter the event duration for your historical data so that it overlaps with the current (live) stream in time. Your historical data would be treated in a way that's very similar to reference data, though a bit more time-limited. Essentially, after you aggregate the history, you'll want to alter the event duration to have it live for another 10 days in the timeline. That way, you'll have the last 10 days worth of data aggregates valid at any point in time and the historical events older than 10 days will be invalidated for you as a part of the query operator.

    Make sense?


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • Wednesday, April 03, 2013 2:45 PM
     
     

    That answers the first half of my question (which honestly you did before). I understand the principle though I'll have to work out the mechanics. However, I don't believe this addresses the performance concerns of dealing with an IEnumerable. Even if I modify the event duration I am still stuck querying against  350000 records through IEnumerable.

     
  • Wednesday, April 03, 2013 3:50 PM
     
     

    No ... not at all. Once you have the stream timeline the way you need it, you can use the StreamInsight engine to do the query. Rather than using an IEnumerable, you simply use a stream (IQStreamable) query. Then you have the SI engine doing the heavy lifting.

    You will have to separate your stream into two streams for querying - 1 that is historical and 1 that is "current". With your historical, you'd apply your AlterEventDuration to get the events to overlap and then the current would be ... well ... current, with no AlterEventDuration. You would then match them based on your desired parameters. This would then allow you to do aggregates to predict, based on previous click-throughs, the most "likely" next step ... or a list of "next steps" with percentages.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.