Linking dynamically created ActionBlocks to a BufferBlock
-
Friday, September 28, 2012 9:00 PMI'm not sure if this is possible, but if it is, I'm probably not doing something right. Let's suppose I have one shared buffer that is linked to many consumers (ActionBlocks). Each consumer should consume data that satisfies a predicate used to link it to the buffer. For example, ActionBlock1 should consume numbers that satisfy `x => x % 5 == 0`, ActionBlock2 should consume only `x => x % 5 == 1` etc.
Here's what I've got:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
for (int i = 0; i < NumProductionLines; i++)
{
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);
}
return productionQueue;
}
And then I call:
Random rnd = new Random();
ITargetBlock<int> temp = BuildPipeline(5);
while (true)
{
temp.Post(rnd.Next(255));
}
However this does not work. No output is displayed in the console. If i modify `BuildPipeline` method as:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
ActionBlock<int> productionLine1 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 1, num));
ActionBlock<int> productionLine2 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 2, num));
ActionBlock<int> productionLine3 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 3, num));
ActionBlock<int> productionLine4 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 4, num));
ActionBlock<int> productionLine5 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 5, num));
productionQueue.LinkTo(productionLine1, x => x % 5 == 0);
productionQueue.LinkTo(productionLine2, x => x % 5 == 1);
productionQueue.LinkTo(productionLine3, x => x % 5 == 2);
productionQueue.LinkTo(productionLine4, x => x % 5 == 3);
productionQueue.LinkTo(productionLine5, x => x % 5 == 4);
return productionQueue;
}
the code does what it's expected to do.
Can someone shed light on why dynamically creating and linking action blocks doesn't work?
P.S. If I break into code right after `ITargetBlock<int> temp = BuildPipeline(5);` temp does show that 5 targets are linked to the buffer. And Id of each target is different.
Thanks in advance
- Edited by Dimitri Dokadze Monday, October 01, 2012 11:32 AM
All Replies
-
Saturday, September 29, 2012 2:18 AMOwner
Hi Darcon77-
In your first example, the buffer might offer data to a linked target that won't accept the data. As such, the data will remain buffered in the queue, and the queue won't offer any other data to any targets until someone consumes that initial piece of data (the queue is ordered).
You can address this by ending your pipeline creation by linking the queue to a DataflowBlock.NullTarget<int>() with no filter... that way, any messages not consumed by any of the previous action blocks will end up getting consumed by the null target, which will accept and dump everything offered to it.
- Proposed As Answer by Stephen Toub - MSFTMicrosoft Employee, Owner Saturday, September 29, 2012 2:18 AM
- Marked As Answer by Dimitri Dokadze Monday, October 01, 2012 11:31 AM
-
Sunday, September 30, 2012 8:31 PM
The first sample uses the same filter expression for all n blocks.
Did you mean:
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);
Thanks,
Alex
- Edited by Alexander Myachin Sunday, September 30, 2012 8:32 PM
-
Monday, October 01, 2012 11:31 AMYes, i meant that. Thanks for noticing.
-
Monday, October 01, 2012 11:33 AM
Hi Darcon77-
In your first example, the buffer might offer data to a linked target that won't accept the data. As such, the data will remain buffered in the queue, and the queue won't offer any other data to any targets until someone consumes that initial piece of data (the queue is ordered).
You can address this by ending your pipeline creation by linking the queue to a DataflowBlock.NullTarget<int>() with no filter... that way, any messages not consumed by any of the previous action blocks will end up getting consumed by the null target, which will accept and dump everything offered to it.
Stephen, I did mark your reply as answer, thanks for that. But I'm trying to understand why only the last value of i is used in the predicate. I had to copy i to use it in the predicates.
Also, is DataflowBlock.NullTarget only available for .NET 4.5? I have Async CTP for .NET 4.0 installed and doen't seem I have the NullTarget method available.
EDIT: System.Threading.Tasks.Dataflow.dll that ships with Async CTP package is not the most up-to-date version of the library. You need to get it from TPL DataFlow home page
- Edited by Dimitri Dokadze Monday, October 01, 2012 11:56 AM
-
Monday, October 01, 2012 7:49 PMOwner
Hi Dmitri-
Your predicate is capturing the iteration variable. See http://blogs.msdn.com/b/ericlippert/archive/2009/11/12/closing-over-the-loop-variable-considered-harmful.aspx
- Proposed As Answer by Stephen Toub - MSFTMicrosoft Employee, Owner Monday, October 01, 2012 7:49 PM

