In this example we're going to create a new purchase order and take duplicated logs of it. It consists of 1 Producer & 1 Exchange & 2 Queue & N Worker & 2 Consumer. Such scenario would be ideal if you wanted to process same message in two different ways. This example currently duplicates order creation just for demonstration purposes to show how it could be done. Example uses RabbitMqBundle.

How it works

  1. Request - User places an order create request.

  2. Controller - Request gets validated and passed on to Service.

  3. Service - A new order gets inserted into database, an order create event is created and the response is returned to user. This is where application process is finished so no more waiting time for user.

  4. Event Dispatcher - Picks up the event and dispatches it.

  5. Event Listener - Picks up dispatched event and passes it on to Producer.

  6. Producer - Creates a message and puts it into queue.

  7. Consumer - If the worker is running in terminal, it catches the message and processes it. If the worker is not running yet, it would start consuming messages later on when the worker is started.


Install bundle

Install "oldsound/rabbitmq-bundle":"1.8.0" with composer and activate it with new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle() in AppKernel.php file.


rabbit_mq_port: 5672
rabbit_mq_user: guest
rabbit_mq_pswd: guest


host: %rabbit_mq_host%
port: %rabbit_mq_port%
user: %rabbit_mq_user%
password: %rabbit_mq_pswd%
vhost: /
lazy: true
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
queue_options: { name: 'order_create_1_qu' }
callback: application_frontend.consumer.order_create_1
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
queue_options: { name: 'order_create_2_qu' }
callback: application_frontend.consumer.order_create_2


You should normally validate the request and map it into a model class. There are a lot of examples about how it is done in this blog.

namespace Application\FrontendBundle\Controller;

use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

* @Route("order", service="application_frontend.controller.order")
class OrderController
private $orderService;

public function __construct(OrderService $orderService)
$this->orderService = $orderService;

* @param Request $request
* @Method({"POST"})
* @Route("/create")
* @return Response
public function createAction(Request $request)
$result = $this->orderService->create(json_decode($request->getContent(), true));

return new Response($result);


class: Application\FrontendBundle\Controller\OrderController
- @application_frontend.service.order


namespace Application\FrontendBundle\Service;

use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Event\OrderEvent;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class OrderService
private $entityManager;
private $eventDispatcher;

public function __construct(
EntityManagerInterface $entityManager,
EventDispatcherInterface $eventDispatcher
) {
$this->entityManager = $entityManager;
$this->eventDispatcher = $eventDispatcher;

* @param array $newOrder
* @return Order
public function create(array $newOrder)
$order = new Order();


$this->eventDispatcher->dispatch(OrderEvent::CREATE, new OrderEvent($order));

return $order->getId();


class: Application\FrontendBundle\Service\OrderService
- @doctrine.orm.entity_manager
- @event_dispatcher


namespace Application\FrontendBundle\Event;

use Application\FrontendBundle\Entity\Order;
use Symfony\Component\EventDispatcher\Event;

class OrderEvent extends Event
const CREATE = 'application_frontend.event.order_create';

private $order;

public function __construct(Order $order)
$this->order = $order;

public function getOrder()
return $this->order;


namespace Application\FrontendBundle\EventListener;

use Application\FrontendBundle\Event\OrderEvent;
use Application\FrontendBundle\Producer\OrderCreateProducer;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderListener
private $orderCreateProducer;

public function __construct(OrderCreateProducer $orderCreateProducer)
$this->orderCreateProducer = $orderCreateProducer;

public function onOrderCreate(OrderEvent $orderEvent)



class: Application\FrontendBundle\EventListener\OrderListener
- { name: kernel.event_listener, event: application_frontend.event.order_create, method: onOrderCreate }
- @application_frontend.producer.order_create


namespace Application\FrontendBundle\Producer;

use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;

class OrderCreateProducer
private $producer;

public function __construct(ProducerInterface $producer)
$this->producer = $producer;

public function add(Order $order)
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')



class: Application\FrontendBundle\Producer\OrderCreateProducer
- @old_sound_rabbit_mq.order_create_producer


There is a reason why we just log failed messages and do nothing about them. You can find the post related to the answer in this blog.

namespace Application\FrontendBundle\Consumer;

use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderCreate1Consumer implements ConsumerInterface
private $entityManager;
private $logger;

public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;

public function execute(AMQPMessage $message)
$body = json_decode($message->body, true);

try {

echo sprintf('Order create 1 - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
echo json_encode($message).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());

private function log($message)
$log = new OrderLog();


private function logError($message, $error)
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message



There is a reason why we just log failed messages and do nothing about them. You can find the post related to the answer in this blog.

namespace Application\FrontendBundle\Consumer;

use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderCreate2Consumer implements ConsumerInterface
private $entityManager;
private $logger;

public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;

public function execute(AMQPMessage $message)
$body = json_decode($message->body, true);

try {

echo sprintf('Order create 2 - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
echo json_encode($message).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());

private function log($message)
$log = new OrderLog();


private function logError($message, $error)
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message


If the message processing fails for some reason, log would have record below.

# app/log/dev.log
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreate1Consumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}} [] []

# Terminal
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreate1Consumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}}


class: Application\FrontendBundle\Consumer\OrderCreate1Consumer
- @doctrine.orm.entity_manager
- @logger

class: Application\FrontendBundle\Consumer\OrderCreate2Consumer
- @doctrine.orm.entity_manager
- @logger

Order entity

namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

* @ORM\Entity
* @ORM\Table(name="orders")
class Order
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
private $id;

* @ORM\Column(name="customer_name", type="string", length=50)
private $customerName;

* @ORM\Column(name="car_make", type="string", length=50)
private $carMake;

* @ORM\Column(name="car_model", type="string", length=50)
private $carModel;

OrderLog entity

namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

* @ORM\Entity
* @ORM\Table(name="order_log")
class OrderLog
const CREATE = 'Create';

* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
private $id;

* @ORM\Column(name="action", type="string", length=10)
private $action;

* @ORM\Column(name="message", type="json_array")
private $message;


This is a kind of payload we're going to use for testing purposes when creating new orders.


"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"

Run a consumers and stop them so that you have the RabbitMQ components (exchange, queue etc.) are ready and bound together otherwise messages will be lost. You should have a visualised diagram below.

Test 1: No consumer running

I created 2 new orders.

mysql> SELECT * FROM orders;
| id | customer_name | car_make | car_model |
| 1 | inanzzz | bmw | 318 |
| 2 | inanzzz | bmw | 318 |
2 rows in set (0.00 sec)

mysql> SELECT * FROM order_log;
Empty set (0.00 sec)

As you can see above, there are 2 messages waiting for create in each queues. I'm now going to run a consumer to show what happens. You might need to close dead connections otherwise messages won't be redelivered.

$ app/console rabbitmq:consumer -m 100 order_create_1
Order create 1 - ID:1 @ 2016-02-10 21:58:44 ...
Order create 1 - ID:2 @ 2016-02-10 21:58:44 ...

$ app/console rabbitmq:consumer -m 100 order_create_2
Order create 1 - ID:1 @ 2016-02-10 21:58:51 ...
Order create 1 - ID:2 @ 2016-02-10 21:58:51 ...

mysql> SELECT * FROM order_log;
| id | action | message |
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 21:46:39"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 21:46:55"} |
| 3 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 21:46:39"} |
| 4 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 21:46:55"} |
4 rows in set (0.00 sec)

As you can see above, all pending messages have been consumed.

Test 2: Two consumer running for each create consumers

I created 3 new orders.

mysql> SELECT * FROM orders;
| id | customer_name | car_make | car_model |
| 1 | inanzzz | bmw | 318 |
| 2 | inanzzz | bmw | 318 |
| 3 | inanzzz | bmw | 318 |
3 rows in set (0.00 sec)

mysql> SELECT * FROM order_log;
| id | action | message |
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 22:06:35"} |
| 2 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 22:06:35"} |
| 3 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 22:06:37"} |
| 4 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 22:06:37"} |
| 5 | Create | {"order_id":3,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 22:06:39"} |
| 6 | Create | {"order_id":3,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-10 22:06:39"} |
6 rows in set (0.00 sec)

# Create1 Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create_1
Order create 1 - ID:1 @ 2016-02-10 22:06:35 ...
Order create 1 - ID:3 @ 2016-02-10 22:06:39 ...

# Create1 Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create_1
Order create 1 - ID:2 @ 2016-02-10 22:06:37 ...

# Create2 Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create_2
Order create 2 - ID:1 @ 2016-02-10 22:06:35 ...
Order create 2 - ID:3 @ 2016-02-10 22:06:39 ...

# Create2 Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create_2
Order create 2 - ID:2 @ 2016-02-10 22:06:37 ...

As you can see above, all 3 create messages have been consumed right away by four consumers in round-robin dispatching way.

RabbitMQ Management


