Eğer client sunucudan mesaj işleminin sonucunun ne olduğuna dair cevap ister ise, Remote procedure call (RPC) yöntemi kullanılır. Aşağıdaki örnek işte tam bunu yapıyor ve RabbitMqBundle paketini kullanıyor.



Çalışma mantığı


  1. Request - Kullanıcı satın alma isteği gönderiyor.

  2. Controller - İstek onaylandıktan sonra Service'e gönderilir.

  3. Service - İstek veritabanına işlenir ve order object model RPC Client'a verilir.

  4. RPC Client - Order object model RPC Server'a verilir.

  5. RPC Server - İsteği işler ve cevabı RPC Client'a verir.

  6. RPC Client - Cevabı Servis'e verir.

  7. Service - Sonucu Controller'e verir.

  8. Controller - User'a cevap verir.

Konfigürasyon


Bundle yükleme


Composer ile "oldsound/rabbitmq-bundle":"1.8.0" & "jms/serializer-bundle":"1.1.0" paketlerini yükleyin ve AppKernel içine new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle() & & new JMS\SerializerBundle\JMSSerializerBundle() satırlarını ekleyin.


Parameters.yml


parameters:
rabbitmq.host: 127.0.0.1
rabbitmq.port: 5672
rabbitmq.user: guest
rabbitmq.pswd: guest
rabbitmq.rpc.server: order_create_server

Config.yml


old_sound_rabbit_mq:
connections:
default:
host: %rabbitmq.host%
port: %rabbitmq.port%
user: %rabbitmq.user%
password: %rabbitmq.pswd%
vhost: /
lazy: true
rpc_clients:
order_create_client:
connection: default
expect_serialized_response: true
rpc_servers:
order_create_server:
connection: default
callback: application_frontend.server.order_create
qos_options: { prefetch_size: 0, prefetch_count: 1, global: false }
queue_options: { name: order_create_qu, durable: true, auto_delete: false }

OrderController


Aslında isteği doğrulama ve bir modele bağlama işlemini yapmanız lazım, ama ben bu seferlik yapmayacağım. Bu konuyla ilgili yazılarımı bulabilirsiniz.


namespace Application\FrontendBundle\Controller;

use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;

public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}

/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create")
*
* @return Response
*/
public function createAction(Request $request)
{
// Request should actually have been validated and mapped to OrderCreate model then sent to Service
$result = $this->orderService->create(json_decode($request->getContent(), true));

return new Response(json_encode($result));
}
}

Controllers.yml


services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order

OrderService


namespace Application\FrontendBundle\Service;

use Application\FrontendBundle\Client\OrderCreateClient;
use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Model\OrderCreate;
use Application\FrontendBundle\Model\OrderCreateResponse;
use Doctrine\ORM\EntityManagerInterface;

class OrderService
{
private $entityManager;
private $orderCreateClient;

public function __construct(
EntityManagerInterface $entityManager,
OrderCreateClient $orderCreateClient
) {
$this->entityManager = $entityManager;
$this->orderCreateClient = $orderCreateClient;
}

/**
* @param array $newOrder
*
* @return OrderCreateResponse
*/
public function create(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);

$this->entityManager->persist($order);
$this->entityManager->flush();

// This mapping could be done in controller while running request validation
$orderCreate = new OrderCreate();
$orderCreate->id = $order->getId();
$orderCreate->customerName = $newOrder['customer_name'];
$orderCreate->carMake = $newOrder['car_make'];
$orderCreate->carModel = $newOrder['car_model'];

return $this->orderCreateClient->call($orderCreate);
}
}

Services.yml


services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @application_frontend.client.order_create

OrderCreate Model


namespace Application\FrontendBundle\Model;

class OrderCreate
{
/**
* @var int
*/
public $id;

/**
* @var string
*/
public $customerName;

/**
* @var string
*/
public $carMake;

/**
* @var string
*/
public $carModel;
}

OrderCreateResponse Model


namespace Application\FrontendBundle\Model;

use JMS\Serializer\Annotation as Serializer;

class OrderCreateResponse
{
/**
* @var string
*
* @Serializer\Type("string")
*/
public $body;

/**
* @var array
*
* @Serializer\Type("array")
*/
public $errors = [];
}

