Source Executor Block


  • Have you guys thought about adding a Source Executor Block? It would be nice to be able to kick off a data flow with a block that only produces messages.

    Let me know if I am missing something. 


    2011年11月19日 上午 04:54


  • Hi Dave-

    Thanks for the suggestion. We have, but that wouldn't be much different than just using a BufferBlock<T>.  For example, let's say you wanted to implement a block which produced a message every N milliseconds... you could do that with code like:

    public static ISourceBlock<T> CreateTimerBlock<T>(int interval, Func<T> generator)
        var bb = new BufferBlock<T>();
        var timer = new Timer(delegate { bb.Post(generator()); }, null, interval, inteval);
        bb.Completion.ContinueWith(delegate { timer.Dispose(); });
        return bb;

    More generally, here's a function that would supply you with delegates to post messages, signal completion, or signal errors:

    public static ISourceBlock<T> CreateSourceBlock<T>(
        Action<Action<T>,Action,Action<Exception>> executor)
        IPropagatorBlock<T,T> bb = new BufferBlock<T>();
        executor(t => bb.Post(t), () => bb.Complete(), e => bb.Fault(e));
        return bb;

    With that function, I could do things like:

    ISourceBlock<int> source = CreateSourceBlock<int>(async (post, complete, fault) =>
             for(int i=0; i<10; i++)
                await Task.Delay(1000);
        catch(Exception e) { fault(e); }

    I hope that helps.

    2011年11月19日 下午 08:49