Mesaj beklenen formatta olmadığında veya işleyici tarafından işlenemiyorsa, RabbitMQ'nun Dead Letter Exchange özelliğini kullanarak mesajı ilgili sırada tutabiliriz. Daha sonra mesajın neden işlenemediğini kontrol edip, konuyla ilgili birşeyler yapabiliriz. Örneğin: bir kereye mahsus olmak üzere bir işleyici yazıp mesajı işleyebiliriz veya mesajı silebiliriz.


Bu örneğimizde, işlenemeyen mesajları tutmak için bir tane Dead Letter Exchange ve bir tanede Dead Letter Queue yaratacağız.


Mantık


  1. Kullanıcı uygulamaya bir mesaj gönderir.

  2. İletici MessageCreateProducer mesajı create_message_ex exchange'e yönlendirir ve daha sonra exchange mesajı message_create_qu sırasına yerleştirir.

  3. İşleyici MessageCreateConsumer terminalde çalışır ve mesajı message_create_qu sırasından alır.
    • Eğer mesaj başarıyla işlenmiş ise:
      • Message silinir. SON.

    • Eğer mesaj başarıyla işlenmemiş ise.
      • Arka planda mesaj otomatik olarak message_create_dead_letter_ex exchange'e yönlendirilir ve exchange mesajı message_create_dead_letter_qu sırasına yerleştirir. Burada da biz gerekeni yaparız. SON

Akış diyagramı



Konfigürasyon


Bundle yükleme


Composer ile oldsound/rabbitmq-bundle 'i yükleyin ve AppKernel içine new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle() satırını ekleyin.


Config.yml


old_sound_rabbit_mq:
connections:
default:
host: 127.0.0.1
port: 5672
user: guest
password: guest
vhost: /
lazy: true
producers:
message_create:
connection: default
exchange_options: { name: message_create_ex, type: fanout }
consumers:
message_create:
connection: default
exchange_options: { name: message_create_ex, type: fanout }
queue_options:
name: message_create_qu
arguments:
x-dead-letter-exchange: ['S', 'message_create_dead_letter_ex']
x-dead-letter-routing-key: ['S', 'message_create_dead_letter_qu']
callback: app.consumer.message_create
message_create_dead_letter:
connection: default
exchange_options: { name: message_create_dead_letter_ex, type: fanout }
queue_options: { name: message_create_dead_letter_qu }
callback: app.consumer.message_create_dead_letter

MessageController


Aslında isteği doğrulama ve bir modele bağlama işlemini yapmanız lazım, ama ben bu seferlik yapmayacağım. Bu konuyla ilgili yazılarımı bulabilirsiniz.


namespace AppBundle\Controller;

use AppBundle\Service\MessageService;
use Exception;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;

/**
* @Route("/messages", service="app.controller.message")
*/
class MessageController
{
private $messageService;

public function __construct(
MessageService $messageService
) {
$this->messageService = $messageService;
}

/**
* @param Request $request
*
* @Method({"POST"})
* @Route("")
*
* @throws BadRequestHttpException
* @return Response
*/
public function indexAction(Request $request)
{
try {
$this->messageService->create($request->getContent());
} catch (Exception $e) {
throw new BadRequestHttpException(
sprintf('Failed to created message because of [%s].', $e->getMessage())
);
}

return new Response('Message created');
}
}

services:
app.controller.message:
class: AppBundle\Controller\MessageController
arguments:
- "@app.service.message"

MessageService


namespace AppBundle\Service;

use AppBundle\Event\MessageEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class MessageService
{
private $eventDispatcher;

public function __construct(
EventDispatcherInterface $eventDispatcher
) {
$this->eventDispatcher = $eventDispatcher;
}

public function create($message)
{
$this->eventDispatcher->dispatch(MessageEvent::CREATE, new MessageEvent($message));
}
}

services:
app.service.message:
class: AppBundle\Service\MessageService
arguments:
- "@event_dispatcher"

MessageEvent


namespace AppBundle\Event;

use Symfony\Component\EventDispatcher\Event;

