none
StreamInsight - Hopping Window Query

    Question

  • I want to send a window of payloads to my output adapter. I need all of them to go to my output adapter at the same time, and not one by one.

    So, so far I have:

    var filteredCepStream = (from window in cepStream.HoppingWindow( TimeSpan.FromMinutes(outputConfig.WindowLength), TimeSpan.FromMinutes(outputConfig.WindowHopSize), HoppingWindowOutputPolicy.ClipToWindowEnd) select window);
    

    But StreamInsight won’t let me send over a window. Is there any way to make a user defined aggregate or something to convert this to a list of payloads to send to the output adapter? Am I going about this is wrong way? I really need ALL of the payloads in my window to be sent to the output adapter at the same time.

    Thank you!!

    Dina

    Tuesday, July 12, 2011 7:38 PM

Answers

  • This sounds like an interesting problem. Ideally, StreamInsight can do the counter and the reset in a query - after all, that's what it is supposed to to :) . I understand that the reset based on non-occurring events is not trivial, so you tried to do some of the logic in the output adapter and have a simpler query, which just "batches" events in each window. Let's look at this simple query first:

    An important aspect of your windows is that they are overlapping, so you are expecting input events to show up in more than one window. If your windows were non-overlapping, this query would be a relatively simple event lifetime modification, snapping them to the next 10min boundary, so that they all line up and are flushed simultaneously (assuming appropriate CTIs). But this won't work with overlapping windows - here you need a true StreamInsight window operator. However, you actually don't want to aggregate, but just get all events in the window, which is kind of an unusual operation. StreamInsight has the TopK operator (Take in LINQ), which picks K events out of a window and returns them into the stream, with their lifetimes set to the window size. You could use this construct with a K that's high enough to always include all events. See here how to write a TopK on top of a hopping window.

    Having said that, I still recommend to express as much of your logic in a StreamInsight query as possible, instead of having to split it between a declarative standing query and programmatic code in the adapter. But before we go there, let me ask some more questions:

    • You say "reset", as if the count was increasing over time - this sounds different from just counting over the last 10 minutes every minute, without "memory" beyond the 10 minutes.
    • Is it correct that you want to produce a 0 count only for those distinct payloads that you have already seen with a count >0? Otherwise, how do you know which payloads you have not seen? Is there another piece of reference data that tells you which payloads you want to count over?

    Regards,
    Roman


    MS StreamInsight Team

    Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights.
    Friday, July 15, 2011 5:44 PM
  • I had the same problem and solve it using TopK(int.MaxValue) and I traks when counter was updated to see is it expired. If expired then use 0 for it.
    Thursday, July 21, 2011 7:58 AM

All replies

  • Before suggesting an approach with fully ordered queries, curious as to the ultimate goal - getting all of the events in a window to the output adapter as a set is only an intermediary to another destination.  Could you describe the end to end data flow you're looking to implement?
    Tuesday, July 12, 2011 9:37 PM
  • Yeah sure. Every minute I need to get the last 10 minutes of data and update a set of performance counters from the output adapter. The reason why I need to get the whole window is because every minute i need to reset the values of the performance counters based on what is in the window. If i get the whole window and just pass each of the individual payloads in the window along one at a time then my output adapter has no concept on when to reset the value of the performance counters, unless there was a way to mark the start or end of a window.

    If this really isn't possible, then I could aggregate the items in the window to the values that I want to reset the performance counters to. I attempted this but wasn't able to do it. Maybe you could help me with this. Here is how my payload looks like:

    public class MonitoringPayload {
    		public string CategoryName { get; set; }
    		public string PerformanceCounterName { get; set; }
    		public string InstanceName { get; set; }
    
    	}


    Every minute I need to get the last 10 minutes of payloads in the stream and store a simple count of the number of times each distinct payload appears, along with the information in the payload.

    The only problem with getting this aggregate and updating the performance counters from it is that if a payload doesn't appear in the 10 minute window, then we would need to reset this performance counter value to 0. The performance counters always need to reflect the past 10 minutes of data. Without sending the whole window, it is not possible to know which performance counters did not appear in the 10 minute window (unless I store a reference to every single performance counter, which I don't want to do)

    Optimally I would like to send the whole window of data to my output adapter, reset all of the performance counters and then go through each payload and increment the performance counter that it corresponds to.

    Sorry, this is kinda confusing. Please let me know if you need any more information from me. 

    Thursday, July 14, 2011 12:31 AM
  • Nope, not confusing at all.  Just wanted to confirm that I knew what you were trying to do before suggesting an approach.  In a windowed query output you'll see a mixture of CTI and Insert events:

    CTI

    Insert

    Insert

    Insert

    CTI

    The Insert events between the two CTI events will all belong to the same window.  What I'd do in the output adapter is:

    • Queue events until a CTI is receieved
    • "Flush" the queue to your destination (doing union and intersects against the "expected" set of performance counters).
    Note that if you queue the raw UntypedEvent objects (or TypedEvent<T> objects) don't forget to call Release(ref evt) after flushing.
    Thursday, July 14, 2011 1:40 AM
  • This sounds like an interesting problem. Ideally, StreamInsight can do the counter and the reset in a query - after all, that's what it is supposed to to :) . I understand that the reset based on non-occurring events is not trivial, so you tried to do some of the logic in the output adapter and have a simpler query, which just "batches" events in each window. Let's look at this simple query first:

    An important aspect of your windows is that they are overlapping, so you are expecting input events to show up in more than one window. If your windows were non-overlapping, this query would be a relatively simple event lifetime modification, snapping them to the next 10min boundary, so that they all line up and are flushed simultaneously (assuming appropriate CTIs). But this won't work with overlapping windows - here you need a true StreamInsight window operator. However, you actually don't want to aggregate, but just get all events in the window, which is kind of an unusual operation. StreamInsight has the TopK operator (Take in LINQ), which picks K events out of a window and returns them into the stream, with their lifetimes set to the window size. You could use this construct with a K that's high enough to always include all events. See here how to write a TopK on top of a hopping window.

    Having said that, I still recommend to express as much of your logic in a StreamInsight query as possible, instead of having to split it between a declarative standing query and programmatic code in the adapter. But before we go there, let me ask some more questions:

    • You say "reset", as if the count was increasing over time - this sounds different from just counting over the last 10 minutes every minute, without "memory" beyond the 10 minutes.
    • Is it correct that you want to produce a 0 count only for those distinct payloads that you have already seen with a count >0? Otherwise, how do you know which payloads you have not seen? Is there another piece of reference data that tells you which payloads you want to count over?

    Regards,
    Roman


    MS StreamInsight Team

    Disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights.
    Friday, July 15, 2011 5:44 PM
  • I had the same problem and solve it using TopK(int.MaxValue) and I traks when counter was updated to see is it expired. If expired then use 0 for it.
    Thursday, July 21, 2011 7:58 AM