none
TPL DataFlow Query Operators?

    Question

  • Are there any plans to implement Query Operators (Select, Where, etc.) for ISourceBlock<T> or any other TPL DataFlow interfaces?  It seems like there might be an opportunity to implement monadic operations.  Additionally, if that could be implemented, it could also mean an implementation base on Expression Tree's could be implemented to make the queries portable.

    Just curious if their are any plans along those lines...

    Saturday, July 02, 2011 5:55 PM

Answers

  • Hi Jordan-

    Rather than implement LINQ operators for TPL Dataflow, we took the approach of adding ToObservable and ToObserver APIs for ISourceBlock<T> and ITargetBlock<T>.  With those, you can then leverage the full set LINQ operators from Reactive Extensions.  That said, it would be possible to implement many of the LINQ operators over ISourceBlock<T> if you wanted to.  For example, here are some simple Select and Where implementations:

    public static class Extensions
    {
        public static ISourceBlock<U> Select<T, U>(this ISourceBlock<T> source, Func<T, U> func)
        {
            var transform = new TransformBlock<T, U>(func);
            source.LinkTo(transform);
            source.Completion.ContinueWith(delegate { transform.Complete(); });
            return transform;
        }

        public static ISourceBlock<T> Where<T>(this ISourceBlock<T> source, Predicate<T> predicate)
        {
            var buffer = new BufferBlock<T>();
            source.LinkTo(buffer, predicate, discardsMessages: true);
            source.Completion.ContinueWith(delegate { buffer.Complete(); });
            return buffer;
        }
    }

    Note that based on customer feedback, we are also planning a few tweaks to the APIs that will make such implementations easier as well (e.g. opting into automatic flowing of completion in LinkTo rather than also needing to specify the continuation).

    Best,

    Stephen

    Tuesday, July 05, 2011 4:35 PM

All replies

  • Hi Jordan-

    Rather than implement LINQ operators for TPL Dataflow, we took the approach of adding ToObservable and ToObserver APIs for ISourceBlock<T> and ITargetBlock<T>.  With those, you can then leverage the full set LINQ operators from Reactive Extensions.  That said, it would be possible to implement many of the LINQ operators over ISourceBlock<T> if you wanted to.  For example, here are some simple Select and Where implementations:

    public static class Extensions
    {
        public static ISourceBlock<U> Select<T, U>(this ISourceBlock<T> source, Func<T, U> func)
        {
            var transform = new TransformBlock<T, U>(func);
            source.LinkTo(transform);
            source.Completion.ContinueWith(delegate { transform.Complete(); });
            return transform;
        }

        public static ISourceBlock<T> Where<T>(this ISourceBlock<T> source, Predicate<T> predicate)
        {
            var buffer = new BufferBlock<T>();
            source.LinkTo(buffer, predicate, discardsMessages: true);
            source.Completion.ContinueWith(delegate { buffer.Complete(); });
            return buffer;
        }
    }

    Note that based on customer feedback, we are also planning a few tweaks to the APIs that will make such implementations easier as well (e.g. opting into automatic flowing of completion in LinkTo rather than also needing to specify the continuation).

    Best,

    Stephen

    Tuesday, July 05, 2011 4:35 PM
  • Stephen - Thanks for getting back to me.  I was aware that you could use ToObservable to gain access to standard query operators. 

    However, I was thinking that using IObservable<T> on top of TDF would not utilize the ReserveMessage -> ConsumeMessage|ReleaseReservation protocol supported by ISourceBlock<TOutput>.  I'm wondering if ISourceBlock<TOutput> could be treated just like IObservable<T> (composable, monadic) by building a "SelectSourceBlock" (alias for monadic Bind operator) and the other standard query operators, but with the obvious focus towards asynchronous data processing w/ support for transactions (ReserveMessage|ConsumeMessage).

    Thoughts?

    I might whip together a quick proof-of-concept in the next few days to see if the current TDF interfaces could be made monadic, or if they would require changes to support it.  I'll post a link to it here when I do.

     

    Thanks - Jordan


    Wednesday, July 06, 2011 3:25 AM
  • Thanks; looking forward to seeing what you come up with.

    Regarding ReserveMessage/ConsumeMessage/ReleaseReservation, you're correct that ToObservable won't use those.  Since observables/observers represent a push mechanism with no built-in path for feedback/bounding/etc., there's nothing that would cause the registered observer to postpone a message back to the source; rather, the target that connects the source to the observer will blindly accept all messages and forward them on to the subscribed observer.  If you wanted, for example, to have bounding support flow through the entire query, then yes, you would likely want to build your own query operators directly with dataflow blocks rather than using observables for the composition.

    Wednesday, July 06, 2011 2:52 PM