Durable Queues and Persistent Messages in RabbitMQ Source
Markdown source
1---2title: 'Durable Queues and Persistent Messages in RabbitMQ'3date: '2019-05-26'4published_at: '2019-05-26T13:58:00.001+10:00'5tags: []6author: 'Gavin Jackson'7excerpt: 'I''ve been playing around with Open Source ESB and Message Queue technologies lately and I thought it might be worth sharing a post on an implementation that I''m reasonably happy with. The problem that...'8updated_at: '2019-05-26T13:58:46.454+10:00'9legacy_url: 'http://www.gavinj.net/2019/05/durable-exchange-with-durable-queues.html'10---1112I've been playing around with Open Source ESB and Message Queue technologies lately and I thought it might be worth sharing a post on an implementation that I'm reasonably happy with.1314The problem that I'm trying to solve is having an underlying bus that propagates data changes across distributed apps within our organsiation.1516An example of this may be an update to a customer record in a CRM needs to update several other apps in the system - an Online Store (Magento).1718I started the journey by looking at a few Enterprise Service Bus (ESB) implementations - namely Zato and Camel (written in Python and Java respectively).1920The more I looked at these solutions, the more I realised that it doesn't actually reduce the complexity in the architecture (it was merely shuffling it around a little bit) - eg. you still need to write code that talks to third party API's.2122One benefit it does provide is a pattern that helps engineers know where their code should run within a system, and a queuing system for building a loosely coupled system that can survive components going up and down - also preserving the state of messages if the underlying bus goes down.2324Although I wasn't in love with the complexity of the ESB system per se, I loved the idea of using a queuing system, so started looking into Open Source implementations such as ActiveMQ and RabbitMQ.2526To cut a long story short, they are both very similar - but I ended up settling on RabbitMQ and wanted to share a configuration that I'm happy with.27282930Above is the RabbitMQ topic pattern described on the rabbitmq website, it allows a producer to post to an exchange using a specific routing key, consumers then bind to the exchange based on a pattern - allowing them to selectively consume messages being posted to the exchange.3132Both ActiveMq and RabbitMq support a number of messaging protocols, the two I looked at included STOMP and AMQP (the latter being the default protocol used by RabbitMQ).3334The choice of messaging technology will determine the client libraries you need to install to talk to the messaging server. Fortunately RabbitMQ has an excellent tutorial with sample code for all of the major programming languages (including Python, PHP, Java and Go).3536[https://www.rabbitmq.com/getstarted.html](https://www.rabbitmq.com/getstarted.html)3738Out of the box, not all messaging systems guarantee that the active state of the system will come back after a reboot - that is the configuration of the queues, topics, exchanges etc. won't come up unless you mark them as durable.3940### PHP Producer4142The following example shows how to write a hook that executes within our SuiteCRM CRM when an operator updates a contact.4344The excellent suitecrm developer documentation describes how to define a logic hook [https://docs.suitecrm.com/developer/logic-hooks/](https://docs.suitecrm.com/developer/logic-hooks/).4546The PHP code will transform the sugarbean into a JSON payload that gets posted to an AMQP *topic*.4748```49 150 251 352 453 554 655 756 857 958105911601261136214631564166517661867196820692170227123722473257426752776287729783079318032813382348335843685378638873988408941904291439244934594469547964897499850995110052101```102103```104use PhpAmqpLib\Connection\AMQPStreamConnection;105use PhpAmqpLib\Message\AMQPMessage;106107use SuiteCRM\Utility\BeanJsonSerializer;108109if (!defined('sugarEntry') || !sugarEntry) {110 die('Not A Valid Entry Point');111}112113class PushMessageHook114{115 public function __construct()116 {117 }118119 public function pushMessageHook()120 {121 $deprecatedMessage = 'PHP4 Style Constructors are deprecated and will be remove in 7.8, please update your code';122 if (isset($GLOBALS['log'])) {123 $GLOBALS['log']->deprecated($deprecatedMessage);124 } else {125 trigger_error($deprecatedMessage, E_USER_DEPRECATED);126 }127 self::__construct();128 }129130 public function pushMessage(&$bean, $event, $arguments)131 {132 $mySerializer = BeanJsonSerializer::make();133 $myJson = $mySerializer->serialize($bean);134135 $connection = new AMQPStreamConnection('amqp_server', 5672, 'username', 'password');136 $channel = $connection->channel();137138 $channel->exchange_declare('lmap_exchange', 'topic', false, true, false);139140 $routing_key = 'lmap.customer_update';141142 $msg = new AMQPMessage($myJson,143 array(144 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT145 ));146147 $channel->basic_publish($msg, 'lmap_exchange', $routing_key);;148149 $channel->close();150 $connection->close();151 }152153}154```155156Important things about the code snippet above:157158**line 37:** The second *false* passed into **exchange_declare** makes it a durable exchange (meaning it doesn't disappear on server restart). In this case, the name of the exchange is *lmap_exchange*.159160**line 39:** The routing key *lmap.customer_update* is used to route messages to subscribers.161162**line 43:** The AMQP message is constructed, note that delivery mode needs to be set to persistent, which means that the message will survive a server restart (note: this must be used in conjunction with *durable queues*).163164### Python consumer165166Below is the consumer that takes this message asynchronously and prints out the message to the terminal, in this case, I used the Pika python client library (I modified the sample code from the rabbitmq tutorial ([https://www.rabbitmq.com/tutorials/tutorial-five-python.html](https://www.rabbitmq.com/tutorials/tutorial-five-python.html)).167168```169 1170 2171 3172 4173 5174 6175 7176 8177 9178101791118012181131821418315184161851718618187191882018921190```191192```193#!/usr/bin/env python194import pika195import sys196197connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))198channel = connection.channel()199200channel.exchange_declare(exchange='lmap_exchange', exchange_type='topic', durable=True)201202result = channel.queue_declare('lmap_custUpdate', durable=True)203204channel.queue_bind(exchange='lmap_exchange', queue='lmap_custUpdate', routing_key='lmap.customer_update')205206print(' [*] Waiting for logs. To exit press CTRL+C')207208def callback(ch, method, properties, body):209 print(" [x] %r:%r" % (method.routing_key, body))210211channel.basic_consume(queue='lmap_custUpdate', on_message_callback=callback, auto_ack=True)212213channel.start_consuming()214```215216Important things to note about the code snippet above:217218**line 9: **This declares the durable exchange *lmap_exchange* of type *topic*219220**line 12:** This line declares the *durable queue* that will receive messages sent to the *lmap_exchange* with the routing key *lmap.customer_update* *221222* So, with this setup I tested firing up the producer - it worked but there were no subscribers for the routing key (which makes sense).223224I then started up the consumer, I could then see in the RabbitMq administrator interface that the queue was then associated with the exchange.225226227228*Here we can see that lmap_exchange is defined as a durable (D) exchange. If it isn't durable, it doesn't survive a reboot.* *229230* 231232*Here we can see the binding of the lmap_custUpdate Queue to the lmap.customer_update routing key.* *233234* 235236*Here we see the lmap_custUpdate queue - note that it is durable, if this isn't set then it doesn't survive a server restart. Also note that this one has three messages in the queue (awaiting a consumer to start up).* *237238* Posting messages to the topic yielded the expected output on the consumer.239240Shutting down the consumer I could see that the queue wasn't removed (as it's durable). If it wasn't marked as durable the queue gets deleted and no messages will get routed, the consumer will start up as normal, but it won't process any of the backlog.241242Sending more messages from the producer started filling the queue.243244Starting the consumer up processed all of the backed up messages in the queue (which is what I wanted).245246Shutting the consumer down, sending messages into the queue and restarting the rabbitmq server and then starting the consumer process processed all of the messages that were sent in to the queue (note that if you do not explicitly mark the messages as persistent, THIS WILL DROP MESSSAGES).247248### **Conclusion**249250In conclusion, RabbitMQ topics are a great way of routing messages to subscribers based on a routing key. You will need to set a few important options to make sure you don't accidentally lose exchanges, queues and messages (namely setting exchanges and queues to *durable*, and making sure the messages are marked as *persistent*).251252### **Todo**2532541. Look into configuring durable exchanges and queues in rabbitmq config (as opposed to dynamically generating them from client code (should be pretty simple).2552. Implement some security around writing and reading from queues2563. Add some error checking in suitecrm in case the AMQP server is not running.257258259