Durable Queues and Persistent Messages in RabbitMQ Source

1---
2title: 'Durable Queues and Persistent Messages in RabbitMQ'
3date: '2019-05-26'
4published_at: '2019-05-26T13:58:00.001+10:00'
5tags: []
6author: 'Gavin Jackson'
7excerpt: 'I''ve been playing around with Open Source ESB and Message Queue technologies lately and I thought it might be worth sharing a post on an implementation that I''m reasonably happy with. The problem that...'
8updated_at: '2019-05-26T13:58:46.454+10:00'
9legacy_url: 'http://www.gavinj.net/2019/05/durable-exchange-with-durable-queues.html'
10---
11
12I've been playing around with Open Source ESB and Message Queue technologies lately and I thought it might be worth sharing a post on an implementation that I'm reasonably happy with.
13
14The problem that I'm trying to solve is having an underlying bus that propagates data changes across distributed apps within our organsiation.
15
16An example of this may be an update to a customer record in a CRM needs to update several other apps in the system - an Online Store (Magento).
17
18I started the journey by looking at a few Enterprise Service Bus (ESB) implementations - namely Zato and Camel (written in Python and Java respectively).
19
20The more I looked at these solutions, the more I realised that it doesn't actually reduce the complexity in the architecture (it was merely shuffling it around a little bit) - eg. you still need to write code that talks to third party API's.
21
22One benefit it does provide is a pattern that helps engineers know where their code should run within a system, and a queuing system for building a loosely coupled system that can survive components going up and down - also preserving the state of messages if the underlying bus goes down.
23
24Although I wasn't in love with the complexity of the ESB system per se, I loved the idea of using a queuing system, so started looking into Open Source implementations such as ActiveMQ and RabbitMQ.
25
26To cut a long story short, they are both very similar - but I ended up settling on RabbitMQ and wanted to share a configuration that I'm happy with.
27
28![](https://www.rabbitmq.com/img/tutorials/python-five.png)
29
30Above is the RabbitMQ topic pattern described on the rabbitmq website, it allows a producer to post to an exchange using a specific routing key, consumers then bind to the exchange based on a pattern - allowing them to selectively consume messages being posted to the exchange.
31
32Both ActiveMq and RabbitMq support a number of messaging protocols, the two I looked at included STOMP and AMQP (the latter being the default protocol used by RabbitMQ).
33
34The choice of messaging technology will determine the client libraries you need to install to talk to the messaging server. Fortunately RabbitMQ has an excellent tutorial with sample code for all of the major programming languages (including Python, PHP, Java and Go).
35
36[https://www.rabbitmq.com/getstarted.html](https://www.rabbitmq.com/getstarted.html)
37
38Out of the box, not all messaging systems guarantee that the active state of the system will come back after a reboot - that is the configuration of the queues, topics, exchanges etc. won't come up unless you mark them as durable.
39
40###  PHP Producer
41
42The following example shows how to write a hook that executes within our SuiteCRM CRM when an operator updates a contact.
43
44The excellent suitecrm developer documentation describes how to define a logic hook [https://docs.suitecrm.com/developer/logic-hooks/](https://docs.suitecrm.com/developer/logic-hooks/).
45
46The PHP code will transform the sugarbean into a JSON payload that gets posted to an AMQP *topic*.
47
48```
49 1
50 2
51 3
52 4
53 5
54 6
55 7
56 8
57 9
5810
5911
6012
6113
6214
6315
6416
6517
6618
6719
6820
6921
7022
7123
7224
7325
7426
7527
7628
7729
7830
7931
8032
8133
8234
8335
8436
8537
8638
8739
8840
8941
9042
9143
9244
9345
9446
9547
9648
9749
9850
9951
10052
101```
102
103```
104use PhpAmqpLib\Connection\AMQPStreamConnection;
105use PhpAmqpLib\Message\AMQPMessage;
106
107use SuiteCRM\Utility\BeanJsonSerializer;
108
109if (!defined('sugarEntry') || !sugarEntry) {
110    die('Not A Valid Entry Point');
111}
112
113class PushMessageHook
114{
115    public function __construct()
116    {
117    }
118
119    public function pushMessageHook()
120    {
121        $deprecatedMessage = 'PHP4 Style Constructors are deprecated and will be remove in 7.8, please update your code';
122        if (isset($GLOBALS['log'])) {
123            $GLOBALS['log']->deprecated($deprecatedMessage);
124        } else {
125            trigger_error($deprecatedMessage, E_USER_DEPRECATED);
126        }
127        self::__construct();
128    }
129
130    public function pushMessage(&$bean, $event, $arguments)
131    {
132        $mySerializer = BeanJsonSerializer::make();
133        $myJson = $mySerializer->serialize($bean);
134
135        $connection = new AMQPStreamConnection('amqp_server', 5672, 'username', 'password');
136        $channel = $connection->channel();
137
138        $channel->exchange_declare('lmap_exchange', 'topic', false, true, false);
139
140        $routing_key = 'lmap.customer_update';
141
142        $msg = new AMQPMessage($myJson,
143            array(
144                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
145            ));
146
147        $channel->basic_publish($msg, 'lmap_exchange', $routing_key);;
148
149        $channel->close();
150        $connection->close();
151    }
152
153}
154```
155
156Important things about the code snippet above:
157
158**line 37:** The second *false* passed into **exchange_declare** makes it a durable exchange (meaning it doesn't disappear on server restart). In this case, the name of the exchange is *lmap_exchange*.
159
160**line 39:** The routing key *lmap.customer_update* is used to route messages to subscribers.
161
162**line 43:** The AMQP message is constructed, note that delivery mode needs to be set to persistent, which means that the message will survive a server restart (note: this must be used in conjunction with *durable queues*).
163
164###  Python consumer
165
166Below is the consumer that takes this message asynchronously and prints out the message to the terminal, in this case, I used the Pika python client library (I modified the sample code from the rabbitmq tutorial ([https://www.rabbitmq.com/tutorials/tutorial-five-python.html](https://www.rabbitmq.com/tutorials/tutorial-five-python.html)).
167
168```
169 1
170 2
171 3
172 4
173 5
174 6
175 7
176 8
177 9
17810
17911
18012
18113
18214
18315
18416
18517
18618
18719
18820
18921
190```
191
192```
193#!/usr/bin/env python
194import pika
195import sys
196
197connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
198channel = connection.channel()
199
200channel.exchange_declare(exchange='lmap_exchange', exchange_type='topic', durable=True)
201
202result = channel.queue_declare('lmap_custUpdate', durable=True)
203
204channel.queue_bind(exchange='lmap_exchange', queue='lmap_custUpdate', routing_key='lmap.customer_update')
205
206print(' [*] Waiting for logs. To exit press CTRL+C')
207
208def callback(ch, method, properties, body):
209    print(" [x] %r:%r" % (method.routing_key, body))
210
211channel.basic_consume(queue='lmap_custUpdate', on_message_callback=callback, auto_ack=True)
212
213channel.start_consuming()
214```
215
216Important things to note about the code snippet above:
217
218**line 9: **This declares the durable exchange *lmap_exchange* of type *topic*
219
220**line 12:** This line declares the *durable queue* that will receive messages sent to the *lmap_exchange* with the routing key *lmap.customer_update* *
221
222* So, with this setup I tested firing up the producer - it worked but there were no subscribers for the routing key (which makes sense).
223
224I then started up the consumer, I could then see in the RabbitMq administrator interface that the queue was then associated with the exchange.
225
226![](/assets/imported/2019-05-26-durable-exchange-with-durable-queues/image-1.png)
227
228*Here we can see that lmap_exchange is defined as a durable (D) exchange. If it isn't durable, it doesn't survive a reboot.* *
229
230* ![](/assets/imported/2019-05-26-durable-exchange-with-durable-queues/image-2.png)
231
232*Here we can see the binding of the lmap_custUpdate Queue to the lmap.customer_update routing key.* *
233
234* ![](/assets/imported/2019-05-26-durable-exchange-with-durable-queues/image-3.png)
235
236*Here we see the lmap_custUpdate queue - note that it is durable, if this isn't set then it doesn't survive a server restart. Also note that this one has three messages in the queue (awaiting a consumer to start up).* *
237
238* Posting messages to the topic yielded the expected output on the consumer.
239
240Shutting down the consumer I could see that the queue wasn't removed (as it's durable). If it wasn't marked as durable the queue gets deleted and no messages will get routed, the consumer will start up as normal, but it won't process any of the backlog.
241
242Sending more messages from the producer started filling the queue.
243
244Starting the consumer up processed all of the backed up messages in the queue (which is what I wanted).
245
246Shutting the consumer down, sending messages into the queue and restarting the rabbitmq server and then starting the consumer process processed all of the messages that were sent in to the queue (note that if you do not explicitly mark the messages as persistent, THIS WILL DROP MESSSAGES).
247
248###  **Conclusion**
249
250In conclusion, RabbitMQ topics are a great way of routing messages to subscribers based on a routing key. You will need to set a few important options to make sure you don't accidentally lose exchanges, queues and messages (namely setting exchanges and queues to *durable*, and making sure the messages are marked as *persistent*).
251
252###  **Todo**
253
2541. Look into configuring durable exchanges and queues in rabbitmq config (as opposed to dynamically generating them from client code (should be pretty simple).
2552. Implement some security around writing and reading from queues
2563. Add some error checking in suitecrm in case the AMQP server is not running.
257
258
259