none
Who Runs Standing Queries in StreamInsight?

    Question

  • Hi, i have one basic question on running Queries in StreamInsight.

    For streamInsight engine, we are giving:

    InputAdapter, which gets data from, say from sensors for every second. This code is from Application, and probably the logic here is the adapter runs for every second (by using timer approach) and pull the data from Sensor and Enqueue the data to StreamInsight. And we also create InputStream on the InputAdapter

    StandingQueries: Queries to read data from InputStream (which is created from the InputAdapter)

    OutputAdapter, whose Job is only to Deque the event and use it or give it some other app for further processing. And developer can use Timer to Dequeue the data from StreamInsight for every second.

    So as you see, The Input and Output adapters are actually triggered by user code (in this case Timer, calling the functions for every second).

    I am wondering when and how the Standing Queries runs? it might be internal to StreamInsight, but just want to understand: On what basis and who runs the Query and when?

    My query is simple and looks like this:

     

    varfiltered = from i ininputStream wherei.Value > 0.5 select i;

    Thanks

     
    Venkat
    Monday, December 19, 2011 2:23 PM

Answers

  • Yes, you can have multiple standing queries for the same set of events ... this is, in fact, a very common use case. You would do this using dynamic query composition ... not by creating multiple input streams with CepStream<T>.Create(). If you use CepStream<T>.Create() to create multiple source streams attached to the same input adapter, you'll wind up with multiple instances of the input adapter. Typically, this is not what you want to do. Mark Simms has an excellent blog post on this. What you would do ... and this is very commonly what I do in my queries ... is to create an initial source stream that is attached to the input adapter, immediately call ToQuery() to get a query from it and then ToStream<T>() on the query. This second stream is used as the base stream for all additional queries. You cannot use the adapter factory to "share" multiple instances of the same input adapter with 2 streams ... only the last one will get the events. There are ways around this but DQC is far, far simpler and fits most use cases.

    This then leads into your next question ... creating filters on top of existing, standing queries. This is certainly possible ... check out my blog post for some tips on how to do this as well as a sample. It's built on top of DQC. More complex dynamic queries ... with logic in addition to simple filtering ... will require dynamically compiling the query and then running it. Colin Meeks wrote a blog post about doing this.

     

     


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    Tuesday, January 03, 2012 3:32 PM

