Bu yazımızda bir tane symfony beanstalk örneği yapacağız. Producer (yayınlayıcı) işleri tube (sıra) içine atacak, terminal worker (işçi) komutu arka planda çalışarak sırayı dinleyip aldığı işleri consumer'e (tüketici) işlenmeleri için verecek. Örnek bu kadar basit. Bu örnek LeezyPheanstalkBundle kütüphanesi kullanıyor.


Not


Consumer servis isimlerinin sonları ve sıra isimlerinin başları birbirlerine uymak zorunda, çünkü worker komutlarının çalışmaları onlara bağlıdır. Örnek:


# Parameter
user_create_tube: "user_create_%kernel.environment%" # "user_create ......."
# Service
inanzzz_application.consumer.user_create # ....... user_create
# Worker commands
$ php bin/console job:processor user_create --env=test # ....... user_create .......

Mantık


Örneğimizde kullanıcı yaratma ve silme işlemleri yapacağız yani her iki mantık için 1 producer, 1 tube, 1 worker ve 1 consumer olacak.



Kurulum


Terminalde $ composer require "leezy/pheanstalk-bundle":"3.2.0" komutunu çalıştırarak LeezyPheanstalkBundle paketini yükledikten sonra, new Leezy\PheanstalkBundle\LeezyPheanstalkBundle() satırını AppKernel.php dosyasına ekleyerek aktifleştirin.


Konfigürasyon


LeezyPheanstalkBundle


# app/config/config.yml
leezy_pheanstalk:
pheanstalks:
primary:
server: 127.0.0.1
default: true

Uygulama


# app/config/parameters.yml
parameters:
user_create_tube: "user_create_%kernel.environment%"
user_delete_tube: "user_delete_%kernel.environment%"

Mevcut komutlar


Bu komutlar LeezyPheanstalkBundle kütüphanesine aittir yani bizim uygulamamızla alakası yoktur. Daha fazla bilgi için LeezyPheanstalkBundle Command Line Tools sayfasını ziyaret edin.


# Get stats about beanstalkd server, tubes and jobs
$ php bin/console leezy:pheanstalk:stats

# List all available tubes
$ php bin/console leezy:pheanstalk:list-tube
Pheanstalk: default
- default
- user_create_test

# Get the first ready job and associated data in given tube
$ php bin/console leezy:pheanstalk:peek-tube user_create_test
Pheanstalk: default
Tube: user_create_test
Job id: 1
Data: a:1:{s:4:"name";s:3:"one";}

# Get the given job's data
$ php bin/console leezy:pheanstalk:peek 1
Pheanstalk: default
Job id: 1
Data: a:1:{s:4:"name";s:3:"one";}

# Delete the given job
$ php bin/console leezy:pheanstalk:delete-job 1
Pheanstalk: default
Job 1 deleted.

# Flush a tube. When you flush a tube, it will be removed from the beanstalkd server.
$ php bin/console leezy:pheanstalk:flush-tube user_create_test
Pheanstalk: default
Jobs deleted: 1.

DefaultController


namespace Inanzzz\ApplicationBundle\Controller;

use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Inanzzz\ApplicationBundle\Producer\ProducerInterface;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Route;
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Method;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

/**
* @Route("jobs/", service="inanzzz_application.controller.default")
*/
class DefaultController
{
private $userCreateProducer;
private $userDeleteProducer;

public function __construct(
ProducerInterface $userCreateProducer,
ProducerInterface $userDeleteProducer
) {
$this->userCreateProducer = $userCreateProducer;
$this->userDeleteProducer = $userDeleteProducer;
}

/**
* @Method({"GET"})
* @Route("")
*
* @return Response
*/
public function indexAction()
{
return new Response('This is a beanstalk application!');
}

/**
* @Method({"POST"})
* @Route("")
*
* @return Response
*/
public function createAction(Request $request)
{
try {
$this->userCreateProducer->produce($request->getContent());
} catch (ProducerException $e) {
return new Response(null, Response::HTTP_BAD_REQUEST);
}

return new Response(null, Response::HTTP_ACCEPTED);
}

/**
* @Method({"DELETE"})
* @Route("{id}")
*
* @return Response
*/
public function deleteAction($id)
{
try {
$this->userDeleteProducer->produce($id);
} catch (ProducerException $e) {
return new Response(null, Response::HTTP_BAD_REQUEST);
}

return new Response(null, Response::HTTP_ACCEPTED);
}
}

services:
inanzzz_application.controller.default:
class: Inanzzz\ApplicationBundle\Controller\DefaultController
arguments:
- '@inanzzz_application.producer.user_create'
- '@inanzzz_application.producer.user_delete'

