none
BroadcastBlock with guaranteed delivery and back pressure? RRS feed

  • Question

  • In my application, I start with a single stream of data (blocks of samples read from a WAV file) and I send this through several forms of processing. Some of these are sequential (e.g., I feed the data through one transform and then feed the output of that through another). But some are not - in some cases, I want to apply several independent forms of processing to one stream. (E.g., I might want to generate two different versions of the input signal, built by applying two different filters - I'm effectively building two independent chains of processing.)

    On the face of it, BroadcastBlock sounds like it should be the solution. However, there's a problem.

    My processing is typically CPU-bound - the signal processing usually proceeds more slowly than the data can be read off disk. So if I use unbounded queues for all of my nodes, I will run out of memory, because some of my input files are many GB long. If I just let the input code feed data into the graph as fast as it can, my program will crash with an out of memory exception on large files. (And even on smaller files, I'll use a lot more memory than necessary, because I'm loading data into memory way in advance of needing it.)

    So I want back pressure. The way to achieve this with the TPL appears to be to use bounded queues. If every block has a bounded capacity, my reader code can know that it has already got (say) 100 blocks ahead of where the processing is at, so it should hold off from reading any more data until the processing code has worked through the backlog a little.

    That works fine when I've got a simple single linear chain of processing. But it's not clear how to achieve this when splitting the input to feed it into multiple chains for further processing. As far as I can tell, the BroadcastBlock simply doesn't support this scenario.

    The problem is that if the BroadcastBlock encounters back pressure (i.e., if any of the blocks it's feeding into has a bounded capacity, and a full input queue) it simply drops messages rather than propagating that back pressure to its input queue. This happens because, by design, BroadcastBlock offers each input to all of its downstream blocks, and once it has offered it to them, it considers its work to be done. Even if the downstream blocks asks to defer handling, the BroadcastBlock will just drop the message as soon as the next one arrives.

    What I want is something that has the clone/multi-delivery semantics of the built-in BroadcastBlock, but which handles deferral differently. If any downstream block says "Yes, I will handle that, but not yet" I want the BroadcastBlock to consider that item still to be in its input queue. Only when all downstream blocks have accepted a message should that message be considered to have been handled.

    Am I right in thinking that a) you can't get the BroadcastBlock to do that and b) there's no built-in block that works this way?

    There's a way to do this if you write a bit of code of course. You can write an ad hoc ActionBlock - here's one that splits things two ways, for example:

    var splitter = new ActionBlock<int>(
      i =>
        {
          var t1 = action.SendAsync(i);
          var t2 = slidingWindowHandler.SendAsync(i);
          Task.WaitAll(t1, t2);
        },
      new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
    
    


    (That's using individual ints, because I wrote that as part of an experiment. My real application is working with considerably larger chunks of data. That snippet is just a simple illustration of the basic idea.)

    But if there's a built-in way to do that, I'd prefer to use that rather than an ad hoc ActionBlock. (If not, I'm contemplating writing a custom block to handle this in a slightly more general way. I'd rather plumb things together with LinkTo like I can when BroadcastBlock happens to meet my needs.)

    Monday, July 25, 2011 11:16 AM

Answers

  • Hi Ian-

    .NET 4 does include the Task.Factory.ContinueWhenAll method.  This is a bit different than Task.WhenAll, in that like ContinueWith, ContinueWhenAll executes a delegate when the provided tasks have completed.  You could still use it for this purpose however, since the Task that's returned from ContinueWhenAll represents both the completion of the antecedent tasks and then also the completion of the delegate that's run when those tasks complete.  Your delegate could be a nop.

    Alternatively, you could also build a relatively simple WhenAll using ContinueWith and TaskCompletionSource, something like:

    static Task WhenAll(params Task[] tasks)
    {
        var tcs = new TaskCompletionSource<bool>();
        int remaining = tasks.Length;

        bool canceled = false;
        ConcurrentStack<Exception> exceptions = null;
        Action<Task> continuation = t =>
        {
            if (t.IsFaulted)
                LazyInitializer.EnsureInitialized(ref exceptions).PushRange(t.Exception.InnerExceptions.ToArray());
            else if (t.IsCanceled)
                canceled = true;

            if (Interlocked.Decrement(ref remaining) == 0)
            {
                if (exceptions != null) tcs.TrySetException(exceptions);
                else if (canceled) tcs.TrySetCanceled();
                else tcs.TrySetResult(true);
            }
        };

        foreach (var task in tasks)
            task.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

        return tcs.Task;
    }

    I hope that helps. 


    Monday, August 1, 2011 4:17 AM

All replies

  • Hi Ian,

     

    What I think what will work in your case is a TransformManyBlock<T, KeyValuePair<int, T>> that multiplies each input message into a set of KeyValuePair’s addressed for specific pipeline branches. Then you link each branch head with a filter so that only messages addressed for that branch pass:

     

                var boundedOptions = new ExecutionDataflowBlockOptions(){ BoundedCapacity = 1000 };

                Func<T, KeyValuePair<int, T>> multiplyFunc = x => new KeyValuePair[]

                    {

                        new KeyValuePair<int, T>(1, x),

                        new KeyValuePair<int, T>(2, x),

                        new KeyValuePair<int, T>(3, x)

                    };

                Func<KeyValuePair<int, T>, T> unwrapFunc = kvp => kvp.Value;

     

                var multiplyBlock = new TransformManyBlock<T, KeyValuePair<int, T>>(multiplyFunc, boundedOptions);

               

                var branch1HeadBlock = new TransformBlock<T, KeyValuePair<int, T>>(unwrapFunc, boundedOptions);

                multiplyBlock.LinkTo(branch1HeadBlock, kvp => kvp.Key == 1);

     

                var branch2HeadBlock = new TransformBlock<T, KeyValuePair<int, T>>(unwrapFunc, boundedOptions);

                multiplyBlock.LinkTo(branch2HeadBlock, kvp => kvp.Key == 2);

     

                var branch3HeadBlock = new TransformBlock<T, KeyValuePair<int, T>>(unwrapFunc, boundedOptions);

                multiplyBlock.LinkTo(branch3HeadBlock, kvp => kvp.Key == 3);

     

    There is one caveat I’d like to call out: if one branch postpones a message, all branches with higher indices will get blocked on that message. To mitigate that, link the fastest branch first, then the second fastest, etc. leaving the slowest branch last. Proper bounded capacities on the branch heads may blur such speed differences.

     

    Please let us know whether this approach works for you.

     

    Zlatko Michailov

    Software Development Engineer, Parallel Computing Platform

    Microsoft Corp.


    This posting is provided "AS IS" with no warranties, and confers no rights.
    Thursday, July 28, 2011 9:41 PM
  • Hi Ian-

    Also, another variation on your solution would be to use the Async CTP and language support to take advantage of the same behavior but in a non-blocking manner, e.g.

    var splitter = new ActionBlock<int>(
      async i =>
        {
          var t1 = action.SendAsync(i);
          var t2 = slidingWindowHandler.SendAsync(i);
          await TaskEx.WhenAll(t1, t2);
        },
      new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

    The ActionBlock will functionally behave the same as in your example, not retrieving and processing another message from its source until the current message is accepted by all of its targets, but it'll do so without blocking a thread.

     

    Saturday, July 30, 2011 12:40 AM
  • I think you have the type arguments for the TransformBlocks the wrong way round don't you? You have them in the same order as the TransformManyBlock, but I think it needs to be TransformBlock<KeyValuePair<int, T>, T> for those various branchNHeadBlocks doesn't it?

    Anyway, having tried this approach it isn't quite working for me. If I have relatively large bounded capacities it seems to be OK. But if I actually reduce these down to the point where I get back pressure, I find that things appear to hang. Basically, once I stop pushing input through the graph, it seems to stop processing data even when there are items sat in queues waiting to be processed.

    Specifically, consider this scenario:

    Suppose I've got another TransformBlock with a bounded capacity of 1 called copyBlock, and supposed I've called branch1HeadBlock.LinkTo(copyBlock). And suppose I've got an ActionBlock called dest with a BoundedCapacity of 1, and I've called copyBlock.LinkTo(dest).

    When I'm done feeding things in, I can see (with some debug tracepoints) that the TransformManyBlock's transform function is getting called the correct number of times. (4 times, in my simple test setup - I'm only feeding 4 items through the graph while I try to find out what's going on with it.) But the transform function for my copyBlock TransformBlock only seems to get called twice.

    If I raise the BoundedCapacity of dest (the block that copyBlock is linked to) to 10, then it all works. (I'm only sending in 4 blocks, so 10 is enough to queue up everything).

    I understand that once I hit the bounded capacity of a block, it'll stop receiving further input. I thought it was supposed to start deferring offered messages, and I thought the idea was that once it freed up some space in its input queue, it would go and attempt to consume the messages it had deferred. But that doesn't appear to be happening. (Or at least, it's not happening in the 10 second I'm giving it. That should be plenty - I see a brief splurge of activity that takes less than a second, and then it just sits there doing nothing for the next 10 seconds.)

    Since the whole point of what I'm attempting here is to enable CPU-bound processing blocks to govern the rate at which data gets fed into the graph, I need BoundedCapacities that will be short enough to cause queues to fill up. And I also need data that was deferred to get picked up eventually - I need to be able to drain the graph once I've finished feeding it with input. So raising the queue lengths isn't a useful workaround for me here.

    Sunday, July 31, 2011 6:06 PM
  • I like this solution, but running the Async CTP version of the C# compiler on the dev box I'm using for this project isn't really an option.

    However, I think I could do the same thing without needing the C# language support - all I need is a function that returns a Task, so in principle, I could delete the "async" from your example, and replace "await" with "return", and it should work. (The C# compiler might need a helping hand with a suitable cast, as it seems to get confused by the Task/non-Task ctor overloads in the TPL, but aside from that this should work.)

    So all that's needed is TaskEx.WhenAll - something that can effectively build a Task as a Join of a bunch of other tasks.

    I believe I'd still need the Async CTP library bits though, as the .NET Framework doesn't have anything like TaskEx.WhenAll built in, right? I know the TPL Dataflow bits work without needing the CTP compiler, because you can download those separately from the Async CTP. But can I use the async CTP DLL in conjunction with an otherwise vanilla .NET 4 app in VS2010sp1?

    Sunday, July 31, 2011 6:12 PM
  • Hi Ian-

    .NET 4 does include the Task.Factory.ContinueWhenAll method.  This is a bit different than Task.WhenAll, in that like ContinueWith, ContinueWhenAll executes a delegate when the provided tasks have completed.  You could still use it for this purpose however, since the Task that's returned from ContinueWhenAll represents both the completion of the antecedent tasks and then also the completion of the delegate that's run when those tasks complete.  Your delegate could be a nop.

    Alternatively, you could also build a relatively simple WhenAll using ContinueWith and TaskCompletionSource, something like:

    static Task WhenAll(params Task[] tasks)
    {
        var tcs = new TaskCompletionSource<bool>();
        int remaining = tasks.Length;

        bool canceled = false;
        ConcurrentStack<Exception> exceptions = null;
        Action<Task> continuation = t =>
        {
            if (t.IsFaulted)
                LazyInitializer.EnsureInitialized(ref exceptions).PushRange(t.Exception.InnerExceptions.ToArray());
            else if (t.IsCanceled)
                canceled = true;

            if (Interlocked.Decrement(ref remaining) == 0)
            {
                if (exceptions != null) tcs.TrySetException(exceptions);
                else if (canceled) tcs.TrySetCanceled();
                else tcs.TrySetResult(true);
            }
        };

        foreach (var task in tasks)
            task.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

        return tcs.Task;
    }

    I hope that helps. 


    Monday, August 1, 2011 4:17 AM
  • Ah - awesome. For some reason, ContinueWhenAll had escaped my notice until now.

    Now I look, I notice that TaskEx.WhenAllCore appears to be implemented on top of ContinueWhenAll in the current CTP. Is there a benefit in the implementation you've shown? It looks like your one will end up with N continuations being invoked when waiting for N tasks, whereas a ContinueWhenAll approach looks like it will just create the one.

    Monday, August 1, 2011 12:27 PM
  • Hi Ian-

    ContinueWhenAll will very likely be more efficient than my quick and dirty WhenAll above.  ContinueWhenAll still needs to hook up a continuation to each task, but in .NET 4 the object it creates for that purpose internally is smaller and lighter than Task, so it ends up being a more efficient operation than what's possible through the public ContinueWith.  In .NET vNext, both WhenAll and ContinueWhenAll share the same internal code paths, and the operation has been made significantly more efficient, in many cases bringing it down to O(1) allocations rather than O(N) in the number of tasks supplied to the WhenAll.

    Tuesday, August 2, 2011 2:31 PM
  • I encountered the same problem as IanG and wanted the same solution: BroadcastBlock with guaranteed delivery and backpressure.  But, since I need to support ad-hoc construction of the dataflow network, I also want to use LinkTo to manage the subscriptions.  So, in the absence of my desired solution, I was intrugued by Zlatko's approach.  I had the same experience as IanG when I tried it.  After some experimentation I found that eliminating the BoundedCapacity from the TransformManyBlock lets the data flow.  If I establish any BoundedCapacity the data flow freezes after that many items have been processed.  TransformManyBlock seems to treat BoundedCapacity as if it were MaxMessagesPerTask.

    The other point of this post is to reinforce IanG's implicit request for optional guaranteed delivery and backpressure behaviors in the BroadcastBlock.


    Rob



    • Edited by mount77 Friday, April 6, 2012 10:13 PM I realized that the filter expressions are working correctly so I've deleted a large part of the post that dealt with that.
    Friday, April 6, 2012 6:15 PM
  • UPDATE

    In fact the problem is NOT a bug in TPL Dataflow. And there were 2 problems in my network.

    1) From a transformblock delegate, i was posting to a grand parent block. But "post" blocks the thread if the BoundedCapacity is full (ie: the number of message the block has in buffer, either in its input or output queue). This leads to the deadlock of the network.

    2) In a transformblock, i dynamically add child blocks with predicates. These blocks/predicates are computed from the current message.

    But because of a bug, none of the child block's predicate pass, then the output queue of this transformblock becomes full, and the network freezes as expected.

    To examine the buggy messages, each time a child block is added, i also dynamically readd (remove then add) the 'catch all' action block (ie: an action block with no predicate) which logs the message.

    There is no bug in dataflow ! Only missing informations ! A "buffer full" property would be nice, as well as a "No child to deliver this message to" information (ie: a counter property called OutputWithNoTargetCount, or an event/delegate).

    --------------------

    mount77 wrote:

     After some experimentation I found that eliminating the BoundedCapacity from the TransformManyBlock lets the data flow

    I had the same problem. The network was freezing, and as the blocks don't provide enough info about their queues and states it is a pain to find where and why. You should add to all blocks: InputQueueSize, OutputQueueSize, RunningActionCount either as perf counters or as public properties, it would be much more easier to debug.

     I removed the BoundedCapacity on the only TransformManyBlock as mount77 said (it was set to 1) and ... it works, BUT only for longer (5mn instead of 2). I've created an action counter which count when an action starts and stops for all blocks in the network. When the network is frozen, this counter is at 0 for all blocks. So it is not a bug in my own code.

    I've removed another BoundedCapacity (was set to 10) of an async embeded transformblock with an action in it. Could it be my implementation of OfferMessage on this block ? As "Post" is synchronous, and the block boundedcapacity was 10.

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, IEmailMeta messageValue, ISourceBlock<IEmailMeta> source, bool consumeToAccept) {

    if (consumeToAccept && source != null) //Never called { Console.WriteLine("!!!!! Offermessage called with consumeToAccept"); bool isConsumed; messageValue = source.ConsumeMessage(messageHeader, this, out isConsumed); if (!isConsumed) return DataflowMessageStatus.NotAvailable; } Interlocked.Increment(ref inputQueueSize); block.Post(messageValue); return DataflowMessageStatus.Accepted; }


    Still searching ...

    (using .NET 4.5 beta)





    • Edited by Softlion Monday, May 14, 2012 3:08 PM
    Sunday, May 13, 2012 2:57 PM
  • I'm finding that an ActionBlock/SendAsync method takes 10 seconds in my test while a plain TransformBlock takes about 7 seconds... it sure would be nice to have an actual GuaranteedBroadcastBlock.

    I have posted a similar question on StackOverflow: http://stackoverflow.com/questions/22127660/broadcastblock-with-guaranteed-delivery-in-tpl-dataflow

    And also posted some code that uses the ActionBlock/SendAsync technique here: http://www.brianrice.com/downloads/permanent/test2.zip

    Tuesday, March 4, 2014 12:05 AM
  • To implement this scenario I recommend using BlockingCollection pipeline. This option is easier to implement and supports the back pressure strategy.
    Thursday, March 21, 2019 4:05 PM