Answered clear the queue?

  • Tuesday, April 10, 2007 2:34 PM
     
     
    Is there a way to clear the message queue on a port?  for example if you subscribe to a service, but can't, (or don't want to), handle every notification... then the messages will get back logged, and you will eventually be working with old data.  Is there a way to clear this queue by the subscriber on a periodic basis?

    I think an analogous question is if there is a way to access messages in a LIFO order instead of FIFO. 

    (Side question: can this backlogged queue break anything? and if so what is the best way to detect and debug this in a large web of services?)

    thanks,
    -Ben

All Replies

  • Tuesday, April 10, 2007 7:12 PM
     
     Answered

    we actually have something to really help you with this, on notification ports: Use the DispatcherQueue.TaskExecutionPolicy enum, to specify the behavior you want. Unfortunately there is a bug in the

     

    more specifically in your service handler, where you subscribe, and activate a receiver on a notification port, you can create a new DispatcherQueue, and use it, when you activate your notification receiver:

    Code Snippet

     

    DispatcherQueue notificationTaskQueue = new DispatcherQueue("notify queue",

        this.TaskQueue.Dispatcher, // use same dispatcher as your service

        TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks,

        1); // throw away anything but the oldest 5 items

     

    Arbiter.Activate(notificationTaskQueue, Arbiter.Receive(true,notificationPort, NotificationHandler);

     

     

     

    There is a bug unfortunately (fixed in next CTP) where we keep the OLDEST messages, when a new one comes in, instead of throwing them away in FIFO order. But even with this bug, it will help you get only N notifications, without your queue getting large.

     

    Another option, and you can find this example in Traxster/SerializerServices/Drive.cs:

     

    Activate a *onetime* receiver on your notification port.

     

    Code Snippet

     

    Arbiter.Activate(notificationTaskQueue, Arbiter.Receive(false,notificationPort, NotificationHandler);

     

     In your notification handler, throw away any message using

    Code Snippet

     

    while (notificationPort.Test());

     

    then, activate the one time receiver again, at the end of your notification

     

    Code Snippet

     

    Arbiter.Activate(notificationTaskQueue, Arbiter.Receive(false,notificationPort, NotificationHandler);

     

     

     

     Here are more details on the task execution policy (its in our help)

     

    Code Snippet
    public enum TaskExecutionPolicy
        {
            ///
            /// Default behavior, all tasks are queued with no constraints
            ///
            Unconstrained = 0,
            ///
            /// Queue enforces maximum depth (specified at queue creation)
            /// and discards tasks enqueued after the limit is reached
            ///
            ConstrainQueueDepthDiscardTasks,
            ///
            /// Queue enforces maximum depth (specified at queue creation)
            /// but does not discard anny tasks. It forces the thread posting any tasks after the limit is reached, to
            /// sleep until the queue depth falls below the limit
            ///
            ConstrainQueueDepthThrottleExecution,
            ///
            /// Queue enforces the rate of task scheduling specified at queue creation
            /// and discards tasks enqueued after the current scheduling rate is above the specified rate
            ///
            ConstrainSchedulingRateDiscardTasks,
            ///
            /// Queue enforces the rate of task scheduling specified at queue creation
            /// and forces the thread posting tasks to sleep until the current rate of task scheduling falls below
            /// the specified average rate
            ///
            ConstrainSchedulingRateThrottleExecution
        }

     

  • Tuesday, April 17, 2007 7:57 PM
     
     

    Regarding the following code snipped:

     

    Code Snippet

    while (notificationPort.Test());

     

     

    Wouldn't this break the contract of any inter-service communication?  All DsspOperation<...> types require sending back a response.  I think the code could be improved to:

     

    Code Snippet

    ResponseType defaultResponse = ...

    MessageType msg;

    while (notificationPort.Test(out msg))

        msg.ResponsePort.Post(defaultResponse);

     

     

    This sends back a valid response to all the queued message.  However, You should probably also track this situation and flag a warning if the average number of notifications waiting is greater than <small-magic-number/> over an extended period of time -- such a condition is likely the symptom of a design flaw or feedback loop.

     

    #aaron