Asked by:
StreamInsight 2.1 Query hangs after 20 days

Question
-
Hi All
I have a stream insight instance created as a stand alone server(Hosted in windows service). and I am running 16 queries. But i have an issue after few days of running query the some of the query stop processing events thought the event are present. I have seen symptoms like the query start piling event in the output event queue. I have observer by watching #Event in Output Queue performance counter
I am using StreamInsight 2.1
Please find the output adapter code
namespace Dummy.Services.Realtime.Analytics.Adapters { public class UserContactOutputAdapter : TypedPointOutputAdapter<UserContactPayload> { public readonly object LockObject = new Object(); StreamWriter sw = null; bool streamOpen = false; public UserContactOutputAdapter() { //sw = new StreamWriter(); //sw.Flush(); } public override void Resume() { ConsumeEvents(); //new Thread(this.ConsumeEvents).Start(); //throw new NotImplementedException(); } public override void Start() { //Initilize(); ConsumeEvents(); //new Thread(this.ConsumeEvents).Start(); //throw new NotImplementedException(); } private void ConsumeEvents() { PointEvent<UserContactPayload> currentEvent = default(PointEvent<UserContactPayload>); sw = AdapterHelper.OpenFile(ConfigurationManager.AppSettings["Dummy.OutputAdapter.UserContact.Filepath"]); lock (LockObject) { streamOpen = true; try { if (sw != null && streamOpen == true) { while (true) { if (AdapterState.Stopping == AdapterState) { this.sw.Flush(); this.sw.Close(); streamOpen = false; Stopped(); return; } if (DequeueOperationResult.Empty == Dequeue(out currentEvent)) { Ready(); this.sw.Flush(); this.sw.Close(); streamOpen = false; return; } if (currentEvent.EventKind == EventKind.Insert) { string accession_number = currentEvent.Payload.TimeStamp + "-" + currentEvent.Payload.CommonThread + "-" + currentEvent.Payload.Name + "-" + currentEvent.Payload.Qualified + "-" + currentEvent.Payload.Passes + "-" + currentEvent.Payload.Mrn + Environment.NewLine; sw.Write(accession_number); } sw.Flush(); // Every received event needs to be released. ReleaseEvent(ref currentEvent); } } } catch (Exception ex) { AdapterHelper.LogError(ex); try { if (currentEvent != null) ReleaseEvent(ref currentEvent); } catch (Exception x) { } if (sw != null && streamOpen == true) { sw.Close(); streamOpen = false; } } finally { try { if (currentEvent != null) ReleaseEvent(ref currentEvent); } catch (Exception x) { } } } } protected override void Dispose(bool disposing) { if (sw != null && streamOpen == true) { sw.Close(); streamOpen = false; } base.Dispose(disposing); } } }
Query code
//USER CONTACT inputConfig.Config = "USERCONTACT"; outputConfig.Config = "USERCONTACT"; UserContactInputAdapter = application.CreateInputAdapter<CommonInputAdapterFactory>("UserContactInputAdapter", "Description..."); UserContactOutputAdapter = application.CreateOutputAdapter<CommonOutputAdapterFactor>("UserContactOutputAdapter", "Description..."); UserContactCepStreamInput = CepStream<UserContactPayload>.Create("UserContactProducer", typeof(CommonInputAdapterFactory), inputConfig, EventShape.Point); //Create Main Stream MainUserContactQuery = UserContactCepStreamInput.ToQuery(application, "MainUserContactQuery", "Main Source Query for contact", EventShape.Point, StreamEventOrder.FullyOrdered); MainUserContactQuery.Start(); normalUserContactEventStream = MainUserContactQuery.ToStream<UserContactPayload>(); UserContactilterCepStream = from e in normalUserContactEventStream select e; userContactQuery = UserContactilterCepStream.ToQuery("UserContactQuery", "User Contact Query", typeof(CommonOutputAdapterFactor), outputConfig, EventShape.Point, StreamEventOrder.FullyOrdered); userContactQuery.Start();
Wednesday, November 30, 2016 3:52 PM