none
Fluent Dataflow

    Discussion générale

  • Have you guys thought about including a fluent interface to Dataflow? For simple linked chains, it would mean that instead of writing this:

        var inputBuffer = new BufferBlock<int>();   
       
        var transformBlock = new TransformBlock <int, int> (
            i => i * 10,
            new ExecutionDataflowBlockOptions { BoundedCapacity = 20, MaxDegreeOfParallelism = 20 });
           
        var middleBuffer = new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 50 });
       
        var toStringTransform = new TransformBlock<int, string> (
            i => i.ToString(),
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
           
        var writeLineBlock = new ActionBlock <string> (i => Console.WriteLine(i));
       
        inputBuffer.LinkTo (transformBlock);
        transformBlock.LinkTo (middleBuffer);
        middleBuffer.LinkTo (toStringTransform);
        toStringTransform.LinkTo (writeLineBlock);
       
        inputBuffer.Completion.ContinueWith (_ => transformBlock.Complete());
        transformBlock.Completion.ContinueWith (_ => middleBuffer.Complete());
        middleBuffer.Completion.ContinueWith (_ => toStringTransform.Complete());
        toStringTransform.Completion.ContinueWith (_ => writeLineBlock.Complete());
    

    you could just go:

        var inputBuffer = new BufferBlock<int>();   
       
        var chain = inputBuffer
            .AddTransform (i => i * 10, 20, 20)
            .AddBuffer (50)
            .AddTransform (i => i.ToString(), 10)
            .AddAction (Console.WriteLine);
    

    It would make Dataflow more friendly to mainstream scenarios. I also think it makes the intent clearer.

    Here's how fluent extension methods might be defined:

    public static class Extensions
    {
        public static TransformBlock<TInput,TOutput> AddTransform<TInput, TOutput> (
            this ISourceBlock<TInput> source,
            Func<TInput,TOutput> transform,
            ExecutionDataflowBlockOptions options = null)
        {
            var transformBlock = new TransformBlock<TInput,TOutput> (transform, options ?? new ExecutionDataflowBlockOptions());
            source.LinkTo (transformBlock);
            source.Completion.ContinueWith (_ => transformBlock.Complete());
            return transformBlock;
        }
       
        public static TransformBlock<TInput,TOutput> AddTransform<TInput, TOutput> (
            this ISourceBlock<TInput> source,
            Func<TInput,TOutput> transform,
            int maxParallelism,
            int boundedCapacity = -1,
            CancellationToken cancelToken = default (CancellationToken),
            TaskScheduler scheduler = null)
        {
            return AddTransform (source, transform, GetExecutionOptions (maxParallelism, boundedCapacity, cancelToken, scheduler));
        }
       
        public static TransformBlock<TInput,TOutput> AddTransform<TInput, TOutput> (
            this ISourceBlock<TInput> source,
            Func<TInput,Task<TOutput>> transform,
            ExecutionDataflowBlockOptions options = null)
        {
            var transformBlock = new TransformBlock<TInput,TOutput> (transform, options ?? new ExecutionDataflowBlockOptions());
            source.LinkTo (transformBlock);
            source.Completion.ContinueWith (_ => transformBlock.Complete());
            return transformBlock;
        }
       
        public static TransformBlock<TInput,TOutput> AddTransform<TInput, TOutput> (
            this ISourceBlock<TInput> source,
            Func<TInput,Task<TOutput>> transform,
            int maxParallelism,
            int boundedCapacity = -1,
            CancellationToken cancelToken = default (CancellationToken),
            TaskScheduler scheduler = null)
        {
            return AddTransform (source, transform, GetExecutionOptions (maxParallelism, boundedCapacity, cancelToken, scheduler));
        }
       
        public static ActionBlock<TInput> AddAction<TInput> (
            this ISourceBlock<TInput> source,
            Action<TInput> action,
            ExecutionDataflowBlockOptions options = null)
        {
            var actionBlock = new ActionBlock<TInput>(action, options ?? new ExecutionDataflowBlockOptions());
            source.LinkTo (actionBlock);
            source.Completion.ContinueWith (_ => actionBlock.Complete());
            return actionBlock;
        }
       
        public static ActionBlock<TInput> AddAction<TInput> (
            this ISourceBlock<TInput> source,
            Action<TInput> action,
            int maxParallelism,
            int boundedCapacity = -1,
            CancellationToken cancelToken = default (CancellationToken),
            TaskScheduler scheduler = null)
        {
            return AddAction (source, action, GetExecutionOptions (maxParallelism, boundedCapacity, cancelToken, scheduler));
        }
       
        public static BufferBlock<T> AddBuffer<T> (this ISourceBlock<T> source, DataflowBlockOptions options = null)
        {
            var bufferBlock = new BufferBlock<T> (options ?? new ExecutionDataflowBlockOptions());
            source.LinkTo (bufferBlock);
            source.Completion.ContinueWith (_ => bufferBlock.Complete());
            return bufferBlock;
        }
       
        public static BufferBlock<T> AddBuffer<T> (this ISourceBlock<T> source, int boundedCapacity = -1)
        {
            return AddBuffer (source, new System.Threading.Tasks.Dataflow.DataflowBlockOptions { BoundedCapacity = boundedCapacity });
        }
       
        public static ExecutionDataflowBlockOptions GetExecutionOptions (
            int maxParallelism = 1,
            int boundedCapacity = -1,
            CancellationToken cancelToken = default (CancellationToken),
            TaskScheduler scheduler = null)
        {   
            var options = new ExecutionDataflowBlockOptions
            {
                BoundedCapacity = boundedCapacity,
                MaxDegreeOfParallelism = maxParallelism,
                CancellationToken = cancelToken
            };
            if (scheduler != null) options.TaskScheduler = scheduler;
            return options;
        }
    }
    


    Joe

    dimanche 16 octobre 2011 05:21

Toutes les réponses

  • Hi Joe,

     

    Thanks for the feedback. This isn’t a bad idea, because in many cases the dataflow network is static, i.e. the construction code doesn’t need to keep the unlinker returned from LinkTo().

     

    However, your suggestion requires way too many extension methods. How about an overload of LinkTo() that returns the target/propagator instead of an unlinker:

            public static IPropagatorBlock<TInput, TOutput> LinkTo<TInput, TOutput>(

                this ISourceBlock<TInput> source,

                IPropagatorBlock<TInput, TOutput> propagator)

            {

                source.LinkTo(propagator, DataflowLinkOptions.Default);

                return propagator;

            }

     

    Then you can do:

                var head = new TransformBlock<string, int>(s => s.Length);

                head.LinkTo(new TransformBlock<int, long>(x => (long)x * (long)x)) // Uses the NEW overload

                    .LinkTo(new ActionBlock<long>(x => Console.WriteLine(x)));     // Uses the OLD overload

     

                head.Post("foo");

                head.Post("bar");

     

    What do you think?

     

    Zlatko Michailov

    Software Development Engineer, Parallel Computing Platform

    Microsoft Corp.


    This posting is provided "AS IS" with no warranties, and confers no rights.
    mardi 18 octobre 2011 00:36
  • That would be a good start. In the simple cases for which this is applicable, it might also make sense to automatically propagate completion:

            public static IPropagatorBlock<TInput, TOutput> LinkTo<TInput, TOutput>(

     

                this ISourceBlock<TInput> source,

                IPropagatorBlock<TInput, TOutput> propagator)

            {

                source.LinkTo(propagator, DataflowLinkOptions.Default);

                source.Completion.ContinueWith (_ => propagator.Complete());

                return propagator;

            }

    In which case LinkTo would be a misleading name for the method. Maybe something like Chain instead?

    I would also prefer to call static methods to create blocks, so I can use type inference to eliminate type parameter clutter and allow anonymous types. In particular, I'd like to be able to do the following, so I can "carry" a variable, rather like the 'let' clause in query expressions:

    .Chain (Block.Transform (x => new { x, Foo = x.Foo() }))

    I guess that takes us back to square one because in the Block class you'd need a static method for every constructor on every block. There's also something ugly about having two ways to construct blocks - unless it's somehow clearly labelled as a fluent interface on top of Dataflow. The other option would be make the block constructors internal+protected (so the only way to create blocks is via the static methods on the Block class) but might that violate design guidelines?

    Which takes us to the final point which is that it would be nice to have overloads to specify the most common options when creating blocks (degree of parallelism & bounded size) without having to instantiate DataflowBlockOptions class.

    None of this matters if you're using Dataflow in a "serious" way I guess.

    The scenario that brought this up was that the other day I wanted to write a script to download blobs from Azure and bulk copy the data into SQL server. I wanted to parallelize the Azure container queries and blob downloads to avoid multiplying the round-trip overhead, but then the results had to be queued for bulk copy into SQL server (ideally into a bounded queue to avoid unconstrained memory consumption). You can see how Dataflow was an ideal fit for this! And worked brilliantly. But there was an awful lot of plumbing to set up that simple chain. I can totally understand why you designed the API as you have, but for such scenarios it would be awesome if the .NET Framework could somehow offer Actor concurrency with the same plumbing-free interface we get with data parallelism.

     

    P.S. The whole thing is tantalizing close to fitting the query expression pattern :)
    mardi 18 octobre 2011 08:07
  • Hi Joe,

     

    Re: Static factory methods

    In addition to the many static methods needed, the .NET design guidelines are shifting towards having a single “entry” point per type, i.e. having a single way to create instances. Since we already have constructors, that will be the only entry point.

     

    Re: Linking with propagating completion

    If you look at our latest CTP (http://www.microsoft.com/download/en/details.aspx?id=14610), you’ll notice that we introduced DataflowLinkOptions that you can pass into LinkTo(). One of the link options is… PropagateCompletion. If we implement the LinkTo() overload that I proposed, we’ll definitely have a variation that takes LinkOptions. I omitted it for simplicity.

     

    Your feedback is very welcome. Keep it coming.

     

    Zlatko Michailov

    Software Development Engineer, Parallel Computing Platform

    Microsoft Corp.


    This posting is provided "AS IS" with no warranties, and confers no rights.
    mardi 18 octobre 2011 15:57