Improve the performance of your Symfony application using RabbitMQ

A real-world example

As the global demand for data increases, many businesses expose their data through APIs. Many other businesses exist simply to make use of those APIs.

My experience is with a company which falls in the latter category, and is a major consumer of social media advertising APIs.

Clients use the company’s tool to manage their advertising campaigns and reach out to a wider audience than is possible using each social network’s own advertising interface. Clients build adverts, adjust their budgets, expect insights on their campaigns’ performances and choose who they want each advert to be shown to. What’s more, they want their changes reflected in real-time (and who wouldn’t?)

Therefore every day involves making many thousands of API calls.

In the company’s early days we could get away with running a cron job every minute which selected batches of updates from the database to send to the APIs. However, as the company expanded and the client base grew, the lack of scalability for this approach began to show itself as a serious problem, and unhappy clients are non-paying clients. We needed to find a way to guarantee that any particular client’s updates wouldn’t be stuck in a queue only being processed at a rate of “a bunch every minute” when the cron job fired again. For this we decided to give RabbitMQ a run.

For a more in-depth look at this specific use-case, watch the video of my presentation:The Evolution Of A Rabbit

What does RabbitMQ do?

RabbitMQ allows you to defer processing of tasks to some other process which you create yourself. For example, if you had a Symfony action which needed to create and send hundreds of emails to a subscriber list; instead of sending the emails from the Symfony controller, you would simply inform RabbitMQ that you have several hundred emails to send and let the RabbitMQ broker take over.

How did it help improve performance?

  • When we were queueing API calls we no longer needed to write to a database queue table
  • We passed access tokens and everything else required inside the message, so we no longer needed to select anything from the database to make API calls
  • We removed all huge data read / processing spikes we had been seeing each minute when cron tasks fired
  • We were able to move all the queue processing and API calls off the client facing servers so web servers are able to concentrate on being web servers
  • The frontend application can concentrate on being a frontend application
  • We could run API calls continuously rather than trying to batch process as many as we could select from the database each minute
  • Any issues with the queue are no longer also issues with the web servers. If anything goes awry we are totally disconnected from the clients while we debug.

The RabbitMQ lifecycle

RabbitMQ is a message broker allowing you to publish messages to an exchange, which are then routed internally into message queues.

Imagine a very simple post office scenario where people who want to send a letter go directly to the post office (in a world without post boxes on the street corner) and you must go directly to the post office to collect all your mail. In this world there are no postmen either, but the post office is happy to organise and hold all your letters for you as long as you want.

  • To send you a letter, your friend walks to the post office and drops the letter into a big container. Hundreds of other people are all dropping letters off at the same time.
  • Someone at the post office takes each letter as it is dropped into the container (oldest letter first) and looks at the envelope. This worker then places each letter into individual boxes according on the address written on the envelope. The letters stay in these boxes until they are collected by you.
  • When you walk to the post office to collect your mail the post office worker gives you the box containing all the letters addressed to you. You open each letter, starting with the oldest,and you perform some action depending on the contents of the letter.

For example you might throw the letter away if it’s junk mail, you might immediately book a flight if the letter was from a long-lost relative in Australia, or you might decide to leave it until later to take action.

The same three stages exist in the RabbitMQ lifecycle:

  • Producer: This is your friend who decided to send a message to you. In your application this is most likely going to be an action in a Symfony controller, although it could also be a console command or any other place you want to send a message from.
  •  Exchange & Queue: The post office is the exchange and the post bag holding the sorted letters is the queue. The RabbitMQ server takes care of this stage for you.
  • Consumer: This is the part you played in the analogy. You collected the letters from the queue and took some action based upon what was inside the envelope.

In a live situation the consumer is most likely going to be a service running on a server somewhere. In our example we’ll create a PHP service to listen to Rabbit and process the queue.

A simple view of the lifecycle:

Using the post office analogy, here’s a simple overview of the basic RabbitMQ process. The parts in blue are those which you create (the producer and the consumer).

Setting up your own Symfony & RabbitMQ application

Here we’re going to build a simple example application which defers processing of API calls to RabbitMQ. We’re also going to allow some messages to be processed after a delay, which is useful for handling error responses and re-trying API calls later on.

Getting started:

**Using the pre-configured Vagrant VM (recommended for dev environments)**

