locked
Apparent BufferBlock.Post/Receive/ReceiveAsync race/bug RRS feed

  • Question

  • I know... I'm not really using TDF to its maximum potential. ATM I'm simply using BufferBlock as a safe queue for message passing, where producer and consumer are running at different rates. I'm seeing some strange behaviour that leaves me stumped as to how to proceed.


            private BufferBlock<object> messageQueue = new BufferBlock<object>();
    
            public void Send(object message)
            {
                var accepted=messageQueue.Post(message);
                logger.Info("Send message was called qlen = {0} accepted={1}",messageQueue.Count,accepted);
            }
    
            public async Task<object> GetMessageAsync()
            {
                try
                {
                    var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
                    //despite messageQueue.Count>0 next line occasionally does not execute
                    logger.Info("message received");
                    //.......
                }
                catch(TimeoutException)
                {
                    //do something
                }
            }

    In the code above (which is part of a 2000 line distributed solution), Send is being called periodically every 100ms or so. This means an item is Posted to messageQueue at around 10 times a second. This is verified. However, occasionally it appears that ReceiveAsync does not complete within the timeout (i.e. the Post is not causing ReceiveAsync to complete) and TimeoutException is being raised after 30s. At this point, messageQueue.Count is in the hundreds. This is unexpected. This problem has been observed at slower rates of posting too (1 post/second) and usually happens before 1000 items have passed through the BufferBlock.

    So, to work around this issue, I am using the following code, which works, but occasionally causes 1s latency when receiving (due to the bug above occurring)


            public async Task<object> GetMessageAsync()
            {
                try
                {
                    object m;
                    var attempts = 0;
                    for (; ; )
                    {
                        try
                        {
                            m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                        }
                        catch (TimeoutException)
                        {
                            attempts++;
                            if (attempts >= 30) throw;
                            continue;
                        }
                        break;
    
                    }
                   
                    logger.Info("message received");
                    //.......
                }
                catch(TimeoutException)
                {
                    //do something
                }
           }


    This looks like a race condition in TDF to me, but I can't get to the bottom of why this doesn't occur in the other places where I use BufferBlock in a similar fashion. Experimentally changing from ReceiveAsync to Receive doesn't help. I haven't checked, but I imagine in isolation, the code above works perfectly. It's a pattern I've seen documented in "Introduction to TPL Dataflow"tpldataflow.docx.

    What can I do to get to the bottom of this? Are there any metrics that might help infer what's happening? If I can't create a reliable test case, what more information can I offer?

    Help!




    Monday, April 9, 2012 2:37 AM

Answers

  • Hi biggy_spender,

    I do not see anything wrong with your code. In your real application could you try to use:

    var m = await messageQueue.ReceiveAsync();

    instead of:

    var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

    and observe how the application behaves?

    Wednesday, April 11, 2012 12:42 AM

All replies

  • Hi biggy_spender-

    Thanks for the info.  Are you using the latest version of the DLL that's available for .NET 4 from http://msdn.microsoft.com/en-us/devlabs/gg585582 or that's in .NET 4.5 Beta? We're not aware of any bugs here, so providing a small, viable repro would be very helpful if you're able to do so. 

    Does this repro for you if you remove the timeout entirely and just use ReceiveAsync() instead of ReceiveAsync(TimeSpan)?

    Also, do you have a sense for what the load on the ThreadPool looks like when this occurs?  I'm wondering if this timeout is occurring because the relevant work can't be scheduled in time (though that'd have to be quite some load to take 30 seconds).  Is your application messing with the ThreadPool's max number of threads?



    Monday, April 9, 2012 4:18 AM
  • Hi Stephen,

    Try as I might, I can't reproduce this issue in isolation. The problem occurs with all variants of Receive/ReceiveAsync. Load on the ThreadPool is low. I haven't messed with any ThreadPool settings, nor am I explicitly setting any TaskScheduler. Currently, I am using vs2012beta with .net4.5 in a virtual machine, but in an attempt to isolate the problem, I ported to async CTP under .net4 in vs2010 on a non-virtual machine.The problem still persisted.

    Not much to go on. Sorry.


    Monday, April 9, 2012 5:25 PM
  • Hi  biggy_spender-<o:p></o:p>

    Is  there any chance that the buffer had others blocks linked to it before ReceiveAsync is invoked?<o:p></o:p>


    Monday, April 9, 2012 11:28 PM
  • Hi Cristina,

    The BufferBlock is not exposed anywhere else, nor is it linked to any other blocks. Only Post and Receive parts of the API are used.

    There is perhaps another issue that might throw light on things. The code around the ReceiveAsync looks something like this:

                    var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); 
                    
                    logger.Info(c + ":message received");
                    IList<object> remaining;// = new List<object>() { message };
                    var mEnum = Enumerable.Repeat(m, 1);
                    messages = messageQueue.TryReceiveAll(out remaining) ? mEnum.Concat(remaining) : mEnum; 

    I wonder if all is well in TryReceiveAll. Perhaps it is this call that is causing the bug to occur? It's the only place I use this pattern in my code. I'm aware that there are other ways that one might try to batch the output of a BufferBlock. Is there anything fundamentally wrong with what I am doing here?

    It's probably worth mentioning that this code is not re-entrant. Only one thread passes through this code at a time. No other Receive-type calls are made anywhere else.

    My best attempt at creating a test case does not appear to re-create the problem:

        class Program
        {
            
            static void Main(string[] args)
            {
                var p = new Program();
                Console.ReadKey();
            }
    
    
            private readonly BufferBlock<object> mq=new BufferBlock<object>();
            
            public Program()
            {
                new Thread(_ => {
                    for (; ; )
                    {
                        mq.Post(new object());
                    }
                }).Start();
                StartReading();
            }
    
            private async void StartReading()
            {
                var m = await mq.ReceiveAsync(TimeSpan.FromSeconds(30));
    
                Console.WriteLine("message received");
                IList<object> remaining;// = new List<object>() { message };
                var mEnum = Enumerable.Repeat(m, 1);
                IEnumerable<object> messages = mq.TryReceiveAll(out remaining) ? mEnum.Concat(remaining) : mEnum;
                Console.WriteLine(messages.Count() + " items read");
                await Task.Delay(500);
                StartReading();
            }
    
    
    
        }

    Tuesday, April 10, 2012 2:47 AM
  • Hi biggy_spender,

    I do not see anything wrong with your code. In your real application could you try to use:

    var m = await messageQueue.ReceiveAsync();

    instead of:

    var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

    and observe how the application behaves?

    Wednesday, April 11, 2012 12:42 AM
  • I may have more information about this issue.    I had a class with a backroundworker as the producer (producing with synchronous POST method) and an async task as the consumer.    I instantiated it over 100 times and had them running in parallel.    The producer produced and the data got into the BufferBlock, but the consumer blocked indefinitely most of the time.   FWIW the producer was receiving from a network socket but I don't suspect that had anything to do with it...

    My solution was to turn the backgroundworker into a thread, then the problem went away.

    Saturday, September 26, 2020 3:00 PM