OrderCreateException


namespace Application\FrontendBundle\Exception;

use RuntimeException;

class OrderCreateException extends RuntimeException
{
}

OrderCreateClient


namespace Application\FrontendBundle\Client;

use Application\FrontendBundle\Exception\OrderCreateException;
use Application\FrontendBundle\Model\OrderCreate;
use JMS\Serializer\SerializerInterface;
use OldSound\RabbitMqBundle\RabbitMq\RpcClient;
use PhpAmqpLib\Exception\AMQPTimeoutException;

class OrderCreateClient
{
private $rpcClient;
private $serializer;
private $server;
private $responseModelClass;
private $correlationId;

public function __construct(
RpcClient $rpcClient,
SerializerInterface $serializer,
$server,
$responseModelClass
) {
$this->rpcClient = $rpcClient;
$this->serializer = $serializer;
$this->server = $server;
$this->responseModelClass = $responseModelClass;
$this->correlationId = 'order_create_'.crc32(microtime());
}

public function call(OrderCreate $order)
{
$response = $this->doCall($order);

return $this->serializer->deserialize($response, $this->responseModelClass, 'json');
}

private function doCall(OrderCreate $order)
{
$this->rpcClient->addRequest(serialize($order), $this->server, $this->correlationId, null, 60);

try {
$reply = $this->rpcClient->getReplies();
} catch (AMQPTimeoutException $e) {
throw new OrderCreateException($e->getMessage());
}

if (!isset($reply[$this->correlationId])) {
throw new OrderCreateException(
sprintf('RPC call response does not contain correlation id [%].', $this->correlationId)
);
}

return $reply[$this->correlationId];
}
}

Clients.yml


services:
application_frontend.client.order_create:
class: Application\FrontendBundle\Client\OrderCreateClient
arguments:
- @old_sound_rabbit_mq.order_create_client_rpc
- @serializer
- %rabbitmq.rpc.server%
- 'Application\FrontendBundle\Model\OrderCreateResponse'

OrderCreateServer


namespace Application\FrontendBundle\Server;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
use Application\FrontendBundle\Model\OrderCreate;

class OrderCreateServer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$this->logger->info(json_encode($message));

/** @var OrderCreate $body */
$body = unserialize($message->body);
echo $body->id.PHP_EOL;
echo $body->customerName.PHP_EOL;
echo $body->carMake.PHP_EOL;
echo $body->carModel.PHP_EOL;

return json_encode([
'body' => json_encode($body),
'errors' => [
'Fake ID error',
'Fake Car Model error'
]
]);
}
}

Servers.yml


services:
application_frontend.server.order_create:
class: Application\FrontendBundle\Server\OrderCreateServer
arguments:
- @logger

Order entity


namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;

/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;

/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;

/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}

Testler


Terminalde server'i çalıştırın ki RabbitMQ için gerekli olan tüm bileşenler (server, queue vs.) yaratılıp, birbirlerine bağlanmış olsunlar. Sonuç olarak aşağıdaki diyagrama sahip olursunuz.



Yeni araba siparişi verirken, aşağıdakine benzer istek göndeririz.


# POST http://rabbitmq.dev/app_dev.php/order/create

{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}

Database output


mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
1 row in set (0.00 sec)

Terminal output


$ app/console rabbitmq:rpc-server order_create_server
1
inanzzz
bmw
318

Log output


[2016-02-28 13:38:31] app.INFO: {"body":"O:44:\"Application\\FrontendBundle\\Model\\OrderCreate\":4:{s:2:\"id\";i:1;s:12:\"customerName\";s:7:\"inanzzz\";s:7:\"carMake\";s:3:\"bmw\";s:8:\"carModel\";s:3:\"318\";}","body_size":"152","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS_Inans-MBP.default_50239":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS_Inans-MBP.default_50239","delivery_tag":"1","redelivered":false,"exchange":"order_create_server","routing_key":""}} [] []
1 row in set (0.00 sec)

User response


200 OK

{
"body": "{\"id\":1,\"customerName\":\"inanzzz\",\"carMake\":\"bmw\",\"carModel\":\"318\"}",
"errors": [
"Fake ID error",
"Fake Car Model error"
]
}

RabbitMQ Management


Exchange




Queue




İlişkiler