Answered Correct use of Complete()

  • Friday, April 20, 2012 8:00 PM
     
     

    Is below the correct method to complete all linked tasks when the source is done posting?

                var recordsRead = 0;
                var recordsStored = 0;

                var sourceBuffer = new BufferBlock<DataRecord>();
                var batchBlock = new BatchBlock<DataRecord>(100); // Save records 10 at a time
                var storeBlock = new ActionBlock<DataRecord[]>(recs =>
                {
                    store.StoreAsync(null, recs, r => r["TrackId"]).Wait();

                    //recs.ForEach(r => recordsStored++);
                    
                    recs.ForEach(rec =>
                        "stored record {0} - {1}".format(++recordsStored, rec.ToJSON(false, true)).writeline());
                }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
                sourceBuffer.Completion.ContinueWith(task =>
                {
                    "Source completed... read {0} records".format(recordsRead).writeline();
                    batchBlock.Complete();
                });
                batchBlock.Completion.ContinueWith(task =>
                {
                    batchBlock.TriggerBatch();
                    storeBlock.Complete();
                }); // Flush any remaining records in batch
                storeBlock.Completion.ContinueWith(task =>
                    "Processing completed... stored {0} records".format(recordsStored).writeline());

                sourceBuffer.LinkTo(batchBlock);
                batchBlock.LinkTo(storeBlock);

                // Post items to source
                var records = sourceProvider.OpenReader(new Dictionary<string, object> { { "@entity", 2 } });
                records.Cast<DataRecord>().ForEach(rec =>
                {
                    recordsRead++;
                    sourceBuffer.Post(rec);
                    //Task.Delay(100);
                });

                //"Press enter to complete dataflow".writeline();
                //Console.ReadLine();
                sourceBuffer.Complete();
                //Console.ReadLine();

                await storeBlock.Completion;

    Brad Serbu

All Replies

  • Friday, April 20, 2012 9:16 PM
     
     Answered Has Code

    You don't actually need all those ContinueWith(). To propagate completion, you can set DataflowLinkOPtions.PropagateCompletion:

    sourceBuffer.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true });
    batchBlock.LinkTo(storeBlock, new DataflowLinkOptions { PropagateCompletion = true });

    And you also don't need to call TriggerBatch(). First, it doesn't make any sense to do that after the Completion task of that block is completed. Second, new batch is created automatically from the remaining items when you complete the block (either directly by calling Complete(), or indirectly through PropagateCompletion).

    • Marked As Answer by Brad Serbu Friday, April 20, 2012 9:51 PM
    • Unmarked As Answer by Brad Serbu Friday, April 20, 2012 9:56 PM
    • Marked As Answer by Brad Serbu Friday, April 20, 2012 9:58 PM
    •  
  • Friday, April 20, 2012 10:00 PM
     
     

    Your right about PropigateCompletion... That has the desired effect.

    However, if I modify the code above, then the boundedcapacity = 10 (from related post), not all the records are processed.  



    Brad Serbu

  • Friday, April 20, 2012 10:36 PM
     
     Answered
    If you're adding them using Post() without checking the returned value, then that is to be expected. Instead of using Post(), you should probably use SendAsync() from an async method.
    • Marked As Answer by Brad Serbu Friday, April 20, 2012 10:46 PM
    •