none
Strange memory usage and eventual termination of StreamInsight instance

    Pertanyaan

  • I'm experiencing memory usage problems when running a query. Memory consumption is normal for a good few hours after query start with garbage collection clearly visible after a while it begins to climb without any apparent garbage collection as before. Eventually memory usage exceeds that available and the stream insight service is killed.

    The purpose of the query is to receive events, categorise them by joining on a reference stream and output the count of each category over a window.

    If a received event doesn't match an event in the known event reference stream it is categorised as unknown by the query.

    The reference stream is updated every minute and enqueues around 28,000 events each time. The point events from this stream are being stretched out until they receive a corresponding event at a later date in time.

    The event input stream usually handles anything from a couple hundred to several thousand events an hour and enqueue's a cti event every 2 seconds. The query still runs out of memory even if there are no input events received which points the finger at the reference stream.

    I'm pretty sure the problem lies with memory management of the reference stream as memory increases consistently upon enqueuing of the updated events. This is as expected and a short time later the memory for the expired events is released. At least to begin with...

    The first image below is the normal memory usage experienced with the clear garbage collection of expired events in the reference stream. It has been running for around 8 hours at this point.

    The blue is memory usage, yellow is cti count for the reference stream, green is cti count for the event input stream, red is cpu, purple is events in reference stream input queue, turquoise is events in input stream queue.


    Below is the usage around 5 a half hours after the first with a considerable rise in memory visible and no apparent garbage collection.

    The query survived another 8 hours before StreamInsight was terminated. Memory usually rises to about 1.5gb before failing.

    The query is running on a virtualised development machine with 1 core and 2gb of ram which should be sufficient for this low throughput.

    Below is the code, if needed i can lift the code from the bigger solution and and upload it as a buildable solution.

    Query

    	    const string eventTraceStreamName = "EventTraceStream";
    
                CepStream<TraceEvent> eventTraceStream = CepStream<TraceEvent>.Create(eventTraceStreamName,
                    typeof (InputAdapterFactory), new InputAdapterConfig()
                    {
                        adapterType = InputAdapterType.TraceEventPayload,
                        stopPollingPeriod = 1000,
                        ctiPeriod = 2000
                    },
                    EventShape.Point);
    
                var timeImportSettings = new AdvanceTimeSettings(null,
                    new AdvanceTimeImportSettings(eventTraceStreamName),
                    AdvanceTimePolicy.Adjust);
    
                // Create a reference stream using the datastream as a time reference
                var knownEventStream = CepStream<Viagogo.Cep.Adapters.Database.GetKnownEventsResult>.Create("knownEventStream",
                    typeof(InputAdapterFactory), new InputAdapterConfig()
                    {
                        adapterType = InputAdapterType.TraceEventCategory,
                    },
                    EventShape.Point, timeImportSettings);
    
                var knownEvents = from e in knownEventStream
                                      .AlterEventDuration(e => TimeSpan.MaxValue))
                                      .ClipEventDuration(knownEventStream, (e1, e2) => (e1.EventID == e2.EventID && e1.ApplicationID == e2.ApplicationID)) // this line appears to overwrite the line above
                                  select e;
    
    
                var unknownEventStream = from e in eventTraceStream
                                         where (from ke in knownEvents where ke.EventID == e.id && ke.ApplicationID == e.applicationID select ke).IsEmpty()
                                         select new CategorisedTraceEvent()
                                                    {
                                                        id = e.id,
                                                        applicationId = e.applicationID,
                                                        eventType = e.eventType,
                                                        source = e.source,
                                                        message = e.message,
                                                        categoryName = "Unknown",
                                                        categoryID = 1,
                                                        timestamp = e.timestamp
                                                    };
    
                var categorisedEventStream = from e in eventTraceStream
                                             join ke in knownEvents
                                             on new KnownTraceEvent() { EventID = e.id, ApplicationID = e.applicationID, CategoryID = -1 }
                                             equals // have to stub out CategoryID as we're doing the join in the first place to determine that
                                             new KnownTraceEvent() { EventID = ke.EventID, ApplicationID = ke.ApplicationID, CategoryID = -1 }
                                             select new CategorisedTraceEvent()
                                                        {
                                                            id = e.id,
                                                            applicationId = e.applicationID,
                                                            eventType = e.eventType,
                                                            source = e.source,
                                                            message = e.message,
                                                            categoryName = ke.CategoryName,
                                                            categoryID = ke.EventLogCategoryID,
                                                            timestamp = e.timestamp
                                                        };
    
                var unionedEventStream = unknownEventStream.Union(categorisedEventStream);
    
                var eventCategoryCount =    from e in unionedEventStream
                                            group e by new ApplicationCategory() { category = e.categoryID, application = e.applicationId }
                                            into perEventCategory
                                            from w in perEventCategory.TumblingWindow(TimeSpan.FromSeconds(windowSize),
                                                                                     HoppingWindowOutputPolicy.ClipToWindowEnd)
                                            select new WindowedCount()
                                            {
                                                id = perEventCategory.Key.category,
                                                source = perEventCategory.Key.application,
                                                count = (int) w.Count(),
                                                startTime = w.LowerTime(te => te.timestamp).ToString("o"),
                                                endTime =  w.UpperTime(te => te.timestamp).ToString("o")
                                            };
    
                var eventCategoryCountQuery = eventCategoryCount.ToQuery(
                    application,
                    "EventCategoryCount",
                    "Count the number of events received in a rolling window",
                    typeof (OutputAdapterFactory),
                    new OutputAdapterConfig() {adapterType = OutputAdapterType.TraceEventCategoryCount},
                    EventShape.Point,
                    StreamEventOrder.FullyOrdered);

    Reference stream input adapter

    public class KnownTraceEventsPointInputAdapter : TypedPointInputAdapter<GetKnownEventsResult>
        {
            private readonly Timer resetCache;
    
            public KnownTraceEventsPointInputAdapter(InputAdapterConfig config)
            {
                // Poll the known event table to insert updated events into the stream
                resetCache = new Timer(60000); // every 1 minute
                resetCache.Elapsed += new ElapsedEventHandler(ResetCache);
            }
    
            public void ResetCache(object sender, ElapsedEventArgs e)
            {
                SyncKnownEvents();
            }
    
            public void SyncKnownEvents()
            {
                var knownEvents = default(List<GetKnownEventsResult>);
                using (var eventLog = new EventLogDataContext())
                {
                    knownEvents = eventLog.GetKnownEvents(false).ToList();
                }
    
                foreach (var knownEvent in knownEvents)
                {
                    PointEvent<GetKnownEventsResult> pointEvent = CreateInsertEvent();
    
                    if (null == pointEvent)
                    {
                        Ready();
                        return;
                    }
    
                    if (AdapterState.Stopping == AdapterState)
                    {
                        Stopped();
                        return;
                    }
    
                    try
                    {
                        pointEvent.StartTime = DateTime.UtcNow;
                        pointEvent.Payload = knownEvent;
    
                        if (Enqueue(ref pointEvent) == EnqueueOperationResult.Full)
                        {
                            Log(DateTime.UtcNow + ": " + GetType().ToString() + " input queue full");
                            Ready();
                            return;
                        }
                    }
                    finally
                    {
                        if (null != pointEvent)
                        {
                            ReleaseEvent(ref pointEvent);
                        }
                    }
                }
    
                EnqueueCtiEvent(DateTime.UtcNow);
            }
    
            public override void Start()
            {
                Log(DateTime.UtcNow + ": " + GetType().ToString() + " started");
                SyncKnownEvents();
                resetCache.Start();
            }
    
            public override void Resume()
            {
                resetCache.Start();
            }
    
            public override void Stop()
            {
                Log(DateTime.UtcNow + ": " + GetType().ToString() + " stopped");
                resetCache.Stop();
                base.Stop();
                Stopped();
            }
    
            protected static void Log(string message)
            {
                using (var file = new System.IO.StreamWriter(@"C:\Cep\Logs\InputAdapters.txt", true))
                {
                    file.WriteLine(message);
                }
            }
        }

    Input stream input adapter (receives its events over a wcf duplex connection)

    [CallbackBehavior(UseSynchronizationContext = false, ConcurrencyMode = ConcurrencyMode.Single, IncludeExceptionDetailInFaults = true)]
        public class EventTracePointInputAdapter : ViagogoPointInputAdapter<TraceEvent>, IEventTraceSourceSubscriptionManagerCallback
        {
            private EventTraceSourceSubscriptionManagerClient client;
    
            public EventTracePointInputAdapter(InputAdapterConfig config) : base(config)
            {
                Listen();
            }
    
            public void Listen()
            {
                client = new EventTraceSourceSubscriptionManagerClient(new InstanceContext(this), "NetTcpBinding_IEventTraceSourceSubscriptionManager");
                client.Open();
                client.SubscribeToSource();
    
                ((ICommunicationObject)client).Faulted += new EventHandler(HandleClientFault);
            }
    
            public void HandleClientFault(object sender, EventArgs e)
            {
                Log(DateTime.UtcNow + " " + this.GetType() + " callback channel faulted, adapter is renewing connection");
                client.Abort();
                Listen();
            }
    
            public void ObserveSource(TraceEvent payload)
            {
                if (null == payload)
                {
                    throw new ArgumentNullException("payload");
                }
    
                PointEvent<TraceEvent> pointEvent = base.CreateInsertEvent();
    
                if (null == pointEvent)
                {
                    Ready();
                    return;
                }
    
                try
                {
                    var eventTime = payload.timestamp; // This is actually the server's Utc timestamp
    
                    latestEvent = eventTime;
                    pointEvent.StartTime = eventTime;
                    pointEvent.Payload = payload;
                    pointEvent.Payload.message = "removed to prevent message size exceeding streaminsight max event size";
    
                    if (Enqueue(ref pointEvent) == EnqueueOperationResult.Full)
                    {
                        Log(DateTime.UtcNow + ": " + GetType().ToString() + " input queue full");
    
                        Ready();
                        return;
                    }
                }
                    catch (Exception e)
                    {
                        // probably tried to enqueue an event before the last cti
                        Log(DateTime.UtcNow + ": " + GetType() + " threw an exception\n" + e.ToString() + "\n");
                    }
                finally
                {
                    if (null != pointEvent)
                    {
                        ReleaseEvent(ref pointEvent);
                    }
                }
            }
    
            public override void Stop()
            {
                ((ICommunicationObject)client).Faulted -= new EventHandler(HandleClientFault);
    
                try
                {
                    client.UnsubscribeFromSource();
                    client.Close();
                }
                catch (Exception e)
                {
                    client.Abort();
                }
    
                base.Stop();
                Stopped();
            }
    
            protected override void Dispose(bool disposing)
            {
                if (disposing)
                {
                    ((IDisposable)client).Dispose();
                }
                base.Dispose(disposing);
            }
        }

    Input adapter base class which enqueue's cti events (may generate cti's using advance time settings at a later date)

    public abstract class ViagogoPointInputAdapter<T> : TypedPointInputAdapter<T>
        {
            private readonly Timer stopTimer;
            private readonly Timer ctiTimer;
            private readonly object sync = new object();
            private InputAdapterConfig config;
            protected DateTime latestEvent = DateTime.MinValue;
            protected DateTime lastCti = DateTime.MinValue;
    
            protected ViagogoPointInputAdapter(InputAdapterConfig config)
            {
                this.config = config;
    
                // Poll the adapter to determine when it is time to stop.
                stopTimer = new Timer(config.stopPollingPeriod);//CheckStopping, new object(), config.stopPollingPeriod, config.stopPollingPeriod);
                stopTimer.Elapsed += new ElapsedEventHandler(CheckStopping);
    
                // Poll the adapter to inject cti events into the stream
                ctiTimer = new Timer(config.ctiPeriod);
                ctiTimer.Elapsed += new ElapsedEventHandler(InjectCtiEvent);
            }
    
            public override void Start()
            {
                ctiTimer.Start();
                stopTimer.Start();
                Log(DateTime.UtcNow + ": " + GetType().ToString() + " started");
            }
    
            public override void Resume()
            {
                ctiTimer.Start();
                stopTimer.Start();
                Log(DateTime.UtcNow + ": " + GetType().ToString() + " resumed");
            }
    
            public override void Stop()
            {
                ctiTimer.Stop();
                stopTimer.Stop();
    
                Log(DateTime.UtcNow + ": " + GetType().ToString() + " stopped");
    
                base.Stop();
            }
    
            private void InjectCtiEvent(object source, ElapsedEventArgs e)
            {
                // Allow for a delay of 3 seconds
                var ctiTime = DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(3));
    
                EnqueueCtiEvent(ctiTime);
                lastCti = ctiTime;
            }
    
            private void CheckStopping(object source, ElapsedEventArgs e)
            {
                lock (this.sync)
                {
                    if (AdapterState == AdapterState.Stopping)
                    {
                        Stop();
                    }
                }
            }
    
            protected static void Log(string message)
            {
                using (var file = new System.IO.StreamWriter(@"C:\Viagogo.Cep\Logs\InputAdapters.txt", true))
                {
                    file.WriteLine(message);
                }
            }
        }

    Thanks in advance for taking time to look over this.

    Marcus

    09 Februari 2012 12:21

