Aşağıdaki örneği takip ederek, producerler tarafından yayınlanan RabbitMQ mesajlarını test edebilirsiniz. İşlemi kontrol etmek için iki tane özel behat adımı geliştireceğiz. Bu örnek RabbitMqBundle kullanmaktadır.


Config.yml


Bunun asıl konfigürasyon dosyası olduğunu varsayalım.


old_sound_rabbit_mq:
connections:
default:
host: %rabbit_mq_host%
port: %rabbit_mq_port%
user: %rabbit_mq_user%
password: %rabbit_mq_pswd%
vhost: /
lazy: true
producers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
consumers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
queue_options: { name: 'order_create_qu' }
callback: application_frontend.consumer.order_create

Config_test.yml


Aşağıda da gördüğümüz gibi, test ortamındaki konfigürasyon dosyası bayağı basit tutulmuş durumda. Producer servisinin otomatik olarak old_sound_rabbit_mq.order_create_producer olarak yaratılacağını biliyoruz. Bunu aşağıdaki context dosyasında kullanacağız.


old_sound_rabbit_mq:
producers:
order_create:
queue_options: { name: 'order_create_queue' }

Eğer uygulamanızda routing keys varsa aşağıdaki örneği kullanabilirsiniz.


old_sound_rabbit_mq:
producers:
order_create:
queue_options:
name: 'order_create_queue'
routing_keys:
- my_routing_key
- your_routing_key

RabbitMqContext.php


namespace My\APIBundle\Features\Context;

use Behat\Gherkin\Node\TableNode;
use Behat\Symfony2Extension\Context\KernelAwareContext;
use LogicException;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\KernelInterface;

class RabbitMqContext implements KernelAwareContext
{
/** @var KernelInterface */
private $kernel;

public function setKernel(KernelInterface $kernel)
{
$this->kernel = $kernel;
}

/**
* @param string $producerName
*
* @When /^the queue associated to "([^"]*)" producer is empty$/
*/
public function theQueueAssociatedToProducerIsEmpty($producerName)
{
$queueName = $this->getQueueName($producerName);

$channel = $this->getChannel($producerName);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_purge($queueName);

if ($channel->basic_get($queueName)) {
throw new LogicException(sprintf('The queue %s does not seem to be empty.', $queueName));
}
}

/**
* @param string $producerName
* @param TableNode $tableNode
*
* @When /^the queue associated to "([^"]*)" producer has messages below:$/
*/
public function theQueueAssociatedToProducerHasMessagesBelow($producerName, TableNode $tableNode)
{
$expectedMessages = $this->getExpectedMessages($tableNode);
$queuedMessages = $this->getQueuedMessages($producerName);

if ($expectedMessages != $queuedMessages) {
throw new LogicException(sprintf(
'Message mismatch. Queue contains:%s%s',
PHP_EOL,
json_encode($queuedMessages)
));
}
}

/**
* @param TableNode $tableNode
*
* @return array
*/
private function getExpectedMessages(TableNode $tableNode)
{
$expectedMessages = [];
foreach ($tableNode->getRowsHash() as $message) {
$expectedMessages[] = $this->replaceDynamicValues($message);
}

return $expectedMessages;
}

/**
* @param string $producerName
*
* @return array
*/
private function getQueuedMessages($producerName)
{
$channel = $this->getChannel($producerName);

$queuedMessages = [];
do {
/** @var AMQPMessage $message */
$message = $channel->basic_get($this->getQueueName($producerName));
if (!$message instanceof AMQPMessage) {
break;
}

$queuedMessages[] = $this->replaceDynamicValues($message->getBody());

if ($message->get('message_count') == 0) {
break;
}
} while (true);

return $queuedMessages;
}

/**
* @param string $producerName
*
* @return AMQPChannel
*/
private function getChannel($producerName)
{
$container = $this->kernel->getContainer();

$producerService = sprintf('old_sound_rabbit_mq.%s_producer', $producerName);
$producer = $container->get($producerService);

return $producer->getChannel();
}

/**
* @param string $producerName
*
* @return string
*/
private function getQueueName($producerName)
{
return sprintf('%s_queue', $producerName);
}

/**
* @param string $data
*
* @return string
*/
private function replaceDynamicValues($data)
{
return preg_replace(
[
'/\b(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\+(\d{2}):(\d{2})\b/',
'#:\d{10}(,|})#',
],
[
'ISO8601_TIMESTAMP',
':"UNIX_TIMESTAMP"$1',
],
$data
);
}
}

Test.feature


Feature: Whatever

Scenario: Whatever
Given .......
And the queue associated to "order_create" producer is empty
And .....
And ..... # Assume that one of these steps publishes 2 messages in symfony application
And .....
And the queue associated to "order_create" producer has messages below:
| 1 | {"id":"123","url":"http:\/\/www.a.com\/123.png","created_at":"2016-08-03T09:45:38+01:00"} |
| 2 | {"id":"321","url":"http:\/\/www.a.com\/321.png","created_at":"2016-08-03T09:45:38+01:00"} |