Bu örneğimizde "bmw", "audi" ve "mercedes" satın almak için bir istek oluşturup, ayrıca isteği kayıt altına alacağız. Mesajlar iki farklı producerlerden gelirler. Producer X'in "bmw" mesajları, producer Y'nin "audi" ve "mercedes" mesajlarına göre farklı bir şekilde işlenirler. Örnek 2 Producer & 1 Exchange & 2 Queue & N Worker & 2 Consumer bileşenlerinden oluşuyor ve RabbitMqBundle paketi kullanıyor.



Çalışma mantığı


  1. Request - Kullanıcı satın alma isteği gönderiyor.

  2. Controller - İstek onaylandıktan sonra Service'e gönderilir.

  3. Service - İstek veritabanına işlenir, "bmw" veya "audi_mercedes" satın alma uyarısı oluşturulur ve onay kullanıcıya geri gönderilir. İşte tam bu anda uygulamamız üzerine düşeni yapmış olur ve kullanıcıyı daha fazla bekletmez.

  4. Event Dispatcher - Uyarılardan "bmw" ve "audi_mercedes" yakalanır ve sevk işlemini yapar.

  5. Event Listener - Sevk'i alır ve Producer'e iletir.

  6. Producer - Mesaj oluşturulup "bmw" veya "audi_mercedes" sırasına koyulur.

  7. Consumer - Eğer terminalde aktif olarak çalışan bir "worker" var ise, mesajı sıradan alıp yapması gerekenı yapar. Eğer yok ise, aktif hale geldiğinde gerekeni yapar.


Konfigürasyon


Bundle yükleme


Composer ile "oldsound/rabbitmq-bundle":"1.8.0"'i yükleyin ve AppKernel içine new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle() satırını ekleyin.


Parameters.yml


parameters:
rabbit_mq_host: 127.0.0.1
rabbit_mq_port: 5672
rabbit_mq_user: guest
rabbit_mq_pswd: guest

Config.yml


old_sound_rabbit_mq:
connections:
default:
host: %rabbitmq.host%
port: %rabbitmq.port%
user: %rabbitmq.user%
password: %rabbitmq.pswd%
vhost: /
lazy: true
producers:
order_create_bmw:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_bmw_qu
routing_keys:
- bmw
order_create_audi_mercedes:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_audi_mercedes_qu
routing_keys:
- audi
- mercedes
consumers:
order_create_bmw:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_bmw_qu
routing_keys:
- bmw
callback: application_frontend.consumer.order_create_bmw
order_create_audi_mercedes:
connection: default
exchange_options: { name: order_create_ex, type: direct }
queue_options:
name: order_create_audi_mercedes_qu
routing_keys:
- audi
- mercedes
callback: application_frontend.consumer.order_create_audi_mercedes

OrderController


Aslında isteği doğrulama ve bir modele bağlama işlemini yapmanız lazım, ama ben bu seferlik yapmayacağım. Bu konuyla ilgili yazılarımı bulabilirsiniz.


namespace Application\FrontendBundle\Controller;

use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;

public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}

/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create/bmw")
*
* @return Response
*/
public function createBmwAction(Request $request)
{
$result = $this->orderService->createBmw(json_decode($request->getContent(), true));

return new Response($result);
}

/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create/audi_mercedes")
*
* @return Response
*/
public function createAudiMercedesAction(Request $request)
{
$result = $this->orderService->createAudiMercedes(json_decode($request->getContent(), true));

return new Response($result);
}
}

Controllers.yml


services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order

OrderService


namespace Application\FrontendBundle\Service;

use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Event\OrderEvent;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class OrderService
{
private $entityManager;
private $eventDispatcher;

public function __construct(
EntityManagerInterface $entityManager,
EventDispatcherInterface $eventDispatcher
) {
$this->entityManager = $entityManager;
$this->eventDispatcher = $eventDispatcher;
}

/**
* @param array $newOrder
*
* @return Order
*/
public function createBmw(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);

$this->entityManager->persist($order);
$this->entityManager->flush();

$this->eventDispatcher->dispatch(OrderEvent::CREATE_BMW, new OrderEvent($order));

return $order->getId();
}

/**
* @param array $newOrder
*
* @return Order
*/
public function createAudiMercedes(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);

$this->entityManager->persist($order);
$this->entityManager->flush();

$this->eventDispatcher->dispatch(OrderEvent::CREATE_AUDI_MERCEDES, new OrderEvent($order));

return $order->getId();
}
}

Services.yml


services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @event_dispatcher

OrderEvent


namespace Application\FrontendBundle\Event;

use Application\FrontendBundle\Entity\Order;
use Symfony\Component\EventDispatcher\Event;

