Symfony Messenger component helps us sending and receiving messages to/from applications. This includes putting jobs into message queues as well such as RabbitMQ. In this example we are going to put a simple image resizing jobs into RabbitMQ queue and consume them afterwards.


RabbitMQ server


Create a RabbitMQ docker container with docker-compose.yml file below.


version: '3'

services:

rabbitmq:
image: rabbitmq:3.7.5-management
hostname: rabbitmq
user: rabbitmq
ports:
- 5672:5672
- 15672:15672
volumes:
- ./data/rabbitmq:/var/lib/rabbitmq/mnesia/rabbit@app-rabbitmq:cached
environment:
RABBITMQ_ERLANG_COOKIE: 6085e2412b6fa88647466c6a81c0cea0
RABBITMQ_DEFAULT_USER: rabbitmq
RABBITMQ_DEFAULT_PASS: rabbitmq
RABBITMQ_DEFAULT_VHOST: /

Build container with $ docker-compose up -d command.


$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------------------------------------------------------------------
rabbitmq_rabbitmq_1 docker-entrypoint.sh rabbi ... Up 15671/tcp, 0.0.0.0:15672->15672/tcp, 25672/tcp, 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp

The IP address of the machine where this container running is 192.168.99.30 which is what we will use to access it from our Symfony application and a browser. For the GUI, you can visit http://192.168.99.30:15672 and the credentials is rabbitmq:rabbitmq. However, I'll have to use port 5672 to queue messages from my Symfony application.


Symfony application


Preparation


Install AMQP broker so that we can put messages into RabbitMQ queues.


$ sudo apt-get -y install gcc make autoconf libc-dev pkg-config
$ sudo apt-get update
$ sudo apt-get -y install libssl-dev
$ sudo apt-get -y install librabbitmq-dev
$ sudo pecl install amqp # Hit enter if a question is asked
$ sudo apt-get -y install php7.2-amqp

Add extension=amqp.so to /etc/php/7.2/fpm/php.ini file and restart php-fpm with sudo service php7.2-fpm restart command. Then verify amqp extension activation.


$ php -i | grep -i amqp
/etc/php/7.2/cli/conf.d/20-amqp.ini,
amqp
AMQP protocol version => 0-9-1
amqp.auto_ack => 0 => 0
amqp.cacert => no value => no value
amqp.cert => no value => no value
amqp.channel_max => 256 => 256
amqp.connect_timeout => 0 => 0
amqp.frame_max => 131072 => 131072
amqp.heartbeat => 0 => 0
amqp.host => localhost => localhost
amqp.key => no value => no value
amqp.login => guest => guest
amqp.password => guest => guest
amqp.port => 5672 => 5672
amqp.prefetch_count => 3 => 3
amqp.read_timeout => 0 => 0
amqp.sasl_method => 0 => 0
amqp.timeout => no value => no value
amqp.verify => 1 => 1
amqp.vhost => / => /
amqp.write_timeout => 0 => 0

Messenger component


Run composer require symfony/messenger to install it. In order to use Symfony's built-in AMQP transport, you will need the Serializer Component which can be installed with composer require symfony/serializer-pack command. However, before running the command, test your application to see if it works without it because you might already have necessary packages installed such as serializers and normalizers.


Configuration


# .env

MESSENGER_TRANSPORT_DSN=amqp://rabbitmq:rabbitmq@192.168.99.30:5672/%2f

Classes


This is where we put the message into the queue - you can call it as the "message producer".


# Controller/ImageController.php

declare(strict_types=1);

namespace App\Controller;

use App\MessageBus\Message\Image\Resize;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;

/**
* @Route("/images")
*/
class ImageController
{
private $messageBus;

public function __construct(MessageBusInterface $messageBus)
{
$this->messageBus = $messageBus;
}

/**
* @Route("", methods={"GET"})
*/
public function getAll(): Response
{
$message = new Resize('ball.png');

$this->messageBus->dispatch($message);

return new Response('Good Job!');
}
}

This is the actual message that goes into the queue - in json encoded format.


# MessageBus/Message/Image/Resize.php

declare(strict_types=1);

namespace App\MessageBus\Message\Image;

class Resize
{
private $path;

public function __construct(string $path)
{
$this->path = $path;
}

public function getPath(): string
{
return $this->path;
}
}

This is where we process the messages in the queue - you can call it as the "message consumer".


# MessageBus/Handler/Image/ResizeHandler.php

declare(strict_types=1);

namespace App\MessageBus\Handler\Image;

use App\MessageBus\Message\Image\Resize;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class ResizeHandler implements MessageHandlerInterface
{
public function __invoke(Resize $message)
{
echo sprintf('The image "%s" is being resized...'.PHP_EOL, $message->getPath());
sleep(2); // Assume than message resizing takes 2 seconds
echo 'The image has been resized!'.PHP_EOL;
echo 'Moving on to next message in the queue.'.PHP_EOL.PHP_EOL;
}
}

This is the final configuration.


# config/packages/messenger.yaml

# Version 1
framework:
messenger:
transports:
amqp_image_resize: '%env(MESSENGER_TRANSPORT_DSN)%/image_resize'

routing:
'App\MessageBus\Message\Image\Resize': amqp_image_resize

# Version 2
framework:
messenger:
transports:
amqp_image_resize:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: image_resize_ex
type: fanout
queue:
name: image_resize_qu

routing:
'App\MessageBus\Message\Image\Resize': amqp_image_resize

The both configs above do the same thing but we are using the first one. What this says is, route my App\MessageBus\Message\Image\Resize messages to image_resize exchange/queue.


Test


Producing messages


When we call GET /images first time, Symfony does the following by default.



Consuming messages


$ bin/console messenger:consume-messages amqp_image_resize
The image "ball.png" is being resized...
The image has been resized!
Moving on to next message in the queue.

The image "ball.png" is being resized...
The image has been resized!
Moving on to next message in the queue.

The image "ball.png" is being resized...
The image has been resized!
Moving on to next message in the queue.