28/02/2016 - RABBITMQ, SYMFONY
If you need an answer from the server after calling it then we need to use RabbitMQ Remote Procedure Call (RPC). Example below does exactly that and uses RabbitMqBundle.
Install "oldsound/rabbitmq-bundle":"1.8.0"
& "jms/serializer-bundle":"1.1.0"
with composer and activate them with new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle()
& new JMS\SerializerBundle\JMSSerializerBundle()
in AppKernel.php file.
parameters:
rabbitmq.host: 127.0.0.1
rabbitmq.port: 5672
rabbitmq.user: guest
rabbitmq.pswd: guest
rabbitmq.rpc.server: order_create_server
old_sound_rabbit_mq:
connections:
default:
host: %rabbitmq.host%
port: %rabbitmq.port%
user: %rabbitmq.user%
password: %rabbitmq.pswd%
vhost: /
lazy: true
rpc_clients:
order_create_client:
connection: default
expect_serialized_response: true
rpc_servers:
order_create_server:
connection: default
callback: application_frontend.server.order_create
qos_options: { prefetch_size: 0, prefetch_count: 1, global: false }
queue_options: { name: order_create_qu, durable: true, auto_delete: false }
You should normally validate the request and map it into a model class. There are a lot of examples about how it is done in this blog.
namespace Application\FrontendBundle\Controller;
use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;
public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}
/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create")
*
* @return Response
*/
public function createAction(Request $request)
{
// Request should actually have been validated and mapped to OrderCreate model then sent to Service
$result = $this->orderService->create(json_decode($request->getContent(), true));
return new Response(json_encode($result));
}
}
services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order
namespace Application\FrontendBundle\Service;
use Application\FrontendBundle\Client\OrderCreateClient;
use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Model\OrderCreate;
use Application\FrontendBundle\Model\OrderCreateResponse;
use Doctrine\ORM\EntityManagerInterface;
class OrderService
{
private $entityManager;
private $orderCreateClient;
public function __construct(
EntityManagerInterface $entityManager,
OrderCreateClient $orderCreateClient
) {
$this->entityManager = $entityManager;
$this->orderCreateClient = $orderCreateClient;
}
/**
* @param array $newOrder
*
* @return OrderCreateResponse
*/
public function create(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);
$this->entityManager->persist($order);
$this->entityManager->flush();
// This mapping could be done in controller while running request validation
$orderCreate = new OrderCreate();
$orderCreate->id = $order->getId();
$orderCreate->customerName = $newOrder['customer_name'];
$orderCreate->carMake = $newOrder['car_make'];
$orderCreate->carModel = $newOrder['car_model'];
return $this->orderCreateClient->call($orderCreate);
}
}
services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @application_frontend.client.order_create
namespace Application\FrontendBundle\Model;
class OrderCreate
{
/**
* @var int
*/
public $id;
/**
* @var string
*/
public $customerName;
/**
* @var string
*/
public $carMake;
/**
* @var string
*/
public $carModel;
}
namespace Application\FrontendBundle\Model;
use JMS\Serializer\Annotation as Serializer;
class OrderCreateResponse
{
/**
* @var string
*
* @Serializer\Type("string")
*/
public $body;
/**
* @var array
*
* @Serializer\Type("array")
*/
public $errors = [];
}
namespace Application\FrontendBundle\Exception;
use RuntimeException;
class OrderCreateException extends RuntimeException
{
}
namespace Application\FrontendBundle\Client;
use Application\FrontendBundle\Exception\OrderCreateException;
use Application\FrontendBundle\Model\OrderCreate;
use JMS\Serializer\SerializerInterface;
use OldSound\RabbitMqBundle\RabbitMq\RpcClient;
use PhpAmqpLib\Exception\AMQPTimeoutException;
class OrderCreateClient
{
private $rpcClient;
private $serializer;
private $server;
private $responseModelClass;
private $correlationId;
public function __construct(
RpcClient $rpcClient,
SerializerInterface $serializer,
$server,
$responseModelClass
) {
$this->rpcClient = $rpcClient;
$this->serializer = $serializer;
$this->server = $server;
$this->responseModelClass = $responseModelClass;
$this->correlationId = 'order_create_'.crc32(microtime());
}
public function call(OrderCreate $order)
{
$response = $this->doCall($order);
return $this->serializer->deserialize($response, $this->responseModelClass, 'json');
}
private function doCall(OrderCreate $order)
{
$this->rpcClient->addRequest(serialize($order), $this->server, $this->correlationId, null, 60);
try {
$reply = $this->rpcClient->getReplies();
} catch (AMQPTimeoutException $e) {
throw new OrderCreateException($e->getMessage());
}
if (!isset($reply[$this->correlationId])) {
throw new OrderCreateException(
sprintf('RPC call response does not contain correlation id [%].', $this->correlationId)
);
}
return $reply[$this->correlationId];
}
}
services:
application_frontend.client.order_create:
class: Application\FrontendBundle\Client\OrderCreateClient
arguments:
- @old_sound_rabbit_mq.order_create_client_rpc
- @serializer
- %rabbitmq.rpc.server%
- 'Application\FrontendBundle\Model\OrderCreateResponse'
namespace Application\FrontendBundle\Server;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
use Application\FrontendBundle\Model\OrderCreate;
class OrderCreateServer implements ConsumerInterface
{
private $logger;
public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$this->logger->info(json_encode($message));
/** @var OrderCreate $body */
$body = unserialize($message->body);
echo $body->id.PHP_EOL;
echo $body->customerName.PHP_EOL;
echo $body->carMake.PHP_EOL;
echo $body->carModel.PHP_EOL;
return json_encode([
'body' => json_encode($body),
'errors' => [
'Fake ID error',
'Fake Car Model error'
]
]);
}
}
services:
application_frontend.server.order_create:
class: Application\FrontendBundle\Server\OrderCreateServer
arguments:
- @logger
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;
/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;
/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}
Run the server so that you have the RabbitMQ components (server, queue etc.) are ready and bound together otherwise messages will be lost. You should have a visualised diagram below.
This is the kind of payload we're going to use for testing purposes when creating new orders.
# POST http://rabbitmq.dev/app_dev.php/order/create
{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
1 row in set (0.00 sec)
$ app/console rabbitmq:rpc-server order_create_server
1
inanzzz
bmw
318
[2016-02-28 13:38:31] app.INFO: {"body":"O:44:\"Application\\FrontendBundle\\Model\\OrderCreate\":4:{s:2:\"id\";i:1;s:12:\"customerName\";s:7:\"inanzzz\";s:7:\"carMake\";s:3:\"bmw\";s:8:\"carModel\";s:3:\"318\";}","body_size":"152","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS_Inans-MBP.default_50239":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS_Inans-MBP.default_50239","delivery_tag":"1","redelivered":false,"exchange":"order_create_server","routing_key":""}} [] []
1 row in set (0.00 sec)
200 OK
{
"body": "{\"id\":1,\"customerName\":\"inanzzz\",\"carMake\":\"bmw\",\"carModel\":\"318\"}",
"errors": [
"Fake ID error",
"Fake Car Model error"
]
}