[Posted to wrong forum before - apologies]
Hi,
I have been stumped by this for the last week now. I have a worker role that is generating test data and sending it to Azure tables, and also an Azure queue, which is listened to by a 2nd worker role (and
only this role). There is just 1 instance of each of these roles within the current config.
I have put logging and try/catch blocks around all the queue related calls, but am observing that whilst all the Add calls seem to be processed correctly (i.e. do not throw errors), the messages are never removed on the other side. Either because they are
not there or because the GetMessage calls are failing, silently?
Complicating things, is the fact that this only seems to happen after a certain period of time and/or CPU loading has been passed. I can't easily just halt the queue arbitrarily and inspect the entire contents to see if the count matches the supposed additions.
The worker that inserts to the queue is multi-threaded, and is therefore running multiple copies of the following code to do the inserts:
try
{
lock (this)
mOutstandingQueueWrites++;
String s = String.Format("{0}\t{1}\t{2}\t{3}",
btte.BondId, (isUpdate ? "Remove" : "Add"), (btte.IsBid ? "BID" : "OFFER"), btte.TickID, DateTime.UtcNow);
CloudQueueClient cqClient = Utility.StorageAccount.CreateCloudQueueClient();
CloudQueue queue = cqClient.GetQueueReference(client.MDQueueName);
AsyncCallback pAsync = new AsyncCallback(queueMessageAdded);
queue.BeginAddMessage(new CloudQueueMessage(tickSerial), pAsync, new KeyValuePair<string, CloudQueue>(s, queue));
}
catch (Exception e)
{
Trace.TraceError("Error adding to queue: {0}", e.ToString());
}
When the async call returns, the following code is run:
private void queueMessageAdded(IAsyncResult result)
{
KeyValuePair<string, CloudQueue> state = (KeyValuePair<string, CloudQueue>)result.AsyncState;
try
{
state.Value.EndAddMessage(result);
lock (this)
{
mOutstandingQueueWrites--;
mQueueCount++;
}
Trace.TraceInformation("{0}\tComplete\t{1}", mQueueCount, state.Key);
}
catch (Exception e)
{
Trace.TraceError("Error adding to queue: {0}\r\n{1}", state.Key, e.ToString());
}
}
So given the error handling, any errors in inserting to the queue should be logged - right?
On the consumer side, a thread is running, which takes things off the queue asynchronously, thus:
private void getPricesFromQueue()
{
mDequeueCount = 0;
mCallbacksInOperation = 0;
int maxCallbacksAllowed = 12;
while (true)
{
if (insideMktsInitialised)
{
if (mCallbacksInOperation <= maxCallbacksAllowed)
{
lock (this)
{
try
{
CloudQueueClient cqClient = Utility.StorageAccount.CreateCloudQueueClient();
CloudQueue queue = cqClient.GetQueueReference(mQueueName);
if (queue.PeekMessage() != null)
{
AsyncCallback asyncCallBack = new AsyncCallback(BeginGetMessageComplete);
queue.BeginGetMessage(new TimeSpan(2, 0, 0), asyncCallBack, queue);
mCallbacksInOperation++;
}
}
catch (Exception e)
{
Trace.TraceError("Error in BeginGetMessages: {0}", e.ToString());
}
}
}
else
{
Thread.Sleep(10);
}
if (mQueueThrottle > 0)
Thread.Sleep(mQueueThrottle);
}
else
Thread.Sleep(100);
}
}
Finally, once this async call returns, I deserialize the message and process it (all within a try/catch):
private void BeginGetMessageComplete(IAsyncResult result)
{
lock (this)
{
try
{
CloudQueue queue = (CloudQueue)result.AsyncState;
CloudQueueMessage msg = queue.EndGetMessage(result);
BondTick btick = null;
if (msg != null)
{
BinaryFormatter bf = new BinaryFormatter();
try
{
//deserialise the bondtick object
btick = (BondTick)bf.Deserialize(new MemoryStream(msg.AsBytes));
//Find the InsideMarket object to which it relates
InsideMarket iMkt = insideMarkets[btick.BondId];
iMkt.ProcessPrice(btick);
.......
The call to iMkt.ProcessPrice(btick) results in a Trace to the log.
Each of my queue message object has a unique GUID, and from looking at the print statements I can see GUIDs that are added to the queue but which do not appear again on the other side, yet I do not see any errors being thrown either?!? Has anyone observed
similar behaviour, maybe relating to concurrency of the insert statements or something? I really am at my wit's end so would appreciate any help that anyone can offer!!
Other info:
Azure v1.4, VSWD Express 2010. Running against compute emulator, but pointing at real storage account.
I am really, really sure that no other consumers are present - even created a brand new storage account and run it against that - same result.
The %age of dropped messages is very small indeed, prob less than 1% (qualitatively). Again, this is indicative that it's not a rogue consumer as this would then be higher, you'd think?
Seems to work OK for a while, seems (anecdotally) more prone to breaking once the load on the consumer worker instance (in the form of another thread within that role) increases.
Have tried using synchronous calls for both Adds and Removes, but to no avail (it broke in the same way, just took longer).
Have also tried removing multiple messages from the queue at once, using BeginGetMessages(), but again, did not solve it.
Many thanks in advance for any advice.
(Seems like I might have to post a working example project, but wasn't sure if is possible, let me know if that would help..)
Oliver Hodson