I’ve created a sample application to get you completely up-and-running with a development server (including a working & configured RabbitMQ implementation) with just a few keystrokes. You can find it here, along with instructions.

Once you have this downloaded and running you can skip ahead to the next section *Configuring your Symfony application to talk to RabbitMQ*.

**Doing it yourself (the purist way)**

If you choose not to use the Vagrant box I’ve provided, you can install RabbitMQ yourself using the installation guides

The default RabbitMQ username:password is guest:guest.

In my example I’ve created an administrative user with the username:password combination symfony:rabbit and removed the guest account. The benefit of this is that you can log in to the RabbitMQ admin panel with the new account, which the guest user is not allowed to do by default.

You can achieve the same by running these commands in the terminal:

Configuring your Symfony application to talk to RabbitMQ

The first thing to do is include some dependencies. We’re going to use the excellentOldsound RabbitMQ Bundle to do the hard work.

So first add the bundle to the project via composer:

Then register the bundle in AppKernel.php

Next we need to add some config options so that Symfony knows about the RabbitMQ connection

And add the associated parameters we’ve just referenced in config.yml.

Sending messages to RabbitMQ

In order to send a message to RabbitMQ we need to create a producer class which will serve as our model’s method of communicating with the RabbitMQ server. In this example I’ve just created a simple class inside AppBundle. The constructor takes a single argument – the Oldsound RabbitMQ producer – which we’ll configure in services.yml.

Update services.yml to configure the producer we just created as an application-wide service, remembering to pass the Oldsound RabbitMQ producer as an argument.

Notice the ‘type’ parameter on the exchange_options element. There are a few different types of exchange available to you in RabbitMQ. Most use-cases are likely to require a ‘direct’ type, where the exchange routes each message to one queue only (like the post office worker putting each letter directly into one box).

Another common exchange type is ‘fanout’, where the exchange puts a copy of each message into every queue that is bound to it(ie every queue that it knows about). This is like the post office worker opening every letter as it comes to him, photocopying it and then putting a single copy into every box at the post office.

There are also ‘topic’ exchanges and ‘headers’ exchanges. Read the manual for a more in-depth discussion of exchanges.

