none
Running Total Query

    问题

  • Hello,

    I am trying to write a query that will give a running total over one of the fields in the payload.

    I would like an output event to be created immediately after each input event is received.

    I am using the query shown below (adapted from another forum response by Alan Mitchell)

                    var query = from e in inputStream
                                               .AlterEventDuration(e => TimeSpan.FromTicks(TimeSpan.TicksPerDay - (e.StartTime.Ticks % TimeSpan.TicksPerDay)))
                                           group e by new { Instrument = e.InstrumentName, Sequence = e.SequenceName } into instGroups
                                           from win in instGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                                           select new
                                           {
                                               Instrument = instGroups.Key.Instrument,
                                               Sequence = instGroups.Key.Sequence,
                                               TotalVolume = win.Sum(x => x.Volume)
                                           };

    It is almost working except that my the outputs seem to be lagging 1 event behind my inputs.

    E.g.

    1st Input Event (Instrument A, Sequence B, Volume 10) -> No output

    2nd Input Event (Instrument A, Sequence B, Volume 15) -> (Instrument A, Sequence B, Volume 10)

    3rd Input Event (Instrument A, Sequence B, Volume 20) -> (Instrument A, Sequence B, Volume 25)

    Can anyone help me understand why this is happen? What I want to see is:

    1st Input Event (Instrument A, Sequence B, Volume 10) -> (Instrument A, Sequence B, Volume 10)

    2nd Input Event (Instrument A, Sequence B, Volume 15) -> (Instrument A, Sequence B, Volume 25)

    3rd Input Event (Instrument A, Sequence B, Volume 20) -> (Instrument A, Sequence B, Volume 45)

    Other info:

    I am modeling my events as points.

    I am basing my code on one of the codeplex samples and have these lines (which I don't entirely understand):

    var advanceTimeGenerationSettings = new AdvanceTimeGenerationSettings(1, TimeSpan.Zero, true);

    var advanceTimeSettings = new AdvanceTimeSettings(advanceTimeGenerationSettings, null, AdvanceTimePolicy.Adjust);

    Thanks for any help you can offer!

    Regards,

    Tom

    2012年3月1日 16:36

答案

  • The ordering setting does make a different but I don't think that it'll matter in this case. For your CTI, try EnqueueCtiEvent(start.AddTicks(1)). In your output adapter, make sure that the tracer is set to include CTIs ... they are really the key to everything.

    Additionally, are you joining with any other streams? What does your create stream stream statement look like?


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月2日 13:02

全部回复

  • When you use AdvanceTimeSettings, the CTI isn't enqueued after it the (in this case) first event is enqueued. It's enqueued just before the second event. So what you are seeing is by design. And it is correct behavior ... when the CTI is enqueued, there is only 1 event in the appropriate CTI span. Take a look at this blog article ... while it relates to using LinqPad, it also details how the AdvanceTimeGenerationSettings actually generate CTIs.

    What you can do, as an alternative, to have the results sent to the output adapter immediately after the event is enqueued is to enqueue the CTI from your input adapter, rather than using AdvanceTimeGenerationSettings. Since, in this case, you control the CTIs, you can determine when the engine schedules the query for execution.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月1日 17:13
  • Hello,

    Thanks for your response.

    I have changed where I instantiate my InputStream to no longer specify any advance time settings and I have modified my input adapter to enque a CTI after enquing each payload event.

    I must be doing something wrong as now I don't get any outputs at all. Do I still need to specify some advance time settings?

    Thanks for your help,

    Tom

    2012年3月1日 17:40
  • If your adapter is enqueuing CTIs then no, you don't need to specify any advance time settings. Can you post the code where you are enqueuing?


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月1日 18:47
  • Yes sure. It is adapted from the CsvInputAdapter sample.

    Code below.

    if (AdapterState.Stopping == AdapterState)
    {
        Stopped();
        return;
    }
    try
    {
        // Create and fill event structure with data from text file line.
        currentEvent = this.CreateEventFromTrade(trade);
        // In case we just went into the stopping state.
        if (currentEvent == null)
        {
            continue;
        }
    }
    catch (Exception e)
    {
        // The trade couldn't be transformed into an event.
        // Just ignore it, and release the event's memory.
        if (currentEvent != null)
        {
            ReleaseEvent(ref currentEvent);
        }
        Console.WriteLine("Trade {0} could not be read into a CEP event: {1}", trade, e.Message);
        continue;
    }
    var start = currentEvent.StartTime;
                        
    if (EnqueueOperationResult.Full == Enqueue(ref currentEvent))
    {
        Console.WriteLine("Enqueue operation returned FULL");
        // If the enqueue was not successful, we keep the event.
        // It is good practice to release the event right away and
        // not hold on to it.
        ReleaseEvent(ref currentEvent);
        // We are suspended now. Tell the engine we are ready to be resumed.
        Ready();
        // Leave loop to wait for call into Resume().
        return;
    }
    EnqueueCtiEvent(start);

    Also, could the ordering setting make a difference?

    Query q = query.ToQuery(application, "TestQuery", "Testing", typeof(TracerFactory), outputConfig, EventShape.Point, StreamEventOrder.FullyOrdered);

    Thanks,

    Tom


    2012年3月2日 9:28
  • The ordering setting does make a different but I don't think that it'll matter in this case. For your CTI, try EnqueueCtiEvent(start.AddTicks(1)). In your output adapter, make sure that the tracer is set to include CTIs ... they are really the key to everything.

    Additionally, are you joining with any other streams? What does your create stream stream statement look like?


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月2日 13:02
  • Hello,

    EnqueueCtiEvent(start.AddTicks(1)) has solved my problem! It is now outputting events as I would expect. Thank you!! Although I don't quite understand why?!

    Am I right in thinking that as I am enqueuing my own CTI events the MUST be in order? In my scenario, it is possible that trades could come in out of order and I think that will cause an exception. Does it make sense for me to do EnqueueCtiEvent(DateTime.Now)? Or does that not make sense?

    Thanks again for your help,

    Tom

    P.S. I am not joining with any other streams. My stream create statement looks like this:

    var inputStream = CepStream<Trade>.Create("TestStream", typeof(TradeInputFactory), configInfo, EventShape.Point);


    2012年3月2日 14:08
  • When you are enqueuing your CTIs, you have complete control over the application time. You can enqueue a CTI with any timestamp. If, for example, you look at the AdvanceTimeGenerationSettings, you'll see that you have an option for a delay. This means that the CTIs are enqueued with an earlier start time ... for example, DateTimeOffset.Now.AddMinutes(-2) would be a 2 minute delay. Also, with AdvanceTimeGenerationSettings, you have an AdvanceTimePolicy - drop or adjust - that allows you to control what happens with an event that comes in after the last-issued CTI. In your case, since you are controlling the CTIs (which I actually prefer to do ... I'm something of a control freak, I guess), you need to decide how to handle those scenarios. How long will you want to wait for late-arriving trades? If those events come in after your last CTI, should you drop or adjust them? You won't be able to enqueue an event that is before the last-issued CTI so you'll want to keep track of the timestamp for your last-issued CTI so that you can adjust/drop accordingly.

    And your CTIs need to be in order but the events do not. They just need to follow the rule that they cannot have a start time before your last-issued CTI.

    As for what's happening ... have you looked at analyzing it with the query debugger? Try attaching and recording the events or - better yet - run the trace.cmd utility in the tools folder of your StreamInsight installation.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月2日 15:39
  • Just tried using trace.cmd.. Very useful!!

    Am I write in saying that enqueing a CTI event is a bit like telling the StreamInsight engine.. "You won't get any more events whith a timestamp less than this value therefore you can flush any events you have (if their timestamp is less than the CTI timestamp)"?

    I have tried changing my time advance settings to this:

    var advanceTimeGenerationSettings = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true); var advanceTimeSettings = new AdvanceTimeSettings(advanceTimeGenerationSettings, null, AdvanceTimePolicy.Adjust);

    So now every input event seems to get a timestamp one less than the current CTI time and my events are popping out immediately.

    However, I was expecting beause my AdvanceTimePolicy = Adjust, if any trades came in late, they would have their timestamp adjusted to current (CTI) time - 1 and they would therefore still be processed. However, this doesn't seem to be happening. Even looking at the trace output the events seem to be discarded in the Cleanse Input operator.

    Is this expected behaviour?

    Apologies for all the questions - still trying to get my head around how things are working!

    Thanks again,

    Tom


    2012年3月2日 16:29
  • No problem with the questions ... that's what the forum is for, right?

    You are correct in your understanding of a CTI. The only thing that I would suggest that you keep in mind is that it is application time that is being moved forward and may not be tied to the system time. This becomes very useful in testing when you are replaying past events to test algorithms and functionality ... you can replay them with their original timestamps. And the aggregate windows (hopping, tumbling) are based on the movement of the application time and not the system time. For example, if you are enqueueing events that are 5 minutes apart from a database but enqueing them, say, at 10 per second with a CTI after each and 5 minutes apart, and you have a TumblingWindow with a window size of 20 minutes, you'll get the aggregate results every .8 seconds in clock time. It's like fast forward on steroids.

    Now, if your AdvanceTimePolicy is Adjust, any late-arriving events (events with a start time after the last issued CTI) will be given a start time equal to the last issued CTI, thereby fitting them into the current CTI span. At least, that's what I would expect to have happen. I just ran a quick mock-up in LinqPad and didn't get that result. Very strange ... the event is removed in the InputAdapter cleanse operator. Let me look into this further to see if I can figure out what's going on.

    That said ... if you are doing your CTIs yourself, that's not an issue ... your adapter would keep track of the last-issued CTI and do the adjustment accordingly.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月2日 17:44
  • Found the answer ... point events can't be adjusted. See http://msdn.microsoft.com/en-us/library/ff518502.aspx.

    It is possible for an event source to violate CTI semantics by sending events with an earlier timestamp than the inserted CTIs. The advance time settings allows for the specification of a policy to handle such occurrences. The policy can have the following two values:
    -Drop
       Events that violate the inserted CTI are dropped and are not enqueued into the query.
    -Adjust
       Events that violate the inserted CTI are modified if their lifetime overlaps with the CTI timestamp. That is, the start timestamp of the events is set to the most recent CTI timestamp, such that those events become valid. If both start and end time of an event fall before the CTI timestamp, then the event is dropped.

    Because they are point events, it is impossible for their lifetime to overlap with the current CTI timestamp. So they get dropped. Again, the way to resolve this is to handle your CTIs in your adapter; you can do any adjustments based on your business requirements in there.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月2日 18:21