20/02/2016 - RABBITMQ, SYMFONY
Bu örneğimizde araba satın almak için bir istek oluşturup, ayrıca isteği kayıt altına alacağız. Örnek 1 Producer & 1 Exchange & 1 Queue & N Worker & 1 Consumer bileşenlerinden oluşuyor ve RabbitMqBundle kullanıyor.
Composer ile "oldsound/rabbitmq-bundle":"1.8.0"
'i yükleyin ve AppKernel içine new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle()
satırını ekleyin.
parameters:
rabbit_mq_host: 127.0.0.1
rabbit_mq_port: 5672
rabbit_mq_user: guest
rabbit_mq_pswd: guest
old_sound_rabbit_mq:
connections:
default:
host: %rabbit_mq_host%
port: %rabbit_mq_port%
user: %rabbit_mq_user%
password: %rabbit_mq_pswd%
vhost: /
lazy: true
producers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: topic }
consumers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: topic }
queue_options: { name: 'order_create_qu' }
callback: application_frontend.consumer.order_create
Aslında isteği doğrulama ve bir modele bağlama işlemini yapmanız lazım, ama ben bu seferlik yapmayacağım. Bu konuyla ilgili yazılarımı bulabilirsiniz.
namespace Application\FrontendBundle\Controller;
use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;
public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}
/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create")
*
* @return Response
*/
public function createAction(Request $request)
{
$result = $this->orderService->create(json_decode($request->getContent(), true));
return new Response($result);
}
}
services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order
namespace Application\FrontendBundle\Service;
use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Event\OrderEvent;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class OrderService
{
private $entityManager;
private $eventDispatcher;
public function __construct(
EntityManagerInterface $entityManager,
EventDispatcherInterface $eventDispatcher
) {
$this->entityManager = $entityManager;
$this->eventDispatcher = $eventDispatcher;
}
/**
* @param array $newOrder
*
* @return Order
*/
public function create(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);
$this->entityManager->persist($order);
$this->entityManager->flush();
$this->eventDispatcher->dispatch(OrderEvent::CREATE, new OrderEvent($order));
return $order->getId();
}
}
services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @event_dispatcher
namespace Application\FrontendBundle\Event;
use Application\FrontendBundle\Entity\Order;
use Symfony\Component\EventDispatcher\Event;
class OrderEvent extends Event
{
const CREATE = 'application_frontend.event.order_create';
private $order;
public function __construct(Order $order)
{
$this->order = $order;
}
public function getOrder()
{
return $this->order;
}
}
namespace Application\FrontendBundle\EventListener;
use Application\FrontendBundle\Event\OrderEvent;
use Application\FrontendBundle\Producer\OrderCreateProducer;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderListener
{
private $orderCreateProducer;
public function __construct(
OrderCreateProducer $orderCreateProducer
) {
$this->orderCreateProducer = $orderCreateProducer;
}
public function onOrderCreate(OrderEvent $orderEvent)
{
$this->orderCreateProducer->add($orderEvent->getOrder());
$orderEvent->stopPropagation();
}
}
services:
application_frontend.event_listener.order:
class: Application\FrontendBundle\EventListener\OrderListener
tags:
- { name: kernel.event_listener, event: application_frontend.event.order_create, method: onOrderCreate }
arguments:
- @application_frontend.producer.order_create
namespace Application\FrontendBundle\Producer;
use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class OrderCreateProducer
{
private $producer;
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];
$this->producer->publish(json_encode($message));
}
}
services:
application_frontend.producer.order_create:
class: Application\FrontendBundle\Producer\OrderCreateProducer
arguments:
- @old_sound_rabbit_mq.order_create_producer
İşlenemeyen mesajların kaydını almaktan başka bir şey yapmamamızın bir nedeni var. Buna cevap veren yazımı bulabilirsiniz.
namespace Application\FrontendBundle\Consumer;
use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderCreateConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;
public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);
try {
$this->log($body);
echo sprintf('Order create - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}
private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::CREATE);
$log->setMessage($message);
$this->entityManager->persist($log);
$this->entityManager->flush();
}
private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];
$this->logger->error(json_encode($data));
}
}
Eğer bir mesajın işlenmesi sırasında herhangi bir hata oluşur ise, aşağıdaki gibi bir kayıt oluşturulur.
# app/log/dev.log
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}} [] []
# Terminal
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}}
services:
application_frontend.consumer.order_create:
class: Application\FrontendBundle\Consumer\OrderCreateConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;
/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;
/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="order_log")
*/
class OrderLog
{
const CREATE = 'Create';
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="action", type="string", length=10)
*/
private $action;
/**
* @ORM\Column(name="message", type="json_array")
*/
private $message;
}
Araba siparişi verirken, aşağıdakine benzer bir istek göndeririz.
# POST http://rabbitmq.dev/app_dev.php/order/create
{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}
Terminalde bir tane consumer çalıştırıp durdurarak, RabbitMQ için gerekli olan tüm bileşenler (exchange, queue vs.) yaratılıp, birbirlerine bağlanmış olurlar. Sonuç olarak aşağıdaki diyagrama sahip olursunuz.
Sadece 2 tane istek gönderdim.
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 2 | inanzzz | bmw | 318 |
| 2 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
2 rows in set (0.00 sec)
mysql> SELECT * FROM order_log;
Empty set (0.00 sec)
Yukarıda da gördüğümüz gibi, sırada 2 tane mesaj bekliyor. Bir tane consumer çalıştırılması durumunda ne olacağını göreceğiz. RabbitMQ içindeki kırık olan bağlantıları silmezseniz, mesajlar işlenmeyebilirler.
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-02-07 15:32:13 ...
Order create - ID:2 @ 2016-02-07 15:32:33 ...
mysql> SELECT * FROM order_log;
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:28:33"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:28:34"} |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
2 rows in set (0.00 sec)
Yukarıda da gördüğümüz gibi, tüm mesajlar işlendi.
Sadece 5 tane istek gönderdim.
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 318 |
| 2 | inanzzz | bmw | 318 |
| 3 | inanzzz | bmw | 318 |
| 4 | inanzzz | bmw | 318 |
| 5 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
5 rows in set (0.00 sec)
mysql> SELECT * FROM order_log;
+----+--------+---------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+---------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:07"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:08"} |
| 3 | Create | {"order_id":3,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:09"} |
| 4 | Create | {"order_id":4,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:10"} |
| 5 | Create | {"order_id":5,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:11"} |
+----+--------+---------------------------------------------------------------------------------------------------------------+
5 rows in set (0.00 sec)
# Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-01-31 12:20:07 ...
Order create - ID:3 @ 2016-01-31 12:20:09 ...
Order create - ID:5 @ 2016-01-31 12:20:11 ...
# Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:2 @ 2016-01-31 12:20:08 ...
Order create - ID:4 @ 2016-01-31 12:20:10 ...
Yukarıda da gördüğümüz gibi, mesajların hepsi hiç bekletilmeden round-robin dispatching yöntemi ile işlendiler.