Answered StreamInsight Threads Number Increase

  • 2012年2月3日 9:59
     
     

    Hi,

     

    I am new with streamInsight and I am facing a big problem in my application.

    Here is the issue:

    I am sending real time data( that I am getting from my data source) to the streamInsight Engine  using the "Enqueue" method, the problem is I noticed That with every "Enqueue" call I got a thread added to the threads number of my application.The number of threads encreases  in a continued way until the application crashes!

    My real data are coming and enqueued each 1 second, the number of data can reach 10000 Data or more in 1 second.these data are enqueued to the StreamInsight engine using the Input Adapter. Could you please advise if someone faced this problem and how to resolve it?does StreamInisght Adapter method "Enqueue" has problems with receiving many data continuously without stopping?

    Any advise will be much appreciated .

    Thank you very much for your help

     

     

     

すべての返信

  • 2012年2月3日 10:11
     
     

    StreamInsight is intended to deal with a lot of data continuously without stopping so that is not where your problem most likely lies.

    I would look into your Input Adapter's source classes to see if they are spawning threads each time they "produce insert events" (enqueue the events). Typically (in my experience), the input adapter has some class that itself is responsible for acquiring the data that it then uses to build the event payload and enqueue as an insert event. How that class actually acquires the data is up to the user implementing the class (ie. you). Is it possible that your implementation spawns a thread each time it builds up a new event payload?

    Perhaps you can incllude some code snippets of your input adapter?

  • 2012年2月3日 10:34
     
     回答済み

    Hi Farnham,

    Thank you for the response , here is how I treat the Data in my Input Adapter.

     

    //This is the event that I am using to get the data from my Data sources. New data are coming Each 1second

    private void GetDataFromDataSource(int transactionID, int groupHandleClient, int masterQuality,
                    int masterError, int[] HandleClient, object[] DataValue,
                    long[] TimeStamp, short[] Quality, int[] Error,
                    int GrpHandle, int SrvHandle)
            {
                if (AdapterState.Stopping == AdapterState)
                {
                   
                  
                    try
                    {
                        Stopped();
                    }
                    catch(Exception e)
                    {
                    
                    }

                    return;
                }

                int countError=0;

                for (int i = 0; i < HandleClient.Length; i++)
                {
                        try
                        {
                         
                            EventAttributes MyEvent = new EventAttributes();
                            MyEvent.svrHost = "";
                            MyEvent.SrvHandle = "";
                            MyEvent.Quality = Quality[i];
                            MyEvent.HandleClient = svrAlias;
                            MyEvent.DataValue = DataValue[i];
                                //save the received data in a buffer
                            EventsBuffer.Add(MyEvent);
    //if the buffer reaches "_buffersize" specified by the user" then send the first "-buffersize" number of data to the StreamInsight engine
                            if (EventsBuffer.Count>_bufferSize && EventsBufferTemp.Count==0)
                            {
                                EventsBufferTemp.AddRange(EventsBuffer.GetRange(0, _bufferSize));
                                EventsBuffer.RemoveRange(0,_bufferSize);
                            }
                        

                        }
                        catch (Exception ex)
                        {
                         
                        }
                }
                   if (!operationRslt)
                   {
                       if (countError >= _fiablilityLevel)
                       {
                           countError = 0;
                           continue;
                       }
                       else
                       {
                           i = currentElement - 1;
                       }
                   }
             
              
            }
           
            ///Now the method of the thread that enqueues the data into streamInsight Engine
             private void SendStreamedData()
           {
               EnqueueOperationResult result = EnqueueOperationResult.Full;
               PointEvent<DAEvent> currEvent = default(PointEvent<DAEvent>);

         
               while (!_needToStop)
               {
                   try
                   {

                       if (EventsBufferTemp.Count > 0)
                       {
                            //loop and enque data to streamInsight
                          for(int i = 0; i< EventsBufferTemp.Count; i++)
                          {
                            
                                   currEvent = CreateInsertEvent();

                                   currEvent.StartTime = DateTime.Now;
                                   currEvent.Payload = new DAEvent();
                                   string servHost1 = EventsBufferTemp[i].svrHost;
                                   string servHandle1 = EventsBufferTemp[i].SrvHandle;
                                   string HandleClient1 = EventsBufferTemp[i].HandleClient;
                                   short quality1 = EventsBufferTemp[i].Quality;
                                   object value1 = EventsBufferTemp[i].DataValue;

                                   (currEvent.Payload as IDAEvent).Init(servHost1, servHandle1, HandleClient1, quality1, value1);

                                   
                                   result = Enqueue(ref currEvent);

                                   if (result == EnqueueOperationResult.Full)
                                   {
                                       ReleaseEvent(ref currEvent);
                                       Ready();

                                   }
                               }
                            EventsBufferTemp.Clear();
                           }
                   }
                   catch (Exception e)
                   {

                   }
                   Thread.Sleep(StreamRate);
               }
           }
          

    Could you please help if I am missing somthing?

    Thanks!

    • 回答としてマーク Juliette_RF 2012年2月3日 11:23
    •  
  • 2012年2月3日 11:36
     
     

    Hi Juliette

    I see the Thread.Sleep(SteamRate) statement at the end of your SendStreamedData method but your code snippet doesn't include anything showing how the thread is created. Can you include more to show how you create the thread.

    I am thinking that this is where your problem is though; the piece that is spawning this thread may be the cultprit that is doing it repeatedly. 

    The patterns I use don't ever entail spawning a thread to deal with sending streamed data to the streaminsight engine. This code snippet shows the basic pattern I use (sorry, it may not run as it is, but the general idea is there)

    I have a DataSource class that is  responsible for getting data, packaging it as a payload and then calling back to the InputAdapter to enqueue the payload. I use a timer to control how often the data source "does its work". The InputAdapter callback is setup by the Input Adapter itself (see further down below).

    Again, can you send the code that spawns the thread in your pattern and lets see if it is being invoked repeatedly. It seems it must be if you are running out of threads.

        public sealed class DataSource : IDisposable
        {
            private System.Timers.Timer m_Timer;

            /// <summary>
            /// Initializes a new instance of the DataSource class.
            /// </summary>
            public DataSource(VPTagsSourceConfig config)
            {
                m_Timer = new System.Timers.Timer(m_Config.EventUpdateInterval.TotalMilliseconds);
                m_Timer.Elapsed += new System.Timers.ElapsedEventHandler(this.ProduceData);
                m_Timer.AutoReset = true;
                m_Timer.Enabled = false; 
            }

            /// <summary>
            /// Callback type to push new data to.
            /// </summary>
            /// <param name="data">New data item to push to the callback.</param>
            /// <param name="timestamp">Timestamp of the data.</param>
            public delegate void EventCallback(XMLPayload data, DateTimeOffset timestamp);

            /// <summary>
            /// Gets or sets the callback.
            /// </summary>
            public EventCallback Callback { get; set; }


            /// <summary>
            /// Starts the data generation.
            /// </summary>
            public void Start()
            {
                m_Timer.Enabled = true;
            }

            /// <summary>
            /// Stops the data generation.
            /// </summary>
            public void Stop()
            {
                m_Timer.Enabled = false;
            }

            /// <summary>
            /// Dispose method.
            /// </summary>
            public void Dispose()
            {
                Stop();
                m_Timer.Close();
                m_Timer.Dispose();
            }

            private void ProduceData(object sender, System.Timers.ElapsedEventArgs e)  // supports Float so far...
            {
                this.Callback(new XMLPayload(GetDataFromSource()));
            }
        }


        public class InputAdapterPoint : PointInputAdapter 
        {
            /// <summary>
            /// Event source.
            /// </summary>
            private DataSource dataSource;

            /// <summary>
            /// Initializes a new instance of the InputAdapterPoint class.
            /// </summary>
            /// <param name="config">The configuration passed from the adapter factory.</param>
            public InputAdapterPoint(VPTagsSourceConfig config, CepEventType cepEventType)
            {
                this.dataSource = new DataSource(config);

                // setup the DataSource callback to this class' ProduceEvent method which will enqueque the event
                this.dataSource.Callback = new DataSource.EventCallback(this.ProduceEvent); 
            }

            /// <summary>
            /// The engine will call the Start() method upon starting the adapter. This is where
            /// the event enqueueing is supposed to be kicked off.
            /// </summary>
            public override void Start()
            {
                this.dataSource.Start();
            }

            /// <summary>
            /// The engine will call the Resume() method when resuming the suspended
            /// adapter.
            /// </summary>
            public override void Resume()
            {
                this.dataSource.Start();
            }

            protected override void Dispose(bool disposing)
            {
                dataSource.Dispose();
                base.Dispose(disposing);
            }

            private void ProduceEvent(XMLPayload data, DateTimeOffset timestamp)
            {
                // Check for the stopping state and stop if necessary.
                if (AdapterState.Stopping == AdapterState)
                {
                    Stopped();
                    return;
                }

                // Since this method is called from a thread that is independend of
                // Start() and Resume(), we need to make sure that the adapter is
                // actually running.
                if (AdapterState.Running != AdapterState)
                {
                    // Throw away the current event.
                    return;
                }

                if (data == null)
                {
                    // enqueue a cti event manually to pump the last interval event (if any). Use the time of the last interval event in the CTI
                    EnqueueCtiEvent(timestamp);
                    return;
                }

                PointEvent currEvent = CreateInsertEvent();
                if (null == currEvent)
                {
                    return;  // Throw away the current event.
                }

                currEvent.StartTime = timestamp; 
                try
                {
                    object value = data.Xml;  //  Convert.ChangeType(data.FQN, evtField.Type.ClrType, this.cultureInfo);
                    currEvent.SetField(0, value);
                }
                catch (Exception e)
                {
                    s_Log.Error("Error: " + e.Message);
                }

                if (previousEvent != currEvent)
                {
                    if (EnqueueOperationResult.Full == Enqueue(ref currEvent))
                    {
                        ReleaseEvent(ref currEvent);
                        ReleaseEvent(ref previousEvent);
                        Ready();
                        return;
                    }
                    previousEvent = currEvent;
                }
            }
        }

     

     

  • 2012年2月3日 13:06
     
     

     

    Hi Farnham,   

    Thanks again for the response,
    To simplify my tests, I have implemented another sample as following :


    In this sample I generated random data and enqueued it to StreamInsight, I got an abnormal encrease in the number of threads in my application: please take a look and tell me if there is something wrong in my code:   

           
            private void LoadConnection()
            {
             
              
               //a thread to generate random data
                Thread test = new Thread(myTest);
                test.Start();
               
                //this is the thread that calls SendData methods that enqueues the Data to StreamInsight Engine
                    Thread th = new Thread(new ThreadStart(SendData));
                    th.Start();       
            }
           
            private void ProduceEvents()
            {
                _needToStop = false;
                LoadConnection();
            }

            public override void Start()
            {
            //a queue in which I will save my Data
                _currEvents = new Queue();
                ProduceEvents();
            }
       
             public override void Resume()
            {
               if(!_needToStop) ProduceEvents();
            }
           
            //Send the data to the streamInsight engine
            private void SendData()
           {
               EnqueueOperationResult result = EnqueueOperationResult.Full;
               PointEvent<DAEvent> currEvent = default(PointEvent<DAEvent>);
               while (!_needToStop)
               {
                   while (_currEvents.Count != 0)
                   {
                       try
                       {
                                currEvent = (PointEvent<DAEvent>)_currEvents.Dequeue();

                                   result = Enqueue(ref currEvent);

                                   if (EnqueueOperationResult.Full == result)
                                   {
                                       ReleaseEvent(ref currEvent);
                                       Ready();

                                   }
                             
                          
                       }
                       catch (Exception e)
                       {

                       }
                   }

                   Thread.Sleep(100);

               }
           }
          
          
           //a method to enqueue the data in my queue" _CurrentEvents"
            void myTest()
            {
                while (true)
                {
                //enqueing the data
                    addToBuffer(125, "", "", "", 122, "");
                    System.Threading.Thread.Sleep(1000);
                }
            }

     

     

    //adding data to the buffer( the queue _currentEvents)
            private bool EnqueueData(long val1, string val2, string val3, string val4, short Val5, object Val6)
           {
             

                   PointEvent<DAEvent> currEvent = default(PointEvent<DAEvent>);
                   EnqueueOperationResult result = EnqueueOperationResult.Full;

                   try
                   {
                     

                       currEvent = CreateInsertEvent();

                       if (currEvent == null)
                       {
                           return false;
                       }

                       currEvent.StartTime = DateTime.Now;
                       currEvent.Payload = new DAEvent();

                    
                       (currEvent.Payload as IOPCDAEvent).Init("hh", "", "ff", 1, "kk");

                        _currEvents.Enqueue(currEvent);

                           return true ;
                   }
                   catch (Exception e)
                   {
                      
                   }


               return false;
           }

     

     

    Thanks :)

  • 2012年2月3日 13:07
     
     
    Sorry the "addToBuffer" is "EnqueueData" that I added in the end of the code
  • 2012年2月3日 15:44
     
     

    Hi Juliette

    The architecture of your streaminsight application is, sorry to have to say this, not quite right.

    I suggest you work through some of the samples available online to get a feel for how input adapters, adapter factories, output adapters and the linq engine hang together

    Here is a link that may be useful

    http://msdn.microsoft.com/en-us/library/ff518472.aspx

    Once you have this demo working, you can go ahead and move your DA logic into the input adapter

     

  • 2012年2月3日 16:41
     
     

    Hi,

     

    Can you tell me what is wrong with my Input adapter code?

    further clarification It is streamInsight that is increasing the number of threads and when it reaches 1000 threads , StreamInsight stops reading data from the input adapter.

     

     

    Thanks

  • 2012年2月3日 22:10
     
     

    Juliette,

    Why are you loading the data into a buffer at all in your input adapter?

    When I write my input adapters, as soon as data is received or retrieved, I enqueue the event into StreamInsight.

  • 2012年2月6日 8:25
     
     

    TXPower125,

     

    I am loading the data into a buffer because when I enqueue the data received directly from my ReceiveData Event , the number of threads increases in a stange way.