Hello everyone!

We have been investing plenty of personal time and energy for many years to share our knowledge with you all. However, we now need your help to keep this blog running. All you have to do is just click one of the adverts on the site, otherwise it will sadly be taken down due to hosting etc. costs. Thank you.

In this example we're going to test beanstalk application with behat. Test is based on a user being created. The job is put into a tube, worker command picks it up and passes it onto a consumer to be consumed. The example is using LeezyPheanstalkBundle library.


Setup


Composer


Install libraries below.


"leezy/pheanstalk-bundle": "3.2.0"
"behat/behat": "3.2.2"
"behat/symfony2-extension": "2.1.1"
"behat/mink": "1.7.1"
"behat/mink-extension": "2.2"
"behat/mink-browserkit-driver": "1.3.2"

Paramaters.yml


parameters:
tube_consumer_service_id_prefix: "inanzzz_application.consumer."
user_create_tube: "user_create_%kernel.environment%"

DefaultController


You should normally use a POST method with a json request body to create a user but I'm being lazy this time!


namespace Inanzzz\ApplicationBundle\Controller;

use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Inanzzz\ApplicationBundle\Producer\ProducerInterface;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("jobs/", service="inanzzz_application.controller.default")
*/
class DefaultController
{
private $userCreateProducer;

public function __construct(
ProducerInterface $userCreateProducer
) {
$this->userCreateProducer = $userCreateProducer;
}

/**
* @Method({"GET"})
* @Route("")
*
* @return Response
*/
public function indexAction()
{
return new Response('This is a beanstalk application!');
}

/**
* @Method({"GET"})
* @Route("create/{name}")
*
* @return Response
*/
public function createAction($name)
{
try {
$this->userCreateProducer->produce($name);
} catch (ProducerException $e) {
return new Response(null, Response::HTTP_BAD_REQUEST);
}

return new Response(null, Response::HTTP_ACCEPTED);
}
}

services:
inanzzz_application.controller.default:
class: Inanzzz\ApplicationBundle\Controller\DefaultController
arguments:
- '@inanzzz_application.producer.user_create'

UserCreateProducer


namespace Inanzzz\ApplicationBundle\Producer;

interface ProducerInterface
{
public function produce($name);
}

namespace Inanzzz\ApplicationBundle\Producer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Pheanstalk\PheanstalkInterface;

