In this example we're going to test beanstalk application with behat. Test is based on a user being created. The job is put into a tube, worker command picks it up and passes it onto a consumer to be consumed. The example is using LeezyPheanstalkBundle library.


Setup


Composer


Install libraries below.


"leezy/pheanstalk-bundle": "3.2.0"
"behat/behat": "3.2.2"
"behat/symfony2-extension": "2.1.1"
"behat/mink": "1.7.1"
"behat/mink-extension": "2.2"
"behat/mink-browserkit-driver": "1.3.2"

Paramaters.yml


parameters:
tube_consumer_service_id_prefix: "inanzzz_application.consumer."
user_create_tube: "user_create_%kernel.environment%"

DefaultController


You should normally use a POST method with a json request body to create a user but I'm being lazy this time!


namespace Inanzzz\ApplicationBundle\Controller;

use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Inanzzz\ApplicationBundle\Producer\ProducerInterface;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("jobs/", service="inanzzz_application.controller.default")
*/
class DefaultController
{
private $userCreateProducer;

public function __construct(
ProducerInterface $userCreateProducer
) {
$this->userCreateProducer = $userCreateProducer;
}

/**
* @Method({"GET"})
* @Route("")
*
* @return Response
*/
public function indexAction()
{
return new Response('This is a beanstalk application!');
}

/**
* @Method({"GET"})
* @Route("create/{name}")
*
* @return Response
*/
public function createAction($name)
{
try {
$this->userCreateProducer->produce($name);
} catch (ProducerException $e) {
return new Response(null, Response::HTTP_BAD_REQUEST);
}

return new Response(null, Response::HTTP_ACCEPTED);
}
}

services:
inanzzz_application.controller.default:
class: Inanzzz\ApplicationBundle\Controller\DefaultController
arguments:
- '@inanzzz_application.producer.user_create'

UserCreateProducer


namespace Inanzzz\ApplicationBundle\Producer;

interface ProducerInterface
{
public function produce($name);
}

namespace Inanzzz\ApplicationBundle\Producer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Pheanstalk\PheanstalkInterface;

