Not able to process remaining message in BatchBlock after callling TriggerBatch
-
Dienstag, 12. Juli 2011 15:19
Hi,
Could anyone help me to get process remaining message in BatchBlock?
var writeOut = new ActionBlock<string[]>(ss => { StringBuilder s = new StringBuilder(); foreach (var item in ss) { s.Append(item); s.Append("-"); } Console.WriteLine( s.ToString()); } ); var buffer1 = new BufferBlock<string>(); var bacthBlock = new BatchBlock<string>(3, new GroupingDataflowBlockOptions { Greedy = true }); buffer1.LinkTo(bacthBlock); bacthBlock.LinkTo(writeOut); for (int i = 0; i < 10; i++) { buffer1.Post(i.ToString()); } //buffer1.Completion.ContinueWith(delegate { bacthBlock.Complete(); }); //bacthBlock.Complete(); bacthBlock.TriggerBatch(); //bacthBlock.Complete(); Console.ReadLine();
Thanks in advance
Alle Antworten
-
Dienstag, 12. Juli 2011 16:34
Hi Thangarajan,
To get your last batch, you need to either tell the BatchBlock no more data is going to arrive by calling Complete() or manually trigger a batch by calling TriggerBatch().
You attempted the second approach but failed to account for the possible race where you call TriggerBatch before the BatchBlock has sent the "6-7-8" batch, leaving your "9" stranded in the block.
A safer way to write your code would be to flow-through the completions and to call Complete() on the input buffer block.
eg.
[TestMethod] public void TestBatch() { // Data var input = Enumerable.Range(0, 10).Select(i => i.ToString()).ToList(); var actual = new List<string[]>(); // Setup var buffer1 = new BufferBlock<string>(); var batchBlock = new BatchBlock<string>(3, new GroupingDataflowBlockOptions { Greedy = true }); var writeOut = new ActionBlock<string[]>(ss => actual.Add(ss)); // Link buffer1.LinkTo(batchBlock); batchBlock.LinkTo(writeOut); // Complete batch as soon as input is complete buffer1.Completion.ContinueWith(delegate { batchBlock.Complete(); }); batchBlock.Completion.ContinueWith(delegate { writeOut.Complete(); }); // Send input foreach (var item in input) Assert.IsTrue(buffer1.Post(item)); buffer1.Complete(); // Wait Assert.IsTrue(writeOut.Completion.Wait(100)); // Verify Assert.AreEqual(4, actual.Count); // 4 batches CollectionAssert.AreEquivalent(input, actual.SelectMany(ss => ss).ToArray()); // All items }
Cheers!
Shawn
- Als Antwort markiert Stephen Toub - MSFTMicrosoft Employee, Owner Montag, 25. Juli 2011 15:01
-
Dienstag, 12. Juli 2011 18:31
Hi Shawn and Thangarajan,
Shawn’s observation that he call to TriggerBatch() is racing with the message propagation and may (is likely to) arrive first is correct. The suggestion to complete the buffer block, however, has limited use, because once you do that, you’ll have to recreate it if you expect further traffic, which is a relatively expensive task. Imagine you expect a long-lasting stream where you don’t want to leave stale messages in the BatchBlock. You shouldn’t be completing and re-creating blocks just to flush the BatchBlock.
This is a common problem. To solve it in a better way, you can stick a TransformBlock<T, T> right before the BatchBlock. The transformation function of the TransformBlock resets a timer that calls batch.TriggerBatch(). As long as there is a constant supply of messages, the timer gets reset (and doesn’t trigger). If messages stop arriving, the timer will trigger and batch.TriggerBatch() will be called.
int triggerAfter = 1000;
// Create a deactivated timer that can trigger the BatchBlock<T>
var timer = new Timer(_ => batchBlock.TriggerBatch(););
// Create the transformation callback
Func<T, T> resetTimerIdentity = value =>
{
timer.Change(triggerAfter, Timeout.Infinite);
return value;
};
// Create the target end and link it to the source end
var timingBlock = new TransformBlock<T, T>(resetTimerIdentity);
timingBlock.LinkTo(batchBlock);
// Link sources to the timing block, not directly to the batch block
buffer1.LinkTo(timingBlock);
An important thing to note here is that this only works for greedy BatchBlocks. (I have a complete demo and a blog post that work for non-greedy BatchBlock as well. Keep an eye on the PFX team blog.)
Zlatko Michailov
Software Development Engineer, Parallel Computing Platform
Microsoft Corp.
This posting is provided "AS IS" with no warranties, and confers no rights.- Als Antwort vorgeschlagen Zlatko Michailov - MSFT Dienstag, 12. Juli 2011 18:31
- Als Antwort markiert Stephen Toub - MSFTMicrosoft Employee, Owner Montag, 25. Juli 2011 15:01

