Correct use of Complete()
-
20 Nisan 2012 Cuma 20:00
Is below the correct method to complete all linked tasks when the source is done posting?
var recordsRead = 0;
var recordsStored = 0;
var sourceBuffer = new BufferBlock<DataRecord>();
var batchBlock = new BatchBlock<DataRecord>(100); // Save records 10 at a time
var storeBlock = new ActionBlock<DataRecord[]>(recs =>
{
store.StoreAsync(null, recs, r => r["TrackId"]).Wait();
//recs.ForEach(r => recordsStored++);
recs.ForEach(rec =>
"stored record {0} - {1}".format(++recordsStored, rec.ToJSON(false, true)).writeline());
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
sourceBuffer.Completion.ContinueWith(task =>
{
"Source completed... read {0} records".format(recordsRead).writeline();
batchBlock.Complete();
});
batchBlock.Completion.ContinueWith(task =>
{
batchBlock.TriggerBatch();
storeBlock.Complete();
}); // Flush any remaining records in batch
storeBlock.Completion.ContinueWith(task =>
"Processing completed... stored {0} records".format(recordsStored).writeline());
sourceBuffer.LinkTo(batchBlock);
batchBlock.LinkTo(storeBlock);
// Post items to source
var records = sourceProvider.OpenReader(new Dictionary<string, object> { { "@entity", 2 } });
records.Cast<DataRecord>().ForEach(rec =>
{
recordsRead++;
sourceBuffer.Post(rec);
//Task.Delay(100);
});
//"Press enter to complete dataflow".writeline();
//Console.ReadLine();
sourceBuffer.Complete();
//Console.ReadLine();
await storeBlock.Completion;Brad Serbu
Tüm Yanıtlar
-
20 Nisan 2012 Cuma 21:16
You don't actually need all those ContinueWith(). To propagate completion, you can set DataflowLinkOPtions.PropagateCompletion:
sourceBuffer.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true }); batchBlock.LinkTo(storeBlock, new DataflowLinkOptions { PropagateCompletion = true });And you also don't need to call TriggerBatch(). First, it doesn't make any sense to do that after the Completion task of that block is completed. Second, new batch is created automatically from the remaining items when you complete the block (either directly by calling Complete(), or indirectly through PropagateCompletion).
- Yanıt Olarak İşaretleyen Brad Serbu 20 Nisan 2012 Cuma 21:51
- Yanıt İşaretini Geri Alan Brad Serbu 20 Nisan 2012 Cuma 21:56
- Yanıt Olarak İşaretleyen Brad Serbu 20 Nisan 2012 Cuma 21:58
-
20 Nisan 2012 Cuma 22:00
Your right about PropigateCompletion... That has the desired effect.
However, if I modify the code above, then the boundedcapacity = 10 (from related post), not all the records are processed.
Brad Serbu
-
20 Nisan 2012 Cuma 22:36
If you're adding them using Post() without checking the returned value, then that is to be expected. Instead of using Post(), you should probably use SendAsync() from an async method.- Yanıt Olarak İşaretleyen Brad Serbu 20 Nisan 2012 Cuma 22:46