none
ConcurrentQueue<T> adding while evaluating count. RRS feed

  • Question

  • I want to know whether is it thread safe to use the ConcurrentQueue<T> in a scenario where one thread is adding (enquing) elements to the queue while another thread is evaluating the Count and performing some actions based on the Count value.

    In this scenario do I have to use explicit locks or as the name indicates is it safe to use the ConcurrentQueue without explicit locks.

    I ask this here because when I searched for this problem I got complex answers and saying Concurrent collections are meant to be used in parallel programming.

    I know that Concurrent collections have some overhead but I want to know whether the above problem is solved in the collection or not. And can any body give me some reference for the Concurrent Collections tutorials.


    My blog : http://thuruinhttp.wordpress.com
    • Edited by Thuru VMVP Thursday, December 15, 2011 7:36 PM
    Thursday, December 15, 2011 7:35 PM

Answers

  • You can do this safely - ConcurrentQueue<T> is completely thread safe.

     

    That being said, this is typically not the way you'd handle it - especially if multiple threads can Dequeue.  Instead, you'd call TryDequeue to get the first object in the queue, and if it succeeds, process that object.

     

    The problem with relying on Count is that it may change between the time you check the count and the time you use the collection.


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".
    • Proposed as answer by servy42 Thursday, December 15, 2011 7:55 PM
    • Marked as answer by Thuru VMVP Friday, December 16, 2011 9:15 AM
    Thursday, December 15, 2011 7:45 PM
    Moderator
  • The first worked perfectly fine for me, and printed (until I hit a key to exit):

    Producer has started ...
    Counter has started..
    Operation Dequeue has started
    0
    New string added
    1
    1
    Got the the string
    New string added
    1
    Got the the string
    New string added
    1
    
    
    
    


    These versions should work effectively the same - the only difference, as you noticed, is that Task will use ThreadPool threads.

     

    In this case, however, you'd typically want to include the LongRunning hint on task creation, as they are very long living tasks...

     

    BTW - If you're making a producer/consumer type of operation, you may want to consider using BlockingCollection<T> instead.  It allows you to write much cleaner code, on both ends, ie:

     using System;
     using System.Collections.Concurrent;
     using System.Threading;
     using System.Threading.Tasks;
    
    internal class Test
    {
        private static BlockingCollection<string> q = new BlockingCollection<string>();
    
        private static void Main(string[] args)
        {
            Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("Producer has started ...");
                    Random random = new Random();
                    while (true)
                    {
                        Thread.Sleep(random.Next(2500));
                        for (int i = random.Next(5); i >= 0; --i)
                            q.Add(string.Format("thuru {0}", i)); // Add multiple items in batches
                        Console.WriteLine("New strings added");
                    }
                }, TaskCreationOptions.LongRunning);
    
            Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("Consumer has started");
    
                    foreach (var str in q.GetConsumingEnumerable())
                    {
                        Console.WriteLine("Got the string [{0}]", str);
                        Thread.Sleep(250); // Put small delay in processing to simulate work
                    }
                   
                }, TaskCreationOptions.LongRunning);
    
            Console.ReadKey();
        }
    }
    

    This is typically a better option than relying on count.  It also prevents the need for Thread.Sleep (on either end - though I have it in there to simulate work atm...)


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".
    • Marked as answer by Thuru VMVP Saturday, December 17, 2011 6:43 AM
    Friday, December 16, 2011 5:41 PM
    Moderator

