Răspuns Load balancing with refusal

  • 25 mai 2012 09:36
     
     

    So I have what I'd like to think is a really trivial problem to solve, and thus this may be a stupid question :)

    I have tasks of work to process, each represented by a message in an external queue. I want to create a simple dataflow that pulls down a message & passes it to a load-balanced set of consumers, most likely scaling to some availability criteria. Critically however, I want to arrange so that messages are only queried (i.e. the source waits) if the consumers are fully loaded.

    I tried to construct something with either a single action block set to max parallelism & async delegates, or a bufferblock linked to multiple bounded action blocks with sync delegates, but in both cases, I seemed to end up hitting an issue whereby buffering meant that I'd get more messages pulled down than can be processed. An example output with 2 action blocks linked:

    • Pulled message 1
    • Processing message 1
    • Pulled message 2
    • Processing message 2
    • Pulled message 3
    • Pulled message 4

    At this point the input buffer then blocks from SendAsync (and waiting on that). What I want is so that message 3 is only pulled down from the external queue when one of the consumers is ready to process it. From what I understand, ActionBlock has both a processing & an accepting queue & it is the latter that is tripping me up here. Am I effectively desiring ActionBlock to be entirely synchronous?

    Is the Dataflow API the right choice? I could probably put this together with Tasks directly but the simplicity & reuse of an existing API at a higher level of abstraction has appeal.

    Thanks in advance for help & suggestions.

    (Please note that I don't have access to await & async (NET & C#4 only))

Toate mesajele

  • 25 mai 2012 13:29
     
     Răspuns

    You can create one ActionBlock, set its MaxDegreeOfParallelism to unbounded and then set its BoundedCapacity to some small number. This way, the block will only consume as many items as you specifier and never more.

    Unfortunately for you, you can only set BoundedCapacity, which is the sum of items being processed and the items in the input queue of the block (and the items in the output queue, but ActionBlock doesn't have one of those). I think what you want is to set the size of the input queue to zero, but doing so is not possible.

    But I think setting BoundedCapacity should work well enough, although it means you are effectively setting an upper limit to the degree of parallelism as well.

  • 25 mai 2012 17:22
     
      Are cod

    Another option might be to create a block that behaves the way you want. To achieve this, you could use the fact that when an item is offered to a block, it can postpone it and consume it at some later time. Then, as long as there are some offered items, you would always have one Task that is waiting to be started. Thanks to that Task, you can take advantage of load-balancing done by the TaskScheduler.

    class UnboundedActionBlockWithoutQueue<T> : ITargetBlock<T>
    {
        private readonly Action<T> m_action;
        private readonly TaskFactory m_taskFactory;
    
        private readonly ConcurrentQueue<Tuple<ISourceBlock<T>, DataflowMessageHeader>> m_offeredMessages =
            new ConcurrentQueue<Tuple<ISourceBlock<T>, DataflowMessageHeader>>();
    
        private bool m_shouldStartNewTask = true;
    
        private bool m_completed = false;
    
        private int m_runningTasks;
    
        private readonly TaskCompletionSource<bool> m_completionSource = new TaskCompletionSource<bool>();
    
        private readonly object m_lock = new object();
    
        public UnboundedActionBlockWithoutQueue(Action<T> action, TaskScheduler scheduler = null)
        {
            m_action = action;
            m_taskFactory = new TaskFactory(scheduler ?? TaskScheduler.Current);
        }
    
        public DataflowMessageStatus OfferMessage(
            DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
        {
            // we need source to postpone
            if (source == null)
                return DataflowMessageStatus.Declined;
    
            lock (m_lock)
            {
                Console.WriteLine("Enqueued {0}.", messageValue);
                m_offeredMessages.Enqueue(Tuple.Create(source, messageHeader));
    
                if (m_shouldStartNewTask)
                {
                    m_shouldStartNewTask = false;
                    StartNewTask();
                }
    
                return DataflowMessageStatus.Postponed;
            }
        }
    
        private void TaskIntro()
        {
            lock (m_lock)
            {
                m_runningTasks++;
    
                Console.WriteLine("Started Task, {0} running Tasks.", m_runningTasks);
    
                if (m_offeredMessages.Count > 1)
                    StartNewTask();
                else
                {
                    Console.WriteLine("Queue is almost empty.");
                    m_shouldStartNewTask = true;
                }
            }
        }
    
        private void TaskOutro()
        {
            lock (m_lock)
            {
                m_runningTasks--;
    
                if (m_runningTasks == 0 && m_completed)
                    m_completionSource.TrySetResult(true);
    
                Console.WriteLine("Task finished, {0} running Tasks.", m_runningTasks);
            }
        }
    
        private void StartNewTask()
        {
            Console.WriteLine("Starting Task.");
    
            m_taskFactory.StartNew(
                () =>
                {
                    TaskIntro();
    
                    ProcessQueue();
    
                    TaskOutro();
                });
        }
    
        private void ProcessQueue()
        {
            Tuple<ISourceBlock<T>, DataflowMessageHeader> tuple;
            while (m_offeredMessages.TryDequeue(out tuple))
            {
                bool consumed;
                T value = tuple.Item1.ConsumeMessage(tuple.Item2, this, out consumed);
                if (consumed)
                    m_action(value);
            }
        }
    
        public void Complete()
        {
            lock (m_lock)
            {
                m_completed = true;
            }
        }
    
        public void Fault(Exception exception)
        {
            throw new NotSupportedException();
        }
    
        public Task Completion
        {
            get { return m_completionSource.Task; }
        }
    }

    This code is only basic, you might need to add support for Fault() or MaxMessagesPerTask.

    If you use this with something like SendAsync() in a loop, Tasks will be created and being completed all the time. If that is a problem for you (it could affect performance, if processing of items is fast), consider creating a custom source block that offers multiple items at once. If you do that, the queue of offered items will never become empty and so the Tasks will never stop. On the other hand, this will make the default TaskScheduler to schedule new Tasks over and over, which is not a good idea. But this is exactly why MaxMessagesPerTask could be useful.

  • 6 iunie 2012 08:48
     
     
    Much obliged for the suggestions, and thank you for the comprehensive answer!