Bu örneğimizde behat ile bir beanstalk symfony uygulamasını test edeceğiz. Testimizde bir kullanıcı yaratacağız. Producer (yayınlayıcı) işleri tube (sıra) içine atacak, terminal worker (işçi) komutu arka planda çalışarak sırayı dinleyip aldığı işleri consumer'e (tüketici) işlenmeleri için verecek. Bu örnek LeezyPheanstalkBundle kütüphanesi kullanıyor.


Kurulum


Composer


Aşağıdaki kütüphaneleri yükleyin.


"leezy/pheanstalk-bundle": "3.2.0"
"behat/behat": "3.2.2"
"behat/symfony2-extension": "2.1.1"
"behat/mink": "1.7.1"
"behat/mink-extension": "2.2"
"behat/mink-browserkit-driver": "1.3.2"

Paramaters.yml


parameters:
tube_consumer_service_id_prefix: "inanzzz_application.consumer."
user_create_tube: "user_create_%kernel.environment%"

DefaultController


Bir kullanıcı yaratmak için aslında içinde json içeriği olan bir POST isteği göndermeniz lazım, ama ben bu seferlik tembellik yapıyorum!


namespace Inanzzz\ApplicationBundle\Controller;

use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Inanzzz\ApplicationBundle\Producer\ProducerInterface;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("jobs/", service="inanzzz_application.controller.default")
*/
class DefaultController
{
private $userCreateProducer;

public function __construct(
ProducerInterface $userCreateProducer
) {
$this->userCreateProducer = $userCreateProducer;
}

/**
* @Method({"GET"})
* @Route("")
*
* @return Response
*/
public function indexAction()
{
return new Response('This is a beanstalk application!');
}

/**
* @Method({"GET"})
* @Route("create/{name}")
*
* @return Response
*/
public function createAction($name)
{
try {
$this->userCreateProducer->produce($name);
} catch (ProducerException $e) {
return new Response(null, Response::HTTP_BAD_REQUEST);
}

return new Response(null, Response::HTTP_ACCEPTED);
}
}

services:
inanzzz_application.controller.default:
class: Inanzzz\ApplicationBundle\Controller\DefaultController
arguments:
- '@inanzzz_application.producer.user_create'

UserCreateProducer


namespace Inanzzz\ApplicationBundle\Producer;

interface ProducerInterface
{
public function produce($name);
}

namespace Inanzzz\ApplicationBundle\Producer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Pheanstalk\PheanstalkInterface;

