09/09/2018 - PHP, SYMFONY
In this example we are going to create a single worker and multiple consumers to process Amazon Simple Queue Service (AWS SQS) messages. Each queue consumers will treat unprocessable messages differently.
Our consumers use "Strategy Pattern" so all we are going to inject into the worker command is the main consumer and then it will know which consumer strategy is to be used for given queue.
Make sure the credentials below belongs to an AWS IAM user who has "AmazonSSQSFullAccess" permissions.
parameters:
application_name: 'APP'
aws_sdk.config.default:
version: 'latest'
region: 'eu-west-1'
aws_sdk.credentials.default:
credentials:
key: 'AWS_KEY'
secret: 'AWS_SECRET'
services:
Aws\Sdk: ~
App\Util\AwsSqsUtil:
arguments:
$applicationName: '%application_name%'
$env: '%kernel.environment%'
calls:
- [createClient, ['%aws_sdk.config.default%', '%aws_sdk.credentials.default%']]
App\Command\AwsSqsWorkerCommand:
tags:
- { name: console.command }
App\Message\Consumer\Consumer:
arguments: [!tagged consumer_strategy]
App\Message\Consumer\Strategy\ImageStrategy:
tags:
- { name: consumer_strategy }
App\Message\Consumer\Strategy\FileStrategy:
tags:
- { name: consumer_strategy }
App\Message\Consumer\Strategy\CommentStrategy:
tags:
- { name: consumer_strategy }
declare(strict_types=1);
namespace App\Exception;
use RuntimeException;
class AwsSqsWorkerException extends RuntimeException
{
}
declare(strict_types=1);
namespace App\Util;
use App\Model\Message;
interface AwsSqsUtilInterface
{
public function createClient(iterable $config, iterable $credentials): void;
public function getQueueUrl(string $name): ?string;
public function receiveMessage(string $url): ?Message;
public function deleteMessage(Message $message): void;
public function requeueMessage(Message $message): void;
}
declare(strict_types=1);
namespace App\Util;
use App\Model\Message;
use Aws\Result;
use Aws\Sdk;
use Aws\Sqs\SqsClient;
class AwsSqsUtil implements AwsSqsUtilInterface
{
/** @var SqsClient */
private $client;
private $sdk;
private $applicationName;
private $env;
public function __construct(Sdk $sdk, string $applicationName, string $env)
{
$this->sdk = $sdk;
$this->applicationName = $applicationName;
$this->env = $env;
}
public function createClient(iterable $config, iterable $credentials): void
{
$this->client = $this->sdk->createSqs($config+$credentials);
}
/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#getqueueurl
*/
public function getQueueUrl(string $name): ?string
{
/** @var Result $result */
$result = $this->client->getQueueUrl([
'QueueName' => $this->createQueueName($name),
]);
return $result->get('QueueUrl');
}
/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage
*/
public function receiveMessage(string $url): ?Message
{
/** @var Result $result */
$result = $this->client->receiveMessage([
'QueueUrl' => $url,
'MaxNumberOfMessages' => 1,
]);
$message = null;
if (null !== $result->get('Messages')) {
$message = new Message();
$message->url = $url;
$message->id = $result->get('Messages')[0]['MessageId'];
$message->body = $result->get('Messages')[0]['Body'];
$message->receiptHandle = $result->get('Messages')[0]['ReceiptHandle'];
}
return $message;
}
/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#deletemessage
*/
public function deleteMessage(Message $message): void
{
$this->client->deleteMessage([
'QueueUrl' => $message->url,
'ReceiptHandle' => $message->receiptHandle,
]);
}
/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#changemessagevisibility
*/
public function requeueMessage(Message $message): void
{
$this->client->changeMessageVisibility([
'QueueUrl' => $message->url,
'ReceiptHandle' => $message->receiptHandle,
'VisibilityTimeout' => 30,
]);
}
private function createQueueName(string $name, bool $isDeadLetter = null): string
{
return sprintf(
'%s_%s_%s%s',
strtoupper($this->applicationName),
strtoupper($this->env),
$name,
$isDeadLetter ? '_DL' : null
);
}
}
declare(strict_types=1);
namespace App\Command;
use App\Exception\AwsSqsWorkerException;
use App\Message\Consumer\ConsumerInterface;
use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class AwsSqsWorkerCommand extends Command
{
private const LIMIT_MIN = 1;
private const LIMIT_MAX = 50;
private $awsSqsUtil;
private $consumer;
private $logger;
public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
ConsumerInterface $consumer,
LoggerInterface $logger
) {
parent::__construct();
$this->awsSqsUtil = $awsSqsUtil;
$this->consumer = $consumer;
$this->logger = $logger;
}
protected function configure()
{
$this
->setName('app:aws-sqs-worker')
->setDescription('Watches AWS SQS queues for messages.')
->addOption(
'queue',
null,
InputOption::VALUE_REQUIRED,
'The name of the queue to watch.'
)
->addOption(
'limit',
null,
InputOption::VALUE_OPTIONAL,
'The maximum amount of messages a worker should consume.',
self::LIMIT_MAX
);
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$parameters = $this->extractParameters($input);
$processed = 0;
$queueUrl = $this->awsSqsUtil->getQueueUrl($parameters['queue']);
$this->handleInterruption();
$output->writeln(
sprintf('Watching the "%s" queue for "%d" messages ...', $parameters['queue'], $parameters['limit'])
);
while (true) {
if ($processed === $parameters['limit']) {
exit;
}
$this->handleMemory($parameters['queue'], $parameters['limit']);
$message = $this->awsSqsUtil->receiveMessage($queueUrl);
if ($message instanceof Message) {
$this->consumer->consume($message, $parameters['queue']);
++$processed;
} else {
$output->writeln('Sleeping for 10 seconds due to no message ...');
sleep(10);
}
}
}
private function extractParameters(InputInterface $input): iterable
{
$queue = (string) $input->getOption('queue');
if (null === $queue || !trim($queue)) {
throw new AwsSqsWorkerException('The "--queue" option requires a value.');
}
$limit = (int) $input->getOption('limit');
if ($limit < self::LIMIT_MIN || $limit > self::LIMIT_MAX) {
$limit = self::LIMIT_MAX;
}
return ['queue' => $queue, 'limit' => $limit];
}
private function handleInterruption(): void
{
pcntl_async_signals(true);
pcntl_signal(SIGINT, function () {
throw new AwsSqsWorkerException('Process has been terminated with the "ctrl+c" signal.');
});
pcntl_signal(SIGTERM, function () {
throw new AwsSqsWorkerException('Process has been terminated with the "kill" signal.');
});
}
private function handleMemory(string $queue, int $limit): void
{
// 104857600 bytes = 100 megabytes
if (104857600 < memory_get_peak_usage(true)) {
throw new AwsSqsWorkerException(
sprintf('Run out of memory while watching the "%s" queue for "%d" messages.', $queue, $limit)
);
}
}
}
$ bin/console app:aws-sqs-worker --help
Description:
Watches AWS SQS queues for messages.
Usage:
app:aws-sqs-worker [options]
Options:
--queue=QUEUE The name of the queue to watch.
--limit[=LIMIT] The maximum amount of messages a worker should consume. [default: 50]
declare(strict_types=1);
namespace App\Model;
class Message
{
public $url;
public $id;
public $body;
public $receiptHandle;
}
declare(strict_types=1);
namespace App\Message\Consumer;
use App\Model\Message;
interface ConsumerInterface
{
public function consume(Message $message, string $queue): void;
}
declare(strict_types=1);
namespace App\Message\Consumer;
use App\Message\Consumer\Strategy\StrategyInterface;
use App\Model\Message;
use Traversable;
class Consumer implements ConsumerInterface
{
private $strategies;
public function __construct(Traversable $strategies)
{
$this->strategies = $strategies;
}
public function consume(Message $message, string $queue): void
{
/** @var StrategyInterface $strategy */
foreach ($this->strategies as $strategy) {
if ($strategy->canProcess($queue)) {
$strategy->process($message);
break;
}
}
}
}
declare(strict_types=1);
namespace App\Message\Consumer\Strategy;
use App\Model\Message;
interface StrategyInterface
{
public function canProcess(string $queue): bool;
public function process(Message $message): void;
}
declare(strict_types=1);
namespace App\Message\Consumer\Strategy;
use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;
class CommentStrategy implements StrategyInterface
{
public const QUEUE_NAME = 'comment';
private $awsSqsUtil;
private $logger;
public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
LoggerInterface $logger
) {
$this->awsSqsUtil = $awsSqsUtil;
$this->logger = $logger;
}
public function canProcess(string $queue): bool
{
return self::QUEUE_NAME === $queue;
}
public function process(Message $message): void
{
$body = json_decode($message->body, true);
if ($body['is_good_message']) {
$this->logger->info(sprintf('The message "%s" has been consumed.', $message->id));
} else {
$this->logger->alert(sprintf('The message "%s" has been deleted.', $message->id));
}
$this->awsSqsUtil->deleteMessage($message);
}
}
declare(strict_types=1);
namespace App\Message\Consumer\Strategy;
use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;
class FileStrategy implements StrategyInterface
{
public const QUEUE_NAME = 'file';
private $awsSqsUtil;
private $logger;
public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
LoggerInterface $logger
) {
$this->awsSqsUtil = $awsSqsUtil;
$this->logger = $logger;
}
public function canProcess(string $queue): bool
{
return self::QUEUE_NAME === $queue;
}
public function process(Message $message): void
{
$body = json_decode($message->body, true);
if ($body['is_good_message']) {
$this->awsSqsUtil->deleteMessage($message);
$this->logger->info(sprintf('The message "%s" has been consumed.', $message->id));
} else {
$this->awsSqsUtil->requeueMessage($message);
$this->logger->alert(sprintf('The message "%s" has been put in the "flight" mode.', $message->id));
}
}
}
declare(strict_types=1);
namespace App\Message\Consumer\Strategy;
use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;
class ImageStrategy implements StrategyInterface
{
public const QUEUE_NAME = 'image';
private $awsSqsUtil;
private $logger;
public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
LoggerInterface $logger
) {
$this->awsSqsUtil = $awsSqsUtil;
$this->logger = $logger;
}
public function canProcess(string $queue): bool
{
return self::QUEUE_NAME === $queue;
}
public function process(Message $message): void
{
$body = json_decode($message->body, true);
if ($body['is_good_message']) {
$this->awsSqsUtil->deleteMessage($message);
$this->logger->info(sprintf('The message "%s" has been consumed.', $message->id));
} else {
$this->logger->alert(sprintf('The message "%s" has been put in the "flight" mode.', $message->id));
}
}
}
We have same messages in all queues below.
{"name":"Inanzzz","is_good_message":true}
{"name":"Inanzzz","is_good_message":true}
{"name":"Inanzzz","is_good_message":false}
{"name":"Inanzzz","is_good_message":true}
Whether the message is processed or not, it is deleted from the queue.
Running command.
$ bin/console app:aws-sqs-worker --queue=comment --limit=10
Watching the "comment" queue for "10" messages ...
[2018-09-08 21:12:30] app.ALERT: The message "0004f333-b006-479d-bddd-fcbdc234ba81" has been deleted. [] []
Application logs.
[2018-09-08 21:12:30] app.INFO: The message "1edd4903-edb4-4c4b-9d92-2cd5f6209e7b" has been consumed. [] []
[2018-09-08 21:12:31] app.INFO: The message "333023fe-bafa-46ae-8b14-0e01d08486e5" has been consumed. [] []
[2018-09-08 21:12:30] app.ALERT: The message "0004f333-b006-479d-bddd-fcbdc234ba81" has been deleted. [] []
[2018-09-08 21:12:31] app.INFO: The message "444c05c7-7255-4b9b-9a10-b536d859b4c5" has been consumed. [] []
If the message is processed then it is deleted from the queue otherwise requeued again until it gets processed. The problem is that if there are a lot of unprocessable messages, the queue would get clogged up which is not a good thing.
Running command.
$ bin/console app:aws-sqs-worker --queue=file --limit=10
Watching the "file" queue for "10" messages ...
[2018-09-08 21:24:58] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:25:28] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:25:59] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:26:49] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:28:00] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:28:31] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:29:42] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Application logs.
[2018-09-08 21:24:57] app.INFO: The message "888a91cb-fe3c-4f42-95bb-7b98bebfd0cc" has been consumed. [] []
[2018-09-08 21:24:58] app.INFO: The message "9ceafd32-8adf-4f8b-af6d-598725203799" has been consumed. [] []
[2018-09-08 21:24:58] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:24:58] app.INFO: The message "16291e18-c618-41c4-a4db-f83be417dbf5" has been consumed. [] []
[2018-09-08 21:25:28] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:25:59] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:26:49] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:28:00] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:28:31] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:29:42] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
If the message is processed then it is deleted from the queue otherwise kept in "flight" mode for 1 minute and retried to process the message again. This is repeated 3 times. If still unsuccessful then message is moved to associated dead-letter queue which is an ideal scenario.
Running command.
$ bin/console app:aws-sqs-worker --queue=image --limit=10
Watching the "image" queue for "10" messages ...
[2018-09-08 21:47:46] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:49:07] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:50:08] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
Application logs.
[2018-09-08 21:47:46] app.INFO: The message "eb9b7d45-db12-472e-bc1f-c4557743592a" has been consumed. [] []
[2018-09-08 21:47:46] app.INFO: The message "3b2156ba-17bb-41ea-b0b4-a235fc8a5b83" has been consumed. [] []
[2018-09-08 21:47:46] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
[2018-09-08 21:47:46] app.INFO: The message "d3eb70b4-c552-4144-82cc-9fcf771686c4" has been consumed. [] []
[2018-09-08 21:49:07] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
[2018-09-08 21:50:08] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []