How to handle completion with DataflowBlock.Encapsulate?

Answered How to handle completion with DataflowBlock.Encapsulate?

  • Monday, July 25, 2011 10:28 AM
     
      Has Code
    I'm not sure whether I've found a bug, or I've simply misunderstood how this is supposed to work. I'm trying to wait until all the information I've pushed into my Dataflow graph has been fully processed. Generally speaking, I do this by first ensuring that all the data has made it into the graph (e.g., if using SendAsync, you must Wait for all tasks returned by SendAsync to complete), then walking through the graph calling Complete and then waiting on Completion for each node in turn (doing so in the order in which data flows through the graph).
     
    This usually works, but it seems to fail if I use any blocks created by DataflowBlock.Encapsulate. I'm using this code, which comes from one of the examples in  TPLDataflow.docx:
    private static IPropagatorBlock<T, T[]> CreateSlidingWindow<T>(int windowSize)
    {
      var queue = new Queue<T>();
      var source = new BufferBlock<T[]>();
      var target = new ActionBlock<T>(item =>
      {
        queue.Enqueue(item);
        if (queue.Count > windowSize) queue.Dequeue();
        if (queue.Count == windowSize) source.Post(queue.ToArray());
      });
      return DataflowBlock.Encapsulate(target, source);
    }
    
     
    (I'm using this because a sliding window is precisely what I need in my application.)
     
    While data is flowing, this block works as advertised. But the problem is that when I come to try to flush data through this block, calling propagatorBlock.Completion.Wait() hangs and never returns.
     
    As far as I can tell, the problem is that in the block returned by Encapsulate, the Completion property simply returns the 'source' block's Completion property. So propagatorBlock.Completion ends up referring to the source.Completion property.
     
    I suppose there's a kind of sense in that, except the encapsulating propagator's Complete method appears only to call Complete on the target. I don't see any way to cause the source block's Complete to be called.
     
    So if I have a propagator block produced by Encapsulate, calling Complete() on that and then waiting for its Completion is effectively equivalent to calling Complete() on one block (the target), and then waiting on Completion on a different block (the source) which never has its Complete() method called.
     
    Since the 'source' block (which is actually the output of this propagator, because 'source' and 'target' are from the point of view of the consumer of the block) is never completed, it seems clear that any attempt to wait on its Completion is doomed. So it looks like you can never use the Completion property of a block returned by DataflowBlock.Encapsulate.
     
    So is this a bug? Or is "Encapsulate" not really meant to encapsulate the two blocks fully? Should I be modifying this example code, so that I can hang onto the 'source' block that it creates, and call Complete() on that prior to waiting on the encapsulating propagator's Completion?

All Replies

  • Monday, July 25, 2011 3:00 PM
    Owner
     
     Answered

    Hi Ian-

    The Encapsulate method merely serves to create an IPropagateBlock implementation whose interface methods are mapped appropriately on to the two provided blocks, saving you the trouble of writing that boilerplate yourself.  As such, it encapsulates the blocks "fully," but it doesn't assume any specific relationship between the two blocks.  While in your example they do represent the only two blocks in the network, in a more complicated example they could be the beginning and ending blocks of a long chain, and it would be incorrect for Encapsulate to assume that the last block should be allowed to complete when the first block does. As such, it's up to you to create the network in whatever manner is appropriate to your needs (e.g. propagating completion from the action block to the buffer block), and then using Encapsulate to create a single block to represent that network.  To do this for the sliding window example, you could add the following statement before calling Encapsulate:

    target.Completion.ContinueWith(t => source.Complete());

    Note that the target action block's task won't complete until it's propagated all of its data to the buffer block, then the task will complete, and then the buffer block's Complete method will be called.

    I hope that helps.

  • Monday, July 25, 2011 3:21 PM
     
     

    Thanks!

    With hindsight, your solution of hooking the two together via the first block's Completion task seems obvious... But I think part of the reason it hadn't occurred to me is that I tended to think of completion as being something intrinsic to how blocks operate, so the idea of hotwiring the two component tasks of an EncapsulatingPropagator block seems like it could be asking for trouble.

    I see now that while it's true that knowing when completion has completed (so to speak) is something that only the block itself can know, it's up to us as users of blocks to decide when a block should start completing. (I.e., the Completion Task is under the control of the block itself, but we get to decide when to call Complete.)

    I've found the completion stuff has taken me a while to get my head around. (The IDataflowBlock.Completion documentation doesn't actually mention how a block will normally enter the state where it will not process any more messages, for example.) I'd vote to include that extra line of code you've supplied that makes the completion work as expected in the sliding window example in TPLDataflow.docx, because it would be a useful practical illustration of the point that "Oh, so I have to wire that up in that situation!"

  • Monday, July 25, 2011 3:29 PM
    Owner
     
     
    Thanks, Ian.  Very good feedback.  We'll try to make sure this is reflected in future incarnations of the documentation.