09/09/2018 - PHP, SYMFONY
Bu örnekte, Amazon Simple Queue Service (AWS SQS) mesajlarını işlemek için tek bir işleyici ve birden fazla tüketici oluşturacağız. Her mesaj tüketicisi, sıralardaki işlenemez iletileri farklı şekilde ele alacak.
Tüketicilerimiz "Strateji Pattern" yöntemini kullanırlar, böylece işleyici komutuna enjekte edeceğimiz tek şey ana tüketicidir ve sonra verilen sıra için hangi tüketici stratejisinin kullanılacağını otomatik olarak belirlenecektir.
Aşağıdaki anahtarların sahibi olan AWS IAM kullanıcısının "AmazonSSQSFullAccess" hakkına sahip olduğundan emin olun.
parameters:
application_name: 'APP'
aws_sdk.config.default:
version: 'latest'
region: 'eu-west-1'
aws_sdk.credentials.default:
credentials:
key: 'AWS_KEY'
secret: 'AWS_SECRET'
services:
Aws\Sdk: ~
App\Util\AwsSqsUtil:
arguments:
$applicationName: '%application_name%'
$env: '%kernel.environment%'
calls:
- [createClient, ['%aws_sdk.config.default%', '%aws_sdk.credentials.default%']]
App\Command\AwsSqsWorkerCommand:
tags:
- { name: console.command }
App\Message\Consumer\Consumer:
arguments: [!tagged consumer_strategy]
App\Message\Consumer\Strategy\ImageStrategy:
tags:
- { name: consumer_strategy }
App\Message\Consumer\Strategy\FileStrategy:
tags:
- { name: consumer_strategy }
App\Message\Consumer\Strategy\CommentStrategy:
tags:
- { name: consumer_strategy }
declare(strict_types=1);
namespace App\Exception;
use RuntimeException;
class AwsSqsWorkerException extends RuntimeException
{
}
declare(strict_types=1);
namespace App\Util;
use App\Model\Message;
interface AwsSqsUtilInterface
{
public function createClient(iterable $config, iterable $credentials): void;
public function getQueueUrl(string $name): ?string;
public function receiveMessage(string $url): ?Message;
public function deleteMessage(Message $message): void;
public function requeueMessage(Message $message): void;
}
declare(strict_types=1);
namespace App\Util;
use App\Model\Message;
use Aws\Result;
use Aws\Sdk;
use Aws\Sqs\SqsClient;
class AwsSqsUtil implements AwsSqsUtilInterface
{
/** @var SqsClient */
private $client;
private $sdk;
private $applicationName;
private $env;
public function __construct(Sdk $sdk, string $applicationName, string $env)
{
$this->sdk = $sdk;
$this->applicationName = $applicationName;
$this->env = $env;
}
public function createClient(iterable $config, iterable $credentials): void
{
$this->client = $this->sdk->createSqs($config+$credentials);
}
/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#getqueueurl
*/
public function getQueueUrl(string $name): ?string
{
/** @var Result $result */
$result = $this->client->getQueueUrl([
'QueueName' => $this->createQueueName($name),
]);
return $result->get('QueueUrl');
}
/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage
*/
public function receiveMessage(string $url): ?Message
{
/** @var Result $result */
$result = $this->client->receiveMessage([
'QueueUrl' => $url,
'MaxNumberOfMessages' => 1,
]);
$message = null;
if (null !== $result->get('Messages')) {
$message = new Message();
$message->url = $url;
$message->id = $result->get('Messages')[0]['MessageId'];
$message->body = $result->get('Messages')[0]['Body'];
$message->receiptHandle = $result->get('Messages')[0]['ReceiptHandle'];
}
return $message;
}
/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#deletemessage
*/
public function deleteMessage(Message $message): void
{
$this->client->deleteMessage([
'QueueUrl' => $message->url,
'ReceiptHandle' => $message->receiptHandle,
]);
}
/**
* @link https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#changemessagevisibility
*/
public function requeueMessage(Message $message): void
{
$this->client->changeMessageVisibility([
'QueueUrl' => $message->url,
'ReceiptHandle' => $message->receiptHandle,
'VisibilityTimeout' => 30,
]);
}
private function createQueueName(string $name, bool $isDeadLetter = null): string
{
return sprintf(
'%s_%s_%s%s',
strtoupper($this->applicationName),
strtoupper($this->env),
$name,
$isDeadLetter ? '_DL' : null
);
}
}
declare(strict_types=1);
namespace App\Command;
use App\Exception\AwsSqsWorkerException;
use App\Message\Consumer\ConsumerInterface;
use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class AwsSqsWorkerCommand extends Command
{
private const LIMIT_MIN = 1;
private const LIMIT_MAX = 50;
private $awsSqsUtil;
private $consumer;
private $logger;
public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
ConsumerInterface $consumer,
LoggerInterface $logger
) {
parent::__construct();
$this->awsSqsUtil = $awsSqsUtil;
$this->consumer = $consumer;
$this->logger = $logger;
}
protected function configure()
{
$this
->setName('app:aws-sqs-worker')
->setDescription('Watches AWS SQS queues for messages.')
->addOption(
'queue',
null,
InputOption::VALUE_REQUIRED,
'The name of the queue to watch.'
)
->addOption(
'limit',
null,
InputOption::VALUE_OPTIONAL,
'The maximum amount of messages a worker should consume.',
self::LIMIT_MAX
);
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$parameters = $this->extractParameters($input);
$processed = 0;
$queueUrl = $this->awsSqsUtil->getQueueUrl($parameters['queue']);
$this->handleInterruption();
$output->writeln(
sprintf('Watching the "%s" queue for "%d" messages ...', $parameters['queue'], $parameters['limit'])
);
while (true) {
if ($processed === $parameters['limit']) {
exit;
}
$this->handleMemory($parameters['queue'], $parameters['limit']);
$message = $this->awsSqsUtil->receiveMessage($queueUrl);
if ($message instanceof Message) {
$this->consumer->consume($message, $parameters['queue']);
++$processed;
} else {
$output->writeln('Sleeping for 10 seconds due to no message ...');
sleep(10);
}
}
}
private function extractParameters(InputInterface $input): iterable
{
$queue = (string) $input->getOption('queue');
if (null === $queue || !trim($queue)) {
throw new AwsSqsWorkerException('The "--queue" option requires a value.');
}
$limit = (int) $input->getOption('limit');
if ($limit < self::LIMIT_MIN || $limit > self::LIMIT_MAX) {
$limit = self::LIMIT_MAX;
}
return ['queue' => $queue, 'limit' => $limit];
}
private function handleInterruption(): void
{
pcntl_async_signals(true);
pcntl_signal(SIGINT, function () {
throw new AwsSqsWorkerException('Process has been terminated with the "ctrl+c" signal.');
});
pcntl_signal(SIGTERM, function () {
throw new AwsSqsWorkerException('Process has been terminated with the "kill" signal.');
});
}
private function handleMemory(string $queue, int $limit): void
{
// 104857600 bytes = 100 megabytes
if (104857600 < memory_get_peak_usage(true)) {
throw new AwsSqsWorkerException(
sprintf('Run out of memory while watching the "%s" queue for "%d" messages.', $queue, $limit)
);
}
}
}
$ bin/console app:aws-sqs-worker --help
Description:
Watches AWS SQS queues for messages.
Usage:
app:aws-sqs-worker [options]
Options:
--queue=QUEUE The name of the queue to watch.
--limit[=LIMIT] The maximum amount of messages a worker should consume. [default: 50]
declare(strict_types=1);
namespace App\Model;
class Message
{
public $url;
public $id;
public $body;
public $receiptHandle;
}
declare(strict_types=1);
namespace App\Message\Consumer;
use App\Model\Message;
interface ConsumerInterface
{
public function consume(Message $message, string $queue): void;
}
declare(strict_types=1);
namespace App\Message\Consumer;
use App\Message\Consumer\Strategy\StrategyInterface;
use App\Model\Message;
use Traversable;
class Consumer implements ConsumerInterface
{
private $strategies;
public function __construct(Traversable $strategies)
{
$this->strategies = $strategies;
}
public function consume(Message $message, string $queue): void
{
/** @var StrategyInterface $strategy */
foreach ($this->strategies as $strategy) {
if ($strategy->canProcess($queue)) {
$strategy->process($message);
break;
}
}
}
}
declare(strict_types=1);
namespace App\Message\Consumer\Strategy;
use App\Model\Message;
interface StrategyInterface
{
public function canProcess(string $queue): bool;
public function process(Message $message): void;
}
declare(strict_types=1);
namespace App\Message\Consumer\Strategy;
use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;
class CommentStrategy implements StrategyInterface
{
public const QUEUE_NAME = 'comment';
private $awsSqsUtil;
private $logger;
public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
LoggerInterface $logger
) {
$this->awsSqsUtil = $awsSqsUtil;
$this->logger = $logger;
}
public function canProcess(string $queue): bool
{
return self::QUEUE_NAME === $queue;
}
public function process(Message $message): void
{
$body = json_decode($message->body, true);
if ($body['is_good_message']) {
$this->logger->info(sprintf('The message "%s" has been consumed.', $message->id));
} else {
$this->logger->alert(sprintf('The message "%s" has been deleted.', $message->id));
}
$this->awsSqsUtil->deleteMessage($message);
}
}
declare(strict_types=1);
namespace App\Message\Consumer\Strategy;
use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;
class FileStrategy implements StrategyInterface
{
public const QUEUE_NAME = 'file';
private $awsSqsUtil;
private $logger;
public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
LoggerInterface $logger
) {
$this->awsSqsUtil = $awsSqsUtil;
$this->logger = $logger;
}
public function canProcess(string $queue): bool
{
return self::QUEUE_NAME === $queue;
}
public function process(Message $message): void
{
$body = json_decode($message->body, true);
if ($body['is_good_message']) {
$this->awsSqsUtil->deleteMessage($message);
$this->logger->info(sprintf('The message "%s" has been consumed.', $message->id));
} else {
$this->awsSqsUtil->requeueMessage($message);
$this->logger->alert(sprintf('The message "%s" has been put in the "flight" mode.', $message->id));
}
}
}
declare(strict_types=1);
namespace App\Message\Consumer\Strategy;
use App\Model\Message;
use App\Util\AwsSqsUtilInterface;
use Psr\Log\LoggerInterface;
class ImageStrategy implements StrategyInterface
{
public const QUEUE_NAME = 'image';
private $awsSqsUtil;
private $logger;
public function __construct(
AwsSqsUtilInterface $awsSqsUtil,
LoggerInterface $logger
) {
$this->awsSqsUtil = $awsSqsUtil;
$this->logger = $logger;
}
public function canProcess(string $queue): bool
{
return self::QUEUE_NAME === $queue;
}
public function process(Message $message): void
{
$body = json_decode($message->body, true);
if ($body['is_good_message']) {
$this->awsSqsUtil->deleteMessage($message);
$this->logger->info(sprintf('The message "%s" has been consumed.', $message->id));
} else {
$this->logger->alert(sprintf('The message "%s" has been put in the "flight" mode.', $message->id));
}
}
}
Aşağıdaki tüm sıralarda aynı mesajlarımız var.
{"name":"Inanzzz","is_good_message":true}
{"name":"Inanzzz","is_good_message":true}
{"name":"Inanzzz","is_good_message":false}
{"name":"Inanzzz","is_good_message":true}
Mesaj işlenebilsede işlenemesede sıradan silinecek.
Komutun çalışması.
$ bin/console app:aws-sqs-worker --queue=comment --limit=10
Watching the "comment" queue for "10" messages ...
[2018-09-08 21:12:30] app.ALERT: The message "0004f333-b006-479d-bddd-fcbdc234ba81" has been deleted. [] []
Uygulama kayıtları.
[2018-09-08 21:12:30] app.INFO: The message "1edd4903-edb4-4c4b-9d92-2cd5f6209e7b" has been consumed. [] []
[2018-09-08 21:12:31] app.INFO: The message "333023fe-bafa-46ae-8b14-0e01d08486e5" has been consumed. [] []
[2018-09-08 21:12:30] app.ALERT: The message "0004f333-b006-479d-bddd-fcbdc234ba81" has been deleted. [] []
[2018-09-08 21:12:31] app.INFO: The message "444c05c7-7255-4b9b-9a10-b536d859b4c5" has been consumed. [] []
Mesaj işlenebilirse sıradan silinecek aksi takdirde tekrar denemek için sıraya tekrar konulacak. Buradaki sorun şu ki, çok fazla işlenmemiş mesajlar varsa, kuyruğun tıkanması söz konucu olabilir ki, buda iyi bir şey değildir.
Komutun çalışması.
$ bin/console app:aws-sqs-worker --queue=file --limit=10
Watching the "file" queue for "10" messages ...
[2018-09-08 21:24:58] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:25:28] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:25:59] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:26:49] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:28:00] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:28:31] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:29:42] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Uygulama kayıtları.
[2018-09-08 21:24:57] app.INFO: The message "888a91cb-fe3c-4f42-95bb-7b98bebfd0cc" has been consumed. [] []
[2018-09-08 21:24:58] app.INFO: The message "9ceafd32-8adf-4f8b-af6d-598725203799" has been consumed. [] []
[2018-09-08 21:24:58] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:24:58] app.INFO: The message "16291e18-c618-41c4-a4db-f83be417dbf5" has been consumed. [] []
[2018-09-08 21:25:28] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:25:59] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:26:49] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:28:00] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:28:31] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
[2018-09-08 21:29:42] app.ALERT: The message "00028690-7cb8-4d1a-b8ad-62573be548af" has been put in the "flight" mode. [] []
Eğer mesaj işlenirse, sıradan silinir, aksi takdirde "flight" modunda 1 dakika tutulur ve mesajı tekrar işlemek için tekrar denenir. Bu işlem 3 kez tekrarlanır. Eğer denemeler başarısız olursa, mesaj ilişkili dead-letter sırasına taşınır ki buda ideal bir senaryodur.
Komutun çalışması.
$ bin/console app:aws-sqs-worker --queue=image --limit=10
Watching the "image" queue for "10" messages ...
[2018-09-08 21:47:46] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:49:07] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
Sleeping for 10 seconds due to no message ...
....
[2018-09-08 21:50:08] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
Uygulama kayıtları.
[2018-09-08 21:47:46] app.INFO: The message "eb9b7d45-db12-472e-bc1f-c4557743592a" has been consumed. [] []
[2018-09-08 21:47:46] app.INFO: The message "3b2156ba-17bb-41ea-b0b4-a235fc8a5b83" has been consumed. [] []
[2018-09-08 21:47:46] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
[2018-09-08 21:47:46] app.INFO: The message "d3eb70b4-c552-4144-82cc-9fcf771686c4" has been consumed. [] []
[2018-09-08 21:49:07] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []
[2018-09-08 21:50:08] app.ALERT: The message "00053c92-4bd4-400f-bb22-7cf489767e8b" has been put in the "flight" mode. [] []