class MessageEvent extends Event
{
const CREATE = 'app.event.message_create';

private $message;

public function __construct($message)
{
$this->message = $message;
}

public function getMessage()
{
return $this->message;
}
}

MessageListener


namespace AppBundle\EventListener;

use AppBundle\Event\MessageEvent;
use AppBundle\Producer\MessageCreateProducer;

class MessageListener
{
private $messageCreateProducer;

public function __construct(
MessageCreateProducer $messageCreateProducer
) {
$this->messageCreateProducer = $messageCreateProducer;
}

public function onCreate(MessageEvent $messageEvent)
{
$this->messageCreateProducer->publish($messageEvent->getMessage());
}
}

services:
app.event_listener.message:
class: AppBundle\EventListener\MessageListener
tags:
- { name: kernel.event_listener, event: app.event.message_create, method: onCreate }
arguments:
- "@app.producer.message_create"

MessageCreateProducer


namespace AppBundle\Producer;

use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;

class MessageCreateProducer
{
private $producer;

public function __construct(
ProducerInterface $producer
) {
$this->producer = $producer;
}

public function publish($message)
{
$this->producer->publish(json_encode($message));
}
}

services:
app.producer.message_create:
class: AppBundle\Producer\MessageCreateProducer
arguments:
- "@old_sound_rabbit_mq.message_create_producer"

MessageCreateConsumer


Bu işleyicinin işçileri terminalde sürekli çalışıyor vaziyette olmalıdır. İşçileri terminalde çalıştırmak için $ bin/console rabbitmq:consumer message_create --env=test komutunu kullanabilirsiniz.


namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;

class MessageCreateConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$message = json_decode($message->body);

if (strpos($message, 'Message') !== 0) {
$this->logger->error('Corrupt message goes into Dead Letter Exchange.');

return ConsumerInterface::MSG_REJECT;
}

$this->logger->info('Message consumed.');
}
}

MessageCreateDeadLetterConsumer


Bu işleyicinin işçileri terminalde çalışıyor vaziyette olmak zorunda değildir. Burada size sadece Deal Letter Exchange'in işleyicisinin nasıl yazılabileceğini gösteriyorum. İşçileri terminalde çalıştırmak için $ bin/console rabbitmq:consumer message_create_dead_letter --env=test komutunu kullanabilirsiniz.


namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;

class MessageCreateDeadLetterConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$message = json_decode($message->body);

$this->logger->info('Do something about "dead-lettered" message.');
}
}

services:
app.consumer.message_create:
class: AppBundle\Consumer\MessageCreateConsumer
arguments:
- "@logger"

app.consumer.message_create_dead_letter:
class: AppBundle\Consumer\MessageCreateDeadLetterConsumer
arguments:
- "@logger"

Exchange ve queue senkronizasyonu


Symfony konfig dosyasında belirttiğiniz exchange ve queue seçeneklerinin RabbitMQ sunucunda hazır bulunması için, aşağıdaki komutu çalıştırmanız mecburidir.


$ bin/console rabbitmq:setup-fabric

Yukarıdaki komutu çalıştırdıktan sonra, RabbitMQ sunucusunun konfigürasyonu aşağıdaki gibi olmalıdır.


Exchange





Queue





Haritalama




Test


İstek


# POST /app_dev.php/messages
Message 1

# POST /app_dev.php/messages
Message 2

# POST /app_dev.php/messages
ABC-Message 3

Terminal


$ bin/console rabbitmq:consumer message_create --env=dev
[2017-09-16 08:11:10] app.ERROR: Corrupt message goes into Dead Letter Exchange.

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
message_create_dead_letter_qu 1 0
message_create_qu 0 0
...done.

dev.log


[2017-09-16 08:09:28] app.INFO: Message consumed. [] []
[2017-09-16 08:09:32] app.INFO: Message consumed. [] []
[2017-09-16 08:11:10] app.ERROR: Corrupt message goes into Dead Letter Exchange. [] []

Yukarıda gördüğümüz gibi, iki mesaj işlenmiş ve bir mesaj ise DLX içine koyulmuş durumdadır.