Producerler (Yayınlayıcılar)


ProducerInterface


namespace Inanzzz\ApplicationBundle\Producer;

interface ProducerInterface
{
public function produce($user);
}

ProducerException


namespace Inanzzz\ApplicationBundle\Exception;

use RuntimeException;

class ProducerException extends RuntimeException
{
}

UserCreateProducer


namespace Inanzzz\ApplicationBundle\Producer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Pheanstalk\PheanstalkInterface;

class UserCreateProducer implements ProducerInterface
{
private $pheanstalk;
private $tube;

public function __construct(
PheanstalkInterface $pheanstalk,
$tube
) {
$this->pheanstalk = $pheanstalk;
$this->tube = $tube;
}

public function produce($user)
{
try {
$this->pheanstalk->useTube($this->tube)->put(serialize($user));
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.producer.user_create:
class: Inanzzz\ApplicationBundle\Producer\UserCreateProducer
arguments:
- '@leezy.pheanstalk'
- '%user_create_tube%'

UserDeleteProducer


namespace Inanzzz\ApplicationBundle\Producer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Pheanstalk\PheanstalkInterface;

class UserDeleteProducer implements ProducerInterface
{
private $pheanstalk;
private $tube;

public function __construct(
PheanstalkInterface $pheanstalk,
$tube
) {
$this->pheanstalk = $pheanstalk;
$this->tube = $tube;
}

public function produce($id)
{
try {
$this->pheanstalk->useTube($this->tube)->put($id);
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.producer.user_delete:
class: Inanzzz\ApplicationBundle\Producer\UserDeleteProducer
arguments:
- '@leezy.pheanstalk'
- '%user_delete_tube%'

Consumers (Tüketiciler)


ConsumerInterface


namespace Inanzzz\ApplicationBundle\Consumer;

interface ConsumerInterface
{
public function consume($data);
}

ConsumerException


namespace Inanzzz\ApplicationBundle\Exception;

use RuntimeException;

class ConsumerException extends RuntimeException
{
}

UserCreateConsumer


namespace Inanzzz\ApplicationBundle\Consumer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Psr\Log\LoggerInterface;

class UserCreateConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function consume($user)
{
try {
// Do something with the $user
$user = unserialize($user);
$user = json_decode($user, true);

$this->logger->info(sprintf('User [%s] has been created.', $user['name']));
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.consumer.user_create:
class: Inanzzz\ApplicationBundle\Consumer\UserCreateConsumer
arguments:
- '@logger'

UserDeleteConsumer


namespace Inanzzz\ApplicationBundle\Consumer;

use Exception;
use Inanzzz\ApplicationBundle\Exception\ProducerException;
use Psr\Log\LoggerInterface;

class UserDeleteConsumer implements ConsumerInterface
{
private $logger;

public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}

public function consume($id)
{
try {
// Do something with the $id

$this->logger->info(sprintf('User [%s] has been deleted.', $id));
} catch (Exception $e) {
throw new ProducerException($e->getMessage());
}
}
}

services:
inanzzz_application.consumer.user_delete:
class: Inanzzz\ApplicationBundle\Consumer\UserDeleteConsumer
arguments:
- '@logger'

JobProcessorCommand


Bu işleyici komutu:



Kullanım örneği: $ php bin/console job:processor {user_create|user_delete} --limit={integer} --env={test|dev|prod}. Buradaki --limit ve --env seçenekleri mecburi değildirler. Varsayılan değerlerinin ne olacaklarını test ederek görebilirsiniz.


Not: Bu not sadece aşağıda bulunan execute() methodundaki IF ifadesini kaldırmazsanız geçerli olacaktır. İşleyici komutunun çalışabilmesi için, sıranın mevcut olması lazım. Eğer herhangi bir iş yoksa, sıra mevcut olamaz. Yeni bir iş sisteme düştüğü zaman, sıra otomatik olarak yaratılır ama işleyici komutu çalışmıyor durumdadır. İşte bu gibi durumlarda supervisor yardımımıza yetişir. Supervisor işçi komutunu çalıştıracak ve işler işlenmeye başlayacaktır.


namespace Inanzzz\ApplicationBundle\Command;

use Doctrine\DBAL\DBALException;
use Doctrine\ORM\ORMException;
use Exception;
use Inanzzz\ApplicationBundle\Consumer\ConsumerInterface;
use PDOException;
use Pheanstalk\Job;
use Pheanstalk\PheanstalkInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

class JobProcessorCommand extends Command implements ContainerAwareInterface
{
const MAXIMUM_JOB_LIMIT = 250;
const MAXIMUM_MEMORY_USAGE_LIMIT = 104857600; // As in Bytes = 100MB
const SLEEP_TIME = 250000; // As in Microseconds = 0.25sec
const WATCH_TIMEOUT = 60; // As in seconds

/** @var ContainerInterface */
private $container;
private $pheanstalk;
private $logger;
private $consumerServiceIdPrefix;

public function __construct(
PheanstalkInterface $pheanstalk,
LoggerInterface $logger,
$consumerServiceIdPrefix
) {
parent::__construct();

$this->pheanstalk = $pheanstalk;
$this->logger = $logger;
$this->consumerServiceIdPrefix = $consumerServiceIdPrefix;
}

public function setContainer(ContainerInterface $container = null)
{
$this->container = $container;
}

protected function configure()
{
$this
->setName('job:processor')
->setDescription('Processing beanstalk jobs for given tubes.')
->addArgument(
'tube',
InputOption::VALUE_REQUIRED,
'Name of the tube to process jobs from?'
)
->addOption(
'limit',
null,
InputOption::VALUE_REQUIRED,
'Maximum how many jobs should be consumed?',
self::MAXIMUM_JOB_LIMIT
);
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$environment = $input->getOption('env');
$tube = $input->getArgument('tube');
$limit = $input->getOption('limit');

$consumer = $this->container->get($this->consumerServiceIdPrefix.$tube);
if (!$consumer instanceof ConsumerInterface) {
$output->writeln(sprintf('Consumer "%s" cannot be found.', $consumer));

return;
}

// IF YOU WANT YOUR COMMAND TO RUN EVEN IF THE TUBE DOESN'T EXIST THEN COMMENT THIS IF STATEMENT.
// I SUGGEST YOU TO REMOVE IT.
if (!in_array($tube, $this->pheanstalk->listTubes())) {
$output->writeln(sprintf('Tube "%s" cannot be found.', $tube));

return;
}
// END

$limit = $limit > self::MAXIMUM_JOB_LIMIT ? self::MAXIMUM_JOB_LIMIT : $limit;

$output->writeln(sprintf('Processing "%d" jobs from "%s" tube...', $limit, $tube));

$this->process($output, $consumer, $tube, $limit);
}

private function process(OutputInterface $output, ConsumerInterface $consumer, $tube, $limit)
{
$counter = 0;
$memoryUsage = memory_get_peak_usage(true);

while (true) {
if ($counter >= $limit) {
return;
}

if ($memoryUsage > self::MAXIMUM_MEMORY_USAGE_LIMIT) {
$this->logger->info(sprintf(
'Processing "%s" tube exhausted memory with "%sMB" of usage.',
$limit,
($memoryUsage/1024/1024)
));

return;
}

try {
/** @var Job $job */
$job = $this->pheanstalk->watch($tube)->ignore('default')->reserve(5);
if ($job instanceof Job) {
$data = $job->getData();
$consumer->consume($data);
$this->pheanstalk->delete($job);

$memoryUsage = memory_get_peak_usage(true);
++$counter;

$output->writeln(sprintf('Job "%d" of tube "%s" completed.', $counter, $tube));
}
} catch (Exception $e) {
$this->pheanstalk->bury($job);

$this->logger->error(sprintf(
'Consuming "%s" tube resulted in "%s" exception: [%s:%s]',
$tube,
get_class($e),
$e->getCode(),
$e->getMessage()
));

if ($e instanceof DBALException || $e instanceof ORMException || $e instanceof PDOException) {
return;
}
}

usleep(self::SLEEP_TIME);
}
}
}

services:
inanzzz_application.command.job_processor:
class: Inanzzz\ApplicationBundle\Command\JobProcessorCommand
tags:
- { name: console.command }
arguments:
- '@leezy.pheanstalk'
- '@logger'
- 'inanzzz_application.consumer.'

Testler


Sıra "user_create_test" içinde toplam 3 tane iş var ve biz bunların hepsini işlemek istiyoruz.


$ php bin/console job:processor user_create--env=test
Processing "250" jobs from "user_create_test" tube...
Job "1" of tube "user_create_test" completed.
Job "2" of tube "user_create_test" completed.
Job "3" of tube "user_create_test" completed.

# This worker is still listening the tube

Sıra "user_delete_dev" içinde toplam 20 tane iş var ve biz bunların sadece 2 tanesini işlemek istiyoruz.


$ php bin/console job:processor user_delete --limit=2 --env=dev
Processing "2" jobs from "user_delete_dev" tube...
Job "1" of tube "user_delete_dev" completed.
Job "2" of tube "user_delete_dev" completed.

# This worker quit listening.