Follow the example below to test RabbitMQ messages that are published by a producer. We're going to create two custom behat step definitions to handle process. Example relies on RabbitMqBundle.


Config.yml


Lets assume that this is the normal config for production environment.


old_sound_rabbit_mq:
connections:
default:
host: %rabbit_mq_host%
port: %rabbit_mq_port%
user: %rabbit_mq_user%
password: %rabbit_mq_pswd%
vhost: /
lazy: true
producers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
consumers:
order_create:
connection: default
exchange_options: { name: 'order_create_ex', type: fanout }
queue_options: { name: 'order_create_qu' }
callback: application_frontend.consumer.order_create

Config_test.yml


As you can see below, we have a trimmed down version of config file for the test environment. We know that the producer service will be created automatically and named as old_sound_rabbit_mq.order_create_producer. This is what we will need in behat context below.


old_sound_rabbit_mq:
producers:
order_create:
queue_options: { name: 'order_create_queue' }

If you have routing keys then you can do this way.


old_sound_rabbit_mq:
producers:
order_create:
queue_options:
name: 'order_create_queue'
routing_keys:
- my_routing_key
- your_routing_key

RabbitMqContext.php


namespace My\APIBundle\Features\Context;

use Behat\Gherkin\Node\TableNode;
use Behat\Symfony2Extension\Context\KernelAwareContext;
use LogicException;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\KernelInterface;

class RabbitMqContext implements KernelAwareContext
{
/** @var KernelInterface */
private $kernel;

public function setKernel(KernelInterface $kernel)
{
$this->kernel = $kernel;
}

/**
* @param string $producerName
*
* @When /^the queue associated to "([^"]*)" producer is empty$/
*/
public function theQueueAssociatedToProducerIsEmpty($producerName)
{
$queueName = $this->getQueueName($producerName);

$channel = $this->getChannel($producerName);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_purge($queueName);

if ($channel->basic_get($queueName)) {
throw new LogicException(sprintf('The queue %s does not seem to be empty.', $queueName));
}
}

/**
* @param string $producerName
* @param TableNode $tableNode
*
* @When /^the queue associated to "([^"]*)" producer has messages below:$/
*/
public function theQueueAssociatedToProducerHasMessagesBelow($producerName, TableNode $tableNode)
{
$expectedMessages = $this->getExpectedMessages($tableNode);
$queuedMessages = $this->getQueuedMessages($producerName);

if ($expectedMessages != $queuedMessages) {
throw new LogicException(sprintf(
'Message mismatch. Queue contains:%s%s',
PHP_EOL,
json_encode($queuedMessages)
));
}
}

/**
* @param TableNode $tableNode
*
* @return array
*/
private function getExpectedMessages(TableNode $tableNode)
{
$expectedMessages = [];
foreach ($tableNode->getRowsHash() as $message) {
$expectedMessages[] = $this->replaceDynamicValues($message);
}

return $expectedMessages;
}

/**
* @param string $producerName
*
* @return array
*/
private function getQueuedMessages($producerName)
{
$channel = $this->getChannel($producerName);

$queuedMessages = [];
do {
/** @var AMQPMessage $message */
$message = $channel->basic_get($this->getQueueName($producerName));
if (!$message instanceof AMQPMessage) {
break;
}

$queuedMessages[] = $this->replaceDynamicValues($message->getBody());

if ($message->get('message_count') == 0) {
break;
}
} while (true);

return $queuedMessages;
}

/**
* @param string $producerName
*
* @return AMQPChannel
*/
private function getChannel($producerName)
{
$container = $this->kernel->getContainer();

$producerService = sprintf('old_sound_rabbit_mq.%s_producer', $producerName);
$producer = $container->get($producerService);

return $producer->getChannel();
}

/**
* @param string $producerName
*
* @return string
*/
private function getQueueName($producerName)
{
return sprintf('%s_queue', $producerName);
}

/**
* @param string $data
*
* @return string
*/
private function replaceDynamicValues($data)
{
return preg_replace(
[
'/\b(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\+(\d{2}):(\d{2})\b/',
'#:\d{10}(,|})#',
],
[
'ISO8601_TIMESTAMP',
':"UNIX_TIMESTAMP"$1',
],
$data
);
}
}

Test.feature


Feature: Whatever

Scenario: Whatever
Given .......
And the queue associated to "order_create" producer is empty
And .....
And ..... # Assume that one of these steps publishes 2 messages in symfony application
And .....
And the queue associated to "order_create" producer has messages below:
| 1 | {"id":"123","url":"http:\/\/www.a.com\/123.png","created_at":"2016-08-03T09:45:38+01:00"} |
| 2 | {"id":"321","url":"http:\/\/www.a.com\/321.png","created_at":"2016-08-03T09:45:38+01:00"} |