none
Creating a Service that uses a Dataflow and enables Throttling RRS feed

  • Question

  • I'm trying to understand throttling and flow in a Service that uses Dataflow.
    I didn't see a service oriented Dataflow example.  If one exists, please let me know.

    I would like to keep answers only related to Dataflow Service Architecture and not on SOLID principals, alternative frameworks, etc.

    I'm looking at creating a service.  The service will accept a request and queue it for processing. This will be important as it will process a large data set and the server will have limited memory/cpu.

    For context let's turn this Pipeline example into a service
    https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline

    First, I would take the bulk of the code in the Main() function and drop it into a singleton class.

    I would like to accept all requests through my service so I do not want to set DataflowBlockOptions.BoundedCapacity property.
    I only want to process a few jobs at a given time so I would define DataflowBlockOptions.MaxMessagesPerTask on each of the pipelines.

    Then create a job class

    class MyJob
    {
        string url
        string text
        string[] words
        list<string> reversed
    }

    Because I would like the service to allow text to be readily available instead of downloaded, I will add an accept job method and inject into the pipeline at the appropriate place

    async Task<MyJob> AcceptAsync (MyJob job)
    {
      if (job.text)
      {
        await createWordList.SendAsync(job);
        return await createWordList.ReceiveAsync();
      }
      else
      {
        await downloadString.SendAsync(job);
        return await downloadString.ReceiveAsync();
      }
    }


    =========
    Service
    =========

    public sealed class MyService
    {
      public async Task<List<string>> FindReverseWordsByUrl(string url)
      {
          var job = new MyJob { url = url }
          var task = await singleton.AcceptAsync(job);
          return task.reversed;
      }

      public async Task<List<string>> FindReverseWords(string text)
      {
          var job = new MyJob { text = text }
          var task = await singleton.AcceptAsync(job);
          return task.reversed;
      }

    }


    So
    1) Is using MaxMessagesPerTask the appropriate way to handle this kind of throttling?
    2) Is injecting into the middle of the pipeline okay as in my AcceptAsync method?
    3) The original example uses .Complete and .Competion.Wait() since they will continuously run... Correct?
    4) Is there a better way to handle this?






    Thursday, January 25, 2018 4:02 PM

All replies

  • I would like to accept all requests through my service so I do not want to set DataflowBlockOptions.BoundedCapacity property.

    Using BoundedCapacity won't really change that. The only thing it will change is whether your code is going to wait on SendAsync or on ReceiveAsync.

    I only want to process a few jobs at a given time so I would define DataflowBlockOptions.MaxMessagesPerTask on each of the pipelines.

    That's not what MaxMessagesPerTask does. That property lets you choose how many messages are going to be processed in a batch using the same Task, which lets you decide between fairness (each message has its own Task that's scheduled on its own) and low overhead (many messages are being processed using a single Task).

    To control how many messages are processed at the same time, you need to use ExecutionDataflowBlockOptions.MaxDegreeOfParallelism.

    await createWordList.SendAsync(job);
    return await createWordList.ReceiveAsync();

    This will only work if you have a guarantee that AcceptAsync is not going to be called more than once at the same time (at which point, using Dataflow would be almost pointless). Otherwise, the calls to SendAsync and ReceiveAsync could be executed in the wrong order, which will result in returning the wrong result.

    Instead, you will need to provide some way of tracking completion of MyJob. Probably the best way to do that is to use TaskCompletionSource.

    Is injecting into the middle of the pipeline okay as in my AcceptAsync method?

    Yes, injecting into the middle of the pipeline is okay. But besides the concurrency issue I mentioned above, your AcceptAsync does not really use the pipeline, since it always sends and receives to the same block. I'm not sure if that way your intention.

    Monday, January 29, 2018 12:48 PM