Correct use of Complete()
-
2012年4月20日 下午 08:00
Is below the correct method to complete all linked tasks when the source is done posting?
var recordsRead = 0;
var recordsStored = 0;
var sourceBuffer = new BufferBlock<DataRecord>();
var batchBlock = new BatchBlock<DataRecord>(100); // Save records 10 at a time
var storeBlock = new ActionBlock<DataRecord[]>(recs =>
{
store.StoreAsync(null, recs, r => r["TrackId"]).Wait();
//recs.ForEach(r => recordsStored++);
recs.ForEach(rec =>
"stored record {0} - {1}".format(++recordsStored, rec.ToJSON(false, true)).writeline());
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
sourceBuffer.Completion.ContinueWith(task =>
{
"Source completed... read {0} records".format(recordsRead).writeline();
batchBlock.Complete();
});
batchBlock.Completion.ContinueWith(task =>
{
batchBlock.TriggerBatch();
storeBlock.Complete();
}); // Flush any remaining records in batch
storeBlock.Completion.ContinueWith(task =>
"Processing completed... stored {0} records".format(recordsStored).writeline());
sourceBuffer.LinkTo(batchBlock);
batchBlock.LinkTo(storeBlock);
// Post items to source
var records = sourceProvider.OpenReader(new Dictionary<string, object> { { "@entity", 2 } });
records.Cast<DataRecord>().ForEach(rec =>
{
recordsRead++;
sourceBuffer.Post(rec);
//Task.Delay(100);
});
//"Press enter to complete dataflow".writeline();
//Console.ReadLine();
sourceBuffer.Complete();
//Console.ReadLine();
await storeBlock.Completion;Brad Serbu
所有回覆
-
2012年4月20日 下午 09:16
You don't actually need all those ContinueWith(). To propagate completion, you can set DataflowLinkOPtions.PropagateCompletion:
sourceBuffer.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true }); batchBlock.LinkTo(storeBlock, new DataflowLinkOptions { PropagateCompletion = true });And you also don't need to call TriggerBatch(). First, it doesn't make any sense to do that after the Completion task of that block is completed. Second, new batch is created automatically from the remaining items when you complete the block (either directly by calling Complete(), or indirectly through PropagateCompletion).
- 已標示為解答 Brad Serbu 2012年4月20日 下午 09:51
- 已取消標示為解答 Brad Serbu 2012年4月20日 下午 09:56
- 已標示為解答 Brad Serbu 2012年4月20日 下午 09:58
-
2012年4月20日 下午 10:00
Your right about PropigateCompletion... That has the desired effect.
However, if I modify the code above, then the boundedcapacity = 10 (from related post), not all the records are processed.
Brad Serbu
-
2012年4月20日 下午 10:36
If you're adding them using Post() without checking the returned value, then that is to be expected. Instead of using Post(), you should probably use SendAsync() from an async method.- 已標示為解答 Brad Serbu 2012年4月20日 下午 10:46