Jawaban

  • The cti's not being imported until an actual join takes place makes sense. However when i enqueue these updated reference events i also enqueue a cti into the reference stream at the end of the update. Would this not tell the engine that new events have arrived and time has advanced, causing old events to expire? Or does the use of AlterEventDuration().ClipEventDuration() and the use of the join change this behaviour?

    ...

    Shouldn't the enqueing of a cti in the reference stream after every update cause the old events to expire even if no cti's are coming from the fast stream during quiet periods?

    Marcus

    No, the enqueuing of a cti in the reference stream won't do that. The various stream operators are processed with the query ... the entire query has to move forward. If you aren't getting CTIs or events from the data stream to keep it moving forward, the intermediary stream operators won't be processed. Changing your reference stream to enqueue updates only, though, is a good thing to do, regardless of the memory issues. :-)

    Looking over your queries again, I see that a tumbling window is the last operator in the stream and I think that's why you need events ... rather than just CTIs ... to push the events through. Windows don't process without events. Try using a query before creating the window to make sure that it moves forward. An example is below:

    var unionedEvents = unknownEventStream.Union(categorisedEventStream);
    
    var unionedEventQuery = unionedEvents.ToQuery(application,
                                                        "UnionedEvents",
                                                        "Categorized and unknown events",
                                                        EventShape.Point,
                                                        StreamEventOrder.FullyOrdered);
    
    var unionedEventStream = unionedEventQuery.ToStream<CategorisedTraceEvent>();
    
    var eventCategoryCount = from e in unionedEventStream
                                group e by new ApplicationCategory() { category = e.categoryID, application = e.applicationId }
                                    into perEventCategory
                                    from w in perEventCategory.TumblingWindow(TimeSpan.FromSeconds(windowSize),
                                                                            HoppingWindowOutputPolicy.ClipToWindowEnd)
                                    select new WindowedCount()
                                    {
                                        id = perEventCategory.Key.category,
                                        source = perEventCategory.Key.application,
                                        count = (int)w.Count(),
                                        startTime = w.LowerTime(te => te.timestamp).ToString("o"),
                                        endTime = w.UpperTime(te => te.timestamp).ToString("o")
                                    };
    //Start the unionedEventQuery right before the event Category Count Query

    This takes your unioned and categorized events and will push the associated stream operators forward independantly of the window. You can also then reuse the unionedEventStream in other queries without worrying about creating multiple instances of your input adapters (which is usually a Very Bad Thing). See this blog post for more details on the how's and why's of DQC.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    • Ditandai sebagai Jawaban oleh Peja Tao 21 Februari 2012 7:02
    15 Februari 2012 12:39
    Moderator
  • I’d speculate that at least during the quiet time Ctis from your regular input (not the reference input), do not reach the join and therefore the reference events never expire.

    You could use query diagnostics to get a handle on which operator in your query is consuming the memory (assuming the culprit is the query operator). You can get to the detailed diagnostics in the event flow debugger. Those diagnostics also report the last Cti timestamp processed by each operator. This will help to verify if Ctis are actually flowing through the query.

    Keep in mind that there is a slight twist with point events and the advance time policy “adjust”. From the documentation:

    CTI Violation Policies

    Itis possible for an event source to violate CTI semantics by sending events with an earlier timestamp than the inserted CTIs. The advance time settings allows
    for the specification of a policy to handle such occurrences. The policy can have the following two values:

    • Drop: Events that violate the inserted CTI are dropped and are not enqueued into the query.
    • Adjust: Events that violate the inserted CTI are modified if their lifetime overlaps with the CTI timestamp. That is, the start timestamp of the events is set to the most recent CTI timestamp, such that those events become valid. If both start and end time of an event fall before the CTI timestamp, then the event is dropped.

    Since you are dealing with point events, all events with start time less than the Cti do not overlap the Cti and are therefore dropped. I’m not sure if this is relevant here (I couldn’t find how you actually generate timestamps for the input events), but it is worth checking up on.



    Peter Zabback

    • Disarankan sebagai Jawaban oleh Peja Tao 13 Februari 2012 4:46
    • Ditandai sebagai Jawaban oleh Peja Tao 21 Februari 2012 7:02
    11 Februari 2012 1:42

