16/09/2017 - RABBITMQ, SYMFONY
When a message is not in expected format and cannot be processed by a consumer, we can use RabbitMQ's Dead Letter Exchange feature to store the message in a relevant queue. We then can investigate why the message is not processable and do something about it such as write a consumer to process them or just delete them.
In this example, we are going to create a Dead Letter Exchange and Dead Letter Queue to store corrupt messages. We will then create a consumer to process them.
MessageCreateProducer
directs message to create_message_ex
exchange and then the exchange puts it into message_create_qu
queue.MessageCreateConsumer
runs and picks up the message from message_create_qu
queue.message_create_dead_letter_ex
exchange and then the exchange puts it into message_create_dead_letter_qu
queue where it waits for us to investigate. ENDInstall oldsound/rabbitmq-bundle
with composer and activate it with new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle()
in AppKernel.php file.
old_sound_rabbit_mq:
connections:
default:
host: 127.0.0.1
port: 5672
user: guest
password: guest
vhost: /
lazy: true
producers:
message_create:
connection: default
exchange_options: { name: message_create_ex, type: fanout }
consumers:
message_create:
connection: default
exchange_options: { name: message_create_ex, type: fanout }
queue_options:
name: message_create_qu
arguments:
x-dead-letter-exchange: ['S', 'message_create_dead_letter_ex']
x-dead-letter-routing-key: ['S', 'message_create_dead_letter_qu']
callback: app.consumer.message_create
message_create_dead_letter:
connection: default
exchange_options: { name: message_create_dead_letter_ex, type: fanout }
queue_options: { name: message_create_dead_letter_qu }
callback: app.consumer.message_create_dead_letter
You should normally validate the request and map it into a model class. There are a lot of examples about how it is done in this blog.
namespace AppBundle\Controller;
use AppBundle\Service\MessageService;
use Exception;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;
/**
* @Route("/messages", service="app.controller.message")
*/
class MessageController
{
private $messageService;
public function __construct(
MessageService $messageService
) {
$this->messageService = $messageService;
}
/**
* @param Request $request
*
* @Method({"POST"})
* @Route("")
*
* @throws BadRequestHttpException
* @return Response
*/
public function indexAction(Request $request)
{
try {
$this->messageService->create($request->getContent());
} catch (Exception $e) {
throw new BadRequestHttpException(
sprintf('Failed to created message because of [%s].', $e->getMessage())
);
}
return new Response('Message created');
}
}
services:
app.controller.message:
class: AppBundle\Controller\MessageController
arguments:
- "@app.service.message"
namespace AppBundle\Service;
use AppBundle\Event\MessageEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class MessageService
{
private $eventDispatcher;
public function __construct(
EventDispatcherInterface $eventDispatcher
) {
$this->eventDispatcher = $eventDispatcher;
}
public function create($message)
{
$this->eventDispatcher->dispatch(MessageEvent::CREATE, new MessageEvent($message));
}
}
services:
app.service.message:
class: AppBundle\Service\MessageService
arguments:
- "@event_dispatcher"
namespace AppBundle\Event;
use Symfony\Component\EventDispatcher\Event;
class MessageEvent extends Event
{
const CREATE = 'app.event.message_create';
private $message;
public function __construct($message)
{
$this->message = $message;
}
public function getMessage()
{
return $this->message;
}
}
namespace AppBundle\EventListener;
use AppBundle\Event\MessageEvent;
use AppBundle\Producer\MessageCreateProducer;
class MessageListener
{
private $messageCreateProducer;
public function __construct(
MessageCreateProducer $messageCreateProducer
) {
$this->messageCreateProducer = $messageCreateProducer;
}
public function onCreate(MessageEvent $messageEvent)
{
$this->messageCreateProducer->publish($messageEvent->getMessage());
}
}
services:
app.event_listener.message:
class: AppBundle\EventListener\MessageListener
tags:
- { name: kernel.event_listener, event: app.event.message_create, method: onCreate }
arguments:
- "@app.producer.message_create"
namespace AppBundle\Producer;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class MessageCreateProducer
{
private $producer;
public function __construct(
ProducerInterface $producer
) {
$this->producer = $producer;
}
public function publish($message)
{
$this->producer->publish(json_encode($message));
}
}
services:
app.producer.message_create:
class: AppBundle\Producer\MessageCreateProducer
arguments:
- "@old_sound_rabbit_mq.message_create_producer"
Worker for this consumer should always run in terminal. You can run it with $ bin/console rabbitmq:consumer message_create --env=test
.
namespace AppBundle\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
class MessageCreateConsumer implements ConsumerInterface
{
private $logger;
public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$message = json_decode($message->body);
if (strpos($message, 'Message') !== 0) {
$this->logger->error('Corrupt message goes into Dead Letter Exchange.');
return ConsumerInterface::MSG_REJECT;
}
$this->logger->info('Message consumed.');
}
}
Worker for this consumer doesn't have to run in terminal. I am just showing you how you can create a consumer for Deal Letter Exchange jobs. You can run it with $ bin/console rabbitmq:consumer message_create_dead_letter --env=test
namespace AppBundle\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
class MessageCreateDeadLetterConsumer implements ConsumerInterface
{
private $logger;
public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$message = json_decode($message->body);
$this->logger->info('Do something about "dead-lettered" message.');
}
}
services:
app.consumer.message_create:
class: AppBundle\Consumer\MessageCreateConsumer
arguments:
- "@logger"
app.consumer.message_create_dead_letter:
class: AppBundle\Consumer\MessageCreateDeadLetterConsumer
arguments:
- "@logger"
You must run command below so that all exchanges and queues from your symfony config are synchronised with RabbitMQ server.
$ bin/console rabbitmq:setup-fabric
After running command above, your RabbitMQ server configuration should look like below.
# POST /app_dev.php/messages
Message 1
# POST /app_dev.php/messages
Message 2
# POST /app_dev.php/messages
ABC-Message 3
$ bin/console rabbitmq:consumer message_create --env=dev
[2017-09-16 08:11:10] app.ERROR: Corrupt message goes into Dead Letter Exchange.
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
message_create_dead_letter_qu 1 0
message_create_qu 0 0
...done.
[2017-09-16 08:09:28] app.INFO: Message consumed. [] []
[2017-09-16 08:09:32] app.INFO: Message consumed. [] []
[2017-09-16 08:11:10] app.ERROR: Corrupt message goes into Dead Letter Exchange. [] []
As you can see above, two messages consumed and one placed into DLX.