Hello everyone!

We have been investing plenty of personal time and energy for many years to share our knowledge with you all. However, we now need your help to keep this blog running. All you have to do is just click one of the adverts on the site, otherwise it will sadly be taken down due to hosting etc. costs. Thank you.

In this example we are going to create a single worker and multiple consumers to process Amazon Simple Queue Service (AWS SQS) messages. Each queue consumers will treat unprocessable messages differently.


Queues



Files


Our consumers use "Strategy Pattern" so all we are going to inject into the worker command is the main consumer and then it will know which consumer strategy is to be used for given queue.


parameters.yaml


Make sure the credentials below belongs to an AWS IAM user who has "AmazonSSQSFullAccess" permissions.


parameters:

application_name: 'APP'

aws_sdk.config.default:
version: 'latest'
region: 'eu-west-1'

aws_sdk.credentials.default:
credentials:
key: 'AWS_KEY'
secret: 'AWS_SECRET'

services.yaml


services:

Aws\Sdk: ~

App\Util\AwsSqsUtil:
arguments:
$applicationName: '%application_name%'
$env: '%kernel.environment%'
calls:
- [createClient, ['%aws_sdk.config.default%', '%aws_sdk.credentials.default%']]

App\Command\AwsSqsWorkerCommand:
tags:
- { name: console.command }

App\Message\Consumer\Consumer:
arguments: [!tagged consumer_strategy]

App\Message\Consumer\Strategy\ImageStrategy:
tags:
- { name: consumer_strategy }

App\Message\Consumer\Strategy\FileStrategy:
tags:
- { name: consumer_strategy }

App\Message\Consumer\Strategy\CommentStrategy:
tags:
- { name: consumer_strategy }

AwsSqsWorkerException


declare(strict_types=1);

namespace App\Exception;

use RuntimeException;

class AwsSqsWorkerException extends RuntimeException
{
}

AwsSqsUtilInterface


declare(strict_types=1);

namespace App\Util;

use App\Model\Message;

interface AwsSqsUtilInterface
{
public function createClient(iterable $config, iterable $credentials): void;

public function getQueueUrl(string $name): ?string;

public function receiveMessage(string $url): ?Message;

public function deleteMessage(Message $message): void;

public function requeueMessage(Message $message): void;
}

AwsSqsUtil


declare(strict_types=1);

namespace App\Util;

use App\Model\Message;
use Aws\Result;
use Aws\Sdk;
use Aws\Sqs\SqsClient;

class AwsSqsUtil implements AwsSqsUtilInterface
{
/** @var SqsClient */
private $client;
private $sdk;
private $applicationName;
private $env;

public function __construct(Sdk $sdk, string $applicationName, string $env)
{
$this->sdk = $sdk;
$this->applicationName = $applicationName;
$this->env = $env;
}

public function createClient(iterable $config, iterable $credentials): void
{
$this->client = $this->sdk->createSqs($config+$credentials);
}

/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#getqueueurl
*/
public function getQueueUrl(string $name): ?string
{
/** @var Result $result */
$result = $this->client->getQueueUrl([
'QueueName' => $this->createQueueName($name),
]);

return $result->get('QueueUrl');
}

/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage
*/
public function receiveMessage(string $url): ?Message
{
/** @var Result $result */
$result = $this->client->receiveMessage([
'QueueUrl' => $url,
'MaxNumberOfMessages' => 1,
]);

$message = null;
if (null !== $result->get('Messages')) {
$message = new Message();
$message->url = $url;
$message->id = $result->get('Messages')[0]['MessageId'];
$message->body = $result->get('Messages')[0]['Body'];
$message->receiptHandle = $result->get('Messages')[0]['ReceiptHandle'];
}

return $message;
}

/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#deletemessage
*/
public function deleteMessage(Message $message): void
{
$this->client->deleteMessage([
'QueueUrl' => $message->url,
'ReceiptHandle' => $message->receiptHandle,
]);
}

/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#changemessagevisibility
*/
public function requeueMessage(Message $message): void
{
$this->client->changeMessageVisibility([
'QueueUrl' => $message->url,
'ReceiptHandle' => $message->receiptHandle,
'VisibilityTimeout' => 30,
]);
}

private function createQueueName(string $name, bool $isDeadLetter = null): string
{
return sprintf(
'%s_%s_%s%s',
strtoupper($this->applicationName),
strtoupper($this->env),
$name,
$isDeadLetter ? '_DL' : null
);
}
}

AwsSqsWorkerCommand


declare(strict_types=1);

namespace App\Command;

use App\Exception\AwsSqsWorkerException;
use App\Message\Consumer\ConsumerInterface;
use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class AwsSqsWorkerCommand extends Command
{
private const LIMIT_MIN = 1;
private const LIMIT_MAX = 50;

private $awsSqsUtil;
private $consumer;
private $logger;

public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
ConsumerInterface $consumer,
LoggerInterface $logger
) {
parent::__construct();

$this->awsSqsUtil = $awsSqsUtil;
$this->consumer = $consumer;
$this->logger = $logger;
}