All replies

  • You can do this safely - ConcurrentQueue<T> is completely thread safe.

     

    That being said, this is typically not the way you'd handle it - especially if multiple threads can Dequeue.  Instead, you'd call TryDequeue to get the first object in the queue, and if it succeeds, process that object.

     

    The problem with relying on Count is that it may change between the time you check the count and the time you use the collection.


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".
    • Proposed as answer by servy42 Thursday, December 15, 2011 7:55 PM
    • Marked as answer by Thuru VMVP Friday, December 16, 2011 9:15 AM
    Thursday, December 15, 2011 7:45 PM
    Moderator
  • Yes ,

    But your second point is exactly what I'm asking. Can I rely on Count on thread while another thread is adding elements to it and another one is Dequeue that. (using TryDequeue()).

    Becasue the scenario is on 3 threads one is adding, another one is counting and another one is dequeue the elements.


    My blog : http://thuruinhttp.wordpress.com
    Friday, December 16, 2011 8:10 AM
  • I tested it with small piece of code, it works fine.
    But when I use the TPL to run those 3 threads I don't get anything in the screen, but when I run them as normal threads they run perfectly.
    What the exact different between new Thread ( () > {}) and TPL's Task.Factory.StartNew ( () => {})
    One obviouse different is that TPL enters in the threadpool where new Thread creates a thread for us.
     
    Here's the code. I get nothing on the screen for this...
     static ConcurrentQueue<string> q = new ConcurrentQueue<string>();
    
    
    static void Main(string[] args)
            {
                Task.Factory.StartNew(() =>
                    {
                        Console.WriteLine("Producer has started ...");
                        while (true)
                        {
                            Thread.Sleep(2500);
                            q.Enqueue("thuru");
                            Console.WriteLine("New string added");
                        }
                    });
                
    
    
                Task.Factory.StartNew(() =>
                    {
                        Console.WriteLine("Counter has started..");
                        while (true)
                        {
                            Thread.Sleep(1500);
                            Console.WriteLine(q.Count);
                        }
                    });
    
    
                Task.Factory.StartNew(() =>
                    {
                        Console.WriteLine("Operation Dequeue has started");
    
                        Random r = new Random();
                        string st;
    
                        while (true)
                        {
                            Thread.Sleep(r.Next(1500, 5000));
                            if (q.TryDequeue(out st))
                            {
                                Console.WriteLine("Got the the string");
                            }
                        }
    
                    });
    
                Console.ReadKey();
            }
    

    This code works fine with the expected output of the Concurrent Queue
     static ConcurrentQueue<string> q = new ConcurrentQueue<string>();
    
            static void Main(string[] args)
            {
                new Thread(() =>
                {
                    Console.WriteLine("Producer has started ...");
                    while (true)
                    {
                        Thread.Sleep(2500);
                        q.Enqueue("thuru");
                        Console.WriteLine("New string added");
                    }
                }).Start();
    
    
                new Thread(() =>
                {
                    Console.WriteLine("Counter has started..");
                    while (true)
                    {
                        Thread.Sleep(1500);
                        Console.WriteLine(q.Count);
                    }
                }).Start();
    
    
                new Thread(() =>
                {
                    Console.WriteLine("Operation Dequeue has started");
    
                    Random r = new Random();
                    string st;
    
                    while (true)
                    {
                        Thread.Sleep(r.Next(1500, 5000));
                        if (q.TryDequeue(out st))
                        {
                            Console.WriteLine("Got the the string");
                        }
                    }
                }).Start();
                
    
    
                Console.ReadKey();
            }
    

    My blog : http://thuruinhttp.wordpress.com
    Friday, December 16, 2011 9:13 AM
  • The first worked perfectly fine for me, and printed (until I hit a key to exit):

    Producer has started ...
    Counter has started..
    Operation Dequeue has started
    0
    New string added
    1
    1
    Got the the string
    New string added
    1
    Got the the string
    New string added
    1
    
    
    
    


    These versions should work effectively the same - the only difference, as you noticed, is that Task will use ThreadPool threads.

     

    In this case, however, you'd typically want to include the LongRunning hint on task creation, as they are very long living tasks...

     

    BTW - If you're making a producer/consumer type of operation, you may want to consider using BlockingCollection<T> instead.  It allows you to write much cleaner code, on both ends, ie:

     using System;
     using System.Collections.Concurrent;
     using System.Threading;
     using System.Threading.Tasks;
    
    internal class Test
    {
        private static BlockingCollection<string> q = new BlockingCollection<string>();
    
        private static void Main(string[] args)
        {
            Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("Producer has started ...");
                    Random random = new Random();
                    while (true)
                    {
                        Thread.Sleep(random.Next(2500));
                        for (int i = random.Next(5); i >= 0; --i)
                            q.Add(string.Format("thuru {0}", i)); // Add multiple items in batches
                        Console.WriteLine("New strings added");
                    }
                }, TaskCreationOptions.LongRunning);
    
            Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("Consumer has started");
    
                    foreach (var str in q.GetConsumingEnumerable())
                    {
                        Console.WriteLine("Got the string [{0}]", str);
                        Thread.Sleep(250); // Put small delay in processing to simulate work
                    }
                   
                }, TaskCreationOptions.LongRunning);
    
            Console.ReadKey();
        }
    }
    

    This is typically a better option than relying on count.  It also prevents the need for Thread.Sleep (on either end - though I have it in there to simulate work atm...)


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".
    • Marked as answer by Thuru VMVP Saturday, December 17, 2011 6:43 AM
    Friday, December 16, 2011 5:41 PM
    Moderator
  • Cool, thanks.

    Then what might be the different between the Concurrent Collections and the Blocking Collections ? I think as per the documentation the concurrent collections are supposed to be used in the parallel programming paradigm.


    My blog : http://thuruinhttp.wordpress.com
    Saturday, December 17, 2011 6:43 AM
  • Cool, thanks.

    Then what might be the different between the Concurrent Collections and the Blocking Collections ? I think as per the documentation the concurrent collections are supposed to be used in the parallel programming paradigm.


    My blog : http://thuruinhttp.wordpress.com

    BlockingCollection<T> is one of the new concurrent collections (in System.Collections.Concurrent).  By default, it uses ConcurrentQueue<T> to implement it's internal functionality.  It just provides a cleaner API specifically geared towards producer/consumer scenarios.  Basically, it's just a utility for classes that implement IProducerConsumerCollection<T> to make them easier to work with in this type of scenario.

     


    Reed Copsey, Jr. - http://reedcopsey.com
    If a post answers your question, please click "Mark As Answer" on that post and "Mark as Helpful".
    Saturday, December 17, 2011 9:16 PM
    Moderator