none
Task-based ActionBlock, SendAsync, and Ordering

    質問

  • I've encountered a behaviour in the April 2011 TPL Dataflow CTP (which I believe is still the latest?) which might be a bug, or it might be my misunderstanding.
    I've reduced it down to a simple program that reproduces the behaviour. The program no longer makes much sense - there are much simpler ways of doing what I'm doing, but I have reasons for doing it this way in the real program.
    In my repro, I have three blocks: an ActionBlock ('inputBlock'), a TransformBlock ('transformBlock'), and a second ActionBlock ('targetBlock'). The block data type in all three cases is 'int'. (In my original program it was something rather more complex, but int is the simplest way to illustrate the problem.)
    My program pushes a bunch of integers into the first block:
        for (int i = 0; i < 100000; ++i)
        {
            inputBlock.SendAsync(i).Wait();
        }
    

    The basic idea is that the integers flow through all three blocks. The slightly funky thing is that ActionBlock is usually a 'leaf' - it receives data items but wouldn't typically pass them on to downstream blocks. But in my case, it does. Here's the action for inputBlock:
        private static Task Send(int item)
        {
            if (_nextExpectedIdForSend != item)
            {
                Console.WriteLine("Send expected {0}, got {1}", _nextExpectedIdForSend, item);
            }
            _nextExpectedIdForSend += 1;
    
            return _sendTo.SendAsync(item);
        }
    

    As you can see, this action returns a Task, so the action does not logically complete until the Task it returns completes. And it's returning the Task provided by SendAsync, meaning that this action completes once it has delivered the item onto whatever block _sendTo refers to. _sendTo is an ITargetBlock<int>, and I can configure my program in one of two ways. I can either set it to targetBlock (bypassing the transformBlock completely). Here's targetBlock's action:
        private static void Target(int item)
        {
            if (_nextExpectedIdForTarget != item)
            {
                Console.WriteLine("Target expected {0}, got {1}", _nextExpectedIdForTarget, item);
            }
    
            _nextExpectedIdForTarget += 1;
        }
    

    Here, _nextExpectedIdForTarget is just an int. This method expects to see the items in order, and prints out a message if items appear out of order. When _sendTo is set to targetBlock, it all appears to work just as I expect: the items are delivered to the target block in the order in which they are sent. (The reason I'm expecting this is that my first ActionBlock is not concurrent, so the n+1th call to Send shouldn't happen until the task for the nth one has completed. And the task for the nth one shouldn't complete until the nth item has been accepted onto the downstream block's input queue.) So in this configuration, that Console.WriteLine never runs, which is what I'd hope.
    However, I see something different if I set _sendTo to refer to my transformBlock. Here's the transform method for that block:
        private static int DoTransform(int item)
        {
            if (_nextExpectedIdForTransform != item)
            {
                Console.WriteLine("TX expected {0}, got {1}", _nextExpectedIdForTransform, item);
            }
            _nextExpectedIdForTransform += 1;
    
            return item;
        }
    

    As you can see, this is also keeping track of the expected order (with a separate variable). I'm expecting the items to arrive at the transform in order for the same reasons I stated above - none of these blocks enable concurrency, and the inputBlock shouldn't be able to start sending the n+1th item until the downstream block (transformBlock in this scenario) has accepted the nth item onto its queue, and I was under the impression that both ActionBlock and TransformBlock would work on their items in the order in which they arrived at the queue. (Perhaps I'm wrong to presume that? But if so, that's a major problem for me - order is significant in my data.)
    To use this block, I have the following code in my setup:
        transformBlock.LinkTo(targetBlock);
        _sendTo = transformBlock;
    

    So the action for the inputBlock will send items into the transformBlock (using the explicit SendAsync call shown above - that code doesn't change, although _sendTo now refers to a different block), and the transformBlock is linked to the targetBlock, so all its output will be implicitly delivered to the targetBlock by the TPL. And that works, but things no longer happen in order. I see this sort of output:
     
    TX expected 26879, got 26880
    TX expected 26880, got 26879
    Target expected 26879, got 26880
    Target expected 26880, got 26879
    TX expected 27934, got 27935
    TX expected 27935, got 27934
    Target expected 27934, got 27935
    Target expected 27935, got 27934
    TX expected 31745, got 31746
    TX expected 31746, got 31745
    Target expected 31745, got 31746
    Target expected 31746, got 31745
    The vast majority of the data items are handled in order - in this case, the first 26879 items were in the expected order. But occasionally, adjacent items get transposed. As you can see, this transposition is visible by the time the items reach the transform. (And as shown earlier, I don't see these transpositions when using the exact same code in the first action block, but feeding directly into the second ActionBlock instead of the TransformBlock.)
    Now an obvious way to deal with this would be not to use an ActionBlock - I could make that first block a TransformBlock, and link it to the second TransfomBlock instead of manually calling SendAsync. However, although that would work for this simple example, I can't do that in my real app. In my real code, that first ActionBlock actually delivers the message to multiple target blocks. (My application is performing signal processing, and I often need to send a single stream of data into multiple targets. For example, I take my input - a stream of audio samples - and run that raw input into two different FFT-based filters to produce two differently-filtered copies of the original.)
    I originally tried using the BroadcastBlock to do this splitting, but unfortunately, it just gives up if the target blocks aren't ready. And that's a problem for me, because I'm relying on back pressure: I can usually read sound samples off disk faster than I can process them, so I need to have bounded capacities on all my blocks, and I need my file reading loop just to wait if the dataflow graph is backed up. BroadcastBlock cannot help in that scenario, because it will simply lose blocks unless either it is feeding into unbounded blocks (which means you get no back pressure), or if the data source is producing data more slowly than it can be processed (which isn't true in my scenario.)
    (An alternative would, I suppose, be not to rely on the Dataflow TPL library to provide me with back pressure. I could in principle set up everything with unbounded capacity, and then perform my own regulation by observing the rate at which processing completes. That would be a significant upheaval for my code, but might be a tolerable workaround, although if what I'm seeing here really is a bug, it's conceivable that that workaround wouldn't necessarily help.)
    I'm running on a Core 2 Quad (Q9550, 2.84GHz) desktop machine, running 64-bit Windows 7. Here's the full code for my repro. (To take the TransformBlock out of the picture, uncomment the line about half way down the Main method, and comment out the two statements after that before the for loop.)
    namespace TplDataflowOrderingRepo
    {
        using System;
        using System.Threading.Tasks;
        using System.Threading.Tasks.Dataflow;
    
        internal class Program
        {
            private static ITargetBlock<int> _sendTo;
    
            private static int _nextExpectedIdForSend;
            private static int _nextExpectedIdForTransform;
            private static int _nextExpectedIdForTarget;
    
            private static void Main(string[] args)
            {
                var inputBlock = new ActionBlock<int>(
                    (Func<int, Task>) Send,
                    new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
    
                var transformBlock = new TransformBlock<int, int>(
                    (Func<int, int>) DoTransform,
                    new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    
                var targetBlock = new ActionBlock<int>(
                    (Action<int>) Target,
                    new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
    
                // If we just do this (and don't use the TransformBlock) the blocks seem to be
                // processed in order.
                //_sendTo = targetBlock;
    
                // But with this, the blocks arrive out of order.
                transformBlock.LinkTo(targetBlock);
                _sendTo = transformBlock;
    
    
                for (int i = 0; i < 100000; ++i)
                {
                    inputBlock.SendAsync(i).Wait();
                }
    
                Console.WriteLine("Done");
            }
    
            private static Task Send(int item)
            {
                if (_nextExpectedIdForSend != item)
                {
                    Console.WriteLine("Send expected {0}, got {1}", _nextExpectedIdForSend, item);
                }
                _nextExpectedIdForSend += 1;
    
                return _sendTo.SendAsync(item);
            }
    
            private static int DoTransform(int item)
            {
                if (_nextExpectedIdForTransform != item)
                {
                    Console.WriteLine("TX expected {0}, got {1}", _nextExpectedIdForTransform, item);
                }
                _nextExpectedIdForTransform += 1;
    
                return item;
            }
    
            private static void Target(int item)
            {
                if (_nextExpectedIdForTarget != item)
                {
                    Console.WriteLine("Target expected {0}, got {1}", _nextExpectedIdForTarget, item);
                }
    
                _nextExpectedIdForTarget += 1;
            }
        }
    }
    
    
    2011年9月11日 8:39

回答

  • Hi Ian,

     

    Thank you for the clean repro. This is a product defect. The correct behavior is exactly what you expect. So stick with TPL Dataflow.

     

    We have fixed the bug, and it will be included in our next TPL Dataflow-only CTP. (It is not included in the current .NET 4.5 CTP.) I’ll let you know as soon as our CTP becomes available.

     

    Zlatko Michailov

    Software Development Engineer, Parallel Computing Platform

    Microsoft Corp.


    This posting is provided "AS IS" with no warranties, and confers no rights.
    • 回答としてマーク IanGMVP 2011年9月15日 19:18
    2011年9月15日 17:48

すべての返信