none
Why does BufferBlock<T>.ReceiveAsync() hang when data is available? RRS feed

  • Question

  • I am trying to build a throttled async update to a fairly fast flowing input stream. BufferBlock seemed a nice match for this with the idea that I could call the ReceiveAll() to grab everything off the buffer and on the occasions there is nothing there I can await on a ReceiveAsync() to pick up the next element as it arrives.
    But it seems to sometimes hang on the ReceiveAsync() call; and the conditions of failure are odd.
    I posted a question to StackOverflow here. The comments note some interesting behaviour when making other changes to the code.

    This question also seems to have occurred before on an old thread, but there was no solution there. However there was a request for how to reproduce. so here goes:

    Set up a console app with a ref to System.Threading.Tasks.Dataflow.dll

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace DataFlowExample
    {
        class Program
        {
            static void Main(string[] args)
            {
                var context = new CancellationTokenSource();
                var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
                var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);
        
                // shove 10 things onto the buffer
                for(int i = 0; i < 10; i++)
                {
                    // shove something on the buffer every second
                    buffer.Post(i);
                    Thread.Sleep(1000);
                }
                context.Cancel();
            }
        
            // 1. We expect to see something hit the buffer and go immediately through on the TryReceiveAll
            // 2. Then we should see nothing for a second
            // 3. Then we immediately process the next element from the await ReceiveAsync.
            // 4. We should then repeat 2 & 3 'till i == 10 as there will be nothign for TryReceiveAll.
            static async Task ProcessBuffer(BufferBlock<int> buffer, CancellationToken token)
            {
                try
                {
                    while (!token.IsCancellationRequested)
                    {
                        Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
                        IList<int> elements;
                        if (!buffer.TryReceiveAll(out elements))
                        {
                            try
                            {
                                var element = await buffer.ReceiveAsync(TimeSpan.FromMilliseconds(5000), token);
                                elements = new List<int> { element };
                            }
                            catch (TimeoutException) { Console.WriteLine("Failed to get anything on await..."); }
                        }
                        if (elements != null) Console.WriteLine("Elements: " + string.Join(", ", elements));
                    }
                }
                catch (Exception e) { Console.WriteLine("Exception in thread: " + e.ToString()); }
            }
        }
    }
    

    output is:

    11:27: This Breaks it...
    Elements: 0
    11:27: This Breaks it...
    Failed to get anything on await...
    11:27: This Breaks it...
    Elements: 1, 2, 3, 4, 5
    ...And so on
    

    But if I change the log line

    Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
    

    to

    Console.WriteLine("This Works...");
    

    The the output comes out as expected:

    This Works...
    Elements: 0
    This Works...
    Elements: 1
    This Works...
    Elements: 2
    This Works...
    
    etc. But even the act of copying text from the console will switch it back to the failing output, as does switching which part is in background thread, whether we format the date or not, etc.

    This appears to be a bug. The behaviour is, at the very least, far from described in the documentation.




    Tuesday, October 17, 2017 9:12 AM