03/08/2016 - BEHAT, RABBITMQ, SYMFONY
Aşağıdaki örneği takip ederek, producerler tarafından yayınlanan RabbitMQ mesajlarını test edebilirsiniz. İşlemi kontrol etmek için iki tane özel behat adımı geliştireceğiz. Bu örnek RabbitMqBundle kullanmaktadır.
Bunun asıl konfigürasyon dosyası olduğunu varsayalım.
old_sound_rabbit_mq:
connections:
default:
host: %rabbit_mq_host%
port: %rabbit_mq_port%
user: %rabbit_mq_user%
password: %rabbit_mq_pswd%
vhost: /
lazy: true
producers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
consumers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
queue_options: { name: 'order_create_qu' }
callback: application_frontend.consumer.order_create
Aşağıda da gördüğümüz gibi, test
ortamındaki konfigürasyon dosyası bayağı basit tutulmuş durumda. Producer servisinin otomatik olarak old_sound_rabbit_mq.order_create_producer
olarak yaratılacağını biliyoruz. Bunu aşağıdaki context dosyasında kullanacağız.
old_sound_rabbit_mq:
producers:
order_create:
queue_options: { name: 'order_create_queue' }
Eğer uygulamanızda routing keys varsa aşağıdaki örneği kullanabilirsiniz.
old_sound_rabbit_mq:
producers:
order_create:
queue_options:
name: 'order_create_queue'
routing_keys:
- my_routing_key
- your_routing_key
namespace My\APIBundle\Features\Context;
use Behat\Gherkin\Node\TableNode;
use Behat\Symfony2Extension\Context\KernelAwareContext;
use LogicException;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\KernelInterface;
class RabbitMqContext implements KernelAwareContext
{
/** @var KernelInterface */
private $kernel;
public function setKernel(KernelInterface $kernel)
{
$this->kernel = $kernel;
}
/**
* @param string $producerName
*
* @When /^the queue associated to "([^"]*)" producer is empty$/
*/
public function theQueueAssociatedToProducerIsEmpty($producerName)
{
$queueName = $this->getQueueName($producerName);
$channel = $this->getChannel($producerName);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_purge($queueName);
if ($channel->basic_get($queueName)) {
throw new LogicException(sprintf('The queue %s does not seem to be empty.', $queueName));
}
}
/**
* @param string $producerName
* @param TableNode $tableNode
*
* @When /^the queue associated to "([^"]*)" producer has messages below:$/
*/
public function theQueueAssociatedToProducerHasMessagesBelow($producerName, TableNode $tableNode)
{
$expectedMessages = $this->getExpectedMessages($tableNode);
$queuedMessages = $this->getQueuedMessages($producerName);
if ($expectedMessages != $queuedMessages) {
throw new LogicException(sprintf(
'Message mismatch. Queue contains:%s%s',
PHP_EOL,
json_encode($queuedMessages)
));
}
}
/**
* @param TableNode $tableNode
*
* @return array
*/
private function getExpectedMessages(TableNode $tableNode)
{
$expectedMessages = [];
foreach ($tableNode->getRowsHash() as $message) {
$expectedMessages[] = $this->replaceDynamicValues($message);
}
return $expectedMessages;
}
/**
* @param string $producerName
*
* @return array
*/
private function getQueuedMessages($producerName)
{
$channel = $this->getChannel($producerName);
$queuedMessages = [];
do {
/** @var AMQPMessage $message */
$message = $channel->basic_get($this->getQueueName($producerName));
if (!$message instanceof AMQPMessage) {
break;
}
$queuedMessages[] = $this->replaceDynamicValues($message->getBody());
if ($message->get('message_count') == 0) {
break;
}
} while (true);
return $queuedMessages;
}
/**
* @param string $producerName
*
* @return AMQPChannel
*/
private function getChannel($producerName)
{
$container = $this->kernel->getContainer();
$producerService = sprintf('old_sound_rabbit_mq.%s_producer', $producerName);
$producer = $container->get($producerService);
return $producer->getChannel();
}
/**
* @param string $producerName
*
* @return string
*/
private function getQueueName($producerName)
{
return sprintf('%s_queue', $producerName);
}
/**
* @param string $data
*
* @return string
*/
private function replaceDynamicValues($data)
{
return preg_replace(
[
'/\b(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\+(\d{2}):(\d{2})\b/',
'#:\d{10}(,|})#',
],
[
'ISO8601_TIMESTAMP',
':"UNIX_TIMESTAMP"$1',
],
$data
);
}
}
Feature: Whatever
Scenario: Whatever
Given .......
And the queue associated to "order_create" producer is empty
And .....
And ..... # Assume that one of these steps publishes 2 messages in symfony application
And .....
And the queue associated to "order_create" producer has messages below:
| 1 | {"id":"123","url":"http:\/\/www.a.com\/123.png","created_at":"2016-08-03T09:45:38+01:00"} |
| 2 | {"id":"321","url":"http:\/\/www.a.com\/321.png","created_at":"2016-08-03T09:45:38+01:00"} |