Bu örneğimizde araba satın almak için bir istek oluşturup, ayrıca isteği kayıt altına alacağız. Örnek 1 Producer & 1 Exchange & 1 Queue & N Worker & 1 Consumer bileşenlerinden oluşuyor ve RabbitMqBundle kullanıyor.



Çalışma mantığı


  1. Request - Kullanıcı satın alma isteği gönderiyor.

  2. Controller - İstek onaylandıktan sonra Service'e gönderilir.

  3. Service - İstek veritabanına işlenir, satın alma uyarısı oluşturulur ve onay kullanıcıya geri gönderilir. İşte tam bu anda uygulamamız üzerine düşeni yapmış olur ve kullanıcıyı daha fazla bekletmez.

  4. Event Dispatcher - Uyarıyı alır ve sevk işlemini yapar.

  5. Event Listener - Sevk'i alır ve Producer'e iletir.

  6. Producer - Mesaj oluşturulur ve sıraya koyulur.

  7. Consumer - Eğer terminalde aktif olarak çalışan bir "worker" var ise, mesajı sıradan alıp yapması gerekenı yapar. Eğer yok ise, aktif hale geldiğinde gerekeni yapar.


Konfigürasyon


Bundle yükleme


Composer ile "oldsound/rabbitmq-bundle":"1.8.0"'i yükleyin ve AppKernel içine new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle() satırını ekleyin.


Parameters.yml


parameters:
rabbit_mq_host: 127.0.0.1
rabbit_mq_port: 5672
rabbit_mq_user: guest
rabbit_mq_pswd: guest

Config.yml


old_sound_rabbit_mq:
connections:
default:
host: %rabbit_mq_host%
port: %rabbit_mq_port%
user: %rabbit_mq_user%
password: %rabbit_mq_pswd%
vhost: /
lazy: true
producers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: topic }
consumers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: topic }
queue_options: { name: 'order_create_qu' }
callback: application_frontend.consumer.order_create

OrderController


Aslında isteği doğrulama ve bir modele bağlama işlemini yapmanız lazım, ama ben bu seferlik yapmayacağım. Bu konuyla ilgili yazılarımı bulabilirsiniz.


namespace Application\FrontendBundle\Controller;

use Application\FrontendBundle\Service\OrderService;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("order", service="application_frontend.controller.order")
*/
class OrderController
{
private $orderService;

public function __construct(OrderService $orderService)
{
$this->orderService = $orderService;
}

/**
* @param Request $request
*
* @Method({"POST"})
* @Route("/create")
*
* @return Response
*/
public function createAction(Request $request)
{
$result = $this->orderService->create(json_decode($request->getContent(), true));

return new Response($result);
}
}

Controllers.yml


services:
application_frontend.controller.order:
class: Application\FrontendBundle\Controller\OrderController
arguments:
- @application_frontend.service.order

OrderService


namespace Application\FrontendBundle\Service;

use Application\FrontendBundle\Entity\Order;
use Application\FrontendBundle\Event\OrderEvent;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class OrderService
{
private $entityManager;
private $eventDispatcher;

public function __construct(
EntityManagerInterface $entityManager,
EventDispatcherInterface $eventDispatcher
) {
$this->entityManager = $entityManager;
$this->eventDispatcher = $eventDispatcher;
}

/**
* @param array $newOrder
*
* @return Order
*/
public function create(array $newOrder)
{
$order = new Order();
$order->setCustomerName($newOrder['customer_name']);
$order->setCarMake($newOrder['car_make']);
$order->setCarModel($newOrder['car_model']);

$this->entityManager->persist($order);
$this->entityManager->flush();

$this->eventDispatcher->dispatch(OrderEvent::CREATE, new OrderEvent($order));

return $order->getId();
}
}

Services.yml


services:
application_frontend.service.order:
class: Application\FrontendBundle\Service\OrderService
arguments:
- @doctrine.orm.entity_manager
- @event_dispatcher

OrderEvent


namespace Application\FrontendBundle\Event;

use Application\FrontendBundle\Entity\Order;
use Symfony\Component\EventDispatcher\Event;

class OrderEvent extends Event
{
const CREATE = 'application_frontend.event.order_create';

private $order;

public function __construct(Order $order)
{
$this->order = $order;
}

public function getOrder()
{
return $this->order;
}
}

