Bu örneğimizde Symfony Compiler Pass kullanmadan, belirli bir etiket ile işaretlenmiş olan tüm servisleri diğer bir servise enjekte edeceğiz. Aşağıdaki örnek Strategy Pattern kullanıyor.


Dizayn


Elimizde iki tane "consumer" servisi ve bir tane de konsol "command" class var. Console command iki ayrı sıra (image ve upload) içindeki mesajları izler ve ana consumer'e (Consumer) iletir. Ana consumer daha sonra bu mesajları işlenmeleri için alakalı consumerlere (ImageConsumer ve UploadConsumer) iletir. Buradaki ana amaç birden fazla command yaratmamak.



Dosyalar


service.yml


service:
# CONSUMER
App\Consumer\Consumer:
arguments: [!tagged consumer]

App\Consumer\ImageConsumer:
tags:
- { name: consumer }

App\Consumer\UploadConsumer:
tags:
- { name: consumer }

QueueWorkerCommand


declare(strict_types=1);

namespace App\Command;

use App\Consumer\Consumer;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class QueueWorkerCommand extends Command
{
private $consumer;

public function __construct(Consumer $consumer)
{
parent::__construct();

$this->consumer = $consumer;
}

protected function configure()
{
$this
->setName('app:queue-worker')
->setDescription('Helps consumer consume messages in queues.')
->setHelp('Watches messages in queues and passes them to the consumer.')
->addOption(
'queue',
null,
InputOption::VALUE_REQUIRED,
'What is the name of the queue to watch?'
);
}

protected function execute(InputInterface $input, OutputInterface $output): void
{
$queue = trim($input->getOption('queue'));
if (!$queue) {
return;
}

$output->writeln(sprintf('Watching the messages in "%s" queue...', $queue));

$this->watch($queue);
}

private function watch(string $queue): void
{
// Grab the message from the queue. e.g. RabbitMQ, Beanstalk etc.
$message = 'I am the message';

$result = $this->consumer->consume($queue, $message);

// Do something next
}
}

Consumer


declare(strict_types=1);

namespace App\Consumer;

use Traversable;

class Consumer
{
private $consumers;

public function __construct(Traversable $consumers)
{
$this->consumers = $consumers;
}

public function consume(string $queue, $message): bool
{
/** @var ConsumerInterface $consumer */
foreach ($this->consumers as $consumer) {
if ($consumer->canConsume($queue)) {
return $consumer->consume($message);
}
}
}
}

ConsumerInterface


declare(strict_types=1);

namespace App\Consumer;

interface ConsumerInterface
{
public function canConsume(string $queue): bool;

public function consume($message): bool;
}

ImageConsumer


declare(strict_types=1);

namespace App\Consumer;

class ImageConsumer implements ConsumerInterface
{
public function canConsume(string $queue): bool
{
return $queue === 'image';
}

public function consume($message): bool
{
echo sprintf('Consumed message "%s" in queue "image".'.PHP_EOL, $message);

return true;
}
}

UploadConsumer


declare(strict_types=1);

namespace App\Consumer;

class UploadConsumer implements ConsumerInterface
{
public function canConsume(string $queue): bool
{
return $queue === 'upload';
}

public function consume($message): bool
{
echo sprintf('Consumed message "%s" in queue "upload".'.PHP_EOL, $message);

return true;
}
}

Test


$ bin/console app:queue-worker --queue=image
Watching the messages in "image" queue...
Consumed message "I am the message" in queue "image".

$ bin/console app:queue-worker --queue=upload
Watching the messages in "upload" queue...
Consumed message "I am the message" in queue "upload".