class UserCreateProducer implements ProducerInterface
{
private $pheanstalk;
private $tube;

public function __construct(
PheanstalkInterface $pheanstalk,
$tube
) {
$this->pheanstalk = $pheanstalk;
$this->tube = $tube;
}

public function produce($name)
{
try {
$this->pheanstalk->useTube($this->tube)->put($name);
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.producer.user_create:
class: Inanzzz\ApplicationBundle\Producer\UserCreateProducer
arguments:
- '@leezy.pheanstalk'
- '%user_create_tube%'

UserCreateConsumer


namespace Inanzzz\ApplicationBundle\Exception;

use RuntimeException;

class ProducerException extends RuntimeException
{
}

namespace Inanzzz\ApplicationBundle\Consumer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Psr\Log\LoggerInterface;

class UserCreateConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function consume($name)
{
try {
// Do something with the $name

$this->logger->info(sprintf('User [%s] has been created.', $name));
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.consumer.user_create:
class: Inanzzz\ApplicationBundle\Consumer\UserCreateConsumer
arguments:
- '@logger'

JobProcessorCommand


namespace Inanzzz\ApplicationBundle\Command;

use Doctrine\DBAL\DBALException;
use Doctrine\ORM\ORMException;
use Exception;
use Inanzzz\ApplicationBundle\Consumer\ConsumerInterface;
use PDOException;
use Pheanstalk\Job;
use Pheanstalk\PheanstalkInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

class JobProcessorCommand extends Command implements ContainerAwareInterface
{
const MAXIMUM_JOB_LIMIT = 250;
const MAXIMUM_MEMORY_USAGE_LIMIT = 104857600; // As in Bytes = 100MB
const SLEEP_TIME = 250000; // As in Microseconds = 0.25sec
const WATCH_TIMEOUT = 60; // As in seconds

/** @var ContainerInterface */
private $container;
private $pheanstalk;
private $logger;
private $consumerServiceIdPrefix;

public function __construct(
PheanstalkInterface $pheanstalk,
LoggerInterface $logger,
$consumerServiceIdPrefix
) {
parent::__construct();

$this->pheanstalk = $pheanstalk;
$this->logger = $logger;
$this->consumerServiceIdPrefix = $consumerServiceIdPrefix;
}

public function setContainer(ContainerInterface $container = null)
{
$this->container = $container;
}

protected function configure()
{
$this
->setName('job:processor')
->setDescription('Processing beanstalk jobs for given tubes.')
->addArgument(
'tube',
InputOption::VALUE_REQUIRED,
'Name of the tube to process jobs from?'
)
->addOption(
'limit',
null,
InputOption::VALUE_REQUIRED,
'Maximum how many jobs should be consumed?',
self::MAXIMUM_JOB_LIMIT
);
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$environment = $input->getOption('env');
$tube = $input->getArgument('tube');
$limit = $input->getOption('limit');

$consumer = $this->container->get($this->consumerServiceIdPrefix.$tube);
if (!$consumer instanceof ConsumerInterface) {
$output->writeln(sprintf('Consumer "%s" cannot be found.', $consumer));

return;
}

$tube = $tube.'_'.$environment;
if (!in_array($tube, $this->pheanstalk->listTubes())) {
$output->writeln(sprintf('Tube "%s" cannot be found.', $tube));

return;
}

$limit = $limit > self::MAXIMUM_JOB_LIMIT ? self::MAXIMUM_JOB_LIMIT : $limit;

$output->writeln(sprintf('Processing "%d" jobs from "%s" tube...', $limit, $tube));

$this->process($output, $consumer, $tube, $limit);
}

private function process(OutputInterface $output, ConsumerInterface $consumer, $tube, $limit)
{
$counter = 0;
$memoryUsage = memory_get_peak_usage(true);

while (true) {
if ($counter >= $limit) {
return;
}

if ($memoryUsage > self::MAXIMUM_MEMORY_USAGE_LIMIT) {
$this->logger->info(sprintf(
'Processing "%s" tube exhausted memory with "%sMB" of usage.',
$limit,
($memoryUsage/1024/1024)
));

return;
}

try {
/** @var Job $job */
$job = $this->pheanstalk->watch($tube)->ignore('default')->reserve(5);
if ($job instanceof Job) {
$data = $job->getData();
$consumer->consume($data);
$this->pheanstalk->delete($job);

$memoryUsage = memory_get_peak_usage(true);
++$counter;

$output->writeln(sprintf('Job "%d" of tube "%s" completed.', $counter, $tube));
}
} catch (Exception $e) {
$this->pheanstalk->bury($job);

$this->logger->error(sprintf(
'Consuming "%s" tube resulted in "%s" exception: [%s:%s]',
$tube,
get_class($e),
$e->getCode(),
$e->getMessage()
));

if ($e instanceof DBALException || $e instanceof ORMException || $e instanceof PDOException) {
return;
}
}

usleep(self::SLEEP_TIME);
}
}
}

services:
inanzzz_application.command.job_processor:
class: Inanzzz\ApplicationBundle\Command\JobProcessorCommand
tags:
- { name: console.command }
arguments:
- '@leezy.pheanstalk'
- '@logger'
- '%tube_consumer_service_id_prefix%'

Behat


Behat.yml


default:
extensions:
Behat\Symfony2Extension: ~
Behat\MinkExtension:
base_url: http://inanzzz.dev:8082/app_test.php
sessions:
symfony2:
symfony2: ~
suites:
app:
type: symfony_bundle
bundle: InanzzzApplicationBundle
mink_session: symfony2
contexts:
- Inanzzz\ApplicationBundle\Features\Context\FeatureContext
- Inanzzz\ApplicationBundle\Features\Context\BeanstalkContext

FeatureContext


namespace Inanzzz\ApplicationBundle\Features\Context;

use Behat\MinkExtension\Context\MinkContext;
use Behat\Symfony2Extension\Context\KernelAwareContext;
use Symfony\Component\HttpKernel\KernelInterface;

class FeatureContext extends MinkContext implements KernelAwareContext
{
/** @var KernelInterface */
private $kernel;

/**
* @param KernelInterface $kernel
*/
public function setKernel(KernelInterface $kernel)
{
$this->kernel = $kernel;
}

/**
* @return KernelInterface
*/
public function getKernel()
{
return $this->kernel;
}
}

BeanstalkContext


namespace Inanzzz\ApplicationBundle\Features\Context;

use Behat\Behat\Context\Context;
use Behat\Behat\Hook\Scope\BeforeScenarioScope;
use Behat\Gherkin\Node\TableNode;
use LogicException;
use Pheanstalk\PheanstalkInterface;
use Symfony\Component\Process\Exception\ProcessFailedException;
use Symfony\Component\Process\Process;

class BeanstalkContext implements Context
{
const WORKER_COMMAND = 'php bin/console job:processor %s --limit=%d --env=test';
const CURRENT_JOB_PREFIX = 'current-jobs-';

/** @var FeatureContext $featureContext */
private $featureContext;
private $jobItems = [
'current-jobs-urgent',
'current-jobs-ready',
'current-jobs-reserved',
'current-jobs-delayed',
'current-jobs-buried',
];

/**
* @param BeforeScenarioScope $scope
*
* @BeforeScenario
*/
public function gatherContexts(BeforeScenarioScope $scope)
{
$environment = $scope->getEnvironment();

$this->featureContext = $environment->getContext(FeatureContext::class);
}

/**
* @param string $tube
*
* @When /^the "([^"]*)" tube is empty$/
*/
public function theTubeIsEmpty($tube)
{
$tube = $tube.'_test';

/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->getPheanstalk($tube);

$totalJobs = 0;
foreach ($pheanstalk->statsTube($tube)->getIterator() as $item => $value) {
if (in_array($item, $this->jobItems)) {
$totalJobs += $value;
}
}

if ($totalJobs != 0) {
throw new LogicException(sprintf('Tube contains "%d" jobs.', $totalJobs));
}
}

/**
* @param int $total
* @param string $state
* @param string $tube
*
* @When /^there should be (\d+) "(urgent|ready|reserved|delayed|buried)" job in "([^"]*)" tube$/
*/
public function thereShouldBeJobInTube($total, $state, $tube)
{
$tube = $tube.'_test';

/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->getPheanstalk($tube);

$totalJobs = 0;
foreach ($pheanstalk->statsTube($tube)->getIterator() as $item => $value) {
if ($item == self::CURRENT_JOB_PREFIX.$state) {
$totalJobs = $value;

break;
}
}

if ($totalJobs != $total) {
throw new LogicException(sprintf('Tube contains "%d" jobs.', $totalJobs));
}
}

/**
* @param int $total
* @param string $tube
* @param TableNode $tableNode
*
* @When /^I consume (\d+) job in "([^"]*)" tube:$/
*/
public function iConsumeJobInTube($total, $tube, TableNode $tableNode)
{
$process = new Process(sprintf(self::WORKER_COMMAND, $tube, $total));
$process->run();
if (!$process->isSuccessful()) {
throw new ProcessFailedException($process);
}

$expectedOutput = array_keys($tableNode->getRowsHash());
$expectedOutput = implode(PHP_EOL, $expectedOutput);

if (trim($process->getOutput()) != $expectedOutput) {
throw new LogicException('Output mismatch.');
}
}

/**
* @param string $tube
*
* @return PheanstalkInterface
*/
private function getPheanstalk($tube)
{
/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->featureContext->getKernel()->getContainer()->get('leezy.pheanstalk');

if (!in_array($tube, $pheanstalk->listTubes())) {
throw new LogicException('Tube not found.');
}

return $pheanstalk;
}
}

Users.feature


Feature: Working with users.
In order to interact with users
As a user
I should be able to make requests and consume beanstalk tubes

Scenario: Successfully creating user and consuming beanstalk tube.
Given I am on "/jobs"
Then I should see "This is a beanstalk application!"
And the "user_create" tube is empty
When I am on "/jobs/create/inanzzz"
Then the response status code should be 202
And there should be 1 "ready" job in "user_create" tube
And I consume 1 job in "user_create" tube:
| Processing "1" jobs from "user_create_test" tube... |
| Job "1" of tube "user_create_test" completed. |
And the "user_create" tube is empty

Test


$ vendor/bin/behat --suite=app
Feature: Working with users.
In order to interact with users
As a user
I should be able to make requests and consume beanstalk tubes

Scenario: Successfully creating user and consuming beanstalk tube.
Given I am on "/jobs"
Then I should see "This is a beanstalk application!"
And the "user_create" tube is empty
When I am on "/jobs/create/inanzzz"
Then the response status code should be 202
And there should be 1 "ready" job in "user_create" tube
And I consume 1 job in "user_create" tube:
| Processing "1" jobs from "user_create_test" tube... |
| Job "1" of tube "user_create_test" completed. |
And the "user_create" tube is empty

1 scenario (1 passed)
8 steps (8 passed)
0m1.38s (26.11Mb)

Addition


If you want to check the content of a job then you can add method below.


...
/**
* @Then /^the job in "([^"]*)" tube contains:$/
*/
public function theJobInTubeContains($tube, PyStringNode $string)
{
$tube = $tube.'_test';

/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->getPheanstalk($tube);

$job = $pheanstalk->watch($tube)->ignore('default')->peekReady();
if (!$job instanceof Job) {
throw new LogicException('Tube is empty.');
}

if ($job->getData() != $string->getRaw()) {
throw new LogicException('Job content mismatch.');
}
}
...

And the job in "user_create" tube contains:
"""
{"id":"1","type":"whatever"}
"""