Pipes / Filters - Ending Execution

Answered Pipes / Filters - Ending Execution

  • Friday, August 31, 2012 8:00 PM
     
      Has Code

    Firstly -- this is totally totally awesome! Just used it today for the first time and its so excellent - almost even makes sense. Reading "Introduction to TPL Dataflow" was a great start.

    In alot of data flows I'm sure filtering if something we would do. Is there any good example of how to achieve this? 

     var actor = new ActionBlock<decimal>((i) =>
                {
                    DoSomethingInteresting(i);
                });
                var dollarFilter = new TransformBlock<decimal, decimal>((i) =>
                {
                    if (i > 5M)
                    {
                        return i;
                    };
                    //END_EXECUTION;
                }
               );
                var pipe = new TransformBlock<decimal, decimal>((i) =>
                {
                    Console.WriteLine(String.Format("Transform of {0} to {1} Done" + "-" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), i));
                    return i;
                });
                pipe.LinkTo(dollarFilter);
                dollarFilter.LinkTo(actor);
                //This should get filtered
                pipe.Post(20M);
                //This should get to DoSomethingIntersting(); 
                pipe.Post(400M);

All Replies

  • Friday, August 31, 2012 8:34 PM
     
     

    I would just post to the actor from inside the dollarfilter block. I'm not sure this is a best practice though, so I would like to hear another opinion.

    If you took my approach you could develop a FilterAction block that just combines your top two blocks into a single block.

  • Saturday, September 01, 2012 12:36 AM
     
     Answered Has Code

    The best way to do filtering in TDF is to use the version of LinkTo() that takes a predicate.

    But if an item doesn't fit the predicate, it stays in the source block. The problem with that is that it means no other items will be produced, unless that one item is consumed. To fix that, you can use NullTarget. So, an item is first sent to the actual target, but if it doesn't fit the predicate, it's sent to the NullTarget, which accepts it and immediately throws it away.

    Your code would then look like this:

    pipe.LinkTo(actor, i => i > 5m);
    pipe.LinkTo(DataflowBlock.NullTarget<decimal>());

    There are also ways to do filtering using TransformManyBlock or TransformBlock, but they are more or less hacks.