When a message is not in expected format and cannot be processed by a consumer, we can use RabbitMQ's Dead Letter Exchange feature to store the message in a relevant queue. We then can investigate why the message is not processable and do something about it such as write a consumer to process them or just delete them.


In this example, we are going to create a Dead Letter Exchange and Dead Letter Queue to store corrupt messages. We will then create a consumer to process them.


The logic


  1. User sends a message to the application.

  2. Producer MessageCreateProducer directs message to create_message_ex exchange and then the exchange puts it into message_create_qu queue.

  3. Consumer MessageCreateConsumer runs and picks up the message from message_create_qu queue.
    • If the message processed successfully.
      • Message deleted. END

    • If the message cannot be processed successfully.
      • Behind the scene, the message automatically gets directed to message_create_dead_letter_ex exchange and then the exchange puts it into message_create_dead_letter_qu queue where it waits for us to investigate. END

Flowchart



Configurations


Install bundle


Install oldsound/rabbitmq-bundle with composer and activate it with new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle() in AppKernel.php file.


Config.yml


old_sound_rabbit_mq:
connections:
default:
host: 127.0.0.1
port: 5672
user: guest
password: guest
vhost: /
lazy: true
producers:
message_create:
connection: default
exchange_options: { name: message_create_ex, type: fanout }
consumers:
message_create:
connection: default
exchange_options: { name: message_create_ex, type: fanout }
queue_options:
name: message_create_qu
arguments:
x-dead-letter-exchange: ['S', 'message_create_dead_letter_ex']
x-dead-letter-routing-key: ['S', 'message_create_dead_letter_qu']
callback: app.consumer.message_create
message_create_dead_letter:
connection: default
exchange_options: { name: message_create_dead_letter_ex, type: fanout }
queue_options: { name: message_create_dead_letter_qu }
callback: app.consumer.message_create_dead_letter

MessageController


You should normally validate the request and map it into a model class. There are a lot of examples about how it is done in this blog.


namespace AppBundle\Controller;

use AppBundle\Service\MessageService;
use Exception;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;

/**
* @Route("/messages", service="app.controller.message")
*/
class MessageController
{
private $messageService;

public function __construct(
MessageService $messageService
) {
$this->messageService = $messageService;
}

/**
* @param Request $request
*
* @Method({"POST"})
* @Route("")
*
* @throws BadRequestHttpException
* @return Response
*/
public function indexAction(Request $request)
{
try {
$this->messageService->create($request->getContent());
} catch (Exception $e) {
throw new BadRequestHttpException(
sprintf('Failed to created message because of [%s].', $e->getMessage())
);
}

return new Response('Message created');
}
}

services:
app.controller.message:
class: AppBundle\Controller\MessageController
arguments:
- "@app.service.message"

MessageService


namespace AppBundle\Service;

use AppBundle\Event\MessageEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class MessageService
{
private $eventDispatcher;

public function __construct(
EventDispatcherInterface $eventDispatcher
) {
$this->eventDispatcher = $eventDispatcher;
}

public function create($message)
{
$this->eventDispatcher->dispatch(MessageEvent::CREATE, new MessageEvent($message));
}
}

services:
app.service.message:
class: AppBundle\Service\MessageService
arguments:
- "@event_dispatcher"

MessageEvent


namespace AppBundle\Event;

use Symfony\Component\EventDispatcher\Event;

class MessageEvent extends Event
{
const CREATE = 'app.event.message_create';

private $message;

public function __construct($message)
{
$this->message = $message;
}

public function getMessage()
{
return $this->message;
}
}

MessageListener


namespace AppBundle\EventListener;

use AppBundle\Event\MessageEvent;
use AppBundle\Producer\MessageCreateProducer;

class MessageListener
{
private $messageCreateProducer;

public function __construct(
MessageCreateProducer $messageCreateProducer
) {
$this->messageCreateProducer = $messageCreateProducer;
}

public function onCreate(MessageEvent $messageEvent)
{
$this->messageCreateProducer->publish($messageEvent->getMessage());
}
}

services:
app.event_listener.message:
class: AppBundle\EventListener\MessageListener
tags:
- { name: kernel.event_listener, event: app.event.message_create, method: onCreate }
arguments:
- "@app.producer.message_create"

MessageCreateProducer


namespace AppBundle\Producer;

use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;

class MessageCreateProducer
{
private $producer;

public function __construct(
ProducerInterface $producer
) {
$this->producer = $producer;
}

public function publish($message)
{
$this->producer->publish(json_encode($message));
}
}

services:
app.producer.message_create:
class: AppBundle\Producer\MessageCreateProducer
arguments:
- "@old_sound_rabbit_mq.message_create_producer"

MessageCreateConsumer


Worker for this consumer should always run in terminal. You can run it with $ bin/console rabbitmq:consumer message_create --env=test.


namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;

class MessageCreateConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$message = json_decode($message->body);

if (strpos($message, 'Message') !== 0) {
$this->logger->error('Corrupt message goes into Dead Letter Exchange.');

return ConsumerInterface::MSG_REJECT;
}

$this->logger->info('Message consumed.');
}
}

MessageCreateDeadLetterConsumer


Worker for this consumer doesn't have to run in terminal. I am just showing you how you can create a consumer for Deal Letter Exchange jobs. You can run it with $ bin/console rabbitmq:consumer message_create_dead_letter --env=test


namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;

class MessageCreateDeadLetterConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$message = json_decode($message->body);

$this->logger->info('Do something about "dead-lettered" message.');
}
}

services:
app.consumer.message_create:
class: AppBundle\Consumer\MessageCreateConsumer
arguments:
- "@logger"

app.consumer.message_create_dead_letter:
class: AppBundle\Consumer\MessageCreateDeadLetterConsumer
arguments:
- "@logger"

Exchange and queue synchronisation


You must run command below so that all exchanges and queues from your symfony config are synchronised with RabbitMQ server.


$ bin/console rabbitmq:setup-fabric

After running command above, your RabbitMQ server configuration should look like below.


Exchanges





Queues





Mapping




Test


Request


# POST /app_dev.php/messages
Message 1

# POST /app_dev.php/messages
Message 2

# POST /app_dev.php/messages
ABC-Message 3

Console


$ bin/console rabbitmq:consumer message_create --env=dev
[2017-09-16 08:11:10] app.ERROR: Corrupt message goes into Dead Letter Exchange.

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
message_create_dead_letter_qu 1 0
message_create_qu 0 0
...done.

dev.log


[2017-09-16 08:09:28] app.INFO: Message consumed. [] []
[2017-09-16 08:09:32] app.INFO: Message consumed. [] []
[2017-09-16 08:11:10] app.ERROR: Corrupt message goes into Dead Letter Exchange. [] []

As you can see above, two messages consumed and one placed into DLX.