Multi-TransformBlock implementation

Answered Multi-TransformBlock implementation

  • Wednesday, May 09, 2012 4:37 PM
     
      Has Code

    What is the right way to implement a block similar to a TransformBlock, but where the transformation can yield data to two different streams?

    For example, here is a simple function that consumes integers and produces integers and strings:

    static void Example(int n, Action<int> yieldNum, Action<string> yieldString)
    {
      var sb = new StringBuilder();
      for (int i = 0; i < n; i++)
      {
        yieldNum(i);
        sb.Append(i);
      }
      yieldString(sb.ToString());
    }

    It should be possible to treat this as an ITargetBlock<int> and as two source blocks: an ISourceBlock<int> and an ISourceBlock<string>.  My question is how best to implement a class (probably similar to TransformBlock) that provides the above functionality.

    In general terms, two approaches come to mind:

    • Directly implement the interfaces (ITargetBlock, and expose two properties implementing ISourceBlock) where under the hood we maintain an input BufferBlock and two output BufferBlocks.
    • Rewrite that above method to yield a sum type (Either<int,string>) then wrap it in a TransformBlock, finally link it to two target blocks via predicates that only accept either the left or the right node of Either.

    Neither, of the above seem satifactory to me.  What is the right way to do it?

All Replies

  • Wednesday, May 09, 2012 10:15 PM
     
      Has Code

    I thought it might be useful to elaborate on the second suggestion (using a sum type).  The code below implements this idea.  It also makes clear that it is not a real solution since it relies on a new method (MultiLinkTo), while the "right" solution should be along the lines of what I mentioned in my first post, namely a class with the following properties:

    • Contains a constructor which accepts an Action<TInput,Action<TOutput1>,Action<TOutput2>>
    • Implements ITargetBlock<TInput>
    • Contains two properties with types ISourceBlock<TOutput1> and ISourceBlock<TOutput2>

    Here is the code. First the sum type:

    abstract class Either<TLeft, TRight>
    {
        public TLeft AsLeft { get { return Match(left => left, _ => { throw new Exception(); }); } }
        public TRight AsRight { get { return Match(_ => { throw new Exception(); }, right => right); } }
        public bool IsLeft { get { return Match(left => true, right => false); } }
        public bool IsRight { get { return !IsLeft; } }
        public abstract T Match<T>(Func<TLeft, T> leftSelect, Func<TRight, T> rightSelect);
    
        public sealed class Left : Either<TLeft, TRight>
        {
            readonly TLeft _left;
    
            public Left(TLeft left)
            {
                _left = left;
            }
    
            public override T Match<T>(Func<TLeft, T> leftSelect, Func<TRight, T> _)
            {
                return leftSelect(_left);
            }
        }
    
        public sealed class Right : Either<TLeft, TRight>
        {
            readonly TRight _right;
    
            public Right(TRight right)
            {
                _right = right;
            }
    
            public override T Match<T>(Func<TLeft, T> _, Func<TRight, T> rightSelect)
            {
                return rightSelect(_right);
            }
        }
    }

    Next some helper methods:

    static Action<Action<Either<TOutput1, TOutput2>>>
        MergeYields<TOutput1, TOutput2>(this Action<Action<TOutput1>, Action<TOutput2>> action)
    {
        return eitherAction => action(
            output1 => eitherAction(new Either<TOutput1, TOutput2>.Left(output1)),
            output2 => eitherAction(new Either<TOutput1, TOutput2>.Right(output2)));
    }
    
    static IEnumerable<T> AsEnumerable<T>(this Action<Action<T>> source)
    {
        var bc = new BlockingCollection<T>(1);
        Task.Factory.StartNew(() =>
            {
                source(bc.Add);
                bc.CompleteAdding();
            });
        return bc.GetConsumingEnumerable();
    }
    

    Finally, here is the method that allows links all this together:

    static ITargetBlock<int> MultiLinkTo(Action<int, Action<int>, Action<string>> examp, 
        ITargetBlock<int> intSink, 
        ITargetBlock<string> stringSink)
    {
        var left = new TransformBlock<Either<int, string>, int>(either => either.AsLeft);
        left.LinkTo(intSink);
    
        var right = new TransformBlock<Either<int, string>, string>(either => either.AsRight);
        right.LinkTo(stringSink);
    
        var tb = new TransformManyBlock<int, Either<int, string>>(n =>
            MergeYields<int, string>((a1, a2) => examp(n, a1, a2))
            .AsEnumerable());
        tb.LinkTo(left, either => either.IsLeft);
        tb.LinkTo(right, either => either.IsRight);
        return tb;
    }
    

    Here is an example of using with the method from my previous post:

                var targetBlock = MultiLinkTo(Example,
                    new ActionBlock<int>(n => Console.WriteLine("int: {0}", n)),
                    new ActionBlock<string>(s => Console.WriteLine("string: {0}", s)));
                targetBlock.Post(5);
                targetBlock.Post(3);
    

    Note that I have done nothing about propagating completions, or allowing the blocks to be configured.
  • Wednesday, May 09, 2012 11:46 PM
     
     
    Why doesn't the first solution seem satisfactory for you? JoinBlock does something very similar exactly this way, except in the other direction: it acts as two ITargetBlocks and one ISourceBlock.
  • Thursday, May 10, 2012 1:42 AM
     
     

    The problem I have (or had) with my first solution is that it seemed excessive to have to use three BufferBlocks to do something so seemingly simple; in addition when I tried to do it wasn't clear what the right way to do it was.  Actually, looking over my code above (where I define MultiLinkTo) I could create the class I want by creating two BufferBlocks, using them as intSink and stringSink (above) and then exposing them as ISourceBlock properties. This solution does raise some further questions: for example, what is the right BoundedCapacity for these internal BufferBlocks?  Should it be 1?

  • Thursday, May 10, 2012 2:37 PM
     
      Has Code

    Below is an implementation of the "sum type" approach, together with a small example.  Please comment!  Is the best way to do it?  Are there alternative approaches?

    Thanks.

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    
    public abstract class Either<TLeft, TRight>
    {
        public TLeft AsLeft { get { return Match(left => left, _ => { throw new Exception(); }); } }
        public TRight AsRight { get { return Match(_ => { throw new Exception(); }, right => right); } }
        public bool IsLeft { get { return Match(left => true, right => false); } }
        public bool IsRight { get { return !IsLeft; } }
        public abstract T Match<T>(Func<TLeft, T> leftSelect, Func<TRight, T> rightSelect);
    
        public sealed class Left : Either<TLeft, TRight>
        {
            readonly TLeft _left;
    
            public Left(TLeft left)
            {
                _left = left;
            }
    
            public override T Match<T>(Func<TLeft, T> leftSelect, Func<TRight, T> _)
            {
                return leftSelect(_left);
            }
        }
    
        public sealed class Right : Either<TLeft, TRight>
        {
            readonly TRight _right;
    
            public Right(TRight right)
            {
                _right = right;
            }
    
            public override T Match<T>(Func<TLeft, T> _, Func<TRight, T> rightSelect)
            {
                return rightSelect(_right);
            }
        }
    }
    
    public class SplitBlock<TInput, TOutput1, TOutput2> : ITargetBlock<TInput>
    {
        static readonly DataflowBlockOptions BufferBlockOptions = new DataflowBlockOptions { BoundedCapacity = 1 };
        static readonly DataflowLinkOptions PropagateCompletion = new DataflowLinkOptions { PropagateCompletion = true };
        readonly ITargetBlock<TInput> _in;
        readonly BufferBlock<TOutput1> _out1 = new BufferBlock<TOutput1>(BufferBlockOptions);
        readonly BufferBlock<TOutput2> _out2 = new BufferBlock<TOutput2>(BufferBlockOptions);
    
        public SplitBlock(Action<TInput, Action<TOutput1>, Action<TOutput2>> action)
        {
            var left = new TransformBlock<Either<TOutput1, TOutput2>, TOutput1>(either => either.AsLeft);
            left.LinkTo(_out1, PropagateCompletion);
    
            var right = new TransformBlock<Either<TOutput1, TOutput2>, TOutput2>(either => either.AsRight);
            right.LinkTo(_out2, PropagateCompletion);
    
            var tb = new TransformManyBlock<TInput, Either<TOutput1, TOutput2>>(n =>
                AsEnumerable(MergeYields((a1, a2) => action(n, a1, a2))));
            tb.LinkTo(left, PropagateCompletion, either => either.IsLeft);
            tb.LinkTo(right, PropagateCompletion, either => either.IsRight);
    
            _in = (ITargetBlock<TInput>)tb;
        }
    
        public ISourceBlock<TOutput1> Source1 { get { return _out1; } }
        public ISourceBlock<TOutput2> Source2 { get { return _out2; } }
    
        public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept)
        {
            return _in.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
    
        public void Complete()
        {
            _in.Complete();
        }
    
        public Task Completion
        {
            get { return _in.Completion; }
        }
    
        public void Fault(Exception exception)
        {
            _in.Fault(exception);
        }
    
        static Action<Action<Either<TOutput1, TOutput2>>> MergeYields(
            Action<Action<TOutput1>, Action<TOutput2>> action)
        {
            return eitherAction => action(
                output1 => eitherAction(new Either<TOutput1, TOutput2>.Left(output1)),
                output2 => eitherAction(new Either<TOutput1, TOutput2>.Right(output2)));
        }
    
        static IEnumerable<T> AsEnumerable<T>(Action<Action<T>> source)
        {
            var bc = new BlockingCollection<T>(1);
            Task.Factory.StartNew(() =>
            {
                source(bc.Add);
                bc.CompleteAdding();
            });
            return bc.GetConsumingEnumerable();
        }
    }
    
    class Program
    {
        static void Example(int n, Action<int> yieldNum, Action<string> yieldString)
        {
            var sb = new StringBuilder();
            for (int i = 0; i < n; i++)
            {
                yieldNum(i);
                sb.Append(i).Append(' ');
            }
            yieldString(sb.ToString());
        }
    
        static void Main()
        {
            var sb = new SplitBlock<int, int, string>(Example);
            var intPrinter = new ActionBlock<int>(n => Console.WriteLine("int: {0}", n));
            var stringPrinter = new ActionBlock<string>(s => Console.WriteLine("string: {0}", s));
            sb.Source1.LinkTo(intPrinter, new DataflowLinkOptions { PropagateCompletion = true });
            sb.Source2.LinkTo(stringPrinter, new DataflowLinkOptions { PropagateCompletion = true });
            for (int i = 0; i < 30; i++)
            {
                sb.Post(i);
            }
            sb.Complete();
            Task.WaitAll(intPrinter.Completion, stringPrinter.Completion);
        }
    }

  • Thursday, May 10, 2012 3:53 PM
     
     Answered Has Code

    I think my implementation of SplitBlock can't be correct.  For example the following code:

    static void Main()
    {
        var sb = new SplitBlock<int, int, string>(Example);
        var intToString = new TransformBlock<int, string>(n =>
        {
            Console.WriteLine("intToString: {0}", n);
            return string.Format("int: {0}", n);
        });
        var stringToString = new TransformBlock<string, string>(s => string.Format("string: {0}", s));
        sb.Source1.LinkTo(intToString, new DataflowLinkOptions { PropagateCompletion = true });
        sb.Source2.LinkTo(stringToString, new DataflowLinkOptions { PropagateCompletion = true });
        var t = DataflowBlock.Choose(intToString, s => Console.WriteLine(s), stringToString, s => Console.WriteLine(s));
        Console.WriteLine("All posted: {0}", Enumerable.Range(2, 3).Select(i => sb.Post(i)).All(b => b));
        //t.Wait();
        Console.ReadLine();
    }

    Produces output:

    All posted: True
    string: 0 1
    intToString: 0
    intToString: 1
    intToString: 0
    intToString: 1
    intToString: 2
    intToString: 0
    intToString: 1
    intToString: 2
    intToString: 3

    In particular, Choose never seems to process the ints.
  • Thursday, May 10, 2012 5:52 PM
     
     
    I believe I misunderstood the semantics of Choose.  It accepts one message in total.  Thus the above behavior is not incorrect.