none
Correct use of Complete()

    Pergunta

  • Is below the correct method to complete all linked tasks when the source is done posting?

                var recordsRead = 0;
                var recordsStored = 0;

                var sourceBuffer = new BufferBlock<DataRecord>();
                var batchBlock = new BatchBlock<DataRecord>(100); // Save records 10 at a time
                var storeBlock = new ActionBlock<DataRecord[]>(recs =>
                {
                    store.StoreAsync(null, recs, r => r["TrackId"]).Wait();

                    //recs.ForEach(r => recordsStored++);
                    
                    recs.ForEach(rec =>
                        "stored record {0} - {1}".format(++recordsStored, rec.ToJSON(false, true)).writeline());
                }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
                sourceBuffer.Completion.ContinueWith(task =>
                {
                    "Source completed... read {0} records".format(recordsRead).writeline();
                    batchBlock.Complete();
                });
                batchBlock.Completion.ContinueWith(task =>
                {
                    batchBlock.TriggerBatch();
                    storeBlock.Complete();
                }); // Flush any remaining records in batch
                storeBlock.Completion.ContinueWith(task =>
                    "Processing completed... stored {0} records".format(recordsStored).writeline());

                sourceBuffer.LinkTo(batchBlock);
                batchBlock.LinkTo(storeBlock);

                // Post items to source
                var records = sourceProvider.OpenReader(new Dictionary<string, object> { { "@entity", 2 } });
                records.Cast<DataRecord>().ForEach(rec =>
                {
                    recordsRead++;
                    sourceBuffer.Post(rec);
                    //Task.Delay(100);
                });

                //"Press enter to complete dataflow".writeline();
                //Console.ReadLine();
                sourceBuffer.Complete();
                //Console.ReadLine();

                await storeBlock.Completion;

    Brad Serbu

    sexta-feira, 20 de abril de 2012 20:00

Respostas

  • You don't actually need all those ContinueWith(). To propagate completion, you can set DataflowLinkOPtions.PropagateCompletion:

    sourceBuffer.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true });
    batchBlock.LinkTo(storeBlock, new DataflowLinkOptions { PropagateCompletion = true });

    And you also don't need to call TriggerBatch(). First, it doesn't make any sense to do that after the Completion task of that block is completed. Second, new batch is created automatically from the remaining items when you complete the block (either directly by calling Complete(), or indirectly through PropagateCompletion).

    • Marcado como Resposta Brad Serbu sexta-feira, 20 de abril de 2012 21:51
    • Não Marcado como Resposta Brad Serbu sexta-feira, 20 de abril de 2012 21:56
    • Marcado como Resposta Brad Serbu sexta-feira, 20 de abril de 2012 21:58
    sexta-feira, 20 de abril de 2012 21:16
  • If you're adding them using Post() without checking the returned value, then that is to be expected. Instead of using Post(), you should probably use SendAsync() from an async method.
    • Marcado como Resposta Brad Serbu sexta-feira, 20 de abril de 2012 22:46
    sexta-feira, 20 de abril de 2012 22:36

Todas as Respostas

  • You don't actually need all those ContinueWith(). To propagate completion, you can set DataflowLinkOPtions.PropagateCompletion:

    sourceBuffer.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true });
    batchBlock.LinkTo(storeBlock, new DataflowLinkOptions { PropagateCompletion = true });

    And you also don't need to call TriggerBatch(). First, it doesn't make any sense to do that after the Completion task of that block is completed. Second, new batch is created automatically from the remaining items when you complete the block (either directly by calling Complete(), or indirectly through PropagateCompletion).

    • Marcado como Resposta Brad Serbu sexta-feira, 20 de abril de 2012 21:51
    • Não Marcado como Resposta Brad Serbu sexta-feira, 20 de abril de 2012 21:56
    • Marcado como Resposta Brad Serbu sexta-feira, 20 de abril de 2012 21:58
    sexta-feira, 20 de abril de 2012 21:16
  • Your right about PropigateCompletion... That has the desired effect.

    However, if I modify the code above, then the boundedcapacity = 10 (from related post), not all the records are processed.  



    Brad Serbu

    sexta-feira, 20 de abril de 2012 22:00
  • If you're adding them using Post() without checking the returned value, then that is to be expected. Instead of using Post(), you should probably use SendAsync() from an async method.
    • Marcado como Resposta Brad Serbu sexta-feira, 20 de abril de 2012 22:46
    sexta-feira, 20 de abril de 2012 22:36