class UserCreateProducer implements ProducerInterface
{
private $pheanstalk;
private $tube;

public function __construct(
PheanstalkInterface $pheanstalk,
$tube
) {
$this->pheanstalk = $pheanstalk;
$this->tube = $tube;
}

public function produce($name)
{
try {
$this->pheanstalk->useTube($this->tube)->put($name);
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.producer.user_create:
class: Inanzzz\ApplicationBundle\Producer\UserCreateProducer
arguments:
- '@leezy.pheanstalk'
- '%user_create_tube%'

UserCreateConsumer


namespace Inanzzz\ApplicationBundle\Exception;

use RuntimeException;

class ProducerException extends RuntimeException
{
}

namespace Inanzzz\ApplicationBundle\Consumer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Psr\Log\LoggerInterface;

class UserCreateConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function consume($name)
{
try {
// Do something with the $name

$this->logger->info(sprintf('User [%s] has been created.', $name));
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.consumer.user_create:
class: Inanzzz\ApplicationBundle\Consumer\UserCreateConsumer
arguments:
- '@logger'

JobProcessorCommand


namespace Inanzzz\ApplicationBundle\Command;

use Doctrine\DBAL\DBALException;
use Doctrine\ORM\ORMException;
use Exception;
use Inanzzz\ApplicationBundle\Consumer\ConsumerInterface;
use PDOException;
use Pheanstalk\Job;
use Pheanstalk\PheanstalkInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

class JobProcessorCommand extends Command implements ContainerAwareInterface
{
const MAXIMUM_JOB_LIMIT = 250;
const MAXIMUM_MEMORY_USAGE_LIMIT = 104857600; // As in Bytes = 100MB
const SLEEP_TIME = 250000; // As in Microseconds = 0.25sec
const WATCH_TIMEOUT = 60; // As in seconds

/** @var ContainerInterface */
private $container;
private $pheanstalk;
private $logger;
private $consumerServiceIdPrefix;

public function __construct(
PheanstalkInterface $pheanstalk,
LoggerInterface $logger,
$consumerServiceIdPrefix
) {
parent::__construct();

$this->pheanstalk = $pheanstalk;
$this->logger = $logger;
$this->consumerServiceIdPrefix = $consumerServiceIdPrefix;
}

public function setContainer(ContainerInterface $container = null)
{
$this->container = $container;
}

protected function configure()
{
$this
->setName('job:processor')
->setDescription('Processing beanstalk jobs for given tubes.')
->addArgument(
'tube',
InputOption::VALUE_REQUIRED,
'Name of the tube to process jobs from?'
)
->addOption(
'limit',
null,
InputOption::VALUE_REQUIRED,
'Maximum how many jobs should be consumed?',
self::MAXIMUM_JOB_LIMIT
);
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$environment = $input->getOption('env');
$tube = $input->getArgument('tube');
$limit = $input->getOption('limit');

$consumer = $this->container->get($this->consumerServiceIdPrefix.$tube);
if (!$consumer instanceof ConsumerInterface) {
$output->writeln(sprintf('Consumer "%s" cannot be found.', $consumer));

return;
}

$tube = $tube.'_'.$environment;
if (!in_array($tube, $this->pheanstalk->listTubes())) {
$output->writeln(sprintf('Tube "%s" cannot be found.', $tube));

return;
}

$limit = $limit > self::MAXIMUM_JOB_LIMIT ? self::MAXIMUM_JOB_LIMIT : $limit;

$output->writeln(sprintf('Processing "%d" jobs from "%s" tube...', $limit, $tube));

$this->process($output, $consumer, $tube, $limit);
}

private function process(OutputInterface $output, ConsumerInterface $consumer, $tube, $limit)
{
$counter = 0;
$memoryUsage = memory_get_peak_usage(true);

while (true) {
if ($counter >= $limit) {
return;
}

if ($memoryUsage > self::MAXIMUM_MEMORY_USAGE_LIMIT) {
$this->logger->info(sprintf(
'Processing "%s" tube exhausted memory with "%sMB" of usage.',
$limit,
($memoryUsage/1024/1024)
));

return;
}

try {
/** @var Job $job */
$job = $this->pheanstalk->watch($tube)->ignore('default')->reserve(5);
if ($job instanceof Job) {
$data = $job->getData();
$consumer->consume($data);
$this->pheanstalk->delete($job);

$memoryUsage = memory_get_peak_usage(true);
++$counter;

$output->writeln(sprintf('Job "%d" of tube "%s" completed.', $counter, $tube));
}
} catch (Exception $e) {
$this->pheanstalk->bury($job);

$this->logger->error(sprintf(
'Consuming "%s" tube resulted in "%s" exception: [%s:%s]',
$tube,
get_class($e),
$e->getCode(),
$e->getMessage()
));

if ($e instanceof DBALException || $e instanceof ORMException || $e instanceof PDOException) {
return;
}
}

usleep(self::SLEEP_TIME);
}
}
}

services:
inanzzz_application.command.job_processor:
class: Inanzzz\ApplicationBundle\Command\JobProcessorCommand
tags:
- { name: console.command }
arguments:
- '@leezy.pheanstalk'
- '@logger'
- '%tube_consumer_service_id_prefix%'

Behat


Behat.yml


default:
extensions:
Behat\Symfony2Extension: ~
Behat\MinkExtension:
base_url: http://inanzzz.dev:8082/app_test.php
sessions:
symfony2:
symfony2: ~
suites:
app:
type: symfony_bundle
bundle: InanzzzApplicationBundle
mink_session: symfony2
contexts:
- Inanzzz\ApplicationBundle\Features\Context\FeatureContext
- Inanzzz\ApplicationBundle\Features\Context\BeanstalkContext

FeatureContext


namespace Inanzzz\ApplicationBundle\Features\Context;

use Behat\MinkExtension\Context\MinkContext;
use Behat\Symfony2Extension\Context\KernelAwareContext;
use Symfony\Component\HttpKernel\KernelInterface;

class FeatureContext extends MinkContext implements KernelAwareContext
{
/** @var KernelInterface */
private $kernel;

/**
* @param KernelInterface $kernel
*/
public function setKernel(KernelInterface $kernel)
{
$this->kernel = $kernel;
}

/**
* @return KernelInterface
*/
public function getKernel()
{
return $this->kernel;
}
}

BeanstalkContext


namespace Inanzzz\ApplicationBundle\Features\Context;

use Behat\Behat\Context\Context;
use Behat\Behat\Hook\Scope\BeforeScenarioScope;
use Behat\Gherkin\Node\TableNode;
use LogicException;
use Pheanstalk\PheanstalkInterface;
use Symfony\Component\Process\Exception\ProcessFailedException;
use Symfony\Component\Process\Process;

class BeanstalkContext implements Context
{
const WORKER_COMMAND = 'php bin/console job:processor %s --limit=%d --env=test';
const CURRENT_JOB_PREFIX = 'current-jobs-';

/** @var FeatureContext $featureContext */
private $featureContext;
private $jobItems = [
'current-jobs-urgent',
'current-jobs-ready',
'current-jobs-reserved',
'current-jobs-delayed',
'current-jobs-buried',
];

/**
* @param BeforeScenarioScope $scope
*
* @BeforeScenario
*/
public function gatherContexts(BeforeScenarioScope $scope)
{
$environment = $scope->getEnvironment();

$this->featureContext = $environment->getContext(FeatureContext::class);
}

/**
* @param string $tube
*
* @When /^the "([^"]*)" tube is empty$/
*/
public function theTubeIsEmpty($tube)
{
$tube = $tube.'_test';

/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->getPheanstalk($tube);

$totalJobs = 0;
foreach ($pheanstalk->statsTube($tube)->getIterator() as $item => $value) {
if (in_array($item, $this->jobItems)) {
$totalJobs += $value;
}
}

if ($totalJobs != 0) {
throw new LogicException(sprintf('Tube contains "%d" jobs.', $totalJobs));
}
}

/**
* @param int $total
* @param string $state
* @param string $tube
*
* @When /^there should be (\d+) "(urgent|ready|reserved|delayed|buried)" job in "([^"]*)" tube$/
*/
public function thereShouldBeJobInTube($total, $state, $tube)
{
$tube = $tube.'_test';

/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->getPheanstalk($tube);

$totalJobs = 0;
foreach ($pheanstalk->statsTube($tube)->getIterator() as $item => $value) {
if ($item == self::CURRENT_JOB_PREFIX.$state) {
$totalJobs = $value;

break;
}
}

if ($totalJobs != $total) {
throw new LogicException(sprintf('Tube contains "%d" jobs.', $totalJobs));
}
}

/**
* @param int $total
* @param string $tube
* @param TableNode $tableNode
*
* @When /^I consume (\d+) job in "([^"]*)" tube:$/
*/
public function iConsumeJobInTube($total, $tube, TableNode $tableNode)
{
$process = new Process(sprintf(self::WORKER_COMMAND, $tube, $total));
$process->run();
if (!$process->isSuccessful()) {
throw new ProcessFailedException($process);
}

$expectedOutput = array_keys($tableNode->getRowsHash());
$expectedOutput = implode(PHP_EOL, $expectedOutput);

if (trim($process->getOutput()) != $expectedOutput) {
throw new LogicException('Output mismatch.');
}
}

/**
* @param string $tube
*
* @return PheanstalkInterface
*/
private function getPheanstalk($tube)
{
/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->featureContext->getKernel()->getContainer()->get('leezy.pheanstalk');

if (!in_array($tube, $pheanstalk->listTubes())) {
throw new LogicException('Tube not found.');
}

return $pheanstalk;
}
}

Users.feature


Feature: Working with users.
In order to interact with users
As a user
I should be able to make requests and consume beanstalk tubes

Scenario: Successfully creating user and consuming beanstalk tube.
Given I am on "/jobs"
Then I should see "This is a beanstalk application!"
And the "user_create" tube is empty
When I am on "/jobs/create/inanzzz"
Then the response status code should be 202
And there should be 1 "ready" job in "user_create" tube
And I consume 1 job in "user_create" tube:
| Processing "1" jobs from "user_create_test" tube... |
| Job "1" of tube "user_create_test" completed. |
And the "user_create" tube is empty

Test


$ vendor/bin/behat --suite=app
Feature: Working with users.
In order to interact with users
As a user
I should be able to make requests and consume beanstalk tubes

Scenario: Successfully creating user and consuming beanstalk tube.
Given I am on "/jobs"
Then I should see "This is a beanstalk application!"
And the "user_create" tube is empty
When I am on "/jobs/create/inanzzz"
Then the response status code should be 202
And there should be 1 "ready" job in "user_create" tube
And I consume 1 job in "user_create" tube:
| Processing "1" jobs from "user_create_test" tube... |
| Job "1" of tube "user_create_test" completed. |
And the "user_create" tube is empty

1 scenario (1 passed)
8 steps (8 passed)
0m1.38s (26.11Mb)

Ek


Eğer sırada bekleyen bir işin içeriğini kontrol etmek isterseniz, aşağıdaki methodu kullanabilirsiniz.


...
/**
* @Then /^the job in "([^"]*)" tube contains:$/
*/
public function theJobInTubeContains($tube, PyStringNode $string)
{
$tube = $tube.'_test';

/** @var PheanstalkInterface $pheanstalk */
$pheanstalk = $this->getPheanstalk($tube);

$job = $pheanstalk->watch($tube)->ignore('default')->peekReady();
if (!$job instanceof Job) {
throw new LogicException('Tube is empty.');
}

if ($job->getData() != $string->getRaw()) {
throw new LogicException('Job content mismatch.');
}
}
...

And the job in "user_create" tube contains:
"""
{"id":"1","type":"whatever"}
"""