locked
Atomically add multiple items to a concurrent queue RRS feed

  • Question

  • Hello again :)

    while doing some stuff with a concurrent stack i found that it would be more suitable to use a queue instead, but unlike concurrent stack, there doesnt seem to be a way to add multiple items in a single operation to a queue (or a blocking collection for that matter)

    is there a reason for this? its kinda unconvenient to have to resort to locks, especially as the atomic adding of a range is avainable in the stack.. if someone knows a better way to do atomic adding of multiple items in blockingCollection or ConcurrentQueue, feel free to post:)

    Tuesday, July 20, 2010 10:53 PM

Answers

  • Hi aL3891-

    Thanks for the additional information.  We added the push/pop range functionality to ConcurrentStack and not to ConcurrentQueue not because it was any less useful on ConcurrentQueue, but because we could do so relatively easily in a lock-free manner with ConcurrentStack and it was significantly more difficult to do so with the algorithms we're employing in ConcurrentQueue; that doesn't mean we couldn't or won't do so, just that the trade-offs at the time didn't make doing so worthwhile.  Hence why I wanted to understand your use cases, to help us determine the importance of adding this feature/functionality in the future (and likely at the expense of adding something else).

    The reason I asked about removal and atomicity was wondering whether instead of having a ConcurrentQueue<T> you could have a ConcurrentQueue<T[]>, such that you'd be adding and removing batches from the collection rather than individual items.  This of course has consequences and may not be appropriate given the usage patterns in your system, but in some cases it could be a fine answer.  Note, too, that this functionality was deemed useful for ConcurrentStack<T> not so much because of the atomicity guarantees, but because it helped improve performance by reducing the number of interlocked operations that would otherwise have been required to add each item individually.

    Thanks.

     

    Saturday, July 24, 2010 12:37 AM
    Moderator
  • Hi IndranilBanerjee-

    Thanks for the suggestion.

    Regarding your code, what you're doing is ok, though you probably want to set an upper-limit on the number of items you'll put into each batch.  Otherwise, if the incoming rate is fast and long enough, you might end up always finding more data available in the TryTake, in which case you'd create one batch that would potentially require more memory than you have.  As such, you might instead want to do something like:

    int numTaken = 0;
    while(_changes.TryTake(out change))
    {
        batch.Add(change);
        if (++numTaken >= MAX_BATCH_SIZE) break;
    }
    HandleBatch(batch);

    Monday, August 16, 2010 6:58 PM
    Moderator

