In this example, we're going to create a symfony beanstalk application. The producer will put jobs into tube, a terminal command will run in background to listen the tube, pick up the jobs and pass them onto the consumer where jobs will be processed. This is how our example works. The example is using LeezyPheanstalkBundle library.


Note


The suffix of consumer service names and the prefix of tube names must match each other because the worker commands depend on them. For example:


# Parameter
user_create_tube: "user_create_%kernel.environment%" # "user_create ......."
# Service
inanzzz_application.consumer.user_create # ....... user_create
# Worker commands
$ php bin/console job:processor user_create --env=test # ....... user_create .......

Logic


We're going to create and delete users so we'll have 1 producer, 1 tube, 1 worker and 1 consumer dedicated to each logics.



Setup


Run $ composer require "leezy/pheanstalk-bundle":"3.2.0" to install LeezyPheanstalkBundle package and enable it by adding new Leezy\PheanstalkBundle\LeezyPheanstalkBundle() to AppKernel.php file.


Configuration


LeezyPheanstalkBundle


# app/config/config.yml
leezy_pheanstalk:
pheanstalks:
primary:
server: 127.0.0.1
default: true

Application


# app/config/parameters.yml
parameters:
user_create_tube: "user_create_%kernel.environment%"
user_delete_tube: "user_delete_%kernel.environment%"

Available commands


These are LeezyPheanstalkBundle related commands so they have nothing to do with our application. Visit LeezyPheanstalkBundle Command Line Tools page for full list.


# Get stats about beanstalkd server, tubes and jobs
$ php bin/console leezy:pheanstalk:stats

# List all available tubes
$ php bin/console leezy:pheanstalk:list-tube
Pheanstalk: default
- default
- user_create_test

# Get the first ready job and associated data in given tube
$ php bin/console leezy:pheanstalk:peek-tube user_create_test
Pheanstalk: default
Tube: user_create_test
Job id: 1
Data: a:1:{s:4:"name";s:3:"one";}

# Get the given job's data
$ php bin/console leezy:pheanstalk:peek 1
Pheanstalk: default
Job id: 1
Data: a:1:{s:4:"name";s:3:"one";}

# Delete the given job
$ php bin/console leezy:pheanstalk:delete-job 1
Pheanstalk: default
Job 1 deleted.

# Flush a tube. When you flush a tube, it will be removed from the beanstalkd server.
$ php bin/console leezy:pheanstalk:flush-tube user_create_test
Pheanstalk: default
Jobs deleted: 1.

DefaultController


namespace Inanzzz\ApplicationBundle\Controller;

use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Inanzzz\ApplicationBundle\Producer\ProducerInterface;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("jobs/", service="inanzzz_application.controller.default")
*/
class DefaultController
{
private $userCreateProducer;
private $userDeleteProducer;

public function __construct(
ProducerInterface $userCreateProducer,
ProducerInterface $userDeleteProducer
) {
$this->userCreateProducer = $userCreateProducer;
$this->userDeleteProducer = $userDeleteProducer;
}

/**
* @Method({"GET"})
* @Route("")
*
* @return Response
*/
public function indexAction()
{
return new Response('This is a beanstalk application!');
}

/**
* @Method({"POST"})
* @Route("")
*
* @return Response
*/
public function createAction(Request $request)
{
try {
$this->userCreateProducer->produce($request->getContent());
} catch (ProducerException $e) {
return new Response(null, Response::HTTP_BAD_REQUEST);
}

return new Response(null, Response::HTTP_ACCEPTED);
}

/**
* @Method({"DELETE"})
* @Route("{id}")
*
* @return Response
*/
public function deleteAction($id)
{
try {
$this->userDeleteProducer->produce($id);
} catch (ProducerException $e) {
return new Response(null, Response::HTTP_BAD_REQUEST);
}

return new Response(null, Response::HTTP_ACCEPTED);
}
}

services:
inanzzz_application.controller.default:
class: Inanzzz\ApplicationBundle\Controller\DefaultController
arguments:
- '@inanzzz_application.producer.user_create'
- '@inanzzz_application.producer.user_delete'

Producers


ProducerInterface


namespace Inanzzz\ApplicationBundle\Producer;

