Proper way to Layer my Queueing Infrastructure. RRS feed

  • Question

  • User-1204637165 posted

    Dear All,

    I have 2 micro services. One of the Microservice (Application A) has an Api endpoint that recieve messages from a partner Payment service. So my design is that once I recieve message from a payment partner. I would initiate a Queuing and pass message to that my second Micro services Application B. would be the Consumer would get the message and process it for some other background functions.

    I decided to go with this design because I want 100 % gauranttee.

    I am using RabbitMQ queuing system. In this system you would have to create a Producer and a consumer.

    Based on my design Application A is gong to be the Producer and Application B is going to be the Consumer.

    My question goes thus: Is it right for me to create the Queueing producer code from within my endpoint. Endpoint in Application A. Or is the Queueing system suppose to be a background service. I would paste what my Controller end point looks like for Application A.

    public class Controller : ApiController
       public JsonResponse  Recieve(Request model){
            //I process the request sent from payment partners and save to db.
            //Below I now go and create a Queue which I would call
            QueueProducer  queue= new QueueProducer(model);
            queue.ProcessQueue();//This message is going to process the queue.
    .........Main Code snippet from QueueProducer  below. That is what sends the Producer message to the Queue.
    public void ProcessQueue()
                using (var connection = CreateConnection())
                using (var channel = connection.CreateModel())
                    bool isopen = connection.IsOpen;
                    Console.WriteLine("Rabbit Queue connection is open "+isopen);
                    logger.Log(LogLevel.Information, "Rabbit Queue connection is open " + isopen);
                    QueueName = channel.QueueDeclare().QueueName;
                    logger.Log(LogLevel.Information, "Rabbit Queue Name " + QueueName);
                    Console.WriteLine("Rabbit Queue Name " + QueueName);
                    //channel.QueueDeclare(queue: QueueName);
                    /*channel.QueueDeclare(queue: QueueName,
                                    durable: false,
                                    exclusive: true,
                                    autoDelete: true,
                                    arguments: null);*/
                    var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
                    outstandingConfirms.TryAdd(channel.NextPublishSeqNo, settlementRequest.ToString());
                    void cleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
                        logger.Log(LogLevel.Information,"This callback is called when maessages are delivered and  confirmed for clean up");
                        logger.Log(LogLevel.Information, "QueueProducer \n Cleans out outstanding confirmations in the queue based on delivery");
                        if (multiple)
                            var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
                            foreach (var entry in confirmed)
                                outstandingConfirms.TryRemove(entry.Key, out _);
                            outstandingConfirms.TryRemove(sequenceNumber, out _);
                    channel.BasicAcks += (sender, ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                    channel.BasicNacks += (sender, ea) =>
                        outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
                        Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
                        cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                    //Convert the message to bytes
                    var bytes = ByteUtility.ObjectToByteArray(settlementRequest);
                    var body1 = bytes;
                    var timer = new Stopwatch();
                    channel.BasicPublish(exchange: "", routingKey: QueueName, basicProperties: null, body: body1);
                    channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
                    if (!WaitUntil(60, () => outstandingConfirms.IsEmpty))
                        logger.Log(LogLevel.Information, "QueueProducer \n  All messages could not be confirmed in 60 seconds");
                        throw new Exception("All messages could not be confirmed in 60 seconds");
                    logger.Log(LogLevel.Information, $"QueueProducer \n  Published messages and handled confirm asynchronously {timer.ElapsedMilliseconds:N0} ms");
                    //Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages individually in {timer.ElapsedMilliseconds:N0} ms");

    I asked this question becos while I was readding RabbitMQ documentation. I read that this block of code  


    is meant to be called once, it meant for initiating a Publisher confirmation for guarantteed delivery.

    So with the approach that I am taking. My producer code is called to action every time a request hits my end point. So that means that I would always be making subsequest connection to the Rabbitmq server and calling  the    channel.ConfirmSelect();   method.

    I dont have any other design flow in my head. So am just confused here. is there a better way to implement this.

    Sunday, May 31, 2020 2:26 PM


All replies