locked
Azure Queue messages appear to be going missing RRS feed

  • Question

  • [Posted to wrong forum before - apologies]

    Hi,

    I have been stumped by this for the last week now. I have a worker role that is generating test data and sending it to Azure tables, and also an Azure queue, which is listened to by a 2nd worker role (and only this role). There is just 1 instance of each of these roles within the current config.

    I have put logging and try/catch blocks around all the queue related calls, but am observing that whilst all the Add calls seem to be processed correctly (i.e. do not throw errors), the messages are never removed on the other side. Either because they are not there or because the GetMessage calls are failing, silently?

    Complicating things, is the fact that this only seems to happen after a certain period of time and/or CPU loading has been passed. I can't easily just halt the queue arbitrarily and inspect the entire contents to see if the count matches the supposed additions.

    The worker that inserts to the queue is multi-threaded, and is therefore running multiple copies of the following code to do the inserts:

       try
       {
        lock (this)
        mOutstandingQueueWrites++;
    
        String s = String.Format("{0}\t{1}\t{2}\t{3}",
        btte.BondId, (isUpdate ? "Remove" : "Add"), (btte.IsBid ? "BID" : "OFFER"), btte.TickID, DateTime.UtcNow);
    
        CloudQueueClient cqClient = Utility.StorageAccount.CreateCloudQueueClient();
        CloudQueue queue = cqClient.GetQueueReference(client.MDQueueName);
    
        AsyncCallback pAsync = new AsyncCallback(queueMessageAdded);
        queue.BeginAddMessage(new CloudQueueMessage(tickSerial), pAsync, new KeyValuePair<string, CloudQueue>(s, queue));
       }
       catch (Exception e)
       {
        Trace.TraceError("Error adding to queue: {0}", e.ToString());
       }
    
    

    When the async call returns, the following code is run:

     private void queueMessageAdded(IAsyncResult result)
     {
      KeyValuePair<string, CloudQueue> state = (KeyValuePair<string, CloudQueue>)result.AsyncState; 
    
      try
      { 
      state.Value.EndAddMessage(result);
    
      lock (this)
      {
       mOutstandingQueueWrites--;
       mQueueCount++;
      }
    
      Trace.TraceInformation("{0}\tComplete\t{1}", mQueueCount, state.Key);
      }
      catch (Exception e)
      {
      Trace.TraceError("Error adding to queue: {0}\r\n{1}", state.Key, e.ToString());
      }
     }
    
    
    

    So given the error handling, any errors in inserting to the queue should be logged - right?

     

    On the consumer side, a thread is running, which takes things off the queue asynchronously, thus:

     private void getPricesFromQueue()
     {
      mDequeueCount = 0;
      mCallbacksInOperation = 0;
      int maxCallbacksAllowed = 12;
    
      while (true)
      {
      if (insideMktsInitialised)
      {
       if (mCallbacksInOperation <= maxCallbacksAllowed)
       {
       lock (this)
       {
        try
        {
        CloudQueueClient cqClient = Utility.StorageAccount.CreateCloudQueueClient();
        CloudQueue queue = cqClient.GetQueueReference(mQueueName);
    
        if (queue.PeekMessage() != null)
        {
         AsyncCallback asyncCallBack = new AsyncCallback(BeginGetMessageComplete); 
         queue.BeginGetMessage(new TimeSpan(2, 0, 0), asyncCallBack, queue);
         mCallbacksInOperation++;
        }
        }
        catch (Exception e)
        {
        Trace.TraceError("Error in BeginGetMessages: {0}", e.ToString());
        }
       }
       }
       else
       {
       Thread.Sleep(10);
       }
       if (mQueueThrottle > 0)
       Thread.Sleep(mQueueThrottle);
      }
      else
       Thread.Sleep(100);
      }
     }
    
    

    Finally, once this async call returns, I deserialize the message and process it (all within a try/catch):

     private void BeginGetMessageComplete(IAsyncResult result)
     {
      lock (this)
      {
      try
      {
       CloudQueue queue = (CloudQueue)result.AsyncState;
       CloudQueueMessage msg = queue.EndGetMessage(result);
       BondTick btick = null;
    
       if (msg != null)
       {
       BinaryFormatter bf = new BinaryFormatter();
       try
       {
        //deserialise the bondtick object 
        btick = (BondTick)bf.Deserialize(new MemoryStream(msg.AsBytes));
    
        //Find the InsideMarket object to which it relates
        InsideMarket iMkt = insideMarkets[btick.BondId];
        iMkt.ProcessPrice(btick);
    
    .......
    
    
    
    
    
    
    
    

    The call to iMkt.ProcessPrice(btick) results in a Trace to the log.

    Each of my queue message object has a unique GUID, and from looking at the print statements I can see GUIDs that are added to the queue but which do not appear again on the other side, yet I do not see any errors being thrown either?!? Has anyone observed similar behaviour, maybe relating to concurrency of the insert statements or something? I really am at my wit's end so would appreciate any help that anyone can offer!!

    Other info:

    Azure v1.4, VSWD Express 2010. Running against compute emulator, but pointing at real storage account.

    I am really, really sure that no other consumers are present - even created a brand new storage account and run it against that - same result.

    The %age of dropped messages is very small indeed, prob less than 1% (qualitatively). Again, this is indicative that it's not a rogue consumer as this would then be higher, you'd think?

    Seems to work OK for a while, seems (anecdotally) more prone to breaking once the load on the consumer worker instance (in the form of another thread within that role) increases.

    Have tried using synchronous calls for both Adds and Removes, but to no avail (it broke in the same way, just took longer).

    Have also tried removing multiple messages from the queue at once, using BeginGetMessages(), but again, did not solve it.

     

    Many thanks in advance for any advice.

    (Seems like I might have to post a working example project, but wasn't sure if is possible, let me know if that would help..)

    Oliver Hodson

     




    • Edited by oh8777 Tuesday, September 6, 2011 1:38 PM
    Thursday, September 1, 2011 5:01 PM

Answers

  • From the behavior you describe, it sounds like the messages are getting enqueued properly (since things work if you just dequeue in a loop), but some of them are not getting processed from the consumer once other threads are spun up.

    If I were you, I'd start simplifying the logic as much as possible (while still reproducing the problem). You've already isolated that turning off the other threads makes the problem going away. I wonder now if we can simplify what those threads are doing and see if there's something more specific that's causing the behavior you're seeing.

    If you can boil it down into a small application you can share, I'd be happy to take a look. My email address is Steve.Marx@microsoft.com.

    • Marked as answer by Brad Calder Monday, March 12, 2012 9:18 AM
    Wednesday, September 7, 2011 12:31 AM

All replies

  • I've definitely never heard of something like this. (If the call to enqueue the message succeeds, the message should definitely be there. By default, messages stay on the queue for 7 days, so assuming you're trying to dequeue sooner than that, you should receive the message at the other end.)

    It would indeed help if you could post a complete sample that reproduces the issue. I know that may be tough to do, but I'm not sure how else to investigate.

    Thursday, September 1, 2011 10:24 PM
  • I had a similar issue where items disappeared and there was no errors.  I was also generating test data and mass adding data to the queue mine was for indexing purposes.  One thing I did notice though it only failed when I was adding data rapidly.

    Unfortunatly I didn't figure out exactly what was going wrong I actually moved to using a SQL Azure table for the queue as I needed the items in order anyway which the queue dosn't support (contradicts a queue in my eyes should be called a FIRO instead)

     

    Friday, September 2, 2011 1:33 PM
  • What's the R in FIRO?

    Windows Azure queues are pretty much as FIFO as can be achieved in a distributed environment. No strict guarantees on ordering, mostly because of visibility timeouts and messages reappearing after failures in processing, but it's basically FIFO. (We usually say "best-effort FIFO.")

    Friday, September 2, 2011 6:01 PM
  • What's the R in FIRO?

    Windows Azure queues are pretty much as FIFO as can be achieved in a distributed environment. No strict guarantees on ordering, mostly because of visibility timeouts and messages reappearing after failures in processing, but it's basically FIFO. (We usually say "best-effort FIFO.")

    Firt In Random Out.  As you say it is a best effort but if it's not guarenteed so I have to treat it as random while coding.  But like the OP said data does seem to go missing when it's under load.  I never followed this up as mentioned due to my FIFO requirment.  I actually only used SQL as I needed the request speed but a Table would probably work for a true FIFO queue but you would need to use the new caching system to keep track of the next ID and the ID of last poped.

    Monday, September 5, 2011 11:28 AM
  • Hi Steve,

    Thanks for your response. I had a hunch that the issue related somehow to my multithreading, so I rewrote the producer/consumer modules from scratch using a different concurrency approach.

    In the limited testing that I have done so far, this appears to have corrected the issue. Will post a follow up if that turns out not to be the case.

    Thanks again

    Monday, September 5, 2011 4:45 PM
  • Hi Steve,

    Thanks for your response. I had a hunch that the issue related somehow to my multithreading, so I rewrote the producer/consumer modules from scratch using a different concurrency approach.

    In the limited testing that I have done so far, this appears to have corrected the issue. Will post a follow up if that turns out not to be the case.

    Thanks again

     

    One thing I should have mentioned using multiple operations on the same data context don't seem to work correctly I had to make one per thread.  But I still did get the problem with items disapperaing

     

    Tuesday, September 6, 2011 10:32 AM
  • Hi,

    Actually the problem has reoccurred - despite having entirely rewritten the code which enqueues/dequeues the items.

    I am now convinced that it is a problem with the role that is doing the dequeuing. I believe this because this code runs in a worker role which does 2 things:

    1) Dequeues and processes queue items

    2) Services client requests from a web role, providing them with data that it has taken from the queue.

    From startup, the worker does job 1), and it does this without dropping any queue messages. I can leave it running for 30 minutes+ and this remains the case. However within seconds of a client request being added, messages start going 'missing'. So it's not until another thread runs within the consumer worker role, that things start to go wrong - hence I believe that it's not an issue putting things onto the queue, but in removing them in a heavy-load situation when other threads are running too.

    The consumer queue-polling code now looks as follows:

            //The main price gathering method (new version w own threading)
            private void getPricesFromQueue()
            {
                mDequeueCount = 0;
                mCallbacksInOperation = 0;
                int pMaxCallbacksAllowed = 12;
    
                while (true)
                {
                    if (insideMktsInitialised)
                    {
                        if (mCallbacksInOperation <= pMaxCallbacksAllowed)
                        {
                            TickItemProcessor tiProc = new TickItemProcessor(mQueueName, insideMarkets, new AsyncCallback(QueueUpdateCompleted));
                            Thread tiProcThread = new Thread(new ThreadStart(tiProc.ProcessTick));
    
                            lock (this)
                                mCallbacksInOperation++;
    
                            tiProcThread.Start();
                        }
                    }
                }
            }
    
            //Delegate function called by our TickItemProcessor thread once it has completed 
            private void QueueUpdateCompleted(IAsyncResult result)
            {
                lock (this)
                {
                    mCallbacksInOperation--;
                    mDequeueCount++;
                }
            }
    


    As you can see, it uses the following separate class, TickItemProcessor to do the work of retrieving, processing and deleting the QueueMessages:

        internal class TickItemProcessor
        {
            CloudQueue mQueue;
            ConcurrentDictionary<int, InsideMarket> mInsideMarkets;
            AsyncCallback mCallBackDelegate;
    
            public TickItemProcessor(string pQueueName, ConcurrentDictionary<int, InsideMarket> pInsideMarkets, AsyncCallback pCallBackDelegate)
            {
                CloudQueueClient cqc = Utility.StorageAccount.CreateCloudQueueClient();
                mQueue = cqc.GetQueueReference(pQueueName);
                mInsideMarkets = pInsideMarkets;
                mCallBackDelegate = pCallBackDelegate;
            }
    
            public void ProcessTick()
            {
                BondTick btick = null;
    
                try
                {
                    CloudQueueMessage msg = mQueue.GetMessage();
    
                    if (msg != null)
                    {
                        BinaryFormatter bf = new BinaryFormatter();
                        btick = (BondTick)bf.Deserialize(new MemoryStream(msg.AsBytes));
    
                        //Find the InsideMarket object to which it relates
                        InsideMarket iMkt = mInsideMarkets[btick.BondId];
                        Trace.TraceInformation(DateTime.UtcNow.ToString() + "\t" + btick.ToString() + "\t" + iMkt.BidCount + "\t" + iMkt.OfferCount);
    
                        iMkt.ProcessPrice(btick);
    
                        List<InsideMarket> dupes = mInsideMarkets.Values.Where(e => e.BidCount > 2 || e.OfferCount > 2).ToList();
                        if (dupes.Count > 0)
                        {
                            InsideMarket worstOffender = dupes.OrderByDescending(e => Math.Max(e.BidCount, e.OfferCount)).FirstOrDefault();
                            Trace.TraceWarning("Warning: {0} bondIds have more than 2 bid or 2 offer. Max={1} (ID {2})",
                                dupes.Count, Math.Max(worstOffender.BidCount, worstOffender.OfferCount), worstOffender.BondID);
                        }
    
                        mQueue.DeleteMessage(msg);
                    }
                }
                catch (Exception ex)
                {
                    Trace.TraceError("Error in ProcessTick(): {0} | {1}", ex.ToString(), btick.ToString());
                }
                finally
                {
                    //Call back to the caller to tell it we're done
                    mCallBackDelegate.Invoke(null);
                }
            }
        }
    
    

     

    Meantime, there are other methods that access the "InsideMarkets" object, to return data to clients via a webservice. These are called via an internal endpoint, and I have added lock statements around all of them to eliminate the possibility that it's some race condition.

    I have also tried it using the built-in asynchronous functions BeginGetMessage(s) etc, and seen the exact same behaviour. Grateful for any further pointers, or an explanation of how I can get a working sample project over to you MS guys?

    Many thanks again

     

    Tuesday, September 6, 2011 12:41 PM
  • From startup, the worker does job 1), and it does this without dropping any queue messages. I can leave it running for 30 minutes+ and this remains the case. However within seconds of a client request being added, messages start going 'missing'. So it's not until another thread runs within the consumer worker role, that things start to go wrong - hence I believe that it's not an issue putting things onto the queue, but in removing them in a heavy-load situation when other threads are running too.


    This is similar to the senario I had I was pusing data from 100+ threads simulating "users" and I had the queue dequeing 32 items at once and processing the messages then deleting them my deletions where happening in a seperate thread from the dequeuing as well. 

    But I had a fixed amount of rows that should have been in my results after processing the queue and this was random in the amount of items acutally processed.  I am wondering if you are also getting a random amount of processed results as well.

    One thing I think it also important to mention nothing else was dequeing and there was no errors in my process.   When examining the queue the day after it would be empty.

    I moved on from this as it didn't suit my scenario but I do think there is somthing going wrong somewhere.

    Tuesday, September 6, 2011 2:34 PM
  • How can you tell some messages aren't getting processed? Is it because the trace messages don't show up, or something else?

    (I wonder if the trace message is simply getting lost, so I wonder if there's some other evidence that queue messages aren't being processed.)

    Tuesday, September 6, 2011 3:27 PM
  • Hi Steve/Dream Walker,

    My symptoms are very similar to Dream Walker's description - I am getting a random number of results coming through - the majority of them do come through and are processed correctly, but a random %age do not.

    To answer your question Steve, I know that the messages are not processed because the messages represent price updates either to be added or deleted from another structure. Prior to updating a price, the producer worker sends a deletion message for the previous price that it had created.

    Therefore at any point in time, there should only be 1 price for a given product. However, whilst I see this when nothing is subscribing to the prices, as soon as somthing is, the number of prices stored for a given product starts to increase. (I can't just disregard previous prices once I go to PRD for other buisiness-related reasons).

    Besides that, even a print stmt immediately after dequeuing reveals that certain msgs never make it to the subscriber.

     

    Tuesday, September 6, 2011 4:59 PM
  • From the behavior you describe, it sounds like the messages are getting enqueued properly (since things work if you just dequeue in a loop), but some of them are not getting processed from the consumer once other threads are spun up.

    If I were you, I'd start simplifying the logic as much as possible (while still reproducing the problem). You've already isolated that turning off the other threads makes the problem going away. I wonder now if we can simplify what those threads are doing and see if there's something more specific that's causing the behavior you're seeing.

    If you can boil it down into a small application you can share, I'd be happy to take a look. My email address is Steve.Marx@microsoft.com.

    • Marked as answer by Brad Calder Monday, March 12, 2012 9:18 AM
    Wednesday, September 7, 2011 12:31 AM