interface ProducerInterface
{
public function produce($user);
}

ProducerException


namespace Inanzzz\ApplicationBundle\Exception;

use RuntimeException;

class ProducerException extends RuntimeException
{
}

UserCreateProducer


namespace Inanzzz\ApplicationBundle\Producer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Pheanstalk\PheanstalkInterface;

class UserCreateProducer implements ProducerInterface
{
private $pheanstalk;
private $tube;

public function __construct(
PheanstalkInterface $pheanstalk,
$tube
) {
$this->pheanstalk = $pheanstalk;
$this->tube = $tube;
}

public function produce($user)
{
try {
$this->pheanstalk->useTube($this->tube)->put(serialize($user));
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.producer.user_create:
class: Inanzzz\ApplicationBundle\Producer\UserCreateProducer
arguments:
- '@leezy.pheanstalk'
- '%user_create_tube%'

UserDeleteProducer


namespace Inanzzz\ApplicationBundle\Producer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Pheanstalk\PheanstalkInterface;

class UserDeleteProducer implements ProducerInterface
{
private $pheanstalk;
private $tube;

public function __construct(
PheanstalkInterface $pheanstalk,
$tube
) {
$this->pheanstalk = $pheanstalk;
$this->tube = $tube;
}

public function produce($id)
{
try {
$this->pheanstalk->useTube($this->tube)->put($id);
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.producer.user_delete:
class: Inanzzz\ApplicationBundle\Producer\UserDeleteProducer
arguments:
- '@leezy.pheanstalk'
- '%user_delete_tube%'

Consumers


ConsumerInterface


namespace Inanzzz\ApplicationBundle\Consumer;

interface ConsumerInterface
{
public function consume($data);
}

ConsumerException


namespace Inanzzz\ApplicationBundle\Exception;

use RuntimeException;

class ConsumerException extends RuntimeException
{
}

UserCreateConsumer


namespace Inanzzz\ApplicationBundle\Consumer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Psr\Log\LoggerInterface;

class UserCreateConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function consume($user)
{
try {
// Do something with the $user
$user = unserialize($user);
$user = json_decode($user, true);

$this->logger->info(sprintf('User [%s] has been created.', $user['name']));
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.consumer.user_create:
class: Inanzzz\ApplicationBundle\Consumer\UserCreateConsumer
arguments:
- '@logger'

UserDeleteConsumer


namespace Inanzzz\ApplicationBundle\Consumer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Psr\Log\LoggerInterface;

class UserDeleteConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function consume($id)
{
try {
// Do something with the $id

$this->logger->info(sprintf('User [%s] has been deleted.', $id));
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.consumer.user_delete:
class: Inanzzz\ApplicationBundle\Consumer\UserDeleteConsumer
arguments:
- '@logger'

JobProcessorCommand


This worker command:



Usage example: $ php bin/console job:processor {user_create|user_delete} --limit={integer} --env={test|dev|prod}. The --limit and --env flags are not compulsory so you see what default values they would contain by testing it yourself.


Note: This note will apply only if you don't remove IF statement from execute() method below. To be able to run the command, the tube must be existed. If there is no job in tube, tube obviously won't be existed. If a new job comes in, tube will be created automatically but the command is not running! This is where supervisor comes in handy. Supervisor will try to run the command and jobs will be consumed.


namespace Inanzzz\ApplicationBundle\Command;

use Doctrine\DBAL\DBALException;
use Doctrine\ORM\ORMException;
use Exception;
use Inanzzz\ApplicationBundle\Consumer\ConsumerInterface;
use PDOException;
use Pheanstalk\Job;
use Pheanstalk\PheanstalkInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

class JobProcessorCommand extends Command implements ContainerAwareInterface
{
const MAXIMUM_JOB_LIMIT = 250;
const MAXIMUM_MEMORY_USAGE_LIMIT = 104857600; // As in Bytes = 100MB
const SLEEP_TIME = 250000; // As in Microseconds = 0.25sec
const WATCH_TIMEOUT = 60; // As in seconds

/** @var ContainerInterface */
private $container;
private $pheanstalk;
private $logger;
private $consumerServiceIdPrefix;

public function __construct(
PheanstalkInterface $pheanstalk,
LoggerInterface $logger,
$consumerServiceIdPrefix
) {
parent::__construct();

$this->pheanstalk = $pheanstalk;
$this->logger = $logger;
$this->consumerServiceIdPrefix = $consumerServiceIdPrefix;
}

public function setContainer(ContainerInterface $container = null)
{
$this->container = $container;
}

protected function configure()
{
$this
->setName('job:processor')
->setDescription('Processing beanstalk jobs for given tubes.')
->addArgument(
'tube',
InputOption::VALUE_REQUIRED,
'Name of the tube to process jobs from?'
)
->addOption(
'limit',
null,
InputOption::VALUE_REQUIRED,
'Maximum how many jobs should be consumed?',
self::MAXIMUM_JOB_LIMIT
);
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$environment = $input->getOption('env');
$tube = $input->getArgument('tube');
$limit = $input->getOption('limit');

$consumer = $this->container->get($this->consumerServiceIdPrefix.$tube);
if (!$consumer instanceof ConsumerInterface) {
$output->writeln(sprintf('Consumer "%s" cannot be found.', $consumer));

return;
}

$tube = $tube.'_'.$environment;

// IF YOU WANT YOUR COMMAND TO RUN EVEN IF THE TUBE DOESN'T EXIST THEN COMMENT THIS IF STATEMENT.
// I SUGGEST YOU TO REMOVE IT.
if (!in_array($tube, $this->pheanstalk->listTubes())) {
$output->writeln(sprintf('Tube "%s" cannot be found.', $tube));

return;
}
// END

$limit = $limit > self::MAXIMUM_JOB_LIMIT ? self::MAXIMUM_JOB_LIMIT : $limit;

$output->writeln(sprintf('Processing "%d" jobs from "%s" tube...', $limit, $tube));

$this->process($output, $consumer, $tube, $limit);
}

private function process(OutputInterface $output, ConsumerInterface $consumer, $tube, $limit)
{
$counter = 0;
$memoryUsage = memory_get_peak_usage(true);

while (true) {
if ($counter >= $limit) {
return;
}

if ($memoryUsage > self::MAXIMUM_MEMORY_USAGE_LIMIT) {
$this->logger->info(sprintf(
'Processing "%s" tube exhausted memory with "%sMB" of usage.',
$limit,
($memoryUsage/1024/1024)
));

return;
}

try {
/** @var Job $job */
$job = $this->pheanstalk->watch($tube)->ignore('default')->reserve(5);
if ($job instanceof Job) {
$data = $job->getData();
$consumer->consume($data);
$this->pheanstalk->delete($job);

$memoryUsage = memory_get_peak_usage(true);
++$counter;

$output->writeln(sprintf('Job "%d" of tube "%s" completed.', $counter, $tube));
}
} catch (Exception $e) {
$this->pheanstalk->bury($job);

$this->logger->error(sprintf(
'Consuming "%s" tube resulted in "%s" exception: [%s:%s]',
$tube,
get_class($e),
$e->getCode(),
$e->getMessage()
));

if ($e instanceof DBALException || $e instanceof ORMException || $e instanceof PDOException) {
return;
}
}

usleep(self::SLEEP_TIME);
}
}
}

services:
inanzzz_application.command.job_processor:
class: Inanzzz\ApplicationBundle\Command\JobProcessorCommand
tags:
- { name: console.command }
arguments:
- '@leezy.pheanstalk'
- '@logger'
- 'inanzzz_application.consumer.'

Tests


There are 3 jobs in "user_create_test" tube and we want to consume all of them.


$ php bin/console job:processor user_create--env=test
Processing "250" jobs from "user_create_test" tube...
Job "1" of tube "user_create_test" completed.
Job "2" of tube "user_create_test" completed.
Job "3" of tube "user_create_test" completed.

# This worker is still listening the tube

There are 20 jobs in "user_delete_dev" tube and we want to consume only 2 of them.


$ php bin/console job:processor user_delete --limit=2 --env=dev
Processing "2" jobs from "user_delete_dev" tube...
Job "1" of tube "user_delete_dev" completed.
Job "2" of tube "user_delete_dev" completed.

# This worker quit listening.