protected function configure()
{
$this
->setName('app:aws-sqs-worker')
->setDescription('Watches AWS SQS queues for messages.')
->addOption(
'queue',
null,
InputOption::VALUE_REQUIRED,
'The name of the queue to watch.'
)
->addOption(
'limit',
null,
InputOption::VALUE_OPTIONAL,
'The maximum amount of messages a worker should consume.',
self::LIMIT_MAX
);
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$parameters = $this->extractParameters($input);
$processed = 0;
$queueUrl = $this->awsSqsUtil->getQueueUrl($parameters['queue']);

$this->handleInterruption();

$output->writeln(
sprintf('Watching the "%s" queue for "%d" messages ...', $parameters['queue'], $parameters['limit'])
);

while (true) {
if ($processed === $parameters['limit']) {
exit;
}

$this->handleMemory($parameters['queue'], $parameters['limit']);

$message = $this->awsSqsUtil->receiveMessage($queueUrl);
if ($message instanceof Message) {
$this->consumer->consume($message, $parameters['queue']);

++$processed;
} else {
$output->writeln('Sleeping for 10 seconds due to no message ...');

sleep(10);
}
}
}

private function extractParameters(InputInterface $input): iterable
{
$queue = (string) $input->getOption('queue');
if (null === $queue || !trim($queue)) {
throw new AwsSqsWorkerException('The "--queue" option requires a value.');
}

$limit = (int) $input->getOption('limit');
if ($limit < self::LIMIT_MIN || $limit > self::LIMIT_MAX) {
$limit = self::LIMIT_MAX;
}

return ['queue' => $queue, 'limit' => $limit];
}

private function handleInterruption(): void
{
pcntl_async_signals(true);

pcntl_signal(SIGINT, function () {
throw new AwsSqsWorkerException('Process has been terminated with the "ctrl+c" signal.');
});

pcntl_signal(SIGTERM, function () {
throw new AwsSqsWorkerException('Process has been terminated with the "kill" signal.');
});
}

private function handleMemory(string $queue, int $limit): void
{
// 104857600 bytes = 100 megabytes
if (104857600 < memory_get_peak_usage(true)) {
throw new AwsSqsWorkerException(
sprintf('Run out of memory while watching the "%s" queue for "%d" messages.', $queue, $limit)
);
}
}
}

$ bin/console app:aws-sqs-worker --help
Description:
Watches AWS SQS queues for messages.

Usage:
app:aws-sqs-worker [options]

Options:
--queue=QUEUE The name of the queue to watch.
--limit[=LIMIT] The maximum amount of messages a worker should consume. [default: 50]

Message


declare(strict_types=1);

namespace App\Model;

class Message
{
public $url;

public $id;

public $body;

public $receiptHandle;
}

ConsumerInterface


declare(strict_types=1);

namespace App\Message\Consumer;

use App\Model\Message;

interface ConsumerInterface
{
public function consume(Message $message, string $queue): void;
}

Consumer


declare(strict_types=1);

namespace App\Message\Consumer;

use App\Message\Consumer\Strategy\StrategyInterface;
use App\Model\Message;
use Traversable;

class Consumer implements ConsumerInterface
{
private $strategies;

public function __construct(Traversable $strategies)
{
$this->strategies = $strategies;
}

public function consume(Message $message, string $queue): void
{
/** @var StrategyInterface $strategy */
foreach ($this->strategies as $strategy) {
if ($strategy->canProcess($queue)) {
$strategy->process($message);

break;
}
}
}
}

StrategyInterface


declare(strict_types=1);

namespace App\Message\Consumer\Strategy;

use App\Model\Message;

interface StrategyInterface
{
public function canProcess(string $queue): bool;

public function process(Message $message): void;
}

CommentStrategy


declare(strict_types=1);

namespace App\Message\Consumer\Strategy;

use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;

class CommentStrategy implements StrategyInterface
{
public const QUEUE_NAME = 'comment';

private $awsSqsUtil;
private $logger;

public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
LoggerInterface $logger
) {
$this->awsSqsUtil = $awsSqsUtil;
$this->logger = $logger;
}

public function canProcess(string $queue): bool
{
return self::QUEUE_NAME === $queue;
}

public function process(Message $message): void
{
$body = json_decode($message->body, true);

if ($body['is_good_message']) {
$this->logger->info(sprintf('The message "%s" has been consumed.', $message->id));
} else {
$this->logger->alert(sprintf('The message "%s" has been deleted.', $message->id));
}

$this->awsSqsUtil->deleteMessage($message);
}
}

FileStrategy


declare(strict_types=1);

namespace App\Message\Consumer\Strategy;

use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;

