TransformBlock and Cancellation behavior

Respondido TransformBlock and Cancellation behavior

  • Wednesday, June 27, 2012 6:18 PM
     
      Has Code

    I have put together the following class:

    using System;
    using System.Collections.Concurrent;
    using System.Globalization;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    
    namespace DataflowTest
    {
        public class Job
        {
            public Job()
            {
                Exceptions = new ConcurrentDictionary<string, int>();
            }
    
            private readonly CancellationTokenSource m_cancelSource = new CancellationTokenSource();
    
            public JobStatus Status { get; private set; }
            public ConcurrentDictionary<string, int> Exceptions { get; private set; }
    
            public void Cancel()
            {
                m_cancelSource.Cancel();
            }
    
            public async Task Run()
            {
                Status = JobStatus.Running;
                var edo = new ExecutionDataflowBlockOptions {BoundedCapacity = 100, CancellationToken = m_cancelSource.Token, MaxDegreeOfParallelism = 5};
                var tb = new TransformBlock<int, string>(async i => await ProcessItem(i), edo);
    
                var rt = Task.Run(
                    async () =>
                          {
                              try
                              {
                                  for (var i = 0; i < 100; i++)
                                      await tb.SendAsync(i);
                              }
                              catch (Exception)
                              {
                                  m_cancelSource.Cancel();
                                  throw;
                              }
                              finally
                              {
                                  tb.Complete();
                              }
                          }, m_cancelSource.Token);
    
                var wt = Task.Run(
                    async () =>
                          {
                              try
                              {
                                  while (await tb.OutputAvailableAsync(m_cancelSource.Token))
                                      await SaveItem(await tb.ReceiveAsync(m_cancelSource.Token));
                              }
                              catch (Exception)
                              {
                                  m_cancelSource.Cancel();
                                  // Why do we have to manually clean up the output buffer?
                                  //. tb.LinkTo(DataflowBlock.NullTarget<string>()); // UNCOMMENT THIS!!!
                                  throw;
                              }
                          }, m_cancelSource.Token);
    
                try
                {
                    await Task.WhenAll(rt, wt, tb.Completion);
                    Status = JobStatus.Completed;
                }
                catch (AggregateException aex)
                {
                    aex.Handle(HandleException);
                }
                catch (Exception ex)
                {
                    if (!HandleException(ex)) throw;
                }
                finally
                {
                    if (tb.Completion.Status == TaskStatus.Canceled && Status != JobStatus.Failed)
                        Status = JobStatus.Cancelled;
                }
            }
    
            private bool HandleException(Exception ex)
            {
                if (ex is OperationCanceledException)
                    return true;
                Status = JobStatus.Failed;
                int count;
                Exceptions[ex.Message] = 1 + (Exceptions.TryGetValue(ex.Message, out count) ? count : 0);
                return true;
            }
    
            private static async Task<string> ProcessItem(int i)
            {
                //await Task.Delay(100);
                return await Task.Run(() => "Result - " + i.ToString(CultureInfo.InvariantCulture));
            }
    
            private static async Task SaveItem(string data)
            {
                await Task.Delay(100);
                //if (new Random().Next(100) % 5 == 0)
                //    throw new InvalidOperationException("Error saving item");
                await Task.Run(() => Console.WriteLine(data));
            }
        }
    }

    And the testing method:

    using System;
    using System.Threading;
    
    namespace DataflowTest
    {
        class Program
        {
            static void Main(string[] args)
            {
                var job = new Job();
                var task = job.Run();
    
                // Simulate cancellation after all items have been posted and processed, but not saved
                Thread.Sleep(2000);
                job.Cancel();
    
                task.Wait();
                Console.WriteLine(job.Status);
                foreach (var exception in job.Exceptions)
                    Console.Error.WriteLine(exception.Key);
                Console.WriteLine("Press Enter to Exit...");
                Console.ReadLine();
            }
        }
    }

    As you can see, if cancellation was triggered right after posting and processing have been completed, but saving has not been finished yet, waiting on TransformBlock's Completion will never return unless the output buffer is manually cleared.

    I wonder is such behavior intentional or not? On my opinion it is not quite expected and straightforward...


    • Edited by yuramag Wednesday, June 27, 2012 6:20 PM
    •  

All Replies

  • Wednesday, June 27, 2012 9:11 PM
     
     
    This is quite confusing. It seems to me this is caused by calling Complete() (if you comment that out, the block gets canceled correctly). But I haven't been able to reproduce this in my own (simpler) code, which indicates it may indeed be a bug.
    • Edited by svick Wednesday, June 27, 2012 9:12 PM
    •  
  • Thursday, June 28, 2012 1:07 PM
     
     
    The interesting fact is that if you take out the m_cancelSource.Token parameter from calls to tb.OutputAvailableAsync and tb.ReceiveAsync, the problem is gone. It seems like in case when cancellation was already requested for current cancellation token, the logic inside OutputAvailableAsync and ReceiveAsync just throws TaskCanceledException, and doesn't react on cancellation appropriately. I believe, TaskCanceledException should not be thrown in this case and the OutputAvailableAsync method should just return False. Does this make sense?
  • Friday, July 06, 2012 7:28 PM
    Owner
     
     Answered

    Hi yuramag,

    Thank you for reporting this scenario, we will evaluate how to make the experience better in these cases.

    The behavior’s explanation is below:

    TransformBlock has two internal blocks: a target block and a source block. The Transform block is moved in a completed state when its source is moved in completed state.

    If transfromBlock.Complete is called then the internal target part will be completed and will notify the source part that needs to be moved in completed state when ready (all the data was processed).  If the source has messages in the output queue the source is not ready to move in the completion state. If the token is cancelled sometime after the Complete was invoked, with the first occasion the source will be notified and it will clean all the messages from the output queue and will initiate the completion phase.

    In the case above the source is not ready to be completed when the Complete method is invoked and ReceiveAsync will not be invoked after the token was cancelled.

    The sample code above needs to be updated to have the ReceiveAsync and OutputAvailableAsync invoked without the cancellation token. They will be transitioned in completed state when the block is completed.

    Hope this helps.

  • Monday, July 09, 2012 1:33 PM
     
     

    Hi Cristina,

    Thanks for explaining this. Although removing the cancellation token from ReceiveAsync and OutputAvailableAsync would solve this problem, it could cause different side effect. I will not be able to cancel the data flow until at least one event is triggered after cancelling the token. Suppose my input is coming from an external event, e.g. folder change notifications, or some user input. It could take hours (or even infinite) until the next event triggers... Such behavior is not suitable in general.