You can see that I also added a the routing key `statuses/update` to the queue config. This isn’t strictly necessary for a simple queue application, but it’s very useful if we want to handle API-call errors (like rate limits) later on [see Re-queueing messages to be consumed later](#requeuing)).

Now we have everything in place for the producer we can create a controller to send messages to RabbitMQ. In this simple example we’ll be sending a bunch of JSON encoded objects to our producer, which in turn serialises each string and passes it on to the RabbitMQ broker.

Navigating to /produce in your browser will create a load of messages in the `statuses/update` queue, which you can check in the RabbitMQ admin panel (at [http://dev.symfony-rabbit:15672/#/queues](http://dev.symfony-rabbit:15672/#/queues) if you’re using the Vagrant machine provided, or at [http://localhost:15672/#/queues](http://localhost:15672/#/queues) if you manually configured RabbitMQ yourself).

Consuming messages

So now we’re halfway there! We’ve got our messages in the queue waiting to be dealt with. Our frontend application no longer cares about processing repetitive or mundane tasks and our clients won’t have to experience pages slowing down as controllers deal with making API calls or sending emails.

Our work is waiting to be done on another server. We can move that queue to another machine, scale it, and build clusters, all completely independently of our web application.

The next thing to do is build a consumer which will deal with performing the donkey work of processing each message. The consumer won’t run inside the client-facing part of the application. In this case we’ll create a Symfony console application, but the popularity and flexibility of AMQP (the protocol implemented by RabbitMQ) means there are a [whole bunch of languages (https://www.rabbitmq.com/devtools.html) you can use. In our real-world example we chose to use Node.js.

As before we need to configure an application-wide service, this time pointing to the consumer.

And finally, OldsoundRabbitMQBundle needs to know about the consumer class too

The `callback` option on the `make_api_call` consumer we just configured points to the `make_api_call` service we added to services.yml. OldsoundRabbitMQBundle will call whichever service you name here as the handler for any messages it consumes. It will pass the message as the argument for the consumer’s execute() method.

Also pay attention to the exchange and queue options configured on the consumer. They must match the options the queue is created with by the producer, otherwise RabbitMQ will begin reporting error messages.

We can now begin consuming messages from a console window by running

The `make_api_call` option refers to the name of the consumer we just configured in config.yml

So now we’re able to create and consume messages. The workload of the application is removed from anything which is client-facing and your boss gives you a pay-rise. Time to rest on your laurels knowing your work is done and you’re instantly more appealing to everyone in the office. Your work here is done, right?

Well, actually no. Now you’re consuming messages so fast and your processing has sped up to such an amount that you’re able to make hundreds of API calls per minute, your company grows and you take on more clients, all of whom have even more API updates to process.

It’s only a matter of time until you start hitting rate limits, and when you do you need to handle them gracefully.

There is no in-built method to tell RabbitMQ to delay delivering a message (we exhausted Google’s entire archive of articles when finding this out for ourselves), but luckily there is such a thing in RabbitMQ as a ‘dead letter exchange’ option which we can take advantage of.

The dead letter exchange (DLX) option tells RabbitMQ that when a message has expired or is rejected it should be sent to whichever exchange and queue is named as the DLX recipient. This means that if we set a TTL (time to live) and a dead letter exchange option on our messages, we can set a delay on when our messages will be processed by first placing them in a different queue with no consumers listening on it.

The workflow is therefore:

  •  Put messages with a specific TTL into a queue which has no consumers.
  • Specify the dead letter exchange and routing key to point to a queue which does have a consumer.
  • When RabbitMQ detects the TTL has expired on each message, it will move it into the queue specified by the dead letter option.
  • Your consumer will process the newly moved messages as normal.

Using the post office analogy, this would be the same as the post office worker putting your letters into his own box for a certain amount of time but without actually opening the messages or reading their contents. A little while later he takes the messages out of his box and puts them into your box instead.

So to implement this ourselves let’s first add a `delay_queue` service to our Symfony application:

Remember that the messages in this queue will never be consumed, we’re just relying on RabbitMQ to move each message back into a consumed queue once it has expired. So all we need now is to tell Oldsound RabbitMQ about it.

Notice that we’ve created a new direct exchange and we’ve named this new queue `statuses/update/delay`. We’ve also got some extra arguments set on this queue that we haven’t set on the previous producer’s queue. The `x-message-ttl` is the TTL for each message that is put into the queue, measured in milliseconds. The example value here equates to 20 seconds. In a real-world example you are likely to want to set longer TTLs, especially if you need to requeue to wait for a rate limit error to expire.

The `x-dead-letter-exchange` option tells RabbitMQ that messages which expire in the delay queue should be sent to the `api_call` exchange, and the `x-dead-letter-routing-key` corresponds to the `routing-keys` option we set on the first producer.

With all this set we now know that any messages we place in the `statuses/update/delay` queue will not be consumed directly, but after 20 seconds will be put into the `statuses/update` queue and consumed from there instead.

Let’s add an action to test this:

Now when running the consumer task in the console and navigating to /delay in the browser you should notice that instead of being consumed immediately there is a delay of 20 seconds before the consumer displays the messages.

Other messages added to the `statuses/update` queue in the meantime will still be consumed immediately. It’s important to note that each message in the queue must have an identical TTL, otherwise RabbitMQ starts throwing errors when you try to publish messages. If you need to delay messages by different amounts you should create separate queues for those (ie: create a 5 minute queue and a 10 minute queue, etc).

In our real-world scenario we found that we needed to set different TTLs on messages depending on the reason for re-queuing.

For example in case of an API rate limit error we will put the message into a queue with a 15-minute TTL, for some 5xx errors we assume there is a temporary glitch on whichever API server we are calling and re-queue after a delay of 30 seconds. For all other error responses we re-try after 5 minutes. For each of these scenarios we have a different delay queue configured, which means at any point we can see how many messages have been re-queued for consumption after a 30s, 5m or 15m delay. We can also add an incrementor to the message object if we like, so we can change the re-queue behaviour (or flag an issue) after a certain number of attempts.

Please have a play with the example application to see Symfony and RabbitMQ in action. Pull requests are welcome, so feel free to modify and improve the code. Also, please let us know in the comments if RabbitMQ has helped you improve your Symfony application’s performance or user experience in any other way :).

Further reading & references: