If you need an answer from the server after calling it then we need to use RabbitMQ Remote Procedure Call (RPC). Example below does exactly that and uses RabbitMqBundle.



How it works


  1. Request - User places an order create request.

  2. Controller - Request gets validated and passed on to Service.

  3. Service - A new order gets inserted into database and order object model is passed to RPC Client.

  4. RPC Client - Sends order object model to RPC Server.

  5. RPC Server - Processes request and replies back to RPC Client.

  6. RPC Client - Sends the reply to Service.

  7. Service - Returns result to Controller.

  8. Controller - Responds to the user.

Configurations


Install bundles


Install "oldsound/rabbitmq-bundle":"1.8.0" & "jms/serializer-bundle":"1.1.0" with composer and activate them with new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle() & new JMS\SerializerBundle\JMSSerializerBundle() in AppKernel.php file.


Parameters.yml


parameters:
rabbitmq.host: 127.0.0.1
rabbitmq.port: 5672
rabbitmq.user: guest
rabbitmq.pswd: guest
rabbitmq.rpc.server: order_create_server

Config.yml


old_sound_rabbit_mq:
connections:
default:
host: %rabbitmq.host%
port: %rabbitmq.port%
user: %rabbitmq.user%
password: %rabbitmq.pswd%
vhost: /
lazy: true
rpc_clients:
order_create_client:
connection: default
expect_serialized_response: true
rpc_servers:
order_create_server:
connection: default
callback: application_frontend.server.order_create
qos_options: { prefetch_size: 0, prefetch_count: 1, global: false }
queue_options: { name: order_create_qu, durable: true, auto_delete: false }

OrderController


You should normally validate the request and map it into a model class. There are a lot of examples about how it is done in this blog.


namespace Application\FrontendBundle\Controller;

use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;

public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}

/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create")
*
* @return Response
*/
public function createAction(Request $request)
{
// Request should actually have been validated and mapped to OrderCreate model then sent to Service
$result = $this->orderService->create(json_decode($request->getContent(), true));

return new Response(json_encode($result));
}
}

Controllers.yml


services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order

OrderService


namespace Application\FrontendBundle\Service;

use Application\FrontendBundle\Client\OrderCreateClient;
use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Model\OrderCreate;
use Application\FrontendBundle\Model\OrderCreateResponse;
use Doctrine\ORM\EntityManagerInterface;

class OrderService
{
private $entityManager;
private $orderCreateClient;

public function __construct(
EntityManagerInterface $entityManager,
OrderCreateClient $orderCreateClient
) {
$this->entityManager = $entityManager;
$this->orderCreateClient = $orderCreateClient;
}

/**
* @param array $newOrder
*
* @return OrderCreateResponse
*/
public function create(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);

$this->entityManager->persist($order);
$this->entityManager->flush();

// This mapping could be done in controller while running request validation
$orderCreate = new OrderCreate();
$orderCreate->id = $order->getId();
$orderCreate->customerName = $newOrder['customer_name'];
$orderCreate->carMake = $newOrder['car_make'];
$orderCreate->carModel = $newOrder['car_model'];

return $this->orderCreateClient->call($orderCreate);
}
}

Services.yml


services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @application_frontend.client.order_create

OrderCreate Model


namespace Application\FrontendBundle\Model;

class OrderCreate
{
/**
* @var int
*/
public $id;

/**
* @var string
*/
public $customerName;

/**
* @var string
*/
public $carMake;

/**
* @var string
*/
public $carModel;
}

OrderCreateResponse Model


namespace Application\FrontendBundle\Model;

use JMS\Serializer\Annotation as Serializer;

class OrderCreateResponse
{
/**
* @var string
*
* @Serializer\Type("string")
*/
public $body;

/**
* @var array
*
* @Serializer\Type("array")
*/
public $errors = [];
}

OrderCreateException


namespace Application\FrontendBundle\Exception;

use RuntimeException;

class OrderCreateException extends RuntimeException
{
}

OrderCreateClient


namespace Application\FrontendBundle\Client;

use Application\FrontendBundle\Exception\OrderCreateException;
use Application\FrontendBundle\Model\OrderCreate;
use JMS\Serializer\SerializerInterface;
use OldSound\RabbitMqBundle\RabbitMq\RpcClient;
use PhpAmqpLib\Exception\AMQPTimeoutException;

class OrderCreateClient
{
private $rpcClient;
private $serializer;
private $server;
private $responseModelClass;
private $correlationId;

public function __construct(
RpcClient $rpcClient,
SerializerInterface $serializer,
$server,
$responseModelClass
) {
$this->rpcClient = $rpcClient;
$this->serializer = $serializer;
$this->server = $server;
$this->responseModelClass = $responseModelClass;
$this->correlationId = 'order_create_'.crc32(microtime());
}

public function call(OrderCreate $order)
{
$response = $this->doCall($order);

return $this->serializer->deserialize($response, $this->responseModelClass, 'json');
}

private function doCall(OrderCreate $order)
{
$this->rpcClient->addRequest(serialize($order), $this->server, $this->correlationId, null, 60);

try {
$reply = $this->rpcClient->getReplies();
} catch (AMQPTimeoutException $e) {
throw new OrderCreateException($e->getMessage());
}

if (!isset($reply[$this->correlationId])) {
throw new OrderCreateException(
sprintf('RPC call response does not contain correlation id [%].', $this->correlationId)
);
}

return $reply[$this->correlationId];
}
}

Clients.yml


services:
application_frontend.client.order_create:
class: Application\FrontendBundle\Client\OrderCreateClient
arguments:
- @old_sound_rabbit_mq.order_create_client_rpc
- @serializer
- %rabbitmq.rpc.server%
- 'Application\FrontendBundle\Model\OrderCreateResponse'

OrderCreateServer


namespace Application\FrontendBundle\Server;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
use Application\FrontendBundle\Model\OrderCreate;

class OrderCreateServer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$this->logger->info(json_encode($message));

/** @var OrderCreate $body */
$body = unserialize($message->body);
echo $body->id.PHP_EOL;
echo $body->customerName.PHP_EOL;
echo $body->carMake.PHP_EOL;
echo $body->carModel.PHP_EOL;

return json_encode([
'body' => json_encode($body),
'errors' => [
'Fake ID error',
'Fake Car Model error'
]
]);
}
}

Servers.yml


services:
application_frontend.server.order_create:
class: Application\FrontendBundle\Server\OrderCreateServer
arguments:
- @logger

Order entity


namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;

/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;

/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;

/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}

Tests


Run the server so that you have the RabbitMQ components (server, queue etc.) are ready and bound together otherwise messages will be lost. You should have a visualised diagram below.



This is the kind of payload we're going to use for testing purposes when creating new orders.


# POST http://rabbitmq.dev/app_dev.php/order/create

{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}

Database output


mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
1 row in set (0.00 sec)

Terminal output


$ app/console rabbitmq:rpc-server order_create_server
1
inanzzz
bmw
318

Log output


[2016-02-28 13:38:31] app.INFO: {"body":"O:44:\"Application\\FrontendBundle\\Model\\OrderCreate\":4:{s:2:\"id\";i:1;s:12:\"customerName\";s:7:\"inanzzz\";s:7:\"carMake\";s:3:\"bmw\";s:8:\"carModel\";s:3:\"318\";}","body_size":"152","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS_Inans-MBP.default_50239":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS_Inans-MBP.default_50239","delivery_tag":"1","redelivered":false,"exchange":"order_create_server","routing_key":""}} [] []
1 row in set (0.00 sec)

User response


200 OK

{
"body": "{\"id\":1,\"customerName\":\"inanzzz\",\"carMake\":\"bmw\",\"carModel\":\"318\"}",
"errors": [
"Fake ID error",
"Fake Car Model error"
]
}

RabbitMQ Management


Exchange




Queue




Mapping