Linking dynamically created ActionBlocks to a BufferBlock

Answered Linking dynamically created ActionBlocks to a BufferBlock

  • Friday, September 28, 2012 9:00 PM
     
     
    I'm not sure if this is possible, but if it is, I'm probably not doing something right. Let's suppose I have one shared buffer that is linked to many consumers (ActionBlocks). Each consumer should consume data that satisfies a predicate used to link it to the buffer. For example, ActionBlock1 should consume numbers that satisfy `x => x % 5 == 0`, ActionBlock2 should consume only `x => x % 5 == 1` etc.

    Here's what I've got:


        private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
        {
            var productionQueue = new BufferBlock<int>();
        
            for (int i = 0; i < NumProductionLines; i++)
            {
                ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));
        
                productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);
            }
            
            return productionQueue;
        }
            
    And then I call:

        

        Random rnd = new Random();
        
        ITargetBlock<int> temp = BuildPipeline(5);

        while (true)
        {
            temp.Post(rnd.Next(255));
        }

    However this does not work. No output is displayed in the console. If i modify `BuildPipeline` method as:

        private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
        {
            var productionQueue = new BufferBlock<int>();

            ActionBlock<int> productionLine1 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 1, num));
            ActionBlock<int> productionLine2 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 2, num));
            ActionBlock<int> productionLine3 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 3, num));
            ActionBlock<int> productionLine4 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 4, num));
            ActionBlock<int> productionLine5 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 5, num));

            productionQueue.LinkTo(productionLine1, x => x % 5 == 0);
            productionQueue.LinkTo(productionLine2, x => x % 5 == 1);
            productionQueue.LinkTo(productionLine3, x => x % 5 == 2);
            productionQueue.LinkTo(productionLine4, x => x % 5 == 3);
            productionQueue.LinkTo(productionLine5, x => x % 5 == 4);

            return productionQueue;
        }

    the code does what it's expected to do.

    Can someone shed light on why dynamically creating and linking action blocks doesn't work?

    P.S. If I break into code right after `ITargetBlock<int> temp = BuildPipeline(5);` temp does show that 5 targets are linked to the buffer. And Id of each target is different.

    Thanks in advance

All Replies

  • Saturday, September 29, 2012 2:18 AM
    Owner
     
     Answered

    Hi Darcon77-

    In your first example, the buffer might offer data to a linked target that won't accept the data.  As such, the data will remain buffered in the queue, and the queue won't offer any other data to any targets until someone consumes that initial piece of data (the queue is ordered).

    You can address this by ending your pipeline creation by linking the queue to a DataflowBlock.NullTarget<int>() with no filter... that way, any messages not consumed by any of the previous action blocks will end up getting consumed by the null target, which will accept and dump everything offered to it.

  • Sunday, September 30, 2012 8:31 PM
     
      Has Code

    The first sample uses the same  filter expression for all n blocks.

    Did you mean:

    productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);

    Thanks,

    Alex




  • Monday, October 01, 2012 11:31 AM
     
     
    Yes, i meant that. Thanks for noticing.
  • Monday, October 01, 2012 11:33 AM
     
     

    Hi Darcon77-

    In your first example, the buffer might offer data to a linked target that won't accept the data.  As such, the data will remain buffered in the queue, and the queue won't offer any other data to any targets until someone consumes that initial piece of data (the queue is ordered).

    You can address this by ending your pipeline creation by linking the queue to a DataflowBlock.NullTarget<int>() with no filter... that way, any messages not consumed by any of the previous action blocks will end up getting consumed by the null target, which will accept and dump everything offered to it.

    Stephen, I did mark your reply as answer, thanks for that. But I'm trying to understand why only the last value of i is used in the predicate. I had to copy i to use it in the predicates.

    Also, is DataflowBlock.NullTarget only available for .NET 4.5? I have Async CTP for .NET 4.0 installed and doen't seem I have the NullTarget method available.

    EDIT: System.Threading.Tasks.Dataflow.dll that ships with Async CTP package is not the most up-to-date version of the library. You need to get it from TPL DataFlow home page

  • Monday, October 01, 2012 7:49 PM
    Owner
     
     Proposed