Semua Balasan

  • I agree. It does seem to point to the reference stream as being the culprit. It seems reasonable to assume code in the input adapter is where the leak is happening although we can't rule out (yet) the leak occuring in the query engine I guess.

    Taking a quick look, the only thing I see in the input adapter that appears to be different from the input stream input adapter is this:

                using (var eventLog = new EventLogDataContext())
                {
                    knownEvents = eventLog.GetKnownEvents(false).ToList();
                }

    Finding memory leaks is always very difficult, but you have to start somewhere, so I would focus initially on this part. What happens, for example, if you stub this out and just generate a fixed event (or a fixed few hundred events). I would be interested to see if this changes the behaviour. If it doesn't, you look somewhere else. If it does, bingo.


    09 Februari 2012 15:15
  • Question for you - is it necessary to use TimeSpan.MaxValue to extend the events before clipping? Is it reasonable to have a timeout - no event in x amount of time and it goes away.

    Even if not, try setting the event duration to something less than TimeSpan.MaxValue ... say, TimeSpan.FromHours(2) and see if that changes the behavior. Unfortunately, it will may take longer to verify the solution.

    Finally, have you recorded any of the events using the debugger or trace.cmd? I'm thinking that you may have an issue with a join that isn't apparent to us here and you should be able to identify that by using EFD.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    09 Februari 2012 20:34
    Moderator
  • Something unrelated to your issue but I did notice in your input adapters that you aren't synchronizing your enqueue and your stop. This can cause you issues with shutdown ... from my initial glance, most likely an ObjectDisposedException. One of your adapters would catch it (but it wouldn't be a CTI violation exception ...)

    But ... it's also possible to be an ObjectDisposedException. Here's what could happen ... your input adapter is happily enqueuing events and StreamInsight calls stop. Now, you are in the middle of an enqueue operation, you've already checked for the AdapterState == Stopping ... and you are about to enqueue. In your Stop(), you call Stopped(). This will dispose the input adapter. The enqueue thread then proceeds to call Enqueue and ... boom! Depending on the volume of events that you have, you may not see it very often and it'll be one of those Heisenbugs that's really, really tough to reproduce - the timing has to be exactly wrong.

    Also, StreamInsight will call Stop() for you ... you don't need to check the state and call Stop() yourself. From what I've been able to determine in Reflector, the setting of the adapter state to Stopping and the actual call to Stop() occur on different threads so it seems to be possible that your AdapterState may not yet be Stopping when Stop() is called (though I've not been able to actually reproduce this). So your CheckStopping() would call Stop() and the engine would call Stop(). Both of these calls would be on separate threads and they aren't being synched ... so another possible ObjectDisposedException.

    Question .. have you seen any issues with shutdown? Hangs? Exceptions?


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    10 Februari 2012 4:16
    Moderator
  • A leak on the result from the database call seems a bit strange, the initial drop in memory usage after each increase appears to be garbage collection of the initial list. I tried calling clear on the list before going out of scope as a thread on stack overflow suggested to try that but not surprisingly it didn't make any difference.

    With regards to extending the event duration before the clipping I just followed this tutorial http://windowsazurecat.com/2011/08/sql-server-reference-data-streaminsight-query/. I've toyed with this previously because the first duration extension seems redundant after the second and concluded that the clipping overwrites the previous infinite extension, but I may be wrong...

    I've still kept the initial extension but as you suggested, changed events to expire after 2 minutes instead of never. Ran the query again over night and as I thought, it didn't make any difference - memory usage still begins to surge. However i've spotted a pattern which i'll explain below.

    I've used the event flow debugger to record events and check the query is functioning correctly which it does, can that or trace.cmd help identify where a leak is occuring?

    You're spot on DevBiker about the ObjectDisposedException, I have seen that before when shutting the service down and assumed it was to do with the wcf connection but even with all the exception catching it was still throwing the error. I'll try what you suggest and get back to you.

    Back to the issue at hand... After another night of monitoring the query, i've spotted a pattern and which causes the build up and then a sudden release of memory which is quite strange.

    While the input stream is receiving events which need categorising, the query releases memory on cue, however during periods of inactivity the memory begins to creep up. I spotted this last night while witnessing almost exponential growth in memory consumption and noticing that there were no input events in the build up to this rapid consumption of memory. This lack of events isn't a problem with the input adapter, its just our source didn't produce any interesting events during the night.

    I manually triggered some events to see if the query was still alive even with the high memory usage and after a slightly delayed period of time, sure as hell the output event popped out the other side and in the process flushed the memory, bringing it back down to normal levels!

    So the question is, why is StreamInsight not releasing memory from the reference stream unless its receiving events on the input stream which need categorising?

    The the input stream is enqueuing cti's every 2 seconds even with if there are no incoming events and the reference stream is tied to this fast moving stream. The reference stream is still successfully enqueuing an updated copy of the known events and the query is expiring the old events (a quick updating of the known events to switch the categorisation of some events does indeed change the query output on the fly).

    Have i missed something or is this a problem with the StreamInsight engine?

    Thanks for the help so far!

    Marcus



    10 Februari 2012 11:07
  • How about trying this:

    Instead of having the input adapter inject CTIs whether you have events or not, use the AdvanceTimeSettings when you create the stream. While implementing IDeclareAdvanceTimeProperties would do the same thing, I think it'd be easier to simply remove the CTI injection and add the AdvanceTimeSettings to where you create the stream. Take a look at this blog post - while it's about controlling your CTIs with LinqPad, it does go into how CTIs are enqueued with the AdvanceTimeSettings.

    Now ... somethings to keep in mind ... AdvanceTimeSettings won't enqueue CTIs like clockwork (the way yours does) ... it will only enqueue CTIs when you actively have events being enqueued. So ... in that period of inactivity, you wouldn't have any CTIs. I'm wondering if all of the empty CTI spans are what is at the root of what you are seeing.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    10 Februari 2012 14:08
    Moderator
  • I've implemented your suggests with regards to the stopping situation. I'll keep on eye on that...

    I've also added AdvanceTimeSettings.StrictlyIncreasingStartTime to my query and cti's with a timestamp of one tick in the future get enqueued only when there are input events.

    I think i actually tried something like this a while back but the output events are not quite as you'd expect, so went down the manual route which makes everything happen like clockwork. This unpredictable behaviour is still the case, when you enqueue an event, its not included in the output until it receives another event some time in the future (even with the cti being one tick in the future).

    Anyway, even with this change in CTI tactics, memory usage continues to rise during inactive periods and is flushed when the first event comes along again.

    As can be seen, the memory (in blue) rises continually while no cti's are being produced. Upon production of cti's by the engine (green), the memory is freed!

    So the build up of cti events doesn't appear to be the problem...


    10 Februari 2012 17:00
  • Let me first say that my initial suggestion ... to look at using AdvanceTimeSettings ... was silly and, as you can see, just plain wrong. But it didn't hit me until I saw your followup.

    The CTI events don't get imported into the stream until you join with the source stream.

    Here's what's happening. You have a data stream and a reference stream. You are importing the CTIs from the data stream into the reference stream. This is exactly how you should do it. But those CTIs don't get imported into the stream ... and, therefore, the reference data actually pushed through the AlterEventDuration().ClipEventDuration until the join. And if there is no data in the data stream, there is nothing to join. So the reference data doesn't get pushed through. Because it doesn't get pushed through, all of the reference events are queued and your memory increases and increases and increases ....

    I remember running into this ... though in a different way ... early in my StreamInsight life ... and Roman set me straight.

    While I'm sure that there is another solution, the only thing that I can think of right now is to have a timeout in your data input adapter that enqueues a "junk" event that won't make it past the join to keep things moving.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    11 Februari 2012 1:31
    Moderator
  • I’d speculate that at least during the quiet time Ctis from your regular input (not the reference input), do not reach the join and therefore the reference events never expire.

    You could use query diagnostics to get a handle on which operator in your query is consuming the memory (assuming the culprit is the query operator). You can get to the detailed diagnostics in the event flow debugger. Those diagnostics also report the last Cti timestamp processed by each operator. This will help to verify if Ctis are actually flowing through the query.

    Keep in mind that there is a slight twist with point events and the advance time policy “adjust”. From the documentation:

    CTI Violation Policies

    Itis possible for an event source to violate CTI semantics by sending events with an earlier timestamp than the inserted CTIs. The advance time settings allows
    for the specification of a policy to handle such occurrences. The policy can have the following two values:

    • Drop: Events that violate the inserted CTI are dropped and are not enqueued into the query.
    • Adjust: Events that violate the inserted CTI are modified if their lifetime overlaps with the CTI timestamp. That is, the start timestamp of the events is set to the most recent CTI timestamp, such that those events become valid. If both start and end time of an event fall before the CTI timestamp, then the event is dropped.

    Since you are dealing with point events, all events with start time less than the Cti do not overlap the Cti and are therefore dropped. I’m not sure if this is relevant here (I couldn’t find how you actually generate timestamps for the input events), but it is worth checking up on.



    Peter Zabback

    • Disarankan sebagai Jawaban oleh Peja Tao 13 Februari 2012 4:46
    • Ditandai sebagai Jawaban oleh Peja Tao 21 Februari 2012 7:02
    11 Februari 2012 1:42
  • You may also be able to use a Left Anti Semi-Join between your reference stream and your data stream ... that should force it to push through. You would still have to publish it as a query but don't hook it up to an output adapter ... or hook it up to a NullOutputAdapter that simply dequeues and drops (that's what I do).

    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    11 Februari 2012 3:35
    Moderator
  • The cti's not being imported until an actual join takes place makes sense. However when i enqueue these updated reference events i also enqueue a cti into the reference stream at the end of the update. Would this not tell the engine that new events have arrived and time has advanced, causing old events to expire? Or does the use of AlterEventDuration().ClipEventDuration() and the use of the join change this behaviour?

    What we've actually done for the moment is switch to a push approach rather than a pull. Instead of polling the known events table and pulling them in regardless every 60 seconds, we're now only updating the reference stream if those categories have changed (this is invoked via wcf).

    Our production servers also appear to provide a pretty continual flow of input events, making sure there is always data going through the join. Even though this appears to function for us, we're still relying on these conditions to hold to avoid a eating up all the memory.

    Shouldn't the enqueing of a cti in the reference stream after every update cause the old events to expire even if no cti's are coming from the fast stream during quiet periods?

    Marcus

    15 Februari 2012 10:54
  • The cti's not being imported until an actual join takes place makes sense. However when i enqueue these updated reference events i also enqueue a cti into the reference stream at the end of the update. Would this not tell the engine that new events have arrived and time has advanced, causing old events to expire? Or does the use of AlterEventDuration().ClipEventDuration() and the use of the join change this behaviour?

    ...

    Shouldn't the enqueing of a cti in the reference stream after every update cause the old events to expire even if no cti's are coming from the fast stream during quiet periods?

    Marcus

    No, the enqueuing of a cti in the reference stream won't do that. The various stream operators are processed with the query ... the entire query has to move forward. If you aren't getting CTIs or events from the data stream to keep it moving forward, the intermediary stream operators won't be processed. Changing your reference stream to enqueue updates only, though, is a good thing to do, regardless of the memory issues. :-)

    Looking over your queries again, I see that a tumbling window is the last operator in the stream and I think that's why you need events ... rather than just CTIs ... to push the events through. Windows don't process without events. Try using a query before creating the window to make sure that it moves forward. An example is below:

    var unionedEvents = unknownEventStream.Union(categorisedEventStream);
    
    var unionedEventQuery = unionedEvents.ToQuery(application,
                                                        "UnionedEvents",
                                                        "Categorized and unknown events",
                                                        EventShape.Point,
                                                        StreamEventOrder.FullyOrdered);
    
    var unionedEventStream = unionedEventQuery.ToStream<CategorisedTraceEvent>();
    
    var eventCategoryCount = from e in unionedEventStream
                                group e by new ApplicationCategory() { category = e.categoryID, application = e.applicationId }
                                    into perEventCategory
                                    from w in perEventCategory.TumblingWindow(TimeSpan.FromSeconds(windowSize),
                                                                            HoppingWindowOutputPolicy.ClipToWindowEnd)
                                    select new WindowedCount()
                                    {
                                        id = perEventCategory.Key.category,
                                        source = perEventCategory.Key.application,
                                        count = (int)w.Count(),
                                        startTime = w.LowerTime(te => te.timestamp).ToString("o"),
                                        endTime = w.UpperTime(te => te.timestamp).ToString("o")
                                    };
    //Start the unionedEventQuery right before the event Category Count Query

    This takes your unioned and categorized events and will push the associated stream operators forward independantly of the window. You can also then reuse the unionedEventStream in other queries without worrying about creating multiple instances of your input adapters (which is usually a Very Bad Thing). See this blog post for more details on the how's and why's of DQC.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    • Ditandai sebagai Jawaban oleh Peja Tao 21 Februari 2012 7:02
    15 Februari 2012 12:39
    Moderator