Answered Correct use of Complete()

  • Freitag, 20. April 2012 20:00
     
     

    Is below the correct method to complete all linked tasks when the source is done posting?

                var recordsRead = 0;
                var recordsStored = 0;

                var sourceBuffer = new BufferBlock<DataRecord>();
                var batchBlock = new BatchBlock<DataRecord>(100); // Save records 10 at a time
                var storeBlock = new ActionBlock<DataRecord[]>(recs =>
                {
                    store.StoreAsync(null, recs, r => r["TrackId"]).Wait();

                    //recs.ForEach(r => recordsStored++);
                    
                    recs.ForEach(rec =>
                        "stored record {0} - {1}".format(++recordsStored, rec.ToJSON(false, true)).writeline());
                }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
                sourceBuffer.Completion.ContinueWith(task =>
                {
                    "Source completed... read {0} records".format(recordsRead).writeline();
                    batchBlock.Complete();
                });
                batchBlock.Completion.ContinueWith(task =>
                {
                    batchBlock.TriggerBatch();
                    storeBlock.Complete();
                }); // Flush any remaining records in batch
                storeBlock.Completion.ContinueWith(task =>
                    "Processing completed... stored {0} records".format(recordsStored).writeline());

                sourceBuffer.LinkTo(batchBlock);
                batchBlock.LinkTo(storeBlock);

                // Post items to source
                var records = sourceProvider.OpenReader(new Dictionary<string, object> { { "@entity", 2 } });
                records.Cast<DataRecord>().ForEach(rec =>
                {
                    recordsRead++;
                    sourceBuffer.Post(rec);
                    //Task.Delay(100);
                });

                //"Press enter to complete dataflow".writeline();
                //Console.ReadLine();
                sourceBuffer.Complete();
                //Console.ReadLine();

                await storeBlock.Completion;

    Brad Serbu

Alle Antworten

  • Freitag, 20. April 2012 21:16
     
     Beantwortet Enthält Code

    You don't actually need all those ContinueWith(). To propagate completion, you can set DataflowLinkOPtions.PropagateCompletion:

    sourceBuffer.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true });
    batchBlock.LinkTo(storeBlock, new DataflowLinkOptions { PropagateCompletion = true });

    And you also don't need to call TriggerBatch(). First, it doesn't make any sense to do that after the Completion task of that block is completed. Second, new batch is created automatically from the remaining items when you complete the block (either directly by calling Complete(), or indirectly through PropagateCompletion).

    • Als Antwort markiert Brad Serbu Freitag, 20. April 2012 21:51
    • Tag als Antwort aufgehoben Brad Serbu Freitag, 20. April 2012 21:56
    • Als Antwort markiert Brad Serbu Freitag, 20. April 2012 21:58
    •  
  • Freitag, 20. April 2012 22:00
     
     

    Your right about PropigateCompletion... That has the desired effect.

    However, if I modify the code above, then the boundedcapacity = 10 (from related post), not all the records are processed.  



    Brad Serbu

  • Freitag, 20. April 2012 22:36
     
     Beantwortet
    If you're adding them using Post() without checking the returned value, then that is to be expected. Instead of using Post(), you should probably use SendAsync() from an async method.
    • Als Antwort markiert Brad Serbu Freitag, 20. April 2012 22:46
    •