class OrderEvent extends Event
{
const CREATE_BMW = 'application_frontend.event.order_create_bmw';
const CREATE_AUDI_MERCEDES = 'application_frontend.event.order_create_audi_mercedes';

private $order;

public function __construct(Order $order)
{
$this->order = $order;
}

public function getOrder()
{
return $this->order;
}
}

OrderListener


namespace Application\FrontendBundle\EventListener;

use Application\FrontendBundle\Event\OrderEvent;
use Application\FrontendBundle\Producer\OrderCreateAudiMercedesProducer;
use Application\FrontendBundle\Producer\OrderCreateBmwProducer;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderListener
{
private $orderCreateBmwProducer;
private $orderCreateAudiMercedesProducer;

public function __construct(
OrderCreateBmwProducer $orderCreateBmwProducer,
OrderCreateAudiMercedesProducer $orderCreateAudiMercedesProducer
) {
$this->orderCreateBmwProducer = $orderCreateBmwProducer;
$this->orderCreateAudiMercedesProducer = $orderCreateAudiMercedesProducer;
}

public function onOrderCreateBmw(OrderEvent $orderEvent)
{
$this->orderCreateBmwProducer->add($orderEvent->getOrder());

$orderEvent->stopPropagation();
}

public function onOrderCreateAudiMercedes(OrderEvent $orderEvent)
{
$this->orderCreateAudiMercedesProducer->add($orderEvent->getOrder());

$orderEvent->stopPropagation();
}
}

Listeners.yml


services:
application_frontend.event_listener.order:
class: Application\FrontendBundle\EventListener\OrderListener
tags:
- { name: kernel.event_listener, event: application_frontend.event.order_create_bmw, method: onOrderCreateBmw }
- { name: kernel.event_listener, event: application_frontend.event.order_create_audi_mercedes, method: onOrderCreateAudiMercedes }
arguments:
- @application_frontend.producer.order_create_bmw
- @application_frontend.producer.order_create_audi_mercedes

OrderCreateBmwProducer


namespace Application\FrontendBundle\Producer;

use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;

class OrderCreateBmwProducer
{
private $producer;

public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}

public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];

$this->producer->publish(json_encode($message), $message['car_make']);
}
}

OrderCreateAudiMercedesProducer


namespace Application\FrontendBundle\Producer;

use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;

class OrderCreateAudiMercedesProducer
{
private $producer;

public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}

public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];

$this->producer->publish(json_encode($message), $message['car_make']);
}
}

Producers.yml


services:
application_frontend.producer.order_create_bmw:
class: Application\FrontendBundle\Producer\OrderCreateBmwProducer
arguments:
- @old_sound_rabbit_mq.order_create_bmw_producer

application_frontend.producer.order_create_audi_mercedes:
class: Application\FrontendBundle\Producer\OrderCreateAudiMercedesProducer
arguments:
- @old_sound_rabbit_mq.order_create_audi_mercedes_producer

OrderCreateBmwConsumer


İşlenemeyen mesajların kaydını almaktan başka bir şey yapmamamızın bir nedeni var. Buna cevap veren yazımı bulabilirsiniz.


namespace Application\FrontendBundle\Consumer;

use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderCreateBmwConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;

public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);

try {
$this->log($body);

echo sprintf('Order create - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
echo json_encode($message).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}

private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::CREATE.' '.$message['car_make']);
$log->setMessage($message);

$this->entityManager->persist($log);
$this->entityManager->flush();
}

private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];

$this->logger->error(json_encode($data));
}
}

OrderCreateAudiMercedesConsumer


İşlenemeyen mesajların kaydını almaktan başka bir şey yapmamamızın bir nedeni var. Buna cevap veren yazımı bulabilirsiniz.


namespace Application\FrontendBundle\Consumer;

use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderCreateAudiMercedesConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;

public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);

try {
$this->log($body);

echo sprintf('Order create - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
echo json_encode($message).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}

private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::CREATE.' '.$message['car_make']);
$log->setMessage($message);

$this->entityManager->persist($log);
$this->entityManager->flush();
}

private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];

$this->logger->error(json_encode($data));
}
}

Eğer bir mesajın işlenmesi sırasında herhangi bir hata oluşur ise, aşağıdaki gibi bir kayıt oluşturulur.


# app/log/dev.log
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateBmwConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":"bmw"}}} [] []

# Terminal
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateBmwConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":"bmw"}}}

Consumers.yml


services:
application_frontend.consumer.order_create_bmw:
class: Application\FrontendBundle\Consumer\OrderCreateBmwConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger

application_frontend.consumer.order_create_audi_mercedes:
class: Application\FrontendBundle\Consumer\OrderCreateAudiMercedesConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger

Order entity


namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;

/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;

/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;

/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}

OrderLog entity


namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

