Dataflow source - custom block or producer?

Answered Dataflow source - custom block or producer?

  • Sunday, July 22, 2012 7:59 PM
     
      Has Code

    OK, after a long hiatus I am once again looking at Dataflow.

    This is a simple "best practice" question.

    Let's say that I have an input stream that I want to use as a source for my dataflow mesh. The entry point into the mesh is always going to be a BufferBlock<ArraySegment<byte>>.

    Question: should I create my own custom source block, or use more of a "producer" concept?

    e.g., using the custom source block:

    internal sealed class StreamBlock : ISourceBlock<ArraySegment<byte>>
    {
      private readonly Stream stream;
      private readonly BufferBlock<ArraySegment<byte>> buffer;
      private readonly Task completion;
    
      public StreamBlock(Stream stream, DataflowBlockOptions options, int bufferSize)
      {
        this.stream = stream;
        this.buffer = new BufferBlock<ArraySegment<byte>>(options);
        var readTask = Read(bufferSize);
        this.completion = TaskEx.WhenAll(readTask, buffer.Completion);
      }
    
      private async Task Read(int bufferSize)
      {
        var stream = this.stream;
        var buffer = this.buffer;
    
        while (true)
        {
          var array = new byte[bufferSize];
          var read = await stream.ReadAsync(array, 0, bufferSize);
          if (read == 0)
          {
            buffer.Complete();
            return;
          }
    
          var bufferDone = await buffer.SendAsync(new ArraySegment<byte>(array, 0, read));
          if (!bufferDone)
            return;
        }
      }
    
      public Task Completion
      {
        get { return completion; }
      }
    
      // Other ISourceBlock members just forward to this.buffer
    }
    


    Alternatively, using a "producer":

    public static async Task CopyTo(this Stream stream, ITargetBlock<ArraySegment<byte>> target, int bufferSize)
    {
      try
      {
        while (true)
        {
          var array = new byte[bufferSize];
          var read = await stream.ReadAsync(array, 0, bufferSize);
          if (read == 0)
          {
            target.Complete();
            return;
          }
    
          var bufferDone = await target.SendAsync(new ArraySegment<byte>(array, 0, read));
          if (!bufferDone)
            return;
        }
      }
      catch (Exception ex)
      {
        target.Fault(ex);
      }
    }

    The "producer" does manual exception forwarding because its task won't be awaited in practice - it's just used to kick off the mesh.

    Which one is preferred, if either?

    Thanks,
         -Steve

    P.S. All code is thoroughly untested. ;)


    Programming blog: http://nitoprograms.blogspot.com/
      Including my TCP/IP .NET Sockets FAQ
      and How to Implement IDisposable and Finalizers: 3 Easy Rules
    Microsoft Certified Professional Developer

    How to get to Heaven according to the Bible

All Replies