31/01/2016 - RABBITMQ, SYMFONY
Bu örneğimizde araba satın almak için bir istek oluşturacağız, mevcut olanı değiştireceğiz ve her iki işlemin kaydını alacağız. Örnek 2 Producer & 2 Exchange & 2 Queue & N Worker & 2 Consumer bileşenlerinden oluşuyor. Eğer sadece bir tane ortak exchange kullanmış olsaydınız, order "create" ve "update" mesajları her iki sırayada giderlerdi ki bu da sistemi bozardı. Örneğimiz RabbitMqBundle paketi kullanıyor.
Composer ile "oldsound/rabbitmq-bundle":"1.8.0"
'i yükleyin ve AppKernel içine new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle()
satırını ekleyin.
parameters:
rabbit_mq_host: 127.0.0.1
rabbit_mq_port: 5672
rabbit_mq_user: guest
rabbit_mq_pswd: guest
old_sound_rabbit_mq:
connections:
default:
host: %rabbit_mq_host%
port: %rabbit_mq_port%
user: %rabbit_mq_user%
password: %rabbit_mq_pswd%
vhost: /
lazy: true
producers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
order_update:
connection: default
exchange_options: { name: 'order_update_ex', type: fanout }
consumers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
queue_options: { name: 'order_create_qu' }
callback: application_frontend.consumer.order_create
order_update:
connection: default
exchange_options: { name: 'order_update_ex', type: fanout }
queue_options: { name: 'order_update_qu' }
callback: application_frontend.consumer.order_update
Aslında isteği doğrulama ve bir modele bağlama işlemini yapmanız lazım, ama ben bu seferlik yapmayacağım. Bu konuyla ilgili yazılarımı bulabilirsiniz.
namespace Application\FrontendBundle\Controller;
use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;
public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}
/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create")
*
* @return Response
*/
public function createAction(Request $request)
{
$result = $this->orderService->create(json_decode($request->getContent(), true));
return new Response($result);
}
/**
* @param Request $request
*
* @Method({"PATCH"})
* @Route("/update")
*
* @return Response
*/
public function updateAction(Request $request)
{
$result = $this->orderService->update(json_decode($request->getContent(), true));
return new Response($result);
}
}
services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order
namespace Application\FrontendBundle\Service;
use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Event\OrderEvent;
use Doctrine\Common\Persistence\ObjectRepository;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class OrderService
{
private $entityManager;
private $eventDispatcher;
public function __construct(
EntityManagerInterface $entityManager,
EventDispatcherInterface $eventDispatcher
) {
$this->entityManager = $entityManager;
$this->eventDispatcher = $eventDispatcher;
}
/**
* @param array $newOrder
*
* @return Order
*/
public function create(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);
$this->entityManager->persist($order);
$this->entityManager->flush();
$this->eventDispatcher->dispatch(OrderEvent::CREATE, new OrderEvent($order));
return $order->getId();
}
/**
* @param array $newOrder
*
* @return Order
*/
public function update(array $newOrder)
{
/** @var Order $order */
$order = $this->getOrderRepository()->findOneBy(['id' => $newOrder['order_id']]);
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);
$this->entityManager->flush();
$this->eventDispatcher->dispatch(OrderEvent::UPDATE, new OrderEvent($order));
return $order->getId();
}
/**
* @return ObjectRepository
*/
private function getOrderRepository()
{
return $this->entityManager->getRepository('Application\FrontendBundle\Entity\Order');
}
}
services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @event_dispatcher
namespace Application\FrontendBundle\Event;
use Application\FrontendBundle\Entity\Order;
use Symfony\Component\EventDispatcher\Event;
class OrderEvent extends Event
{
const CREATE = 'application_frontend.event.order_create';
const UPDATE = 'application_frontend.event.order_update';
private $order;
public function __construct(Order $order)
{
$this->order = $order;
}
public function getOrder()
{
return $this->order;
}
}
namespace Application\FrontendBundle\EventListener;
use Application\FrontendBundle\Event\OrderEvent;
use Application\FrontendBundle\Producer\OrderCreateProducer;
use Application\FrontendBundle\Producer\OrderUpdateProducer;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderListener
{
private $orderCreateProducer;
private $orderUpdateProducer;
public function __construct(
OrderCreateProducer $orderCreateProducer,
OrderUpdateProducer $orderUpdateProducer
) {
$this->orderCreateProducer = $orderCreateProducer;
$this->orderUpdateProducer = $orderUpdateProducer;
}
public function onOrderCreate(OrderEvent $orderEvent)
{
$this->orderCreateProducer->add($orderEvent->getOrder());
$orderEvent->stopPropagation();
}
public function onOrderUpdate(OrderEvent $orderEvent)
{
$this->orderUpdateProducer->add($orderEvent->getOrder());
$orderEvent->stopPropagation();
}
}
services:
application_frontend.event_listener.order:
class: Application\FrontendBundle\EventListener\OrderListener
tags:
- { name: kernel.event_listener, event: application_frontend.event.order_create, method: onOrderCreate }
- { name: kernel.event_listener, event: application_frontend.event.order_update, method: onOrderUpdate }
arguments:
- @application_frontend.producer.order_create
- @application_frontend.producer.order_update
namespace Application\FrontendBundle\Producer;
use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class OrderCreateProducer
{
private $producer;
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];
$this->producer->publish(json_encode($message));
}
}
namespace Application\FrontendBundle\Producer;
use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class OrderUpdateProducer
{
private $producer;
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];
$this->producer->publish(json_encode($message));
}
}
services:
application_frontend.producer.order_create:
class: Application\FrontendBundle\Producer\OrderCreateProducer
arguments:
- @old_sound_rabbit_mq.order_create_producer
application_frontend.producer.order_update:
class: Application\FrontendBundle\Producer\OrderUpdateProducer
arguments:
- @old_sound_rabbit_mq.order_update_producer
İşlenemeyen mesajların kaydını almaktan başka bir şey yapmamamızın bir nedeni var. Buna cevap veren yazımı bulabilirsiniz.
namespace Application\FrontendBundle\Consumer;
use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderCreateConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;
public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);
try {
$this->log($body);
echo sprintf('Order create - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}
private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::CREATE);
$log->setMessage($message);
$this->entityManager->persist($log);
$this->entityManager->flush();
}
private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];
$this->logger->error(json_encode($data));
}
}
İşlenemeyen mesajların kaydını almaktan başka bir şey yapmamamızın bir nedeni var. Buna cevap veren yazımı bulabilirsiniz.
namespace Application\FrontendBundle\Consumer;
use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderUpdateConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;
public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);
try {
$this->log($body);
echo sprintf('Order update - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}
private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::UPDATE);
$log->setMessage($message);
$this->entityManager->persist($log);
$this->entityManager->flush();
}
private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];
$this->logger->error(json_encode($data));
}
}
Eğer bir mesajın işlenmesi sırasında herhangi bir hata oluşur ise, aşağıdaki gibi bir kayıt oluşturulur.
# app/log/dev.log
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}} [] []
# Terminal
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}}
services:
application_frontend.consumer.order_create:
class: Application\FrontendBundle\Consumer\OrderCreateConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger
application_frontend.consumer.order_update:
class: Application\FrontendBundle\Consumer\OrderUpdateConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;
/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;
/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="order_log")
*/
class OrderLog
{
const CREATE = 'Create';
const UPDATE = 'Update';
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="action", type="string", length=10)
*/
private $action;
/**
* @ORM\Column(name="message", type="json_array")
*/
private $message;
}
Yeni araba siparişi verirken veya mevcut olanını yenilemek için, aşağıdakine benzer istekler göndeririz.
# POST http://rabbitmq.dev/app_dev.php/order/create
{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}
# PATCH http://rabbitmq.dev/app_dev.php/order/update
{
"order_id": "1",
"customer_name": "inanzzz",
"car_make": "mercedes",
"car_model": "500sel"
}
Terminalde bir tane consumer çalıştırıp durdurarak, RabbitMQ için gerekli olan tüm bileşenler (exchange, queue vs.) yaratılıp, birbirlerine bağlanmış olurlar. Sonuç olarak aşağıdaki diyagrama sahip olursunuz.
Sadece 2 tane yeni sipariş ve 1 tane de yenileme isteği gönderdim.
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | mercedes | 500sel |
| 2 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
2 rows in set (0.00 sec)
mysql> SELECT * FROM order_log;
Empty set (0.00 sec)
Yukarıda da gördüğümüz gibi, sırada 2 tane yeni sirariş, 1 tane de yenileme mesajı bekliyor. Bir tane consumer çalıştırılması durumunda ne olacağını göreceğiz. RabbitMQ içindeki kırık olan bağlantıları silmezseniz, mesajlar işlenmeyebilirler.
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-02-07 15:32:13 ...
Order create - ID:2 @ 2016-02-07 15:32:33 ...
$ app/console rabbitmq:consumer -m 100 order_update
Order update - ID:1 @ 2016-02-07 15:33:18 ...
mysql> SELECT * FROM order_log;
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:32:13"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:32:33"} |
| 3 | Update | {"order_id":1,"customer_name":"inanzzz","car_make":"mercedes","car_model":"500sel","timestamp":"2016-02-07 15:33:18"} |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.00 sec)
Yukarıda da gördüğümüz gibi, tüm mesajlar işlendi.
Sadece 5 tane yeni sipariş, 2 tane de yenileme isteği gönderdim.
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | mercedes | 500sel |
| 2 | inanzzz | bmw | 318 |
| 3 | inanzzz | mercedes | 500sel |
| 4 | inanzzz | bmw | 318 |
| 5 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
5 rows in set (0.00 sec)
mysql> SELECT * FROM order_log;
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:47"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:48"} |
| 3 | Create | {"order_id":3,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:49"} |
| 4 | Create | {"order_id":4,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:50"} |
| 5 | Create | {"order_id":5,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:51"} |
| 6 | Update | {"order_id":1,"customer_name":"inanzzz","car_make":"mercedes","car_model":"500sel","timestamp":"2016-02-07 16:11:57"} |
| 7 | Update | {"order_id":3,"customer_name":"inanzzz","car_make":"mercedes","car_model":"500sel","timestamp":"2016-02-07 16:12:02"} |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
7 rows in set (0.00 sec)
# Create Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-02-07 16:11:47 ...
Order create - ID:3 @ 2016-02-07 16:11:49 ...
Order create - ID:5 @ 2016-02-07 16:11:51 ...
# Create Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:2 @ 2016-02-07 16:11:48 ...
Order create - ID:4 @ 2016-02-07 16:11:50 ...
# Update Consumer 1
$ app/console rabbitmq:consumer -m 100 order_update
Order update - ID:1 @ 2016-02-07 16:11:57 ...
# Update Consumer 2
$ app/console rabbitmq:consumer -m 100 order_update
Order update - ID:3 @ 2016-02-07 16:12:02 ...
Yukarıda da gördüğümüz gibi, mesajların hepsi hiç bekletilmeden dört consumer tarafından round-robin dispatching yöntemi ile işlendiler.