Hello everyone!

We have been investing plenty of personal time and energy for many years to share our knowledge with you all. However, we now need your help to keep this blog running. All you have to do is just click one of the adverts on the site, otherwise it will sadly be taken down due to hosting etc. costs. Thank you.

In this example we're going to create a car purchase order and take a log of it. It consists of 1 Producer & 1 Exchange & 1 Queue & N Worker & 1 Consumer. Example relies on RabbitMqBundle.


In the image below, RK stands for "routing_key" and BK stands for "binding_key".



How it works


  1. Request - User places an order create request.

  2. Controller - Request gets validated and passed on to Service.

  3. Service - New order gets inserted into database, an order create event is created and the response is returned to user. This is where application process is finished so no more waiting time for user.

  4. Event Dispatcher - Picks up the event and dispatches it.

  5. Event Listener - Picks up dispatched event and passes it on to Producer.

  6. Producer - Creates a message and puts it into "order_create" queue.

  7. Consumer - If the worker is running in terminal, it catches the message from "order_create" and processes it. If the worker is not running yet, it would start consuming messages later on when the worker is started.


Configurations


Install bundle


Install "oldsound/rabbitmq-bundle":"1.8.0" with composer and activate it with new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle() in AppKernel.php file.


Parameters.yml


parameters:
rabbitmq.host: 127.0.0.1
rabbitmq.port: 5672
rabbitmq.user: guest
rabbitmq.pswd: guest
rabbitmq.queue.order_create.routing_keys:
- order_create

Config.yml


old_sound_rabbit_mq:
connections:
default:
host: %rabbitmq.host%
port: %rabbitmq.port%
user: %rabbitmq.user%
password: %rabbitmq.pswd%
vhost: /
lazy: true
producers:
order_create:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_qu
routing_keys: %rabbitmq.queue.order_create.routing_keys%
consumers:
order_create:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_qu
routing_keys: %rabbitmq.queue.order_create.routing_keys%
callback: application_frontend.consumer.order_create

OrderController


You should normally validate the request and map it into a model class. There are a lot of examples about how it is done in this blog.


namespace Application\FrontendBundle\Controller;

use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;

public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}

/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create")
*
* @return Response
*/
public function createAction(Request $request)
{
$result = $this->orderService->create(json_decode($request->getContent(), true));

return new Response($result);
}
}

Controllers.yml


services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order

OrderService


namespace Application\FrontendBundle\Service;

use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Event\OrderEvent;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class OrderService
{
private $entityManager;
private $eventDispatcher;

public function __construct(
EntityManagerInterface $entityManager,
EventDispatcherInterface $eventDispatcher
) {
$this->entityManager = $entityManager;
$this->eventDispatcher = $eventDispatcher;
}

/**
* @param array $newOrder
*
* @return Order
*/
public function create(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);

$this->entityManager->persist($order);
$this->entityManager->flush();

$this->eventDispatcher->dispatch(OrderEvent::CREATE, new OrderEvent($order));

return $order->getId();
}
}

Services.yml


services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @event_dispatcher

OrderEvent


namespace Application\FrontendBundle\Event;

use Application\FrontendBundle\Entity\Order;
use Symfony\Component\EventDispatcher\Event;

class OrderEvent extends Event
{
const CREATE = 'application_frontend.event.order_create';

private $order;

public function __construct(Order $order)
{
$this->order = $order;
}

public function getOrder()
{
return $this->order;
}
}

OrderListener


namespace Application\FrontendBundle\EventListener;

use Application\FrontendBundle\Event\OrderEvent;
use Application\FrontendBundle\Producer\OrderCreateProducer;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderListener
{
private $orderCreateProducer;

public function __construct(
OrderCreateProducer $orderCreateProducer
) {
$this->orderCreateProducer = $orderCreateProducer;
}

public function onOrderCreate(OrderEvent $orderEvent)
{
$this->orderCreateProducer->add($orderEvent->getOrder());

$orderEvent->stopPropagation();
}
}

Listeners.yml


