16/09/2017 - RABBITMQ, SYMFONY
Mesaj beklenen formatta olmadığında veya işleyici tarafından işlenemiyorsa, RabbitMQ'nun Dead Letter Exchange özelliğini kullanarak mesajı ilgili sırada tutabiliriz. Daha sonra mesajın neden işlenemediğini kontrol edip, konuyla ilgili birşeyler yapabiliriz. Örneğin: bir kereye mahsus olmak üzere bir işleyici yazıp mesajı işleyebiliriz veya mesajı silebiliriz.
Bu örneğimizde, işlenemeyen mesajları tutmak için bir tane Dead Letter Exchange ve bir tanede Dead Letter Queue yaratacağız.
MessageCreateProducer
mesajı create_message_ex
exchange'e yönlendirir ve daha sonra exchange mesajı message_create_qu
sırasına yerleştirir.MessageCreateConsumer
terminalde çalışır ve mesajı message_create_qu
sırasından alır.message_create_dead_letter_ex
exchange'e yönlendirilir ve exchange mesajı message_create_dead_letter_qu
sırasına yerleştirir. Burada da biz gerekeni yaparız. SONComposer ile oldsound/rabbitmq-bundle
'i yükleyin ve AppKernel içine new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle()
satırını ekleyin.
old_sound_rabbit_mq:
connections:
default:
host: 127.0.0.1
port: 5672
user: guest
password: guest
vhost: /
lazy: true
producers:
message_create:
connection: default
exchange_options: { name: message_create_ex, type: fanout }
consumers:
message_create:
connection: default
exchange_options: { name: message_create_ex, type: fanout }
queue_options:
name: message_create_qu
arguments:
x-dead-letter-exchange: ['S', 'message_create_dead_letter_ex']
x-dead-letter-routing-key: ['S', 'message_create_dead_letter_qu']
callback: app.consumer.message_create
message_create_dead_letter:
connection: default
exchange_options: { name: message_create_dead_letter_ex, type: fanout }
queue_options: { name: message_create_dead_letter_qu }
callback: app.consumer.message_create_dead_letter
Aslında isteği doğrulama ve bir modele bağlama işlemini yapmanız lazım, ama ben bu seferlik yapmayacağım. Bu konuyla ilgili yazılarımı bulabilirsiniz.
namespace AppBundle\Controller;
use AppBundle\Service\MessageService;
use Exception;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;
/**
* @Route("/messages", service="app.controller.message")
*/
class MessageController
{
private $messageService;
public function __construct(
MessageService $messageService
) {
$this->messageService = $messageService;
}
/**
* @param Request $request
*
* @Method({"POST"})
* @Route("")
*
* @throws BadRequestHttpException
* @return Response
*/
public function indexAction(Request $request)
{
try {
$this->messageService->create($request->getContent());
} catch (Exception $e) {
throw new BadRequestHttpException(
sprintf('Failed to created message because of [%s].', $e->getMessage())
);
}
return new Response('Message created');
}
}
services:
app.controller.message:
class: AppBundle\Controller\MessageController
arguments:
- "@app.service.message"
namespace AppBundle\Service;
use AppBundle\Event\MessageEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class MessageService
{
private $eventDispatcher;
public function __construct(
EventDispatcherInterface $eventDispatcher
) {
$this->eventDispatcher = $eventDispatcher;
}
public function create($message)
{
$this->eventDispatcher->dispatch(MessageEvent::CREATE, new MessageEvent($message));
}
}
services:
app.service.message:
class: AppBundle\Service\MessageService
arguments:
- "@event_dispatcher"
namespace AppBundle\Event;
use Symfony\Component\EventDispatcher\Event;
class MessageEvent extends Event
{
const CREATE = 'app.event.message_create';
private $message;
public function __construct($message)
{
$this->message = $message;
}
public function getMessage()
{
return $this->message;
}
}
namespace AppBundle\EventListener;
use AppBundle\Event\MessageEvent;
use AppBundle\Producer\MessageCreateProducer;
class MessageListener
{
private $messageCreateProducer;
public function __construct(
MessageCreateProducer $messageCreateProducer
) {
$this->messageCreateProducer = $messageCreateProducer;
}
public function onCreate(MessageEvent $messageEvent)
{
$this->messageCreateProducer->publish($messageEvent->getMessage());
}
}
services:
app.event_listener.message:
class: AppBundle\EventListener\MessageListener
tags:
- { name: kernel.event_listener, event: app.event.message_create, method: onCreate }
arguments:
- "@app.producer.message_create"
namespace AppBundle\Producer;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class MessageCreateProducer
{
private $producer;
public function __construct(
ProducerInterface $producer
) {
$this->producer = $producer;
}
public function publish($message)
{
$this->producer->publish(json_encode($message));
}
}
services:
app.producer.message_create:
class: AppBundle\Producer\MessageCreateProducer
arguments:
- "@old_sound_rabbit_mq.message_create_producer"
Bu işleyicinin işçileri terminalde sürekli çalışıyor vaziyette olmalıdır. İşçileri terminalde çalıştırmak için $ bin/console rabbitmq:consumer message_create --env=test
komutunu kullanabilirsiniz.
namespace AppBundle\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
class MessageCreateConsumer implements ConsumerInterface
{
private $logger;
public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$message = json_decode($message->body);
if (strpos($message, 'Message') !== 0) {
$this->logger->error('Corrupt message goes into Dead Letter Exchange.');
return ConsumerInterface::MSG_REJECT;
}
$this->logger->info('Message consumed.');
}
}
Bu işleyicinin işçileri terminalde çalışıyor vaziyette olmak zorunda değildir. Burada size sadece Deal Letter Exchange'in işleyicisinin nasıl yazılabileceğini gösteriyorum. İşçileri terminalde çalıştırmak için $ bin/console rabbitmq:consumer message_create_dead_letter --env=test
komutunu kullanabilirsiniz.
namespace AppBundle\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
class MessageCreateDeadLetterConsumer implements ConsumerInterface
{
private $logger;
public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$message = json_decode($message->body);
$this->logger->info('Do something about "dead-lettered" message.');
}
}
services:
app.consumer.message_create:
class: AppBundle\Consumer\MessageCreateConsumer
arguments:
- "@logger"
app.consumer.message_create_dead_letter:
class: AppBundle\Consumer\MessageCreateDeadLetterConsumer
arguments:
- "@logger"
Symfony konfig dosyasında belirttiğiniz exchange ve queue seçeneklerinin RabbitMQ sunucunda hazır bulunması için, aşağıdaki komutu çalıştırmanız mecburidir.
$ bin/console rabbitmq:setup-fabric
Yukarıdaki komutu çalıştırdıktan sonra, RabbitMQ sunucusunun konfigürasyonu aşağıdaki gibi olmalıdır.
# POST /app_dev.php/messages
Message 1
# POST /app_dev.php/messages
Message 2
# POST /app_dev.php/messages
ABC-Message 3
$ bin/console rabbitmq:consumer message_create --env=dev
[2017-09-16 08:11:10] app.ERROR: Corrupt message goes into Dead Letter Exchange.
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
message_create_dead_letter_qu 1 0
message_create_qu 0 0
...done.
[2017-09-16 08:09:28] app.INFO: Message consumed. [] []
[2017-09-16 08:09:32] app.INFO: Message consumed. [] []
[2017-09-16 08:11:10] app.ERROR: Corrupt message goes into Dead Letter Exchange. [] []
Yukarıda gördüğümüz gibi, iki mesaj işlenmiş ve bir mesaj ise DLX içine koyulmuş durumdadır.