28/02/2016 - RABBITMQ, SYMFONY
Eğer client sunucudan mesaj işleminin sonucunun ne olduğuna dair cevap ister ise, Remote procedure call (RPC) yöntemi kullanılır. Aşağıdaki örnek işte tam bunu yapıyor ve RabbitMqBundle paketini kullanıyor.
Composer ile "oldsound/rabbitmq-bundle":"1.8.0"
& "jms/serializer-bundle":"1.1.0"
paketlerini yükleyin ve AppKernel içine new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle()
& & new JMS\SerializerBundle\JMSSerializerBundle()
satırlarını ekleyin.
parameters:
rabbitmq.host: 127.0.0.1
rabbitmq.port: 5672
rabbitmq.user: guest
rabbitmq.pswd: guest
rabbitmq.rpc.server: order_create_server
old_sound_rabbit_mq:
connections:
default:
host: %rabbitmq.host%
port: %rabbitmq.port%
user: %rabbitmq.user%
password: %rabbitmq.pswd%
vhost: /
lazy: true
rpc_clients:
order_create_client:
connection: default
expect_serialized_response: true
rpc_servers:
order_create_server:
connection: default
callback: application_frontend.server.order_create
qos_options: { prefetch_size: 0, prefetch_count: 1, global: false }
queue_options: { name: order_create_qu, durable: true, auto_delete: false }
Aslında isteği doğrulama ve bir modele bağlama işlemini yapmanız lazım, ama ben bu seferlik yapmayacağım. Bu konuyla ilgili yazılarımı bulabilirsiniz.
namespace Application\FrontendBundle\Controller;
use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;
public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}
/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create")
*
* @return Response
*/
public function createAction(Request $request)
{
// Request should actually have been validated and mapped to OrderCreate model then sent to Service
$result = $this->orderService->create(json_decode($request->getContent(), true));
return new Response(json_encode($result));
}
}
services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order
namespace Application\FrontendBundle\Service;
use Application\FrontendBundle\Client\OrderCreateClient;
use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Model\OrderCreate;
use Application\FrontendBundle\Model\OrderCreateResponse;
use Doctrine\ORM\EntityManagerInterface;
class OrderService
{
private $entityManager;
private $orderCreateClient;
public function __construct(
EntityManagerInterface $entityManager,
OrderCreateClient $orderCreateClient
) {
$this->entityManager = $entityManager;
$this->orderCreateClient = $orderCreateClient;
}
/**
* @param array $newOrder
*
* @return OrderCreateResponse
*/
public function create(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);
$this->entityManager->persist($order);
$this->entityManager->flush();
// This mapping could be done in controller while running request validation
$orderCreate = new OrderCreate();
$orderCreate->id = $order->getId();
$orderCreate->customerName = $newOrder['customer_name'];
$orderCreate->carMake = $newOrder['car_make'];
$orderCreate->carModel = $newOrder['car_model'];
return $this->orderCreateClient->call($orderCreate);
}
}
services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @application_frontend.client.order_create
namespace Application\FrontendBundle\Model;
class OrderCreate
{
/**
* @var int
*/
public $id;
/**
* @var string
*/
public $customerName;
/**
* @var string
*/
public $carMake;
/**
* @var string
*/
public $carModel;
}
namespace Application\FrontendBundle\Model;
use JMS\Serializer\Annotation as Serializer;
class OrderCreateResponse
{
/**
* @var string
*
* @Serializer\Type("string")
*/
public $body;
/**
* @var array
*
* @Serializer\Type("array")
*/
public $errors = [];
}
namespace Application\FrontendBundle\Exception;
use RuntimeException;
class OrderCreateException extends RuntimeException
{
}
namespace Application\FrontendBundle\Client;
use Application\FrontendBundle\Exception\OrderCreateException;
use Application\FrontendBundle\Model\OrderCreate;
use JMS\Serializer\SerializerInterface;
use OldSound\RabbitMqBundle\RabbitMq\RpcClient;
use PhpAmqpLib\Exception\AMQPTimeoutException;
class OrderCreateClient
{
private $rpcClient;
private $serializer;
private $server;
private $responseModelClass;
private $correlationId;
public function __construct(
RpcClient $rpcClient,
SerializerInterface $serializer,
$server,
$responseModelClass
) {
$this->rpcClient = $rpcClient;
$this->serializer = $serializer;
$this->server = $server;
$this->responseModelClass = $responseModelClass;
$this->correlationId = 'order_create_'.crc32(microtime());
}
public function call(OrderCreate $order)
{
$response = $this->doCall($order);
return $this->serializer->deserialize($response, $this->responseModelClass, 'json');
}
private function doCall(OrderCreate $order)
{
$this->rpcClient->addRequest(serialize($order), $this->server, $this->correlationId, null, 60);
try {
$reply = $this->rpcClient->getReplies();
} catch (AMQPTimeoutException $e) {
throw new OrderCreateException($e->getMessage());
}
if (!isset($reply[$this->correlationId])) {
throw new OrderCreateException(
sprintf('RPC call response does not contain correlation id [%].', $this->correlationId)
);
}
return $reply[$this->correlationId];
}
}
services:
application_frontend.client.order_create:
class: Application\FrontendBundle\Client\OrderCreateClient
arguments:
- @old_sound_rabbit_mq.order_create_client_rpc
- @serializer
- %rabbitmq.rpc.server%
- 'Application\FrontendBundle\Model\OrderCreateResponse'
namespace Application\FrontendBundle\Server;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
use Application\FrontendBundle\Model\OrderCreate;
class OrderCreateServer implements ConsumerInterface
{
private $logger;
public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}
public function execute(AMQPMessage $message)
{
$this->logger->info(json_encode($message));
/** @var OrderCreate $body */
$body = unserialize($message->body);
echo $body->id.PHP_EOL;
echo $body->customerName.PHP_EOL;
echo $body->carMake.PHP_EOL;
echo $body->carModel.PHP_EOL;
return json_encode([
'body' => json_encode($body),
'errors' => [
'Fake ID error',
'Fake Car Model error'
]
]);
}
}
services:
application_frontend.server.order_create:
class: Application\FrontendBundle\Server\OrderCreateServer
arguments:
- @logger
namespace Application\FrontendBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;
/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;
/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;
/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}
Terminalde server'i çalıştırın ki RabbitMQ için gerekli olan tüm bileşenler (server, queue vs.) yaratılıp, birbirlerine bağlanmış olsunlar. Sonuç olarak aşağıdaki diyagrama sahip olursunuz.
Yeni araba siparişi verirken, aşağıdakine benzer istek göndeririz.
# POST http://rabbitmq.dev/app_dev.php/order/create
{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}
mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
1 row in set (0.00 sec)
$ app/console rabbitmq:rpc-server order_create_server
1
inanzzz
bmw
318
[2016-02-28 13:38:31] app.INFO: {"body":"O:44:\"Application\\FrontendBundle\\Model\\OrderCreate\":4:{s:2:\"id\";i:1;s:12:\"customerName\";s:7:\"inanzzz\";s:7:\"carMake\";s:3:\"bmw\";s:8:\"carModel\";s:3:\"318\";}","body_size":"152","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS_Inans-MBP.default_50239":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS_Inans-MBP.default_50239","delivery_tag":"1","redelivered":false,"exchange":"order_create_server","routing_key":""}} [] []
1 row in set (0.00 sec)
200 OK
{
"body": "{\"id\":1,\"customerName\":\"inanzzz\",\"carMake\":\"bmw\",\"carModel\":\"318\"}",
"errors": [
"Fake ID error",
"Fake Car Model error"
]
}