All replies

  • Hi aL3891-

    Can you describe more about your use case that requires this? e.g. if the items all need to be added atomically, do they also need to be removed atomically?

    Thanks.

    Wednesday, July 21, 2010 3:10 PM
    Moderator
  • sorry for not responding in a more timely manner but ive been away :)

    in my perticular case its a part of a logging infrastructure where log entries traverse a an appdomin boundery in batches, so in the reciving appdomin i'd like to add all the entries to another queue all at once.

    Beeing able to add multiple items atomically is not crucial in this perticular scenario, but it would make for a nice symetry with ConcurrentStack if that was possible :)

    For me personally, removing multiple items atomically is far less important than beeing able to add them atomically.

    Friday, July 23, 2010 5:51 PM
  • Hi aL3891-

    Thanks for the additional information.  We added the push/pop range functionality to ConcurrentStack and not to ConcurrentQueue not because it was any less useful on ConcurrentQueue, but because we could do so relatively easily in a lock-free manner with ConcurrentStack and it was significantly more difficult to do so with the algorithms we're employing in ConcurrentQueue; that doesn't mean we couldn't or won't do so, just that the trade-offs at the time didn't make doing so worthwhile.  Hence why I wanted to understand your use cases, to help us determine the importance of adding this feature/functionality in the future (and likely at the expense of adding something else).

    The reason I asked about removal and atomicity was wondering whether instead of having a ConcurrentQueue<T> you could have a ConcurrentQueue<T[]>, such that you'd be adding and removing batches from the collection rather than individual items.  This of course has consequences and may not be appropriate given the usage patterns in your system, but in some cases it could be a fine answer.  Note, too, that this functionality was deemed useful for ConcurrentStack<T> not so much because of the atomicity guarantees, but because it helped improve performance by reducing the number of interlocked operations that would otherwise have been required to add each item individually.

    Thanks.

     

    Saturday, July 24, 2010 12:37 AM
    Moderator
  • Thank you for clearing that up :) i was mostly curious of the reasons for the asymetry between queue and stack in this case and while it would be nice to have EnqueueRange, i'll do fine without it as well. i would not trade that kind of thing for a ConcurrentList<T> for example :)

    Actually the ConcurrentQueue<T[]> or some other collection of T would work pretty well in my case. this time though i used a plain old CocurrentQueue<T> and built some other buffering stuff around that since it turned out that i only need to do that kind of batch processing in a few cases and plenty of cases where it didnt matter

     

    Sunday, July 25, 2010 9:41 PM
  • Hi aL3891-

    Thanks for the ConcurrentList<T> suggestion.  What kind of functionality would you want/need exposed from such a type?  For example, in Visual C++ 2010 we have a concurrent_vector<T> type that supports concurrent additions/modifications but not concurrent removals or iteration.

    Also, can you describe more about what you're building?  Feel free to email me offline at stoub at microsoft dot com if you'd prefer not share broadly on the forum.

    Thanks!

    Sunday, July 25, 2010 10:32 PM
    Moderator
  • Hi

    It would be great to Enqueue & Dequeue batches from ConcurrentQueue. What would be really great for me is to do the same for BlockingCollections

    I have a producer which is producing large bursts of data to process. I'm doing individual Adds for each item at the producer end. I've come up with the following code at the consumer to get all available updates. Is it as correct/efficient as it could be?

    Would be very helpful if the framework code assist with this type of batch processing. It doesnt have to be perfectly atomic for me.

     

    while (true)

    {

     

    var batch = new List<Response>();

     

    var change = _changes.Take(_cancelSource.Token);

    batch.Add(change);

     

    while (_changes.TryTake(out change))

    {

    batch.Add(change);

    }

    HandleBatch(batch);

    }

    Monday, August 16, 2010 3:35 PM
  • Hi IndranilBanerjee-

    Thanks for the suggestion.

    Regarding your code, what you're doing is ok, though you probably want to set an upper-limit on the number of items you'll put into each batch.  Otherwise, if the incoming rate is fast and long enough, you might end up always finding more data available in the TryTake, in which case you'd create one batch that would potentially require more memory than you have.  As such, you might instead want to do something like:

    int numTaken = 0;
    while(_changes.TryTake(out change))
    {
        batch.Add(change);
        if (++numTaken >= MAX_BATCH_SIZE) break;
    }
    HandleBatch(batch);

    Monday, August 16, 2010 6:58 PM
    Moderator
  • Thanks. In fact I've made the BlockingCollection bounded. So that should prevent the producer from overdoing it.
    Monday, August 16, 2010 8:39 PM
  • Hello, sorry for not replying sooner

    Our application does measurements on optical components (frequency tunable lasers) and often work by doing some measurements on a physical device, doing calculations on the data recevied and then setting new currents on the device and doing new measurements. Also, alot of times the instruemnts are very slow and several measurements can be done at once.

    Mainly what i would like to do with a ConcurrentList is to index things and also add items safely. The thing that i was building, the logging infrastructure where multiple producers are logging messages and the collection cant get corrupted, only really requires a ConcurrentQueue

    However there are many instances in our application where results are produced as lists and then processed. We could use a ConcurrentQueue and then call ToArray on it, but it feel like we should use a ConcurrentList instead to avoid allocating new arrays all the time

    Again, having a ConcurrentList is not critical for us, but it would be convenient and it does feel like an indexable concurrent collection is 'missing' from the toolbox :)

    Thursday, August 19, 2010 8:07 AM
  • Thanks for the information, aL3891!
    Thursday, August 19, 2010 2:45 PM
    Moderator