class FileStrategy implements StrategyInterface
{
public const QUEUE_NAME = 'file';

private $awsSqsUtil;
private $logger;

public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
LoggerInterface $logger
) {
$this->awsSqsUtil = $awsSqsUtil;
$this->logger = $logger;
}

public function canProcess(string $queue): bool
{
return self::QUEUE_NAME === $queue;
}

public function process(Message $message): void
{
$body = json_decode($message->body, true);

if ($body['is_good_message']) {
$this->awsSqsUtil->deleteMessage($message);

$this->logger->info(sprintf('The message "%s" has been consumed.', $message->id));
} else {
$this->awsSqsUtil->requeueMessage($message);

$this->logger->alert(sprintf('The message "%s" has been put in the "flight" mode.', $message->id));
}
}
}

ImageStrategy


declare(strict_types=1);

namespace App\Message\Consumer\Strategy;

use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;

class ImageStrategy implements StrategyInterface
{
public const QUEUE_NAME = 'image';

private $awsSqsUtil;
private $logger;

public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
LoggerInterface $logger
) {
$this->awsSqsUtil = $awsSqsUtil;
$this->logger = $logger;
}

public function canProcess(string $queue): bool
{
return self::QUEUE_NAME === $queue;
}

public function process(Message $message): void
{
$body = json_decode($message->body, true);

if ($body['is_good_message']) {
$this->awsSqsUtil->deleteMessage($message);

$this->logger->info(sprintf('The message "%s" has been consumed.', $message->id));
} else {
$this->logger->alert(sprintf('The message "%s" has been put in the "flight" mode.', $message->id));
}
}
}

Tests


We have same messages in all queues below.


{"name":"Inanzzz","is_good_message":true}
{"name":"Inanzzz","is_good_message":true}
{"name":"Inanzzz","is_good_message":false}
{"name":"Inanzzz","is_good_message":true}

comment


Whether the message is processed or not, it is deleted from the queue.



Running command.


$ bin/console app:aws-sqs-worker --queue=comment --limit=10
Watching the "comment" queue for "10" messages ...
[2018-09-08 21:12:30] app.ALERT: The message "0004f333-b006-479d-bddd-fcbdc234ba81" has been deleted. [] []

Application logs.


[2018-09-08 21:12:30] app.INFO: The message "1edd4903-edb4-4c4b-9d92-2cd5f6209e7b" has been consumed. [] []
[2018-09-08 21:12:31] app.INFO: The message "333023fe-bafa-46ae-8b14-0e01d08486e5" has been consumed. [] []
[2018-09-08 21:12:30] app.ALERT: The message "0004f333-b006-479d-bddd-fcbdc234ba81" has been deleted. [] []
[2018-09-08 21:12:31] app.INFO: The message "444c05c7-7255-4b9b-9a10-b536d859b4c5" has been consumed. [] []

file


If the message is processed then it is deleted from the queue otherwise requeued again until it gets processed. The problem is that if there are a lot of unprocessable messages, the queue would get clogged up which is not a good thing.



Running command.


$ bin/console app:aws-sqs-worker --queue=file --limit=10
Watching the "file" queue for "10" messages ...
[2018-09-08 21:24:58] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:25:28] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:25:59] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:26:49] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:28:00] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:28:31] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:29:42] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []

Application logs.


[2018-09-08 21:24:57] app.INFO: The message "888a91cb-fe3c-4f42-95bb-7b98bebfd0cc" has been consumed. [] []
[2018-09-08 21:24:58] app.INFO: The message "9ceafd32-8adf-4f8b-af6d-598725203799" has been consumed. [] []
[2018-09-08 21:24:58] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:24:58] app.INFO: The message "16291e18-c618-41c4-a4db-f83be417dbf5" has been consumed. [] []
[2018-09-08 21:25:28] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:25:59] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:26:49] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:28:00] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:28:31] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:29:42] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []

image


If the message is processed then it is deleted from the queue otherwise kept in "flight" mode for 1 minute and retried to process the message again. This is repeated 3 times. If still unsuccessful then message is moved to associated dead-letter queue which is an ideal scenario.






Running command.


$ bin/console app:aws-sqs-worker --queue=image --limit=10
Watching the "image" queue for "10" messages ...
[2018-09-08 21:47:46] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:49:07] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:50:08] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []

Application logs.


[2018-09-08 21:47:46] app.INFO: The message "eb9b7d45-db12-472e-bc1f-c4557743592a" has been consumed. [] []
[2018-09-08 21:47:46] app.INFO: The message "3b2156ba-17bb-41ea-b0b4-a235fc8a5b83" has been consumed. [] []
[2018-09-08 21:47:46] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
[2018-09-08 21:47:46] app.INFO: The message "d3eb70b4-c552-4144-82cc-9fcf771686c4" has been consumed. [] []
[2018-09-08 21:49:07] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
[2018-09-08 21:50:08] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []