13/02/2016 - RABBITMQ, SYMFONY
In this example we're going to create a new purchase order for "bmw", "audi", "mercedes" and take logs of it. Orders will be coming from two different producers. It consists of 2 Producer & 1 Exchange & 2 Queue & N Worker & 2 Consumer. In such scenario, "bmw" orders get treated differently compared to "audi" and "mercedes" orders which get treated in same way. Example uses RabbitMqBundle.
Install "oldsound/rabbitmq-bundle":"1.8.0"
with composer and activate it with new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle()
in AppKernel.php file.
parameters:
rabbit_mq_host: 127.0.0.1
rabbit_mq_port: 5672
rabbit_mq_user: guest
rabbit_mq_pswd: guest
old_sound_rabbit_mq:
connections:
default:
host: %rabbitmq.host%
port: %rabbitmq.port%
user: %rabbitmq.user%
password: %rabbitmq.pswd%
vhost: /
lazy: true
producers:
order_create_bmw:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_bmw_qu
routing_keys:
- bmw
order_create_audi_mercedes:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_audi_mercedes_qu
routing_keys:
- audi
- mercedes
consumers:
order_create_bmw:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_bmw_qu
routing_keys:
- bmw
callback: application_frontend.consumer.order_create_bmw
order_create_audi_mercedes:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_audi_mercedes_qu
routing_keys:
- audi
- mercedes
callback: application_frontend.consumer.order_create_audi_mercedes
You should normally validate the request and map it into a model class. There are a lot of examples about how it is done in this blog.
namespace Application\FrontendBundle\Controller;
use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;
public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}
/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create/bmw")
*
* @return Response
*/
public function createBmwAction(Request $request)
{
$result = $this->orderService->createBmw(json_decode($request->getContent(), true));
return new Response($result);
}
/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create/audi_mercedes")
*
* @return Response
*/
public function createAudiMercedesAction(Request $request)
{
$result = $this->orderService->createAudiMercedes(json_decode($request->getContent(), true));
return new Response($result);
}
}
services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order
namespace Application\FrontendBundle\Service;
use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Event\OrderEvent;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class OrderService
{
private $entityManager;
private $eventDispatcher;
public function __construct(
EntityManagerInterface $entityManager,
EventDispatcherInterface $eventDispatcher
) {
$this->entityManager = $entityManager;
$this->eventDispatcher = $eventDispatcher;
}
/**
* @param array $newOrder
*
* @return Order
*/
public function createBmw(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);
$this->entityManager->persist($order);
$this->entityManager->flush();
$this->eventDispatcher->dispatch(OrderEvent::CREATE_BMW, new OrderEvent($order));
return $order->getId();
}
/**
* @param array $newOrder
*
* @return Order
*/
public function createAudiMercedes(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);
$this->entityManager->persist($order);
$this->entityManager->flush();
$this->eventDispatcher->dispatch(OrderEvent::CREATE_AUDI_MERCEDES, new OrderEvent($order));
return $order->getId();
}
}
services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @event_dispatcher
namespace Application\FrontendBundle\Event;
use Application\FrontendBundle\Entity\Order;
use Symfony\Component\EventDispatcher\Event;
class OrderEvent extends Event
{
const CREATE_BMW = 'application_frontend.event.order_create_bmw';
const CREATE_AUDI_MERCEDES = 'application_frontend.event.order_create_audi_mercedes';
private $order;
public function __construct(Order $order)
{
$this->order = $order;
}
public function getOrder()
{
return $this->order;
}
}
namespace Application\FrontendBundle\EventListener;
use Application\FrontendBundle\Event\OrderEvent;
use Application\FrontendBundle\Producer\OrderCreateAudiMercedesProducer;
use Application\FrontendBundle\Producer\OrderCreateBmwProducer;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderListener
{
private $orderCreateBmwProducer;
private $orderCreateAudiMercedesProducer;
public function __construct(
OrderCreateBmwProducer $orderCreateBmwProducer,
OrderCreateAudiMercedesProducer $orderCreateAudiMercedesProducer
) {
$this->orderCreateBmwProducer = $orderCreateBmwProducer;
$this->orderCreateAudiMercedesProducer = $orderCreateAudiMercedesProducer;
}
public function onOrderCreateBmw(OrderEvent $orderEvent)
{
$this->orderCreateBmwProducer->add($orderEvent->getOrder());
$orderEvent->stopPropagation();
}
public function onOrderCreateAudiMercedes(OrderEvent $orderEvent)
{
$this->orderCreateAudiMercedesProducer->add($orderEvent->getOrder());
$orderEvent->stopPropagation();
}
}
services:
application_frontend.event_listener.order:
class: Application\FrontendBundle\EventListener\OrderListener
tags:
- { name: kernel.event_listener, event: application_frontend.event.order_create_bmw, method: onOrderCreateBmw }
- { name: kernel.event_listener, event: application_frontend.event.order_create_audi_mercedes, method: onOrderCreateAudiMercedes }
arguments:
- @application_frontend.producer.order_create_bmw
- @application_frontend.producer.order_create_audi_mercedes
namespace Application\FrontendBundle\Producer;
use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class OrderCreateBmwProducer
{
private $producer;
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];
$this->producer->publish(json_encode($message), $message['car_make']);
}
}
namespace Application\FrontendBundle\Producer;
use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
class OrderCreateAudiMercedesProducer
{
private $producer;
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];
$this->producer->publish(json_encode($message), $message['car_make']);
}
}
services:
application_frontend.producer.order_create_bmw:
class: Application\FrontendBundle\Producer\OrderCreateBmwProducer
arguments:
- @old_sound_rabbit_mq.order_create_bmw_producer
application_frontend.producer.order_create_audi_mercedes:
class: Application\FrontendBundle\Producer\OrderCreateAudiMercedesProducer
arguments:
- @old_sound_rabbit_mq.order_create_audi_mercedes_producer
There is a reason why we just log failed messages and do nothing about them. You can find the post related to the answer in this blog.
namespace Application\FrontendBundle\Consumer;
use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderCreateBmwConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;
public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);
try {
$this->log($body);
echo sprintf('Order create - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
echo json_encode($message).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}
private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::CREATE.' '.$message['car_make']);
$log->setMessage($message);
$this->entityManager->persist($log);
$this->entityManager->flush();
}
private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];
$this->logger->error(json_encode($data));
}
}
There is a reason why we just log failed messages and do nothing about them. You can find the post related to the answer in this blog.
namespace Application\FrontendBundle\Consumer;
use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class OrderCreateAudiMercedesConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;
public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);
try {
$this->log($body);
echo sprintf('Order create - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
echo json_encode($message).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}
private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::CREATE.' '.$message['car_make']);
$log->setMessage($message);
$this->entityManager->persist($log);
$this->entityManager->flush();
}
private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];
$this->logger->error(json_encode($data));
}
}
If the message processing fails for some reason, log would have record below.
# app/log/dev.log
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateBmwConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":"bmw"}}} [] []
# Terminal
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateBmwConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":"bmw"}}}
services:
application_frontend.consumer.order_create_bmw:
class: Application\FrontendBundle\Consumer\OrderCreateBmwConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger
application_frontend.consumer.order_create_audi_mercedes:
class: Application\FrontendBundle\Consumer\OrderCreateAudiMercedesConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;
/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;
/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="order_log")
*/
class OrderLog
{
const CREATE = 'Create';
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="action", type="string", length=20)
*/
private $action;
/**
* @ORM\Column(name="message", type="json_array")
*/
private $message;
}
These are the kind of payloads we're going to use for testing purposes when creating new orders.
# POST http://rabbitmq.dev/app_dev.php/order/create/bmw
{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}
# POST http://rabbitmq.dev/app_dev.php/order/create/audi_mercedes
{
"customer_name": "inanzzz",
"car_make": "audi",
"car_model": "a3"
}
{
"customer_name": "inanzzz",
"car_make": "mercedes",
"car_model": "sl500"
}
Run a consumers and stop them so that you have the RabbitMQ components (exchange, queue etc.) are ready and bound together otherwise messages will be lost. You should have a visualised diagram below.
I created 3 new orders.
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 3.18 |
| 2 | inanzzz | audi | a3 |
| 3 | inanzzz | mercedes | sl500 |
+----+---------------+----------+-----------+
3 rows in set (0.00 sec)
mysql> SELECT * FROM order_log;
Empty set (0.00 sec)
As you can see above, there is 1 message waiting in "bmw" queue and 2 messages in "audi_mercedes" queue. I'm now going to run a consumer to show what happens. You might need to close dead connections otherwise messages won't be redelivered.
$ app/console rabbitmq:consumer -m 100 order_create_bmw
Order create - ID:1 @ 2016-02-13 12:19:41 ...
$ app/console rabbitmq:consumer -m 100 order_create_audi_mercedes
Order create - ID:2 @ 2016-02-13 12:19:55 ...
Order create - ID:3 @ 2016-02-13 12:19:59 ...
mysql> SELECT * FROM order_log;
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
| 1 | Create bmw | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"3.18","timestamp":"2016-02-13 12:19:41"} |
| 2 | Create audi | {"order_id":2,"customer_name":"inanzzz","car_make":"audi","car_model":"a3","timestamp":"2016-02-13 12:19:55"} |
| 3 | Create mercedes | {"order_id":3,"customer_name":"inanzzz","car_make":"mercedes","car_model":"sl500","timestamp":"2016-02-13 12:19:59"} |
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.00 sec)
As you can see above, all pending messages have been consumed.
I created 3 new "bmw" and 2 "audi/mercedes" orders.
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 3.18 |
| 2 | inanzzz | bmw | 3.20 |
| 3 | inanzzz | bmw | 5.00 |
| 4 | inanzzz | audi | a3 |
| 5 | inanzzz | mercedes | sl500 |
+----+---------------+----------+-----------+
5 rows in set (0.00 sec)
mysql> SELECT * FROM order_log;
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
| 1 | Create bmw | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"3.18","timestamp":"2016-02-13 13:01:12"} |
| 2 | Create bmw | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"3.20","timestamp":"2016-02-13 13:01:20"} |
| 3 | Create bmw | {"order_id":3,"customer_name":"inanzzz","car_make":"bmw","car_model":"5.00","timestamp":"2016-02-13 13:01:28"} |
| 4 | Create audi | {"order_id":4,"customer_name":"inanzzz","car_make":"audi","car_model":"a3","timestamp":"2016-02-13 13:01:37"} |
| 5 | Create mercedes | {"order_id":5,"customer_name":"inanzzz","car_make":"mercedes","car_model":"sl500","timestamp":"2016-02-13 13:01:47"} |
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
5 rows in set (0.00 sec)
# Create bmw Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create_bmw
Order create - ID:1 @ 2016-02-13 13:01:12 ...
Order create - ID:3 @ 2016-02-13 13:01:28 ...
# Create bmw Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create_bmw
Order create - ID:2 @ 2016-02-13 13:01:20 ...
# Create audi/mercedes Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create_audi_mercedes
Order create - ID:4 @ 2016-02-13 13:01:37 ...
# Create audi/mercedes Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create_audi_mercedes
Order create - ID:5 @ 2016-02-13 13:01:47 ...
As you can see above, all 5 create messages have been consumed right away by four consumers in round-robin dispatching way.