06/05/2018 - SYMFONY
Bu örneğimizde Symfony Compiler Pass kullanmadan, belirli bir etiket ile işaretlenmiş olan tüm servisleri diğer bir servise enjekte edeceğiz. Aşağıdaki örnek Strategy Pattern kullanıyor.
Elimizde iki tane "consumer" servisi ve bir tane de konsol "command" class var. Console command iki ayrı sıra (image
ve upload
) içindeki mesajları izler ve ana consumer'e (Consumer
) iletir. Ana consumer daha sonra bu mesajları işlenmeleri için alakalı consumerlere (ImageConsumer
ve UploadConsumer
) iletir. Buradaki ana amaç birden fazla command yaratmamak.
service:
# CONSUMER
App\Consumer\Consumer:
arguments: [!tagged consumer]
App\Consumer\ImageConsumer:
tags:
- { name: consumer }
App\Consumer\UploadConsumer:
tags:
- { name: consumer }
declare(strict_types=1);
namespace App\Command;
use App\Consumer\Consumer;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class QueueWorkerCommand extends Command
{
private $consumer;
public function __construct(Consumer $consumer)
{
parent::__construct();
$this->consumer = $consumer;
}
protected function configure()
{
$this
->setName('app:queue-worker')
->setDescription('Helps consumer consume messages in queues.')
->setHelp('Watches messages in queues and passes them to the consumer.')
->addOption(
'queue',
null,
InputOption::VALUE_REQUIRED,
'What is the name of the queue to watch?'
);
}
protected function execute(InputInterface $input, OutputInterface $output): void
{
$queue = trim($input->getOption('queue'));
if (!$queue) {
return;
}
$output->writeln(sprintf('Watching the messages in "%s" queue...', $queue));
$this->watch($queue);
}
private function watch(string $queue): void
{
// Grab the message from the queue. e.g. RabbitMQ, Beanstalk etc.
$message = 'I am the message';
$result = $this->consumer->consume($queue, $message);
// Do something next
}
}
declare(strict_types=1);
namespace App\Consumer;
use Traversable;
class Consumer
{
private $consumers;
public function __construct(Traversable $consumers)
{
$this->consumers = $consumers;
}
public function consume(string $queue, $message): bool
{
/** @var ConsumerInterface $consumer */
foreach ($this->consumers as $consumer) {
if ($consumer->canConsume($queue)) {
return $consumer->consume($message);
}
}
}
}
declare(strict_types=1);
namespace App\Consumer;
interface ConsumerInterface
{
public function canConsume(string $queue): bool;
public function consume($message): bool;
}
declare(strict_types=1);
namespace App\Consumer;
class ImageConsumer implements ConsumerInterface
{
public function canConsume(string $queue): bool
{
return $queue === 'image';
}
public function consume($message): bool
{
echo sprintf('Consumed message "%s" in queue "image".'.PHP_EOL, $message);
return true;
}
}
declare(strict_types=1);
namespace App\Consumer;
class UploadConsumer implements ConsumerInterface
{
public function canConsume(string $queue): bool
{
return $queue === 'upload';
}
public function consume($message): bool
{
echo sprintf('Consumed message "%s" in queue "upload".'.PHP_EOL, $message);
return true;
}
}
$ bin/console app:queue-worker --queue=image
Watching the messages in "image" queue...
Consumed message "I am the message" in queue "image".
$ bin/console app:queue-worker --queue=upload
Watching the messages in "upload" queue...
Consumed message "I am the message" in queue "upload".