All replies

  • Queries run in the StreamInsight engine and are scheduled by the StreamInsight scheduler. Once you start them, they run ... and run ... and run. Hence, they are standing queries - they run until stopped. In your question, you are trying to look at a StreamInsight query the same we that we view traditional SQL queries - you issue the query, it executes and then you get results. You go to the data. StreamInsight queries don't work that way at all. You issue the query and then continuously evaluate live data as it passes with the query. The data comes to you. Your CTIs provide the heartbeat to let StreamInsight know when all data for a particular time span has arrived and can be processed, advancing the timeline of the application.

    This is part of how StreamInsight is different from data as we have typically worked with it and it can take a little time to get your mind fully wrapped around the concept.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    Tuesday, December 20, 2011 1:07 AM
  • Thank you DevBiker for your prmopt and detailed reply. But another question.. please check this.

    So the workflow is like this:

    InputAdapter Enqueue the Event to StreamInsight.

    OutputAdapter Dequeue the Event from StreamInsight.

    Both Input and output adapters runs using application custom schedule mechansim (for example: Timer)

    The so called Standing Query and its execution: when does it comes into picture? Immediately after Enqueing the event? when the Query executes where does it keep the results?

    you said: "Your CTIs provide the heartbeat to let StreamInsight know when all data for a particular time span has arrived and can be processed"

    But I get the event only when I Dequeue using Output Adapter, and when to Dequeue is scheduled by me using Timer.

    Could you please give me the detailed workflow of Input Adapters, Queries and Output Adapters?  Why I am asking this is, both Input and Output adapters are scheduled by me. And I am trying to how the Standing queries interact with these adapters. Queries get executed when I call Dequeue? Sorry, I am confused here, please clarify.

     


    Venkat
    Tuesday, December 20, 2011 5:15 AM
  • First, your output adapter shouldn't be on a timer. Start and Resume will tell you when you have events waiting for dequeuing ... both should call a method that dequeues events until the output queue is empty and then return. Start, of course, gets you going. Resume tells you that you have stuff waiting for you after you emptied the queue.

    Now ... as for your input adapter, the implementation of it being on a timer or not is an implementation detail, not something that is tied to the architecture of StreamInsight. You seem to be trying to force your view of StreamInsight into a schedule/batch/poll paradigm and that's going to really limit your possibilities. This is something that I see over and over again with developers ... because it's an easy and familiar paradigm ... and they miss much of StreamInsight's really unique capabilities.

    That said, in your scenario ... your input adapter is polling some data source (I would first question this ... is this really the proper thing to do? Perhaps it is but polling is not ideal if you have other options). Events from each poll are then enqueued. When do you enqueue your CTI? Are you using IDeclareAdvanceTimeSettings? The CTIs, as mentioned before, tell StreamInsight that it has all data up to a specific point in the application timeline and to process all queries/windows/etc. before that point in the application timeline. In your scenario, I would assume that you are enqueuing a CTI after each poll?

    And ... by the way ... this scenario is about the simplest that you can have. StreamInsight handles - very well - much more complex temporal logic and, using CTIs, can synchronize data streams from different sources with different latencies. And CTIs can also be done in such a way that you can actually handle late-arriving events when in a more push scenario where you won't have the luxury of all events coming in a consistent temporal order. But that's outside of your scenario.

    Also ... note that CTIs - and events - do not need to be enqueued with the current system time. For example, in one project, we were testing algorithms and functionality based on several months worth of log data that we replayed to validate the queries. We enqueued the log events with the original date (March through June 2011). And we enqueued the events at 6000x recorded speed. Yet ... all of the temporal windows were at the application time spanning four months.

    Back to your scenario ... assuming that you are enqueuing a CTI after each poll, valid events are pushed through the queries. The results of those queries are then sent to the output adapters. Your input adapter is scheduling the poll and the CTI and that's what conrols the rest of it ... but that's just your scenario. Once your query has been scheduled and the events processed and evaluated, any output events are then queued for the output adapter to get. The output adapter does not pull the results through ... instead, the input adapter pushes the events through. If your output adapter never dequeues the events, they will still be sitting there in the queue and the queue will eventually fill up. Events are "released" to the output adapters on CTI.

    I would also suggest that you try working with the StreamInsight Event Flow Debugger ... run some simple queries and capture a trace. Follow the events through the queries. Start with something simple but add on some additional queries that use things like AlterEventDuration and AlterEventLifetime as well as hopping windows. You can take a run everything through step-by-step.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    Tuesday, December 20, 2011 6:58 AM
  • "First, your output adapter shouldn't be on a timer." --> Agree with you on this. StreamInsight some how makes sure that, it can call the Registered Output adapter With Start and Resume Interfaces. Once these methods called, Adapter code need to Dequeue all events (may be in a while loop) until no event left in the Queue.

    "Input adapter polling iss really the proper thing to do?" --> The input adapter has two choices: Etiher to Poll or wait for Push form the Source. This is depending on how Source is exposing the data. So we have to choose wither Poll or Push mechansim.

    "When do you enqueue your CTI" --> I am not really calling the: EnqueueCtiEvent method. I am just setting the Event starttime to DateTime.Now

    "Are you using IDeclareAdvanceTimeSettings?" --> No, I yet to understand the purpose of this

    "In your scenario, I would assume that you are enqueuing a CTI after each poll?" --> Actually I am trying the StreamInsight with a sample downloaded, and at the same time trying to relate to our usage scenarios. I will send the sample by e-mail to you. The sample InputAdpater just generates the values Randomly. So in this sample case, I am not calling EnqueueCtiEvent method any time.

    "Back to your scenario ... assuming that you are enqueuing a CTI after each poll, valid events are pushed through the queries." --> What do you mean by valid events? you mean the events which satisifes Query criteria? So the events what I see in the "Event Flow Debugger" are the events before running the Query? when the Query runs and Output adapter is not availble then where does the Query results gets stored? As soon the Query runs, the Output adapter Resume method is called to catch results of the Query?

    "Once your query has been scheduled and the events processed and evaluated, any output events are then queued for the output adapter to get." --> Great !! this is what I am trying to understand. So Input Adapter Queued events QUEUE is differen than the QUEUE available for Output adapter to Deque the data.

    "Events are "released" to the output adapters on CTI." --> In my sample I am not using any CTI event, but I am still able to Dequeue. Ok, so if there is no CTI and no Output Adapter to Dequeue, events will get piled up in StreamInsight. No expiry time for the events? (no automatic deletion of the events by StreamInsight??) Look at my use case of Temperature sensor InputAdapter. It can send lot of events in a day.

    Also, assume that: based on the Query logic, I am Dequeing only 5 events out of 1000 events Queued. so what happens the events which are not passed through the Query criteria (955 events)? will they get deleted automatically? if so by when?


    Venkat
    Tuesday, December 20, 2011 12:18 PM
  • First, if you have events to dequeue, you have CTIs. You'll even "see" the CTIs when you dequeue. If you are using the RandomDataGenerator sample input adapter, the CTIs are enqueued for you as its adapter factory implements IDeclareAdvanceTimeSettings.

    Please see http://msdn.microsoft.com/en-us/library/ff518502.aspx.

    As mentioned on a separate thread, events that are no longer valid for the application timeline are removed from memory. When? When no longer valid according to the CTIs.

     


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    Tuesday, December 20, 2011 1:42 PM
  • Hi,

    Yes, you are right about CTI events. It is done by the factory as factory implements: ITypedDeclareAdvanceTimeProperties

    So if I Enqueue 1000 events and only 5 of them satisfy my Standing query criteria, so only 5 Events will be available for Dequeue.

    Let me restate what you said:

    Whether events are available in Enqueued area or Dequeue area, they will be automatically removed by StreamInsight "when those events are no longer valid accroding to CTI's"

    Am I correct?

    CTI is only current time increment, just one timestamp. I am not sure how StreamInsight uses this timestamp and determines that Event is no longer required to be available.. Could you refer any article on this for me to understand in depth? Please note that Events are not violating CTI, but they are not just satisfying Query criteria.

     


    Venkat
    • Edited by VenkatABC Wednesday, December 21, 2011 10:02 AM
    Wednesday, December 21, 2011 6:04 AM
  • Let's take this one step at a time.

    First, you have a stream, called streamA. This is a stream of point events (a duration of 1 tick). A CTI is enqueued at 10:00:01 AM, an event with ID of "A" is enqueued at 10:00:02 and then another CTI at 10:00:03. Your event with ID "A" is available to queries and the engine in the CTI span between 10:00:01 AM and 10:00:03 AM. After that, the event is removed from memory (it's no longer valid).

    With this same stream, you have the following statement:

    var extendedStream = streamA.AlterEventDuration(e => TimeSpan.FromMinutes(1));
    
    This is a stream of events with a duration of 1 minute. Our event above, with an ID of "A", has the same start time of 10:00:02 but an end time of 10:01:02. This event will be available for queries, joins, unions, etc. until the last CTI span that contains the event. So ... if there is a CTI at 10:01:00, this event is still valid and will be included in any query processing. If the next CTI is at 10:01:03, it will still be available as that CTI span is from 10:01:00 to 10:01:03. However, after that, it will be removed from memory. However, it will still join only where there is overlap between start and end. So, if there is another event in the CTI span between 10:01:00 and 10:01:03 but with a start time of 10:01:02.5 and and end time of 10:01:03, they will not join as they don't overlap in time even though they are valid within the same CTI span.

    Again, the best way to really understand this is to work with it and use the event flow debugger. LinqPad is also essential.

     


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    • Proposed as answer by Peja Tao Thursday, December 22, 2011 5:52 AM
    Wednesday, December 21, 2011 4:48 PM
  • OK,

    "Please note that Events are not violating CTI, but they are not just satisfying Query criteria." --> So events will be there in StreamInsight until it is valid as per current CTI. Even the same logic applies to the Events which are passed through the Standing Queries. So if some events passed the Standing Query criteria and if Ouput Adapter is not dequeued them with in the CTI, then these events will be removed automatically.

    But i think as per our earlier understadning, the moment event is passed through the StadningQuery, Output adpater Resume gets called and event can be Dequeued immediately. So only Events which are not passed the Standing Query criteria will get remvoed from StreamInaight automatically after the CTI.

    I am closing this post with this understanding. please update the post if you have any comments on this.

     

     


    Venkat
    • Marked as answer by VenkatABC Thursday, December 22, 2011 5:56 AM
    • Edited by VenkatABC Thursday, December 22, 2011 5:57 AM format change
    • Unmarked as answer by VenkatABC Friday, December 23, 2011 3:13 PM
    Thursday, December 22, 2011 5:56 AM
  • First ... you haven't really answered the question so marking your own response as the "answer" isn't really accurate and doesn't help others that come back and are trying to find similar answers. If posts are helpful, you should mark them as such also ... again, this is to help others that come through the forum with the same or very similar questions.

    Second ... you still aren't quite understanding this correctly. When the event is no longer valid, it is removed from memory and from query processing. It is not removed from any output queue for an output adapter to dequeue. It will not, however, be put into an output adapter's queue again.

    What this means is this: in our previous example, an event was valid in a given period and, let's say, it wasn't filtered out of the event stream and was sent to your output adapter. But ... your output adapter, for whatever reason, is only dequeuing every 10 seconds. You have an event that was put into the queue for that output adapter but that event is no longer valid according to the CTIs. Will that event be dequeued the next time the output adapter tries to dequeue events. Yes, it will be. It just won't be processed in any queries.

    It appears to me that you are trying to fit StreamInsight into the more traditional request-response database paradigm that we, as developers, are used to and have been working with for a long, long time. I don't mean this as an insult ... this is very typical and I've seen it with every developer that has started on StreamInsight ... including myself. But StreamInsight isn't like the traditional request-response database paradigm. It is inherently different because a) it's not going to a set of existing data with a query but the data is coming to the query and b) you have an additional dimension to all of your queries that you don't have in traditional databases - the dimension of time (this is an attribute in traditional databases, not a dimension). It requires a different thought process and you'll get that by a) working with the samples in LinqPad b) reading the documentation and blog posts c) working with your own scenarios and d) using the query debugger to trace what's happening with your events as they come through your queries.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    Thursday, December 22, 2011 7:26 AM
  • Thanks DevBiker  for your comments.

    Yes, I agree that thinking should be different while using StreamInsight.. i am getting there slowly..

    Ok, another point:

    I submited a Standing Query, and Adpaters to StreamInsight (Query.Start())

    Now, i want to add another Standing Query for the same set of data/events. Is this possible in StreamInsight?

    Other point is:

    In our applicaitons we allow users to write filters for data (I am talking about SQL Query builders). Can I do the same In StreamInsight also? This is quite natural use case..

    So I will give an applicaiton to customer to Monitor, say for example: Temperature Sensor. As part my deployment, I will give Input Adapter, Output Adapter and a Query (with simple logic). Now after deploying, if customer want to change the Query logic, without any code change or development effort, is it possible? (in my current case I can offer Query builder, where customer can choose his own filters any time)

    Any comments on this?


    Venkat
    Friday, December 23, 2011 3:24 PM
  • Yes, you can have multiple standing queries for the same set of events ... this is, in fact, a very common use case. You would do this using dynamic query composition ... not by creating multiple input streams with CepStream<T>.Create(). If you use CepStream<T>.Create() to create multiple source streams attached to the same input adapter, you'll wind up with multiple instances of the input adapter. Typically, this is not what you want to do. Mark Simms has an excellent blog post on this. What you would do ... and this is very commonly what I do in my queries ... is to create an initial source stream that is attached to the input adapter, immediately call ToQuery() to get a query from it and then ToStream<T>() on the query. This second stream is used as the base stream for all additional queries. You cannot use the adapter factory to "share" multiple instances of the same input adapter with 2 streams ... only the last one will get the events. There are ways around this but DQC is far, far simpler and fits most use cases.

    This then leads into your next question ... creating filters on top of existing, standing queries. This is certainly possible ... check out my blog post for some tips on how to do this as well as a sample. It's built on top of DQC. More complex dynamic queries ... with logic in addition to simple filtering ... will require dynamically compiling the query and then running it. Colin Meeks wrote a blog post about doing this.

     

     


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14

    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.
    Tuesday, January 03, 2012 3:32 PM