Hello everyone!

We have been investing plenty of personal time and energy for many years to share our knowledge with you all. However, we now need your help to keep this blog running. All you have to do is just click one of the adverts on the site, otherwise it will sadly be taken down due to hosting etc. costs. Thank you.

In this example we are going to avoid using Compiler Pass by injecting services tagged with a specific tag into another service. Our example implements Strategy Pattern.


Design


We have two consumer services and one console command. Console command watches messages in two different queues (image and upload) and passes them to main consumer (Consumer). The main consumer then passes messages to the relevant consumers (ImageConsumer and UploadConsumer) where they will be processed. The main aim here is not to create more than one command.



Files


service.yml


service:
# CONSUMER
App\Consumer\Consumer:
arguments: [!tagged consumer]

App\Consumer\ImageConsumer:
tags:
- { name: consumer }

App\Consumer\UploadConsumer:
tags:
- { name: consumer }

QueueWorkerCommand


declare(strict_types=1);

namespace App\Command;

use App\Consumer\Consumer;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class QueueWorkerCommand extends Command
{
private $consumer;

public function __construct(Consumer $consumer)
{
parent::__construct();

$this->consumer = $consumer;
}

protected function configure()
{
$this
->setName('app:queue-worker')
->setDescription('Helps consumer consume messages in queues.')
->setHelp('Watches messages in queues and passes them to the consumer.')
->addOption(
'queue',
null,
InputOption::VALUE_REQUIRED,
'What is the name of the queue to watch?'
);
}

protected function execute(InputInterface $input, OutputInterface $output): void
{
$queue = trim($input->getOption('queue'));
if (!$queue) {
return;
}

$output->writeln(sprintf('Watching the messages in "%s" queue...', $queue));

$this->watch($queue);
}

private function watch(string $queue): void
{
// Grab the message from the queue. e.g. RabbitMQ, Beanstalk etc.
$message = 'I am the message';

$result = $this->consumer->consume($queue, $message);

// Do something next
}
}

Consumer


declare(strict_types=1);

namespace App\Consumer;

use Traversable;

class Consumer
{
private $consumers;

public function __construct(Traversable $consumers)
{
$this->consumers = $consumers;
}

public function consume(string $queue, $message): bool
{
/** @var ConsumerInterface $consumer */
foreach ($this->consumers as $consumer) {
if ($consumer->canConsume($queue)) {
return $consumer->consume($message);
}
}
}
}

ConsumerInterface


declare(strict_types=1);

namespace App\Consumer;

interface ConsumerInterface
{
public function canConsume(string $queue): bool;

public function consume($message): bool;
}

ImageConsumer


declare(strict_types=1);

namespace App\Consumer;

class ImageConsumer implements ConsumerInterface
{
public function canConsume(string $queue): bool
{
return $queue === 'image';
}

public function consume($message): bool
{
echo sprintf('Consumed message "%s" in queue "image".'.PHP_EOL, $message);

return true;
}
}

UploadConsumer


declare(strict_types=1);

namespace App\Consumer;

class UploadConsumer implements ConsumerInterface
{
public function canConsume(string $queue): bool
{
return $queue === 'upload';
}

public function consume($message): bool
{
echo sprintf('Consumed message "%s" in queue "upload".'.PHP_EOL, $message);

return true;
}
}

Test


$ bin/console app:queue-worker --queue=image
Watching the messages in "image" queue...
Consumed message "I am the message" in queue "image".

$ bin/console app:queue-worker --queue=upload
Watching the messages in "upload" queue...
Consumed message "I am the message" in queue "upload".