How to stop/remove cleanly input and output adapters?

Answered How to stop/remove cleanly input and output adapters?

  • 28. února 2012 15:48
     
     

    Hello,

    I have an application running with queries and adapters and i want to remove them from the application and create new ones. Is there a good way to do this?

    I am currently stopping and deleting all the queries but the adapters seem to be still running and it is generating an error which is that it cannot access to the deleted IOutputAdapter<TEvent>

    I am using the SqlOutputAdapter from the StreamInsightProductTeamSamples.

    Thank you very much for your help.

Všechny reakce

  • 28. února 2012 16:24
     
     

    I've never seen this. Can you post the code to stop/delete the queries?

    It's possible that there is a condition where your queries are stopped/deleted before your output adapter calls Stopped() and still tries to dequeue events. But ... I use the same type of method to shutdown the StreamInsight instance and, as I said, I've not seen this before.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • 28. února 2012 16:47
     
      Obsahuje kód
    public Dictionary<String, QueryTemplate> ListQueriesQT;
    protected Dictionary<String, Query> ListQueries;
    
    public void CleanUp()
            {
                foreach (var query in ListQueries)
                {
                    //stoping and deleting queries
                    query.Value.Stop();
                    query.Value.Delete();
                }
    
                foreach (var queryQt in ListQueriesQT)
                {
                     queryQT.Delete();
                }
            }

    and here the code into the adapter:

            private void ConsumeEvents()
            {
                TEvent currentEvent = default(TEvent);
    
                while (true)
                {
    
                    try
                    {
                            // if the engine asked the adapter to stop
                            if (this.outputAdapter.AdapterState == AdapterState.Stopping)
                            {
                                // clean up state
                                this.Cleanup();
    
                                    // inform the engine that the adapter has stopped
                                    this.outputAdapter.Stopped();
    
                                // and exit worker thread
                                return;
                            }
    
                            // NOTE: at any point in time during the execution of the code block below, the engine
                            // could change the Adapter state to Stopping. Then the engine will resume the adapter
                            // (i.e. call Resume()) just one more time, and the stopping condition will be
                            // trapped at the check above.
    
                            // Dequeue the event
                            DequeueOperationResult result = this.outputAdapter.Dequeue(out currentEvent);
    
                            // if the engine does not have any events, the adapter is Suspended; so do this ..
                            if (result == DequeueOperationResult.Empty)
                            {
                                // optionally invoke a method here for any housekeeping
    
                                // inform the engine that adapter is ready to be resumed
                                this.outputAdapter.Ready();
    
                                // exit the worker thread
                                return;
                            }
    
                            // write out event to output table
                            this.CreateRowFromEvent(currentEvent);
                    }
                    catch (Exception ex)
                    {
                        Logger.Log.Instance.Warn(ex.Message);
                    }
                    finally
                    {
                        // IMPORTANT: Release the event always
                        if (currentEvent != null)
                        {
                            this.outputAdapter.ReleaseEvent(ref currentEvent);
                        }
                    }
                }
            }

    this.outputAdapter.Stopped() emits an exception which is caught into my logger.

    Is Stopped() supposed to be called before i call the CleanUp() function? Should i call it explicitly before i stop the query?

  • 28. února 2012 18:22
     
     

    First, does this happen every time or just once in a while?

    On what line do you get the exception?

    What happens if you put a Thread.Current.Sleep(1000) (1 second sleep) between the stop and the delete?

    I have a theory and your answers may confirm (or not) that theory. I'd rather have confirmation before I send you down a potential rabbit hole as I won't have an opportunity to set this up and run it myself until later.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • 29. února 2012 9:47
     
      Obsahuje kód

    It happens every time yes.

    I always have the exception on

      this.outputAdapter.Stopped();

    The exception is still thrown even if i put a Thread.Sleep(1000) between the stop and the delete.

    But i have tried something else and it didn't work either:

    public void CleanUp()
            {
                foreach (var query in ListQueries)
                {
                    //stoping  queries
                    query.Value.Stop();
                    
                }
    
                foreach (var query in ListQueries)
                {
                    //deleting queries
                    query.Value.Delete();
                }
    
                foreach (var queryQt in ListQueriesQT)
                {
                     queryQT.Delete();
                }
            }

    Actually, the queries were composed queries so the first one may depends on the second one, and so on...

    The problem happens when i stop the final query which is binded to the adapter.


  • 29. února 2012 20:04
     
     

    Let me guess ... it's an ObjectDisposedException, right?

    I get that also. In a lot of cases, the Sql Output Adapter in the samples will call Stopped() twice. The first time is OK ... but when you call Stopped(), internally StreamInsight disposes the object. And, as you would expect and as a good Disposable object should do, the second time you get an ObjectDisposedException.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • 1. března 2012 7:33
     
     

    Yes this is the exception i got.

    So this is not a bug? Should i catch this kind of exception and exit the ConsumeEvents function?  Or is there a way to prevent the adapter to call Stopped() twice? Because if i do nothing the adapter keeps trying to call stopped() and the adapter seems to be never deleted.

    And what i want is to remove completly the streamInsight objects: queries, templatequeries, binders, streams and the adapters in order to create new ones.


  • 1. března 2012 16:13
     
     Odpovědět

    Oh, it's a bug all right, don't get me wrong. But it's not a StreamInsight bug ... the bug is in the output adapter ... it's calling Stopped twice() and that's going to guarantee you an ObjectDisposedException(). Now, this may have been different in 1.0 or 1.1 - I honestly don't remember - but this is how it is in 1.2.

    I've posted a fixed version of the Sql Output adapter ... as well as a detailed description/walkthrough of output adapter lifetime on my blog. There have been several questions on the board related to adapter lifetime so I figured that I'd go ahead and get the details out there.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

  • 2. dubna 2012 13:37
     
     

    Sorry to have been so long to answer but i was busy with some other issues and i haven't been able to test it before. So I tried the method you suggested and it prevents the exception from being raised. Also i understand much better the problem and the way an adapter works now thanks to your blog. Thank you very much!