none
Point stream join stops at 100,000 consumed events

    Question

  • Hi,

    I have a an observer that joins 2 point streams together. In the join, 1 stream is receiving high frequency data, the other stream receives data very slowly (i.e. no data, or data quite rarely) . Using EventFlowDebugger (see screenshot) I see that first the Total Consumed Events Count stops at 100,000 while the Total Incoming Events Count continues to increment.

    Then when Total Incoming Events Count reaches 300,000 this too stops. After this limit is reached no more incoming events are either produced or consumed until this observer is stopped and restarted.

                    // Create the point streams for timing and car data
                    var streamTiming = from s in inputTimingSubject.ToPointStreamable(e => PointEvent<TimingModel>.CreateInsert(new DateTimeOffset(e.MessageDateTime), e), AdvanceTimeSettings.IncreasingStartTime)
                                      select s;
    
                    var streamData = from s in inputDataSubject.ToPointStreamable(e => PointEvent<CarAggregateModel>.CreateInsert(new DateTimeOffset(e.MessageDateTime), e), AdvanceTimeSettings.IncreasingStartTime)
                                     select s;
    
                    var signal = streamTiming.AlterEventDuration(e => TimeSpan.FromSeconds(maxDuration))
                                                   .ClipEventDuration(streamTiming, (e1, e2) => (e1.Driver.RacingNumber == e2.Driver.RacingNumber));
                    
                    // Join the timing events to the data events
                    var query = from t in signal
                                from d in streamData
                                where t.Driver.RacingNumber == d.RacingNumber
                                select new DriverGearModel
                                    {
                                        SessionId = t.SessionId,
                                        StartLoop = t.StartLoop,

    Any idea how I can prevent streaminsight stopping at the limit of 100,000 Total Consumed Events Count? I can't see any reason why this should be happening.

    Due the nature of my data with one side of the join being high frequency and the other low frequency this means that potentially the system stops processing data after 6 minutes.

    Thanks,

    Jeremy


    Friday, May 16, 2014 1:26 PM

All replies

  • When you join/union two different streams, StreamInsight will sync to the slowest stream and that's what you're seeing here. The slow-moving data stream is preventing your resulting stream from moving forward and, because of that, the input queue is filling up.

    What you need to do is introduce CTIs into your data stream. You can do this either by importing CTIs from the data stream or by adding, say, a counter to artificially produce CTIs in the stream regardless of whether there are events being queued. IncreasingStartTime - and, in fact, any CTI generation option - only adds CTIs to the stream when you have events being enqueued.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Friday, May 16, 2014 2:17 PM
  • Hi DevBiker,
    I'm looking at how you implemented your AdvanceTimeImporter example and how I can fit that into my project.

    I would also be interested to know if the 100,000 and 300,000 incoming event limits can be increased in StreamInsight. As I mentioned this is only about 6 minutes best case scenario with the data streams I am working with.

    Thanks,
    Jeremy

    Monday, May 19, 2014 3:48 PM
  • Hi Jeremy,

    Thank you for your feedback. I am trying to involve someone more familiar with this topic for a further look at this issue. Sometime delay might be expected from the job transferring.

    Your patience is greatly appreciated. 

    Regards,


    Elvis Long
    TechNet Community Support

    Tuesday, May 20, 2014 8:14 AM
  • The issue that you are running in to is the result of events being queued before being processed. They aren't being processed because your reference stream's timeline (as defined by its CTIs) isn't moving forward. Once your reference stream's timeline does move forward, you won't have this issue. Essentially, you have a traffic jam. From the looks of it, your reference stream isn't enqueuing any CTIs at all. Your Last Produced (or outbound) CTI Timestamps is DateTimeOffset.MinValue. This is likely due to your reference stream (again) ... I'm going to guess that the start times for all of your reference events are the same. IncreasingStartTime won't enqueue a generated CTI until you have an event with a new timestamp ... so no new Timestamps, no CTI.

    Once you get CTIs into the reference stream, your issues will disappear. Entirely. I've seen StreamInsight handle over 100K events/second with no queuing. If it takes 6 minutes for your queue to fill, you're running at about 30 events/sec. Not a problem. 

    And no, the max input queue size can't be increased.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Tuesday, May 20, 2014 10:50 AM
  • Hi Jeremy,

    I am the SQL Server support engineer who are working with you on this issue.

    During my research, the main reason you experience this issue should be because events being queued due to CTI violation

    During query processing, application time is driven by current time increment (CTI) events. A CTI is a punctuation event that is a central component of the StreamInsight temporal model. CTIs are used to commit sequences of events and release computed results to the query output by asserting to the StreamInsight server that certain parts of the timeline will not change anymore. Hence, it is crucial to enqueue CTIs along with events into the input event stream in order to produce any result and to flush the state of crateful operators.

    Note that when specifying a frequency for CTI generation through AdvanceTimeSettings, end edges are not taken into account. They are also not considered when using a duration as a frequency. Only start edges are considered in the case of edge events for both frequency and duration.

    I strongly recommend you to see the below article, it tells you how to generate CTIs and how to advance application time.

    http://technet.microsoft.com/en-us/library/ff518502.aspx 

    Thank you for your time.

    Keep us posted.

    Halin Huang


    Wednesday, May 21, 2014 2:13 AM
  • Hi Halin,

    Thanks for the response. Yes I have read the article you linked regarding advancing application time.

    The only issue with this article is that in the section titled "Synchronizing with another Stream" the example talks about using the AdvanceTimeImportSettings class which as far as I can see is not compatible with StreamInsight version 2.1 and later. I do not think there exists a method to reference a "stream" by name in vesion 2.1

    This makes using the AdvanceTimeSettings class to synchronize 2 streams quite difficult.

    I will continue to investigate solutions to this problem.

    Thanks,

    Jeremy

    Friday, May 23, 2014 2:55 PM
  • Hi DevBiker,

    You're right, in my scenario I haven't receiving any events in the reference stream, therefore yes it is as you put it an event "traffic jam". There is no way for me to know when a reference record will arrive and yes my data stream is running at approx 30/sec.

    I was hoping there was a simple method of synchronizing CTIs to the reference stream because its at this event that I want to produce the result combined from the reference and data events.

    I have looked at your AdvanceTimeImporter example but I don't think injecting reference events at given time intervals will produce the results I am looking for in the output.

    Jeremy


    Friday, May 23, 2014 3:04 PM
  • You are correct, Jeremy. The Reactive model in 2.1 doesn't have a way to import CTIs from other streams. There's the CTI Importer that you've already found on my blog -or- you can merge Observables before creating the temporal stream. However, if you just merge, you don't have any logic to handle CTI violations on the slower-moving stream, which is bad.

    One other thing that I've done with reference streams in 2.1 is to have the observable simply tick off a CTI every second or so. This works when your data stream timeline is tied to - or is very close to - the system clock. It doesn't work in replay scenarios. This keeps the streams lively and allows you to update/refresh the reference data while the application is running.


    DevBiker (aka J Sawyer)
    Microsoft MVP - Sql Server (StreamInsight)


    Ruminations of J.net


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    Friday, May 23, 2014 7:51 PM
  • Hi Jeremy,

    From a support perspective this is really beyond what we can do here in the forums. If you cannot determine your answer here or on your own, consider opening a support case with us. Visit this link to see the various support options that are available to better meet your needs:

      http://support.microsoft.com/default.aspx?id=fh;en-us;offerprophoneB

    Best regards,

    Halin

    Thursday, May 29, 2014 9:56 AM