/**
* @ORM\Entity
* @ORM\Table(name="order_log")
*/
class OrderLog
{
const CREATE = 'Create';

/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;

/**
* @ORM\Column(name="action", type="string", length=20)
*/
private $action;

/**
* @ORM\Column(name="message", type="json_array")
*/
private $message;
}

Testler


Yeni araba siparişi verirken, aşağıdakine benzer istekler göndeririz.


# POST http://rabbitmq.dev/app_dev.php/order/create/bmw
{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}

# POST http://rabbitmq.dev/app_dev.php/order/create/audi_mercedes
{
"customer_name": "inanzzz",
"car_make": "audi",
"car_model": "a3"
}
{
"customer_name": "inanzzz",
"car_make": "mercedes",
"car_model": "sl500"
}

Terminalde bir tane consumer çalıştırıp durdurarak, RabbitMQ için gerekli olan tüm bileşenler (exchange, queue vs.) yaratılıp, birbirlerine bağlanmış olurlar. Sonuç olarak aşağıdaki diyagrama sahip olursunuz.




Test 1: Consumer çalışmıyor


Sadece 3 tane istek gönderdim.


mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 3.18 |
| 2 | inanzzz | audi | a3 |
| 3 | inanzzz | mercedes | sl500 |
+----+---------------+----------+-----------+
3 rows in set (0.00 sec)

mysql> SELECT * FROM order_log;
Empty set (0.00 sec)



Yukarıda da gördüğümüz gibi, "bmw" sırasında 1, "audi_mercedes" sırasında ise 2 tane mesaj bekliyor. Birer tane consumer çalıştırılması durumunda ne olacağını göreceğiz. RabbitMQ içindeki kırık olan bağlantıları silmezseniz, mesajlar işlenmeyebilirler.


$ app/console rabbitmq:consumer -m 100 order_create_bmw
Order create - ID:1 @ 2016-02-13 12:19:41 ...

$ app/console rabbitmq:consumer -m 100 order_create_audi_mercedes
Order create - ID:2 @ 2016-02-13 12:19:55 ...
Order create - ID:3 @ 2016-02-13 12:19:59 ...

mysql> SELECT * FROM order_log;
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
| 1 | Create bmw | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"3.18","timestamp":"2016-02-13 12:19:41"} |
| 2 | Create audi | {"order_id":2,"customer_name":"inanzzz","car_make":"audi","car_model":"a3","timestamp":"2016-02-13 12:19:55"} |
| 3 | Create mercedes | {"order_id":3,"customer_name":"inanzzz","car_make":"mercedes","car_model":"sl500","timestamp":"2016-02-13 12:19:59"} |
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.00 sec)



Yukarıda da gördüğümüz gibi, tüm mesajlar işlendi.


Test 2: Her iki sıra için ikişer consumer çalışıyor


3 tane "bmw" ve 2 tane de "audi/mercedes" isteği gönderdim.


mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 3.18 |
| 2 | inanzzz | bmw | 3.20 |
| 3 | inanzzz | bmw | 5.00 |
| 4 | inanzzz | audi | a3 |
| 5 | inanzzz | mercedes | sl500 |
+----+---------------+----------+-----------+
5 rows in set (0.00 sec)

mysql> SELECT * FROM order_log;
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
| 1 | Create bmw | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"3.18","timestamp":"2016-02-13 13:01:12"} |
| 2 | Create bmw | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"3.20","timestamp":"2016-02-13 13:01:20"} |
| 3 | Create bmw | {"order_id":3,"customer_name":"inanzzz","car_make":"bmw","car_model":"5.00","timestamp":"2016-02-13 13:01:28"} |
| 4 | Create audi | {"order_id":4,"customer_name":"inanzzz","car_make":"audi","car_model":"a3","timestamp":"2016-02-13 13:01:37"} |
| 5 | Create mercedes | {"order_id":5,"customer_name":"inanzzz","car_make":"mercedes","car_model":"sl500","timestamp":"2016-02-13 13:01:47"} |
+----+-----------------+----------------------------------------------------------------------------------------------------------------------+
5 rows in set (0.00 sec)


# Create bmw Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create_bmw
Order create - ID:1 @ 2016-02-13 13:01:12 ...
Order create - ID:3 @ 2016-02-13 13:01:28 ...

# Create bmw Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create_bmw
Order create - ID:2 @ 2016-02-13 13:01:20 ...

# Create audi/mercedes Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create_audi_mercedes
Order create - ID:4 @ 2016-02-13 13:01:37 ...

# Create audi/mercedes Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create_audi_mercedes
Order create - ID:5 @ 2016-02-13 13:01:47 ...

Yukarıda da gördüğümüz gibi, mesajların hepsi hiç bekletilmeden dört consumer tarafından round-robin dispatching yöntemi ile işlendiler.


RabbitMQ Management


Exchange




Queue





İlişkiler