class UserCreateProducer implements ProducerInterface
{
private $pheanstalk;
private $tube;

public function __construct(
PheanstalkInterface $pheanstalk,
$tube
) {
$this->pheanstalk = $pheanstalk;
$this->tube = $tube;
}

public function produce($name)
{
try {
$this->pheanstalk->useTube($this->tube)->put($name);
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.producer.user_create:
class: Inanzzz\ApplicationBundle\Producer\UserCreateProducer
arguments:
- '@leezy.pheanstalk'
- '%user_create_tube%'

UserCreateConsumer


namespace Inanzzz\ApplicationBundle\Exception;

use RuntimeException;

class ProducerException extends RuntimeException
{
}

namespace Inanzzz\ApplicationBundle\Consumer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Psr\Log\LoggerInterface;

class UserCreateConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function consume($name)
{
try {
// Do something with the $name

$this->logger->info(sprintf('User [%s] has been created.', $name));
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.consumer.user_create:
class: Inanzzz\ApplicationBundle\Consumer\UserCreateConsumer
arguments:
- '@logger'

JobProcessorCommand


namespace Inanzzz\ApplicationBundle\Command;

use Doctrine\DBAL\DBALException;
use Doctrine\ORM\ORMException;
use Exception;
use Inanzzz\ApplicationBundle\Consumer\ConsumerInterface;
use PDOException;
use Pheanstalk\Job;
use Pheanstalk\PheanstalkInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

class JobProcessorCommand extends Command implements ContainerAwareInterface
{
const MAXIMUM_JOB_LIMIT = 250;
const MAXIMUM_MEMORY_USAGE_LIMIT = 104857600; // As in Bytes = 100MB
const SLEEP_TIME = 250000; // As in Microseconds = 0.25sec
const WATCH_TIMEOUT = 60; // As in seconds

/** @var ContainerInterface */
private $container;
private $pheanstalk;
private $logger;
private $consumerServiceIdPrefix;

public function __construct(
PheanstalkInterface $pheanstalk,
LoggerInterface $logger,
$consumerServiceIdPrefix
) {
parent::__construct();

$this->pheanstalk = $pheanstalk;
$this->logger = $logger;
$this->consumerServiceIdPrefix = $consumerServiceIdPrefix;
}

public function setContainer(ContainerInterface $container = null)
{
$this->container = $container;
}

protected function configure()
{
$this
->setName('job:processor')
->setDescription('Processing beanstalk jobs for given tubes.')
->addArgument(
'tube',
InputOption::VALUE_REQUIRED,
'Name of the tube to process jobs from?'
)
->addOption(
'limit',
null,
InputOption::VALUE_REQUIRED,
'Maximum how many jobs should be consumed?',
self::MAXIMUM_JOB_LIMIT
);
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$environment = $input->getOption('env');
$tube = $input->getArgument('tube');
$limit = $input->getOption('limit');

$consumer = $this->container->get($this->consumerServiceIdPrefix.$tube);
if (!$consumer instanceof ConsumerInterface) {
$output->writeln(sprintf('Consumer "%s" cannot be found.', $consumer));

return;
}

$tube = $tube.'_'.$environment;
if (!in_array($tube, $this->pheanstalk->listTubes())) {
$output->writeln(sprintf('Tube "%s" cannot be found.', $tube));

return;
}

$limit = $limit > self::MAXIMUM_JOB_LIMIT ? self::MAXIMUM_JOB_LIMIT : $limit;

$output->writeln(sprintf('Processing "%d" jobs from "%s" tube...', $limit, $tube));

$this->process($output, $consumer, $tube, $limit);
}

private function process(OutputInterface $output, ConsumerInterface $consumer, $tube, $limit)
{
$counter = 0;
$memoryUsage = memory_get_peak_usage(true);

while (true) {
if ($counter >= $limit) {
return;
}

if ($memoryUsage > self::MAXIMUM_MEMORY_USAGE_LIMIT) {
$this->logger->info(sprintf(
'Processing "%s" tube exhausted memory with "%sMB" of usage.',
$limit,
($memoryUsage/1024/1024)
));

return;
}

try {
/** @var Job $job */
$job = $this->pheanstalk->watch($tube)->ignore('default')->reserve(5);
if ($job instanceof Job) {
$data = $job->getData();
$consumer->consume($data);
$this->pheanstalk->delete($job);

$memoryUsage = memory_get_peak_usage(true);
++$counter;

$output->writeln(sprintf('Job "%d" of tube "%s" completed.', $counter, $tube));
}
} catch (Exception $e) {
$this->pheanstalk->bury($job);

$this->logger->error(sprintf(
'Consuming "%s" tube resulted in "%s" exception: [%s:%s]',
$tube,
get_class($e),
$e->getCode(),
$e->getMessage()
));

if ($e instanceof DBALException || $e instanceof ORMException || $e instanceof PDOException) {
return;
}
}

usleep(self::SLEEP_TIME);
}
}
}

services:
inanzzz_application.command.job_processor:
class: Inanzzz\ApplicationBundle\Command\JobProcessorCommand
tags:
- { name: console.command }
arguments:
- '@leezy.pheanstalk'
- '@logger'
- '%tube_consumer_service_id_prefix%'

Behat


Behat.yml


default:
extensions:
Behat\Symfony2Extension: ~
Behat\MinkExtension:
base_url: http://inanzzz.dev:8082/app_test.php
sessions:
symfony2:
symfony2: ~
suites:
app:
type: symfony_bundle
bundle: InanzzzApplicationBundle
mink_session: symfony2
contexts:
- Inanzzz\ApplicationBundle\Features\Context\FeatureContext
- Inanzzz\ApplicationBundle\Features\Context\BeanstalkContext

FeatureContext


namespace Inanzzz\ApplicationBundle\Features\Context;

use Behat\MinkExtension\Context\MinkContext;
use Behat\Symfony2Extension\Context\KernelAwareContext;
use Symfony\Component\HttpKernel\KernelInterface;

class FeatureContext extends MinkContext implements KernelAwareContext
{
/** @var KernelInterface */
private $kernel;

/**
* @param KernelInterface $kernel
*/
public function setKernel(KernelInterface $kernel)
{
$this->kernel = $kernel;
}

/**
* @return KernelInterface
*/
public function getKernel()
{
return $this->kernel;
}
}

BeanstalkContext


namespace Inanzzz\ApplicationBundle\Features\Context;

use Behat\Behat\Context\Context;
use Behat\Behat\Hook\Scope\BeforeScenarioScope;
use Behat\Gherkin\Node\TableNode;
use LogicException;
use Pheanstalk\PheanstalkInterface;
use Symfony\Component\Process\Exception\ProcessFailedException;
use Symfony\Component\Process\Process;

class BeanstalkContext implements Context
{
const WORKER_COMMAND = 'php bin/console job:processor %s --limit=%d --env=test';
const CURRENT_JOB_PREFIX = 'current-jobs-';

/** @var FeatureContext $featureContext */
private $featureContext;
private $jobItems = [
'current-jobs-urgent',
'current-jobs-ready',
'current-jobs-reserved',
'current-jobs-delayed',
'current-jobs-buried',
];

/**
* @param BeforeScenarioScope $scope
*
* @BeforeScenario
*/
public function gatherContexts(BeforeScenarioScope $scope)
{
$environment = $scope->getEnvironment();

$this->featureContext = $environment->getContext(FeatureContext::class);
}

/**
* @param string $tube
*
* @When /^the "([^"]*)" tube is empty$/
*/
public function theTubeIsEmpty($tube)
{
$tube = $tube.'_test';

/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->getPheanstalk($tube);

$totalJobs = 0;
foreach ($pheanstalk->statsTube($tube)->getIterator() as $item => $value) {
if (in_array($item, $this->jobItems)) {
$totalJobs += $value;
}
}

if ($totalJobs != 0) {
throw new LogicException(sprintf('Tube contains "%d" jobs.', $totalJobs));
}
}

/**
* @param int $total
* @param string $state
* @param string $tube
*
* @When /^there should be (\d+) "(urgent|ready|reserved|delayed|buried)" job in "([^"]*)" tube$/
*/
public function thereShouldBeJobInTube($total, $state, $tube)
{
$tube = $tube.'_test';

/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->getPheanstalk($tube);

$totalJobs = 0;
foreach ($pheanstalk->statsTube($tube)->getIterator() as $item => $value) {
if ($item == self::CURRENT_JOB_PREFIX.$state) {
$totalJobs = $value;

break;
}
}

if ($totalJobs != $total) {
throw new LogicException(sprintf('Tube contains "%d" jobs.', $totalJobs));
}
}

/**
* @param int $total
* @param string $tube
* @param TableNode $tableNode
*
* @When /^I consume (\d+) job in "([^"]*)" tube:$/
*/
public function iConsumeJobInTube($total, $tube, TableNode $tableNode)
{
$process = new Process(sprintf(self::WORKER_COMMAND, $tube, $total));
$process->run();
if (!$process->isSuccessful()) {
throw new ProcessFailedException($process);
}

$expectedOutput = array_keys($tableNode->getRowsHash());
$expectedOutput = implode(PHP_EOL, $expectedOutput);

if (trim($process->getOutput()) != $expectedOutput) {
throw new LogicException('Output mismatch.');
}
}

/**
* @param string $tube
*
* @return PheanstalkInterface
*/
private function getPheanstalk($tube)
{
/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->featureContext->getKernel()->getContainer()->get('leezy.pheanstalk');

if (!in_array($tube, $pheanstalk->listTubes())) {
throw new LogicException('Tube not found.');
}

return $pheanstalk;
}
}

Users.feature


Feature: Working with users.
In order to interact with users
As a user
I should be able to make requests and consume beanstalk tubes

Scenario: Successfully creating user and consuming beanstalk tube.
Given I am on "/jobs"
Then I should see "This is a beanstalk application!"
And the "user_create" tube is empty
When I am on "/jobs/create/inanzzz"
Then the response status code should be 202
And there should be 1 "ready" job in "user_create" tube
And I consume 1 job in "user_create" tube:
| Processing "1" jobs from "user_create_test" tube... |
| Job "1" of tube "user_create_test" completed. |
And the "user_create" tube is empty

Test


$ vendor/bin/behat --suite=app
Feature: Working with users.
In order to interact with users
As a user
I should be able to make requests and consume beanstalk tubes

Scenario: Successfully creating user and consuming beanstalk tube.
Given I am on "/jobs"
Then I should see "This is a beanstalk application!"
And the "user_create" tube is empty
When I am on "/jobs/create/inanzzz"
Then the response status code should be 202
And there should be 1 "ready" job in "user_create" tube
And I consume 1 job in "user_create" tube:
| Processing "1" jobs from "user_create_test" tube... |
| Job "1" of tube "user_create_test" completed. |
And the "user_create" tube is empty

1 scenario (1 passed)
8 steps (8 passed)
0m1.38s (26.11Mb)

Addition


If you want to check the content of a job then you can add method below.


...
/**
* @Then /^the job in "([^"]*)" tube contains:$/
*/
public function theJobInTubeContains($tube, PyStringNode $string)
{
$tube = $tube.'_test';

/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->getPheanstalk($tube);

$job = $pheanstalk->watch($tube)->ignore('default')->peekReady();
if (!$job instanceof Job) {
throw new LogicException('Tube is empty.');
}

if ($job->getData() != $string->getRaw()) {
throw new LogicException('Job content mismatch.');
}
}
...

And the job in "user_create" tube contains:
"""
{"id":"1","type":"whatever"}
"""