Fighting deadlocks with RabbitMQ

Matt Chad
6 min readSep 12, 2019

A non-related intro

It was a beautiful day, the sun was shining directly on my screen through the window so I have to shut the curtains. Sometimes when I feel stuck I do watch poor drivers fighting for each square inch of the trapezium-shaped parking lot. Another time there is an ambulance locked behind parking barrier desperately roaring with horns like wild wolf caught in a trap. This reminds me of my e-commerce project based on Magento platform where we with my great team have encountered many performance related challenges. One of these challenges were…

Deadlocks.

You can ignore them, sweep them under the carpet, unless you are expecting a high traffic. What are the most common results of taking bad decisions in the e-commerce world? Lost orders, lost clients, lost money. None of these are acceptable and are as deadly as the cancer. They occurred when application increased requests to MySQL and database tables were locked waiting for previous transactions to complete. Table locking is a very clever protection against data overwriting by concurrent queries executed in nearly exact time. Unfortunately, like everything what civilization has invented it is limited to a few boundaries. And here you can’t request infinite amount of queries to be executed before others are completed. MySQL engine InnoDB enqueues every query that then tries to claim a lock on the table which finally leads to sequential execution. One query after another. Like in a traffic jam with just one lane, each vehicle crawls after the slowest truck. More cars means more time required for the last car to drive along the lane. The only difference is that every query that waits to long in a database queue is dropped out. Of course there is a configuration variable wait_timeout, but increasing it too much is going to increase risk of locking the whole database and only sudo /etc/init.d/mysql start can bring it back from this fatal state.

I used to worry that I can’t get no sleep, can’t find a real solution. Insomnia, please release me. I was in that state for two weeks. Then the final solution descended on me from heaven.

Ingredients

PHP 7.2
RabbitMQ
MySQL
and your favorite IDE

Feel free to go with any language and SQL database. An alternative for RabbitMQ is Apache Kafka which is also based on AMQP protocol.

Implementation

I’m using amqplib client for PHP. You can obtain it via composer as it’s described in this github repository. Same name is given to the libs for other languages and platforms like Python or Node.js.
One queue for messages with incoming data, one dead letter (DLX) queue and optionally an error queue which could be replaced just with file logger. I do prefer error queue because this gives me much more flexibility when it comes to dealing with exceptions. There could be also another worker suited just for processing failures and e.g. sending notifications on administrator email box and another one for transferring exceptions to external logs storage.

It’s like baking a cake — holding it for too long in the oven wont give you better results. Likewise with database configuration. It is impossible to extend forever lock-wait timeouts because the engine will get jammed by incoming traffic.

Example code

The NDA clauses forbids me to post real code that I have implemented in the platform, so the lines below are prepared just for the article. The whole idea is left untouched and will work under any circumstances.

In the listing posted below, I am using a wrapper for all operations related to saving and deleting. There is just one method - wrap - which receives callback function as parameter and then begins database transaction and executes passed callback. Committing transaction. Callback could be an INSERT statement that will fail when database engine has another transaction going on the same table and time for executing SQL query is over.
As you can see in the second catch block, Deadlock and Lock Wait Timeout exceptions are converted to Recoverable exception. You can guess from its name that we’ve got here a chance to handle this situation in a gentle manner and retry transaction when things went bad.
Eventually if we or the database messed something up, an Unrecoverable exception is thrown. That is, the message will be published onto an error queue, like rotten egg is thrown into the trash.
For function nesting in case of passing callback that was already wrapped by TransactionWrapper::wrap there is the first catch block. It is just filtering out already wrapped exceptions, rolling back transaction and re-throwing caught exception.

class TransactionWrapper
...
public function wrap(\Closure $callback)
{
try {
$this->connection->beginTransaction();
$callback();
$this->connection->commit();
} catch (UnrecoverableException | RecoverableException $e) {
$this->connection->rollback();
throw $e;
} catch (DeadLockException | LockWaitTimeoutException $e) {
$this->connection->rollback();
throw new RecoverableException($e);
} catch (\Exception $e) {
$this->connection->rollback();
throw new UnrecoverableException($e);
}
}

In the upper layer where messages are processed, all the magic happens. Message processor executes SQL queries that are wrapped in aforementioned TransactionWrapper. When Recoverable exception is caught, this means we have too much traffic and database is unable to process requests at this rate. So we are delaying message processing by nacking it which returns message to the AMQP broker onto a dead-letter queue. Here message waits for a given period of time as is configured in the dead-letter queue TTL value.

public function process(AMQPMessage $message)
{
try {
$this->messageProcessor->process($message);
} catch (RecoverableException $e) {
// nack the message so it is pushed onto dead-letter queue
$message->basic_nack();
} catch (UnrecoverableException $e) {
// perform logging and return message to error queue
$this->logger->error($e);
$message->basic_ack();
// messagePublisher is a sugar-service used to publish messages to a queue
$this->messagePublisher->publish($message, 'error_queue');
}
}

And usage is fairly straightforward (inside MessageProcessor):

$transactionWrapper = new TransactionWrapper();
$transactionWrapper->wrap($saveProcessor->save());

Here is just one saveProcessor, but your message processor might be implementing a chain of responsibility pattern where there are more instances processing complex message.
In the real app you would probably use some kind of factory and/or dependency injection to provide TransactionWrapper instance. The saveProcessor is also some kind of service (or command in CQRS approach) responsible for persisting data into a SQL storage. Another benefit of this approach is

Sampling the final effects

Delegating persistence tasks to an external service will treat your headaches caused by locks on database transactions and it scales pretty well (to the limits of database’s endurance). There is also a bonus feature: asynchronous requests processing especially in the world of languages with poor threading support (PHP). But it’s not a silver bullet — when the data must be persisted in any given order things gets more complicated. Use this tool wisely: for processing responses from external service, updating customers statuses or importing product catalog in the background. Begin with a little POC app and then incrementally integrate this solution into the other parts of your system. I might say today that this approach has provided to me another evidence for what an engineering is all about — smartly overcoming challenges within specific boundaries.

Was it delicious?

I will be very proud if this article let you take a new point of view on a database exception handling. Feel free to give me a feedback in the comments.

Useful links:
https://sourcemaking.com/design_patterns
https://www.rabbitmq.com/
https://symfony.com/blog/new-in-symfony-4-1-messenger-component

--

--