OrderListener


namespace Application\FrontendBundle\EventListener;

use Application\FrontendBundle\Event\OrderEvent;
use Application\FrontendBundle\Producer\OrderCreateProducer;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderListener
{
private $orderCreateProducer;

public function __construct(
OrderCreateProducer $orderCreateProducer
) {
$this->orderCreateProducer = $orderCreateProducer;
}

public function onOrderCreate(OrderEvent $orderEvent)
{
$this->orderCreateProducer->add($orderEvent->getOrder());

$orderEvent->stopPropagation();
}
}

Listeners.yml


services:
application_frontend.event_listener.order:
class: Application\FrontendBundle\EventListener\OrderListener
tags:
- { name: kernel.event_listener, event: application_frontend.event.order_create, method: onOrderCreate }
arguments:
- @application_frontend.producer.order_create

OrderCreateProducer


namespace Application\FrontendBundle\Producer;

use Application\FrontendBundle\Entity\Order;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;

class OrderCreateProducer
{
private $producer;

public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}

public function add(Order $order)
{
$message = [
'order_id' => $order->getId(),
'customer_name' => $order->getCustomerName(),
'car_make' => $order->getCarMake(),
'car_model' => $order->getCarModel(),
'timestamp' => date('Y-m-d H:i:s')
];

$this->producer->publish(json_encode($message));
}
}

Producers.yml


services:
application_frontend.producer.order_create:
class: Application\FrontendBundle\Producer\OrderCreateProducer
arguments:
- @old_sound_rabbit_mq.order_create_producer

OrderCreateConsumer


İşlenemeyen mesajların kaydını almaktan başka bir şey yapmamamızın bir nedeni var. Buna cevap veren yazımı bulabilirsiniz.


namespace Application\FrontendBundle\Consumer;

use Application\FrontendBundle\Entity\OrderLog;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;

class OrderCreateConsumer implements ConsumerInterface
{
private $entityManager;
private $logger;

public function __construct(
EntityManagerInterface $entityManager,
LoggerInterface $logger
) {
$this->entityManager = $entityManager;
$this->logger = $logger;
}

public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);

try {
$this->log($body);

echo sprintf('Order create - ID:%s @ %s ...', $body['order_id'], date('Y-m-d H:i:s')).PHP_EOL;
} catch (Exception $e) {
$this->logError($message, $e->getMessage());
}
}

private function log($message)
{
$log = new OrderLog();
$log->setAction(OrderLog::CREATE);
$log->setMessage($message);

$this->entityManager->persist($log);
$this->entityManager->flush();
}

private function logError($message, $error)
{
$data = [
'error' => $error,
'class' => __CLASS__,
'message' => $message
];

$this->logger->error(json_encode($data));
}
}

Eğer bir mesajın işlenmesi sırasında herhangi bir hata oluşur ise, aşağıdaki gibi bir kayıt oluşturulur.


# app/log/dev.log
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}} [] []

# Terminal
[2016-01-31 22:32:10] app.ERROR: {"error":"Catchable Fatal Error: Object of class PhpAmqpLib\\Message\\AMQPMessage could not be converted to string","class":"Application\\FrontendBundle\\Consumer\\OrderCreateConsumer","message":{"body":"{\"order_id\":39,\"customer_name\":\"inanzzz\",\"car_make\":\"bmw\",\"car_model\":\"318\",\"timestamp\":\"2016-02-02 22:32:09\"}","body_size":"110","is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"PHPPROCESS-MacBook-Pro.local_93134":[{},"processMessage"]}},"consumer_tag":"PHPPROCESS-MacBook-Pro.local_93134","delivery_tag":"1","redelivered":false,"exchange":"order_create_ex","routing_key":""}}}

Consumers.yml


services:
application_frontend.consumer.order_create:
class: Application\FrontendBundle\Consumer\OrderCreateConsumer
arguments:
- @doctrine.orm.entity_manager
- @logger

Order entity


namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

/**
* @ORM\Entity
* @ORM\Table(name="orders")
*/
class Order
{
/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;

/**
* @ORM\Column(name="customer_name", type="string", length=50)
*/
private $customerName;

/**
* @ORM\Column(name="car_make", type="string", length=50)
*/
private $carMake;

/**
* @ORM\Column(name="car_model", type="string", length=50)
*/
private $carModel;
}

OrderLog entity


namespace Application\FrontendBundle\Entity;

use Doctrine\ORM\Mapping as ORM;

/**
* @ORM\Entity
* @ORM\Table(name="order_log")
*/
class OrderLog
{
const CREATE = 'Create';

/**
* @ORM\Id
* @ORM\Column(type="smallint")
* @ORM\GeneratedValue(strategy="AUTO")
*/
private $id;

/**
* @ORM\Column(name="action", type="string", length=10)
*/
private $action;

/**
* @ORM\Column(name="message", type="json_array")
*/
private $message;
}

Testler


Araba siparişi verirken, aşağıdakine benzer bir istek göndeririz.


# POST http://rabbitmq.dev/app_dev.php/order/create

{
"customer_name": "inanzzz",
"car_make": "bmw",
"car_model": "318"
}

Terminalde bir tane consumer çalıştırıp durdurarak, RabbitMQ için gerekli olan tüm bileşenler (exchange, queue vs.) yaratılıp, birbirlerine bağlanmış olurlar. Sonuç olarak aşağıdaki diyagrama sahip olursunuz.



Test 1: Consumer çalışmıyor


Sadece 2 tane istek gönderdim.


mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 2 | inanzzz | bmw | 318 |
| 2 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
2 rows in set (0.00 sec)

mysql> SELECT * FROM order_log;
Empty set (0.00 sec)


Yukarıda da gördüğümüz gibi, sırada 2 tane mesaj bekliyor. Bir tane consumer çalıştırılması durumunda ne olacağını göreceğiz. RabbitMQ içindeki kırık olan bağlantıları silmezseniz, mesajlar işlenmeyebilirler.


$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-02-07 15:32:13 ...
Order create - ID:2 @ 2016-02-07 15:32:33 ...

mysql> SELECT * FROM order_log;
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:28:33"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-02-07 15:28:34"} |
+----+--------+-----------------------------------------------------------------------------------------------------------------------+
2 rows in set (0.00 sec)


Yukarıda da gördüğümüz gibi, tüm mesajlar işlendi.


Test 2: İki consumer çalışıyor


Sadece 5 tane istek gönderdim.


mysql> SELECT * FROM orders;
+----+---------------+----------+-----------+
| id | customer_name | car_make | car_model |
+----+---------------+----------+-----------+
| 1 | inanzzz | bmw | 318 |
| 2 | inanzzz | bmw | 318 |
| 3 | inanzzz | bmw | 318 |
| 4 | inanzzz | bmw | 318 |
| 5 | inanzzz | bmw | 318 |
+----+---------------+----------+-----------+
5 rows in set (0.00 sec)

mysql> SELECT * FROM order_log;
+----+--------+---------------------------------------------------------------------------------------------------------------+
| id | action | message |
+----+--------+---------------------------------------------------------------------------------------------------------------+
| 1 | Create | {"order_id":1,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:07"} |
| 2 | Create | {"order_id":2,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:08"} |
| 3 | Create | {"order_id":3,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:09"} |
| 4 | Create | {"order_id":4,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:10"} |
| 5 | Create | {"order_id":5,"customer_name":"inanzzz","car_make":"bmw","car_model":"318","timestamp":"2016-01-31 12:20:11"} |
+----+--------+---------------------------------------------------------------------------------------------------------------+
5 rows in set (0.00 sec)


# Consumer 1
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:1 @ 2016-01-31 12:20:07 ...
Order create - ID:3 @ 2016-01-31 12:20:09 ...
Order create - ID:5 @ 2016-01-31 12:20:11 ...

# Consumer 2
$ app/console rabbitmq:consumer -m 100 order_create
Order create - ID:2 @ 2016-01-31 12:20:08 ...
Order create - ID:4 @ 2016-01-31 12:20:10 ...

Yukarıda da gördüğümüz gibi, mesajların hepsi hiç bekletilmeden round-robin dispatching yöntemi ile işlendiler.


RabbitMQ Management


Exchange




Queue




İlişkiler