none
How do I execute FILO Task Processing?

    Întrebare

  • Recently, I have found myself thinking more and more about how much better some of my modules would be if I could process Tasks in a FILO order instead of FIFO. Has someone already created a 'DispatcherStack' extension to the CCR? If not, why not? Again, such a dispatcher would work great in some cases, wouldn't it? (is the dispatcherQueue the only thing that would need to be substituted or is it somehow more complicated than that?)

    If nobody has already created one, I think I'll take a crack at doing it. That said, any design advice or guidance would be much appreciated. I promise to turn whatever working code I come up with back over to the community if I can get some help with a design.

    8 februarie 2012 03:25

Răspunsuri

  • I chatted with an internal developer for CCR/DSS at length.  In short, the best solution we came up with was: a) utilize the Drain Port method or b) inject a task at the top of the queue or c) rewrite vast sub-portions of CCR.

    • Marcat ca răspuns de Kobe1815 29 februarie 2012 18:35
    29 februarie 2012 01:22

Toate mesajele

  • I've not heard anyone request exactly this pattern before. Could you elaborate on what the motivation is?

    Nick.

    17 februarie 2012 22:28
  • Sure, here's an entire blog post on some of the when's and why's someone would want FILO task processing

    http://beachfrontcoding.wordpress.com/2012/02/20/why-i-think-fifo-is-not-always-the-right-task-processing-strategy-for-ccr-ports/

    Anyone have any thoughts?

    20 februarie 2012 06:05
  • Interesting post, sure looks like you're the author. :)  But, kudos to the creative story.  I particularly like the HungryGF class.  Below should be the code to get you pointed in the right direction.

    using System; using System.Collections.Generic; using System.ComponentModel; using Microsoft.Ccr.Core; using Microsoft.Dss.Core.Attributes; using Microsoft.Dss.ServiceModel.Dssp; using Microsoft.Dss.ServiceModel.DsspServiceBase; using W3C.Soap; using Microsoft.Robotics; namespace Microsoft.Robotics.HungryGF {     publicsealedclassContract     {         [DataMember]         publicconststring Identifier = "http://schemas.microsoft.com/robotics/2012/02/hungrygf.html";     }              [DataContract]     publicclassHungryGFState     {     }          [ServicePort]     publicclassHungryGFOperations : PortSet<DsspDefaultLookupDsspDefaultDropGetGetFoodFILOProcess>     {     }     [DataContract]     publicclassFILOProcessRequest     {     }     publicclassFILOProcess : Update<FILOProcessRequestPortSet<GetFoodRequestFault>>     {     }     publicclassGetFoodOperationsPort : Port<GetFood>     {     }     [DataContract]     publicclassGetFoodRequest     {         [DataMember]         publicstring FoodType { getset; }     }     publicclassGetFood : Update<GetFoodRequestDsspResponsePort<DefaultUpdateResponseType>>     {     }          publicclassGet : Get<GetRequestTypePortSet<HungryGFStateFault>>     {         public Get()         {         }                  public Get(GetRequestType body)             : base(body)         {         }                  public Get(GetRequestType body, PortSet<HungryGFStateFault> responsePort)             : base(body, responsePort)         {         }     } }

    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Threading;
    using Microsoft.Ccr.Core;
    using Microsoft.Dss.Core.Attributes;
    using Microsoft.Dss.ServiceModel.Dssp;
    using Microsoft.Dss.ServiceModel.DsspServiceBase;
    using W3C.Soap;
     
    namespace Microsoft.Robotics.HungryGF
    {
        [Contract(Contract.Identifier)]
        [DisplayName("HungryGF")]
        [Description("HungryGF service (no description provided)")]
        class HungryGFService : DsspServiceBase
        {
            [ServiceState]
            HungryGFState state = new HungryGFState();
     
            [ServicePort("/HungryGF", AllowMultipleInstances = true)]
            HungryGFOperations mainPort = new HungryGFOperations();
     
            GetFoodOperationsPort foodPort = new GetFoodOperationsPort();
     
            public HungryGFService(DsspServiceCreationPort creationPort)
                : base(creationPort)
            {
            }
     
            protected override void Start()
            {
                // Initialize state, if no saved configuration was found
                if (this.state == null)
                {
                    this.state = new HungryGFState()
                    {
                        //default service state
                    };
                }
     
                // Call the base class to complete initialization, including service registration
                base.Start();
     
                //for easy testing purposes - normally, you would have a separate service posting this messages
                //and processing the responses
                for (int i = 0; i < 100; i++)
                {
                    this.mainPort.Post(new GetFood() { Body = new GetFoodRequest() { FoodType = i.ToString() } });
                }
     
                //for testing purposes just Thread.Sleep
                Thread.Sleep(5000);
     
                this.mainPort.Post(new FILOProcess());
            }
     
            [ServiceHandler(ServiceHandlerBehavior.Exclusive, QueueDepthLimit = 100)]
            public IEnumerator<ITask> GetFoodHandler(GetFood getFood)
            {
                this.foodPort.Post(getFood);
                getFood.ResponsePort.Post(DefaultUpdateResponseType.Instance);
                yield break;
            }
     
            [ServiceHandler(ServiceHandlerBehavior.Exclusive, QueueDepthLimit = 1)]
            public IEnumerator<ITask> FILOProcessHandler(FILOProcess filoProcess)
            {
                if (this.foodPort.ItemCount < 1)
                {
                    //there's no default appropriate behavior to take if your GF hasn't
                    //told you what to get
                    filoProcess.ResponsePort.Post(Fault.FromException(new NotImplementedException()));
                }
     
                while (this.foodPort.ItemCount > 1)
                {
                    //Remove all messages except the most recent in a thread-safe manner
                    yield return Arbiter.Receive(falsethis.foodPort, EmptyHandler);
                }
     
                GetFood mostRecentRequest = null;
                yield return this.foodPort.Receive(
                    delegate(GetFood getFood)
                    {
                        mostRecentRequest = getFood;
                        Console.WriteLine("The FILO Processed decision was: {0}", getFood.Body.FoodType);
                    });
     
                filoProcess.ResponsePort.Post(mostRecentRequest.Body);
     
                yield break;
            }
        }
    }
    
                          

    20 februarie 2012 23:27
  •  

     

    Luke Thompson - MSFT, I did write that blog post, I just started replying to gunnn then realized it was such a long response it'd clog the thread so I just created a blog post. I'm glad you liked it though.

    I think your response is a good one. It helped me realize that in the simple case where we only care about the single most recent message (and not the n most recent messages), we could simply set QueueDepthLimit = 1. Though I am not sure if high posting rates would prevent any one task from being executed since in that case we'd constantly be adding then removing the task as new messages come in. Any thoughts on that approach?

    Your response also helped me realize a very serious danger of FILO task execution that I had not previously considered. Namely, that with FILO task execution it is possible that a task could get created, queued, then 'buried'. That is, a task could be queued but so many other tasks queued on top of it that the task never gets executed. So posting a task and waiting for a response would always require a timeout when using FILO task execution. Furthermore, the scope in-which FILO task execution should be used in a single program should be limited so that fewer tasks get buried.

    That said, I am afraid the suggestion you have given is not sufficient for general use. First of all, it seems like too much work. With your approach I'll have to create and coordinate two ports every time I want to do FILO task execution. To me, FILO task execution is so important and so useful that it should be as easy to do as FIFO task execution. Secondly, in a more general case it is possible that items are being posted to the foodPort faster than you are removing them in the FILOProcessHandler. At very least you may have to iterate through a large number of tasks on the foodport before you get to the last one. In the best case, this is a major performance concern and in the worse case it results in an infinite while-loop. Lastly, if there are multiple senders of the FILOProcess message and they both post at nearly the same time then one will get a response but the other will get a Fault because by the time the second message gets turned into a task and executed there may not be any more data left on the foodPort because the processing of the first message removed all of it.

    The example that you gave in your response was very DSS-centric. I am posting to the CCR forum and thinking in a very CCR way because, as the 'Hungry Girlfriend' blog post explains, I am trying to support FILO task execution by extending the CCR itself. I'd like to extend the CCR by adding a 'DispatcherStack' or some other primitive that is capable of implementing FILO task execution at the scheduling level. I am afraid that approaching this problem at a higher level will only mean more work(code) every time FILO task execution is needed.

    21 februarie 2012 15:59
  • I need to chat with Nick more in-depth about the performance implications of the code below, but I believe this is more in-line with what you're looking for.  Note, I followed the same coding paradigm as I had used for the DispatcherQueueExtensions, which is now part of CCR.  It's actually pretty straightforward to add Class Extensions to CCR.

    I believe all the interesting code is in the DispatcherQueue Class Extensions.

    namespace Microsoft.Ccr.Core
    {
        using System;
        using System.Collections.Generic;
     
        /// <summary>
        /// CCR DispatcherQueue Class Extensions
        /// </summary>
        public static class DispatcherQueueExtensions
        {
            /// <summary>
            /// Invoke an iterator-based message handler asynchronously, supplying any arguments explicitly.
            /// This implementation explicitly conducts FILO processing, pushing every new task to the top of the queue.
            /// </summary>
            /// <param name="TaskQueue">DispatcherQueue for this extension to be applied.</param>
            /// <typeparam name="T0">The 0th parameter type to the message handler.</typeparam>
            /// <param name="t0">The 0th parameter to the message handler.</param>
            /// <param name="handler">The message handler to invoke.</param>
            public static void FILOSpawnIterator<T0>(this DispatcherQueue TaskQueue, T0 t0, IteratorHandler<T0> handler)
            {
                Queue<ITask> temp = new Queue<ITask>();
                temp.Enqueue(new IterativeTask<T0>(t0, handler));
     
                while (TaskQueue.Count > 0)
                {
                    ITask current = null;
                    bool deque = TaskQueue.TryDequeue(out current);
                    if (current != null)
                    {
                        temp.Enqueue(current);
                    }
                }
     
                foreach (ITask x in temp)
                {
                    TaskQueue.Enqueue(x);
                }
            }
        }
    }
     
     
    namespace HungryGF
    {
        using System;
        using System.Collections.Generic;
        using System.Linq;
        using System.Text;
        using Microsoft.Ccr.Core;
        using System.Threading;
     
        class HungryGF
        {
            public Dispatcher dispatcher;
            public DispatcherQueue queue;
     
            public IEnumerator<ITask> FILOTask(int index)
            {
                Console.WriteLine(index.ToString());
                yield break;
            }
     
            public void Execute()
            {
                for (int i = 0; i < 100; ++i)
                {
                    queue.FILOSpawnIterator<int>(i, this.FILOTask);
                }
            }
        }
     
        class Program
        {
            public static void Main(string[] args)
            {
                HungryGF g = new HungryGF();
                g.dispatcher = new Dispatcher();
                g.queue = new DispatcherQueue("hungrygf", g.dispatcher);
                g.Execute();
                Console.ReadKey();
            }
        }
    }
     
    22 februarie 2012 09:23
  • What about something even shorter and simpler. Something that uses a FILO collection of ITasks. What do you think of replacing the use of Dispatcher queue in some places with something like this:

    public class DispatcherStack : DispatcherQueue
        {
            public DispatcherStack()
                : base()
            {
            
            }
            
            public override bool TryDequeue(out ITask task)
            {
                task = null;

                try
                {
                    if (_tasks.Count < 1)
                    {
                        return false;
                    }

                    lock (this._tasks)
                    {
                        task = this._tasks.Pop();
                    }                
                }
                catch
                {
                    return false;
                }            
                return true;
            }

            public override bool Enqueue(ITask task)
            {
                try
                {
                    lock (this._tasks)
                    {
                        this._tasks.Push(task);    
                    }
                    base.SignalDispatcher();
                }
                catch
                {
                    return false;
                }
                return true;
            }

            protected override void Dispose(bool disposing)
            {
                this._tasks.Clear();
                this._tasks = null;
                base.Dispose(disposing);
            }
            
            Stack<ITask> _tasks = new Stack<ITask>();

        }

    If I can figure out how to properly inherit from the DispatcherQueue class then this DispatcherStack could make FILO task execution just as easy, and almost syntactically identical to FIFO task execution. It's a big 'if' but to me, getting this to work would be great.

    Maybe my original question should have been, "is it ok to inherit from DispatcherQueue?" It is not a sealed class, so there is nothing explicitly preventing me from doing so. At the same time though, the rest of the CCR API uses DispatcherQueue instead of some higher-level abstraction like ITaskQueue or some  abstract base class with a name that does not indicate the type of collection used to hold the ITask instances. Even if the DispatcherQueue class had a different name that made it seem less like a Queue<> must be used, I'd feel a lot better about inheriting from it. Basically, any class name that doesn't imply a Queue is (or *must* be) being used would make me feel more certain that what I am proposing is logically a good idea. 

    Syntactically, the first line of the code above feels silly, doesn't it? It mixes the words Stack and Queue in a way that makes the intent of the class unclear.

    Semantically, I am not sure I have overridden the DispatcherQueue methods properly nor am I sure I have overriden all the methods that I need to override. I suppose I could do a bunch of testing to figure it out for some cases but it'll be tough to know I have it right for all cases (or at least all the same cases that the CCR team currently tests the DispatcherQueue itself against).

    Can anyone advise me on any problems they see here? Obviously I need to go back and put in explicit support for TaskExecutionPolicies and schedulingRate so my question is more meant to ask for help on the code that is already there or that is not there but needs to be to get this simple base class working.

    23 februarie 2012 00:16
  • Update: I have tried and tried. It looks like there are a lot of private and/or internal methods on the DispatcherQueue class that makes it impossible to sub-class it in any useful way. Specifically, it looks like the DispatcherQueue class has an internal method called Test() that is used to get ITask items from the DispatcherQueue and run them on a thread from the thread pool managed by a Dispatcher. That's just a guess.

    I must say, I am disapointed. It would have been really cool to be able to create custom DispatcherQueues. At this point I am hoping I am missing something here and it really is possible.

    24 februarie 2012 03:44
  • I chatted with an internal developer for CCR/DSS at length.  In short, the best solution we came up with was: a) utilize the Drain Port method or b) inject a task at the top of the queue or c) rewrite vast sub-portions of CCR.

    • Marcat ca răspuns de Kobe1815 29 februarie 2012 18:35
    29 februarie 2012 01:22
  • can you elaborate on how to "inject a task at the top of the queue"?
    29 februarie 2012 18:34