06/05/2018 - SYMFONY
In this example we are going to avoid using Compiler Pass by injecting services tagged with a specific tag into another service. Our example implements Strategy Pattern.
We have two consumer services and one console command. Console command watches messages in two different queues (image
and upload
) and passes them to main consumer (Consumer
). The main consumer then passes messages to the relevant consumers (ImageConsumer
and UploadConsumer
) where they will be processed. The main aim here is not to create more than one command.
service:
# CONSUMER
App\Consumer\Consumer:
arguments: [!tagged consumer]
App\Consumer\ImageConsumer:
tags:
- { name: consumer }
App\Consumer\UploadConsumer:
tags:
- { name: consumer }
declare(strict_types=1);
namespace App\Command;
use App\Consumer\Consumer;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class QueueWorkerCommand extends Command
{
private $consumer;
public function __construct(Consumer $consumer)
{
parent::__construct();
$this->consumer = $consumer;
}
protected function configure()
{
$this
->setName('app:queue-worker')
->setDescription('Helps consumer consume messages in queues.')
->setHelp('Watches messages in queues and passes them to the consumer.')
->addOption(
'queue',
null,
InputOption::VALUE_REQUIRED,
'What is the name of the queue to watch?'
);
}
protected function execute(InputInterface $input, OutputInterface $output): void
{
$queue = trim($input->getOption('queue'));
if (!$queue) {
return;
}
$output->writeln(sprintf('Watching the messages in "%s" queue...', $queue));
$this->watch($queue);
}
private function watch(string $queue): void
{
// Grab the message from the queue. e.g. RabbitMQ, Beanstalk etc.
$message = 'I am the message';
$result = $this->consumer->consume($queue, $message);
// Do something next
}
}
declare(strict_types=1);
namespace App\Consumer;
use Traversable;
class Consumer
{
private $consumers;
public function __construct(Traversable $consumers)
{
$this->consumers = $consumers;
}
public function consume(string $queue, $message): bool
{
/** @var ConsumerInterface $consumer */
foreach ($this->consumers as $consumer) {
if ($consumer->canConsume($queue)) {
return $consumer->consume($message);
}
}
}
}
declare(strict_types=1);
namespace App\Consumer;
interface ConsumerInterface
{
public function canConsume(string $queue): bool;
public function consume($message): bool;
}
declare(strict_types=1);
namespace App\Consumer;
class ImageConsumer implements ConsumerInterface
{
public function canConsume(string $queue): bool
{
return $queue === 'image';
}
public function consume($message): bool
{
echo sprintf('Consumed message "%s" in queue "image".'.PHP_EOL, $message);
return true;
}
}
declare(strict_types=1);
namespace App\Consumer;
class UploadConsumer implements ConsumerInterface
{
public function canConsume(string $queue): bool
{
return $queue === 'upload';
}
public function consume($message): bool
{
echo sprintf('Consumed message "%s" in queue "upload".'.PHP_EOL, $message);
return true;
}
}
$ bin/console app:queue-worker --queue=image
Watching the messages in "image" queue...
Consumed message "I am the message" in queue "image".
$ bin/console app:queue-worker --queue=upload
Watching the messages in "upload" queue...
Consumed message "I am the message" in queue "upload".