none
TPL Dataflow request/response callback process

    Question

  • We recently developed a data-driven, flow-control solution (process-based workflow) using the TPL Task Factory to dynamically and asynchronously “route” messages through a series of “steps” (actor/agent processes) such as message Parsing, Translating, Logging, Searching, etc.  Our solution seems to work well and has displayed great efficiency. As we considered adding administrative controls and monitoring, however, we stumbled across TPL Dataflow and thought that moving the TPL Tasks to Dataflow Blocks would not only provide the same high degree of async performance, but also facilitate monitoring of the steps (queues).

    Unfortunately, we have not been able to efficiently implement a Dataflow process that handles the following requirement:  Our flow-control solution must be able to broker Synchronous front-end requests to Asynchronous backend processes and then link the response back to the request.  The last part of the process (getting the async response back to the requesting thread) has proven to be challenging with TPL Dataflow.

    Please advise if we have misunderstood the TPL Dataflow usage scenario or if we are just overlooking the missing part of the system.

    Thank you.

    Note:  The following code snippet provides a simplified example of the issue:

    class DataFlowTest

    {

        private TransformBlock<string, string> processParse = null;

        private TransformBlock<string, string> processSendToWebservice = null;

        private TransformBlock<string, string> processTranslate = null;

        private TransformBlock<string, string> processLog = null;

        public DataFlowTest()

       {

         processParse = new TransformBlock<string, string>(message => Parse(message));

         processSendToWebservice = new TransformBlock<string, string>(message => SendToWebservice(message));

         processTranslate = new TransformBlock<string, string>(message => Translate(message));

         processLog = new TransformBlock<string, string>(message => Log(message));

         processParse.LinkTo<string>(processSendToWebservice);

         processSendToWebservice.LinkTo<string>(processTranslate);

         processTranslate.LinkTo<string>(processLog);

       }

     

     string Process(string inputMessage)

     {

        string outputMessage;

        processParse.Post(inputMessage);

        // ISSUE: Need output message data to return to caller

        // SOLUTION: ??

        return outputMessage;

     }

     

     string Parse(string inputMessage)

     {

        string outputMessage;

        // ...

        // Parse logic removed for brevity

        // ...

        return outputMessage;

     }

     

     string SendToWebservice(string inputMessage)

     {

      string outputMessage;

      // ...

      // Call to external Webservice logic removed for brevity

      // ...

      return outputMessage;

    }

     

     string Translate(string inputMessage)

     {

      string outputMessage;

      // ...

      // Translate logic removed for brevity

      // ...

      return outputMessage;

    }

     

     string  Log(string inputMessage)

     {

      string outputMessage;

      // ...

      // Logging logic removed for brevity

      // ...

      return outputMessage;

     }

    }

     
     
    Wednesday, October 03, 2012 4:28 PM

Answers

  • What you need is some sort of token that could be used to set the final result of the dataflow and also to get it when its ready. It would be sent alongside the processed data and actually used only at the end of the dataflow. I think TaskCompletionSource is ideal for this, because you use Task to access the result, which means you can easily wait until it's available (synchronously or asynchronously). To do this, you could use something like Tuple<TData, TaskCompletionSource<TResult>>, but I think using a custom class would be better here:

    class DataWithResult<TData, TResult>
    {
    	private readonly TaskCompletionSource<TResult> m_completionSource;
    
    	public TData Data { get; private set; }
    
    	public DataWithResult(TData data, TaskCompletionSource<TResult> completionSource)
    	{
    		m_completionSource = completionSource;
    		Data = data;
    	}
    
    	public void SetResult(TResult result)
    	{
    		m_completionSource.SetResult(result);
    	}
    
    	public DataWithResult<TNewData, TResult> WithNewData<TNewData>(TNewData newData)
    	{
    		return new DataWithResult<TNewData, TResult>(newData, m_completionSource);
    	}
    }

    Because you don't want to deal with passing the completion source in each block, you could create a helper method to do that. And another one that creates an ActionBlock that sets the result of the dataflow:

    private IPropagatorBlock<DataWithResult<TInput, TResult>, DataWithResult<TOutput, TResult>>
    	CreateTransformBlockWithResult<TInput, TOutput, TResult>(Func<TInput, TOutput> transform)
    {
    	return new TransformBlock<DataWithResult<TInput, TResult>, DataWithResult<TOutput, TResult>>(
    		dwr => dwr.WithNewData(transform(dwr.Data)));
    }
    
    private ITargetBlock<DataWithResult<TResult, TResult>> CreateSetResultBlock<TResult>()
    {
    	return new ActionBlock<DataWithResult<TResult, TResult>>(dwr => dwr.SetResult(dwr.Data));
    }

    Your code would then look something like:

    private readonly IPropagatorBlock<DataWithResult<string, string>, DataWithResult<string, string>> processParse;
    // etc.
    
    public DataFlowTest()
    {
    	processParse = CreateTransformBlockWithResult<string, string, string>(message => Parse(message));
    	// etc.
    	processParse.LinkTo(processSendToWebservice);
    	processSendToWebservice.LinkTo(processTranslate);
    	processTranslate.LinkTo(processLog);
    	processLog.LinkTo(CreateSetResultBlock<string>());
    }
    
    public string Process(string inputMessage)
    {
    	var tcs = new TaskCompletionSource<string>();
    	processParse.Post(new DataWithResult<string, string>(inputMessage, tcs));
    	return tcs.Task.Result;
    }

    The type IPropagatorBlock<DataWithResult<string, string>, DataWithResult<string, string>> is way too long, but I don't see a good way to make it shorter.

    Also, this way all of the processing is completely asynchronous and uses threads only when needed, but Process() is still blocking, which means you would have one blocked thread for each message currently being processed. Because of that, I think it would be better to make Process() asynchronous too:

    public Task<string> ProcessAsync(string inputMessage)
    {
    	var tcs = new TaskCompletionSource<string>();
    	processParse.Post(new DataWithResult<string, string>(inputMessage, tcs));
    	return tcs.Task;
    }

    This method doesn't actually use async-await, because there is no need to use it. But it will work well if you call it from async methods.

    • Marked as answer by t15 Thursday, October 04, 2012 7:19 PM
    Thursday, October 04, 2012 11:37 AM

