locked
StreamInsight 2.1 Query hangs after 20 days RRS feed

  • Question

  • Hi All

    I have a stream insight instance created as a stand alone server(Hosted in windows service). and I am running 16 queries. But i have an issue after few days of running query the some of the query stop processing events thought the event are present. I have seen symptoms like the query start piling event in the output event queue. I have observer by watching #Event in Output Queue performance counter

    I am using StreamInsight 2.1

    Please find the output adapter code

    namespace Dummy.Services.Realtime.Analytics.Adapters
    {
        public class UserContactOutputAdapter : TypedPointOutputAdapter<UserContactPayload>
        {
            public readonly object LockObject = new Object();
    
            StreamWriter sw = null; bool streamOpen = false;
            public UserContactOutputAdapter()
            {
               
    
                //sw = new StreamWriter();
                //sw.Flush();
            }
            public override void Resume()
            {
                ConsumeEvents();
                //new Thread(this.ConsumeEvents).Start();
                //throw new NotImplementedException();
            }
    
            public override void Start()
            {
                //Initilize();
                ConsumeEvents();
                //new Thread(this.ConsumeEvents).Start();
                //throw new NotImplementedException();
            }
    
            private void ConsumeEvents()
            {
                PointEvent<UserContactPayload> currentEvent = default(PointEvent<UserContactPayload>);
                sw = AdapterHelper.OpenFile(ConfigurationManager.AppSettings["Dummy.OutputAdapter.UserContact.Filepath"]);
                lock (LockObject)
                {
    
    
    
                    streamOpen = true;
                    try
                    {
                        if (sw != null && streamOpen == true)
                        {
                            while (true)
                            {
                                if (AdapterState.Stopping == AdapterState)
                                {
                                    this.sw.Flush();
                                    this.sw.Close(); streamOpen = false;
    
                                    Stopped();
    
                                    return;
                                }
    
                                if (DequeueOperationResult.Empty == Dequeue(out currentEvent))
                                {
                                    Ready();
                                    this.sw.Flush();
                                    this.sw.Close(); streamOpen = false;
                                    return;
                                }
                                if (currentEvent.EventKind == EventKind.Insert)
                                {
    
                                     
                                    string accession_number = currentEvent.Payload.TimeStamp + "-" + currentEvent.Payload.CommonThread + "-" + currentEvent.Payload.Name + "-" + currentEvent.Payload.Qualified + "-" + currentEvent.Payload.Passes + "-" + currentEvent.Payload.Mrn + Environment.NewLine;
                                   
                                    sw.Write(accession_number);
                                }
                              
                                sw.Flush();
    
                                // Every received event needs to be released.
                                ReleaseEvent(ref currentEvent);
                            }
    
                        }
                    }
                    catch (Exception ex)
                    {
                        AdapterHelper.LogError(ex);
                        try { if (currentEvent != null) ReleaseEvent(ref currentEvent); } catch (Exception x) { }
                        if (sw != null && streamOpen == true) { sw.Close(); streamOpen = false; }
                        
                    }
                    finally
                    {
                        try { if (currentEvent != null) ReleaseEvent(ref currentEvent); } catch (Exception x) { }
                    }
                }
    
            }
            protected override void Dispose(bool disposing)
            {
                if (sw != null && streamOpen == true) { sw.Close(); streamOpen = false; }
                base.Dispose(disposing);
            }
        }
    }

    Query code

      //USER CONTACT
                inputConfig.Config = "USERCONTACT";
                outputConfig.Config = "USERCONTACT";
                UserContactInputAdapter = application.CreateInputAdapter<CommonInputAdapterFactory>("UserContactInputAdapter", "Description...");
                UserContactOutputAdapter = application.CreateOutputAdapter<CommonOutputAdapterFactor>("UserContactOutputAdapter", "Description...");
    
                UserContactCepStreamInput = CepStream<UserContactPayload>.Create("UserContactProducer", typeof(CommonInputAdapterFactory), inputConfig, EventShape.Point);
    
                //Create Main Stream
                MainUserContactQuery = UserContactCepStreamInput.ToQuery(application, "MainUserContactQuery", "Main Source Query for contact", EventShape.Point, StreamEventOrder.FullyOrdered);
                MainUserContactQuery.Start();
    
                normalUserContactEventStream = MainUserContactQuery.ToStream<UserContactPayload>();
    
    
                UserContactilterCepStream = from e in normalUserContactEventStream
                                                select e;
    
                userContactQuery = UserContactilterCepStream.ToQuery("UserContactQuery", "User Contact Query", typeof(CommonOutputAdapterFactor), outputConfig, EventShape.Point, StreamEventOrder.FullyOrdered);
                userContactQuery.Start();


    Wednesday, November 30, 2016 3:52 PM