31/01/2016 - RABBITMQ, SYMFONY
In this example we're going to create a new purchase order, update existing one and take a log of both processes. It consists of 2 Producer & 2 Exchange & 2 Queue & N Worker & 2 Consumer. If you had used only one shared exchange here such example, order "create" and "update" messages would have gone to both queues which would break the system. Example uses RabbitMqBundle.
Install "oldsound/rabbitmq-bundle":"1.8.0"
with composer and activate it with new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle()
in AppKernel.php file.
parameters:
rabbit_mq_host: 127.0.0.1
rabbit_mq_port: 5672
rabbit_mq_user: guest
rabbit_mq_pswd: guest
old_sound_rabbit_mq:
connections:
default:
host: %rabbit_mq_host%
port: %rabbit_mq_port%
user: %rabbit_mq_user%
password: %rabbit_mq_pswd%
vhost: /
lazy: true
producers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
order_update:
connection: default
exchange_options: { name: 'order_update_ex', type: fanout }
consumers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
queue_options: { name: 'order_create_qu' }
callback: application_frontend.consumer.order_create
order_update:
connection: default
exchange_options: { name: 'order_update_ex', type: fanout }
queue_options: { name: 'order_update_qu' }
callback: application_frontend.consumer.order_update
You should normally validate the request and map it into a model class. There are a lot of examples about how it is done in this blog.
namespace Application\FrontendBundle\Controller;
use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;
public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}
/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create")
*
* @return Response
*/
public function createAction(Request $request)
{
$result = $this->orderService->create(json_decode($request->getContent(), true));
return new Response($result);
}
/**
* @param Request $request
*
* @Method({"PATCH"})
* @Route("/update")
*
* @return Response
*/
public function updateAction(Request $request)
{
$result = $this->orderService->update(json_decode($request->getContent(), true));
return new Response($result);
}
}
services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order
namespace Application\FrontendBundle\Service;
use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Event\OrderEvent;
use Doctrine\Common\Persistence\ObjectRepository;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class OrderService
{
private $entityManager;
private $eventDispatcher;
public function __construct(
EntityManagerInterface $entityManager,
EventDispatcherInterface $eventDispatcher
) {
$this->entityManager = $entityManager;
$this->eventDispatcher = $eventDispatcher;
}
/**
* @param array $newOrder
*
* @return Order
*/
public function create(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);
$this->entityManager->persist($order);
$this->entityManager->flush();
$this->eventDispatcher->dispatch(OrderEvent::CREATE, new OrderEvent($order));
return $order->getId();
}
/**
* @param array $newOrder
*
* @return Order
*/
public function update(array $newOrder)
{
/** @var Order $order */
$order = $this->getOrderRepository()->findOneBy(['id' => $newOrder['order_id']]);
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);
$this->entityManager->flush();
$this->eventDispatcher->dispatch(OrderEvent::UPDATE, new OrderEvent($order));
return $order->getId();
}
/**
* @return ObjectRepository
*/
private function getOrderRepository()
{
return $this->entityManager->getRepository('Application\FrontendBundle\Entity\Order');
}
}
services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @event_dispatcher
namespace Application\FrontendBundle\Event;
use Application\FrontendBundle\Entity\Order;
use Symfony\Component\EventDispatcher\Event;
class OrderEvent extends Event
{
const CREATE = 'application_frontend.event.order_create';
const UPDATE = 'application_frontend.event.order_update';
private $order;
public function __construct(Order $order)
{
$this->order = $order;
}
public function getOrder()
{
return $this->order;
}
}
namespace Application\FrontendBundle\EventListener;
use Application\FrontendBundle\Event\OrderEvent;
use Application\FrontendBundle\Producer\OrderCreateProducer;
use Application\FrontendBundle\Producer\OrderUpdateProducer;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderListener
{
private $orderCreateProducer;
private $orderUpdateProducer;
public function __construct(
OrderCreateProducer $orderCreateProducer,
OrderUpdateProducer $orderUpdateProducer
) {
$this->orderCreateProducer = $orderCreateProducer;
$this->orderUpdateProducer = $orderUpdateProducer;
}
public function onOrderCreate(OrderEvent $orderEvent)
{
$this->orderCreateProducer->add($orderEvent->getOrder());
$orderEvent->stopPropagation();
}
public function onOrderUpdate(OrderEvent $orderEvent)
{
$this->orderUpdateProducer->add($orderEvent->getOrder());
$orderEvent->stopPropagation();
}
}
services:
application_frontend.event_listener.order:
class: Application\FrontendBundle\EventListener\OrderListener
tags:
- { name: kernel.event_listener, event: application_frontend.event.order_create, method: onOrderCreate }
- { name: kernel.event_listener, event: application_frontend.event.order_update, method: onOrderUpdate }
arguments:
- @application_frontend.producer.order_create
- @application_frontend.producer.order_update
namespace Application\FrontendBundle\Producer;
use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class OrderCreateProducer
{
private $producer;
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];
$this->producer->publish(json_encode($message));
}
}
namespace Application\FrontendBundle\Producer;
use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class OrderUpdateProducer
{
private $producer;
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];
$this->producer->publish(json_encode($message));
}
}
services:
application_frontend.producer.order_create:
class: Application\FrontendBundle\Producer\OrderCreateProducer
arguments:
- @old_sound_rabbit_mq.order_create_producer
application_frontend.producer.order_update:
class: Application\FrontendBundle\Producer\OrderUpdateProducer
arguments:
- @old_sound_rabbit_mq.order_update_producer
There is a reason why we just log failed messages and do nothing about them. You can find the post related to the answer in this blog.
namespace Application\FrontendBundle\Consumer;
use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderCreateConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;
public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);
try {
$this->log($body);
echo sprintf('Order create - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}
private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::CREATE);
$log->setMessage($message);
$this->entityManager->persist($log);
$this->entityManager->flush();
}
private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];
$this->logger->error(json_encode($data));
}
}
There is a reason why we just log failed messages and do nothing about them. You can find the post related to the answer in this blog.
namespace Application\FrontendBundle\Consumer;
use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderUpdateConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;
public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);
try {
$this->log($body);
echo sprintf('Order update - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}
private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::UPDATE);
$log->setMessage($message);
$this->entityManager->persist($log);
$this->entityManager->flush();
}
private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];
$this->logger->error(json_encode($data));
}
}
If the message processing fails for some reason, log would have record below.
# app/log/dev.log
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}} [] []
# Terminal
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}}
services:
application_frontend.consumer.order_create:
class: Application\FrontendBundle\Consumer\OrderCreateConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger
application_frontend.consumer.order_update:
class: Application\FrontendBundle\Consumer\OrderUpdateConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;
/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;
/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="order_log")
*/
class OrderLog
{
const CREATE = 'Create';
const UPDATE = 'Update';
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="action", type="string", length=10)
*/
private $action;
/**
* @ORM\Column(name="message", type="json_array")
*/
private $message;
}
These are the kind of payloads we're going to use for testing purposes when creating new orders or updating existing ones.
# POST http://rabbitmq.dev/app_dev.php/order/create
{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}
# PATCH http://rabbitmq.dev/app_dev.php/order/update
{
"order_id": "1",
"customer_name": "inanzzz",
"car_make": "mercedes",
"car_model": "500sel"
}
Run a consumers and stop them so that you have the RabbitMQ components (exchange, queue etc.) are ready and bound together otherwise messages will be lost. You should have a visualised diagram below.
I created 2 new orders and send 1 update request.
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | mercedes | 500sel |
| 2 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
2 rows in set (0.00 sec)
mysql> SELECT * FROM order_log;
Empty set (0.00 sec)
As you can see above, there are 2 queued messages waiting for create and 1 for update. I'm now going to run a consumer to show what happens. You might need to close dead connections otherwise messages won't be redelivered.
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-02-07 15:32:13 ...
Order create - ID:2 @ 2016-02-07 15:32:33 ...
$ app/console rabbitmq:consumer -m 100 order_update
Order update - ID:1 @ 2016-02-07 15:33:18 ...
mysql> SELECT * FROM order_log;
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:32:13"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:32:33"} |
| 3 | Update | {"order_id":1,"customer_name":"inanzzz","car_make":"mercedes","car_model":"500sel","timestamp":"2016-02-07 15:33:18"} |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.00 sec)
As you can see above, all pending messages have been consumed.
I created 5 new orders and send 2 update requests.
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | mercedes | 500sel |
| 2 | inanzzz | bmw | 318 |
| 3 | inanzzz | mercedes | 500sel |
| 4 | inanzzz | bmw | 318 |
| 5 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
5 rows in set (0.00 sec)
mysql> SELECT * FROM order_log;
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:47"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:48"} |
| 3 | Create | {"order_id":3,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:49"} |
| 4 | Create | {"order_id":4,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:50"} |
| 5 | Create | {"order_id":5,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 16:11:51"} |
| 6 | Update | {"order_id":1,"customer_name":"inanzzz","car_make":"mercedes","car_model":"500sel","timestamp":"2016-02-07 16:11:57"} |
| 7 | Update | {"order_id":3,"customer_name":"inanzzz","car_make":"mercedes","car_model":"500sel","timestamp":"2016-02-07 16:12:02"} |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
7 rows in set (0.00 sec)
# Create Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-02-07 16:11:47 ...
Order create - ID:3 @ 2016-02-07 16:11:49 ...
Order create - ID:5 @ 2016-02-07 16:11:51 ...
# Create Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:2 @ 2016-02-07 16:11:48 ...
Order create - ID:4 @ 2016-02-07 16:11:50 ...
# Update Consumer 1
$ app/console rabbitmq:consumer -m 100 order_update
Order update - ID:1 @ 2016-02-07 16:11:57 ...
# Update Consumer 2
$ app/console rabbitmq:consumer -m 100 order_update
Order update - ID:3 @ 2016-02-07 16:12:02 ...
As you can see above, all 5 create and 2 update messages have been consumed right away by two each consumers in round-robin dispatching way.