services:
application_frontend.event_listener.order:
class: Application\FrontendBundle\EventListener\OrderListener
tags:
- { name: kernel.event_listener, event: application_frontend.event.order_create, method: onOrderCreate }
arguments:
- @application_frontend.producer.order_create

OrderCreateProducer


namespace Application\FrontendBundle\Producer;

use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;

class OrderCreateProducer
{
private $producer;
private $routingKeys;

public function __construct(ProducerInterface $producer, array $routingKeys)
{
$this->producer = $producer;
$this->routingKeys = $routingKeys;
}

public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];

$this->producer->publish(json_encode($message), $this->routingKeys[0]);
}
}

Producers.yml


services:
application_frontend.producer.order_create:
class: Application\FrontendBundle\Producer\OrderCreateProducer
arguments:
- @old_sound_rabbit_mq.order_create_producer
- %rabbitmq.queue.order_create.routing_keys%

OrderCreateConsumer


There is a reason why we just log failed messages and do nothing about them. You can find the post related to the answer in this blog.


namespace Application\FrontendBundle\Consumer;

use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderCreateConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;

public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);

try {
$this->log($body);

echo sprintf('Order create - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}

private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::CREATE);
$log->setMessage($message);

$this->entityManager->persist($log);
$this->entityManager->flush();
}

private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];

$this->logger->error(json_encode($data));
}
}

If the message processing fails for some reason, log would have record below.


# app/log/dev.log
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":"order_create"}}} [] []

# Terminal
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":"order_create"}}}

Consumers.yml


services:
application_frontend.consumer.order_create:
class: Application\FrontendBundle\Consumer\OrderCreateConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger

Order entity


namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;

/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;

/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;

/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}

OrderLog entity


namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

/**
* @ORM\Entity
* @ORM\Table(name="order_log")
*/
class OrderLog
{
const CREATE = 'Create';

/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;

/**
* @ORM\Column(name="action", type="string", length=10)
*/
private $action;

/**
* @ORM\Column(name="message", type="json_array")
*/
private $message;
}

Tests


This is the kind of payload we're going to use for testing purposes when creating new orders.


# POST http://rabbitmq.dev/app_dev.php/order/create

{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}

Run a consumer and stop it so that you have the RabbitMQ components (exchange, queue etc.) are ready and bound together otherwise messages will be lost. You should have a visualised diagram below.



Test 1: No consumer running


I created 2 new orders.


mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 2 | inanzzz | bmw | 318 |
| 2 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
2 rows in set (0.00 sec)

mysql> SELECT * FROM order_log;
Empty set (0.00 sec)


As you can see above, there are 2 queued messages waiting. I'm now going to run a consumer to show what happens. You might need to close dead connections otherwise messages won't be redelivered.


$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-02-07 15:32:13 ...
Order create - ID:2 @ 2016-02-07 15:32:33 ...

mysql> SELECT * FROM order_log;
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:28:33"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:28:34"} |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
2 rows in set (0.00 sec)


As you can see above, all pending messages have been consumed.


Test 2: Two consumer running


I created 5 new orders.


mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 318 |
| 2 | inanzzz | bmw | 318 |
| 3 | inanzzz | bmw | 318 |
| 4 | inanzzz | bmw | 318 |
| 5 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
5 rows in set (0.00 sec)

mysql> SELECT * FROM order_log;
+----+--------+---------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+---------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:07"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:08"} |
| 3 | Create | {"order_id":3,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:09"} |
| 4 | Create | {"order_id":4,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:10"} |
| 5 | Create | {"order_id":5,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:11"} |
+----+--------+---------------------------------------------------------------------------------------------------------------+
5 rows in set (0.00 sec)


# Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-01-31 12:20:07 ...
Order create - ID:3 @ 2016-01-31 12:20:09 ...
Order create - ID:5 @ 2016-01-31 12:20:11 ...

# Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:2 @ 2016-01-31 12:20:08 ...
Order create - ID:4 @ 2016-01-31 12:20:10 ...

As you can see above, all 5 messages have been consumed right away by two consumers in round-robin dispatching way.


RabbitMQ Management


Exchange




Queue




Mapping