All replies

  • What you need is some sort of token that could be used to set the final result of the dataflow and also to get it when its ready. It would be sent alongside the processed data and actually used only at the end of the dataflow. I think TaskCompletionSource is ideal for this, because you use Task to access the result, which means you can easily wait until it's available (synchronously or asynchronously). To do this, you could use something like Tuple<TData, TaskCompletionSource<TResult>>, but I think using a custom class would be better here:

    class DataWithResult<TData, TResult>
    {
    	private readonly TaskCompletionSource<TResult> m_completionSource;
    
    	public TData Data { get; private set; }
    
    	public DataWithResult(TData data, TaskCompletionSource<TResult> completionSource)
    	{
    		m_completionSource = completionSource;
    		Data = data;
    	}
    
    	public void SetResult(TResult result)
    	{
    		m_completionSource.SetResult(result);
    	}
    
    	public DataWithResult<TNewData, TResult> WithNewData<TNewData>(TNewData newData)
    	{
    		return new DataWithResult<TNewData, TResult>(newData, m_completionSource);
    	}
    }

    Because you don't want to deal with passing the completion source in each block, you could create a helper method to do that. And another one that creates an ActionBlock that sets the result of the dataflow:

    private IPropagatorBlock<DataWithResult<TInput, TResult>, DataWithResult<TOutput, TResult>>
    	CreateTransformBlockWithResult<TInput, TOutput, TResult>(Func<TInput, TOutput> transform)
    {
    	return new TransformBlock<DataWithResult<TInput, TResult>, DataWithResult<TOutput, TResult>>(
    		dwr => dwr.WithNewData(transform(dwr.Data)));
    }
    
    private ITargetBlock<DataWithResult<TResult, TResult>> CreateSetResultBlock<TResult>()
    {
    	return new ActionBlock<DataWithResult<TResult, TResult>>(dwr => dwr.SetResult(dwr.Data));
    }

    Your code would then look something like:

    private readonly IPropagatorBlock<DataWithResult<string, string>, DataWithResult<string, string>> processParse;
    // etc.
    
    public DataFlowTest()
    {
    	processParse = CreateTransformBlockWithResult<string, string, string>(message => Parse(message));
    	// etc.
    	processParse.LinkTo(processSendToWebservice);
    	processSendToWebservice.LinkTo(processTranslate);
    	processTranslate.LinkTo(processLog);
    	processLog.LinkTo(CreateSetResultBlock<string>());
    }
    
    public string Process(string inputMessage)
    {
    	var tcs = new TaskCompletionSource<string>();
    	processParse.Post(new DataWithResult<string, string>(inputMessage, tcs));
    	return tcs.Task.Result;
    }

    The type IPropagatorBlock<DataWithResult<string, string>, DataWithResult<string, string>> is way too long, but I don't see a good way to make it shorter.

    Also, this way all of the processing is completely asynchronous and uses threads only when needed, but Process() is still blocking, which means you would have one blocked thread for each message currently being processed. Because of that, I think it would be better to make Process() asynchronous too:

    public Task<string> ProcessAsync(string inputMessage)
    {
    	var tcs = new TaskCompletionSource<string>();
    	processParse.Post(new DataWithResult<string, string>(inputMessage, tcs));
    	return tcs.Task;
    }

    This method doesn't actually use async-await, because there is no need to use it. But it will work well if you call it from async methods.

    • Marked as answer by t15 Thursday, October 04, 2012 7:19 PM
    Thursday, October 04, 2012 11:37 AM
  • Thank you for following up on our question and providing such a complete and meaningful response.

    Indeed, the TaskCompletionSource  solution that you proposed allowed us to implement our target workflow with TPL Dataflow and still achieve the required request/response synchronization.

    Note: In our admittedly simplistic test scenario, our TPL Task Factory based workflow process substantially outperformed the TPL Dataflow process.  We have tinkered with various settings and configurations, but we have not been able to resolve the TPL Task vs TPL Dataflow performance discrepancy. In the event that we are not able to speed up the dataflow solution, we plan to proceed with the TPL Task based solution.

    Thank you.
    Thursday, October 04, 2012 7:20 PM