Herkese merhaba!

Uzun yıllardır bol miktarda kişisel zaman ve enerji harcayarak bilgimizi hepinizle paylaşıyoruz. Ancak şu andan itibaren bu blogu çalışır durumda tutabilmek için yardımınıza ihtiyacımız var. Yapmanız gereken tek şey, sitedeki reklamlardan birine tıklamak olacaktır, aksi takdirde hosting vb. masraflar nedeniyle maalesef yayından kaldırılacaktır. Teşekkürler.

Bu örnekte, mesajları üretmek ve tüketmek için AWS SQS'yi kullanacağız. Kullanacağımız tüketici çalışanı, eşzamanlı (engelleyici) veya eşzamansız (engellemesiz) şekilde çalışır. Bir veya daha fazla işçiyi çalıştırmamızı sağlar. Ayrıca her bir çalışan, aşağıdaki diyagramda gösterildiği gibi bir veya daha fazla tüketiciyi çalıştırabilir.



Yapı


├── cmd
│   ├── consumer
│   │   └── main.go
│   └── producer
│   └── main.go
└── internal
├── email
│   ├── consumer.go
│   └── producer.go
└── pkg
└── cloud
├── aws
│   ├── aws.go
│   └── sqs.go
├── client.go
└── model.go

Dosyalar


aws.go


package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

type Config struct {
Address string
Region string
Profile string
ID string
Secret string
}

func New(config Config) (*session.Session, error) {
return session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials(config.ID, config.Secret, ""),
Region: aws.String(config.Region),
Endpoint: aws.String(config.Address),
S3ForcePathStyle: aws.Bool(true),
},
Profile: config.Profile,
},
)
}

sqs.go


package aws

import (
"context"
"fmt"
"time"

"github.com/you/aws/internal/pkg/cloud"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

var _ cloud.MessageClient = SQS{}

type SQS struct {
timeout time.Duration
client *sqs.SQS
}

func NewSQS(session *session.Session, timeout time.Duration) SQS {
return SQS{
timeout: timeout,
client: sqs.New(session),
}
}

func (s SQS) Send(ctx context.Context, req *cloud.SendRequest) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

attrs := make(map[string]*sqs.MessageAttributeValue, len(req.Attributes))
for _, attr := range req.Attributes {
attrs[attr.Key] = &sqs.MessageAttributeValue{
StringValue: aws.String(attr.Value),
DataType: aws.String(attr.Type),
}
}

res, err := s.client.SendMessageWithContext(ctx, &sqs.SendMessageInput{
MessageAttributes: attrs,
MessageBody: aws.String(req.Body),
QueueUrl: aws.String(req.QueueURL),
})
if err != nil {
return "", fmt.Errorf("send: %w", err)
}

return *res.MessageId, nil
}

func (s SQS) Receive(ctx context.Context, queueURL string, maxMsg int64) ([]*sqs.Message, error) {
if maxMsg < 1 || maxMsg > 10 {
return nil, fmt.Errorf("receive argument: msgMax valid values: 1 to 10: given %d", maxMsg)
}

var waitTimeSeconds int64 = 10

// Must always be above `WaitTimeSeconds` otherwise `ReceiveMessageWithContext`
// trigger context timeout error.
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(waitTimeSeconds+5))
defer cancel()

res, err := s.client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: aws.Int64(maxMsg),
WaitTimeSeconds: aws.Int64(waitTimeSeconds),
MessageAttributeNames: aws.StringSlice([]string{"All"}),
})
if err != nil {
return nil, fmt.Errorf("receive: %w", err)
}

return res.Messages, nil
}

func (s SQS) Delete(ctx context.Context, queueURL, rcvHandle string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

if _, err := s.client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(rcvHandle),
}); err != nil {
return fmt.Errorf("delete: %w", err)
}

return nil
}

client.go


package cloud

import (
"context"

"github.com/aws/aws-sdk-go/service/sqs"
)

type MessageClient interface {
// Send a message to queue and returns its message ID.
Send(ctx context.Context, req *SendRequest) (string, error)
// Long polls given amount of messages from a queue.
Receive(ctx context.Context, queueURL string, maxMsg int64) ([]*sqs.Message, error)
// Deletes a message from a queue.
Delete(ctx context.Context, queueURL, rcvHandle string) error
}

model.go


package cloud

type SendRequest struct {
QueueURL string
Body string
Attributes []Attribute
}

type Attribute struct {
Key string
Value string
Type string
}

producer.go


Burada 500 sahte mesaj üretiyoruz.


package email

import (
"context"
"fmt"
"log"

"github.com/you/aws/internal/pkg/cloud"
)

type Producer struct {
client cloud.MessageClient
}

func NewProducer(client cloud.MessageClient) Producer {
return Producer{client: client}
}

func (p Producer) Produce(ctx context.Context, queueURL string) error {
for i := 1; i <= 500; i++ {
if _, err := p.client.Send(ctx, &cloud.SendRequest{
QueueURL: queueURL,
Body: fmt.Sprintf("body: %d", i),
Attributes: []cloud.Attribute{
{
Key: "FromEmail",
Value: "from@example.com",
Type: "String",
},
{
Key: "ToEmail",
Value: "to@example.com",
Type: "String",
},
{
Key: "Subject",
Value: "this is subject",
Type: "String",
},
},
}); err != nil {
return err
}

log.Printf("body: %d\n", i)
}

return nil
}

main.go (producer)


Bu bizim üretici uygulamamız/mikro servisimizdir.


package main

import (
"context"
"log"
"time"

"github.com/you/aws/internal/email"
"github.com/you/aws/internal/pkg/cloud/aws"
)

func main() {
// Create a cancellable context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
})
if err != nil {
log.Fatalln(err)
}

// Set queue URL.
url := "http://localhost:4566/000000000000/welcome-email"

// Instantiate client.
client := aws.NewSQS(ses, time.Second*5)

// Instantiate email producer and start producing dummy messages.
if err := email.NewProducer(client).Produce(ctx, url); err != nil {
log.Fatalln("produce:", err)
}
}

consumer.go


package email

import (
"context"
"log"
"sync"

"github.com/you/aws/internal/pkg/cloud"
"github.com/aws/aws-sdk-go/service/sqs"
)

type ConsumerType string

const (
// Consumers to consume messages one by one.
// A single goroutine handles all messages.
// Progression is slower and requires less system resource.
// Ideal for quiet/non-critical queues.
SyncConsumer ConsumerType = "blocking"
// Consumers to consume messages at the same time.
// Runs an individual goroutine per message.
// Progression is faster and requires more system resource.
// Ideal for busy/critical queues.
AsyncConsumer ConsumerType = "non-blocking"
)

type ConsumerConfig struct {
// Instructs whether to consume messages come from a worker synchronously or asynchronous.
Type ConsumerType
// Queue URL to receive messages from.
QueueURL string
// Maximum workers that will independently receive messages from a queue.
MaxWorker int
// Maximum messages that will be picked up by a worker in one-go.
MaxMsg int
}

type Consumer struct {
client cloud.MessageClient
config ConsumerConfig
}

func NewConsumer(client cloud.MessageClient, config ConsumerConfig) Consumer {
return Consumer{
client: client,
config: config,
}
}

func (c Consumer) Start(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(c.config.MaxWorker)

for i := 1; i <= c.config.MaxWorker; i++ {
go c.worker(ctx, wg, i)
}

wg.Wait()
}

func (c Consumer) worker(ctx context.Context, wg *sync.WaitGroup, id int) {
defer wg.Done()

log.Printf("worker %d: started\n", id)

for {
select {
case <-ctx.Done():
log.Printf("worker %d: stopped\n", id)
return
default:
}

msgs, err := c.client.Receive(ctx, c.config.QueueURL, int64(c.config.MaxMsg))
if err != nil {
// Critical error!
log.Printf("worker %d: receive error: %s\n", id, err.Error())
continue
}

if len(msgs) == 0 {
continue
}

if c.config.Type == SyncConsumer {
c.sync(ctx, msgs)
} else {
c.async(ctx, msgs)
}
}
}

func (c Consumer) sync(ctx context.Context, msgs []*sqs.Message) {
for _, msg := range msgs {
c.consume(ctx, msg)
}
}

func (c Consumer) async(ctx context.Context, msgs []*sqs.Message) {
wg := &sync.WaitGroup{}
wg.Add(len(msgs))

for _, msg := range msgs {
go func(msg *sqs.Message) {
defer wg.Done()

c.consume(ctx, msg)
}(msg)
}

wg.Wait()
}

func (c Consumer) consume(ctx context.Context, msg *sqs.Message) {
log.Println(*msg.Body)

if err := c.client.Delete(ctx, c.config.QueueURL, *msg.ReceiptHandle); err != nil {
// Critical error!
log.Printf("delete error: %s\n", err.Error())
}
}

main.go (consumer)


Bu bizim işleyici uygulamamız/mikro servisimizdir.


package main

import (
"context"
"log"
"time"

"github.com/you/aws/internal/email"
"github.com/you/aws/internal/pkg/cloud/aws"
)

func main() {
// Create a cancellable context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
})
if err != nil {
log.Fatalln(err)
}

// Set queue URL.
url := "http://localhost:4566/000000000000/welcome-email"

// Instantiate client.
client := aws.NewSQS(ses, time.Second*5)

// Instantiate consumer and start consuming.
email.NewConsumer(client, email.ConsumerConfig{
Type: email.AsyncConsumer,
QueueURL: url,
MaxWorker: 2,
MaxMsg: 10,
}).Start(ctx)
}

Sıra (queue)


$ aws --profile localstack --endpoint-url http://localhost:4566 sqs get-queue-attributes --queue-url http://localhost:4566/000000000000/welcome-email
{
"Attributes": {
"ApproximateNumberOfMessages": "0",
"ApproximateNumberOfMessagesDelayed": "0",
"ApproximateNumberOfMessagesNotVisible": "0",
"CreatedTimestamp": "1611682123.5068",
"DelaySeconds": "0",
"LastModifiedTimestamp": "1611682123.5068",
"MaximumMessageSize": "262144",
"MessageRetentionPeriod": "345600",
"QueueArn": "arn:aws:sqs:eu-west-1:000000000000:welcome-email",
"ReceiveMessageWaitTimeSeconds": "10",
"VisibilityTimeout": "20"
}
}

Test


Üretici


$ go run --race cmd/producer/main.go

2021/01/26 21:31:46 body: 1
2021/01/26 21:31:46 body: 2
2021/01/26 21:31:46 body: 3
2021/01/26 21:31:46 body: 4
...
2021/01/26 21:32:23 body: 489
2021/01/26 21:32:24 body: 490
2021/01/26 21:32:24 body: 491
2021/01/26 21:32:24 body: 492
2021/01/26 21:32:24 body: 493
2021/01/26 21:32:24 body: 494
2021/01/26 21:32:24 body: 495
2021/01/26 21:32:24 body: 496
2021/01/26 21:32:24 body: 497
2021/01/26 21:32:24 body: 498
2021/01/26 21:32:24 body: 499
2021/01/26 21:32:25 body: 500

İşleyici


$ go run --race cmd/consumer/main.go

2021/01/26 21:32:49 worker 1: started
2021/01/26 21:32:49 worker 2: started
2021/01/26 21:32:49 body: 1
2021/01/26 21:32:49 body: 2
2021/01/26 21:32:49 body: 6
2021/01/26 21:32:49 body: 10
2021/01/26 21:32:49 body: 3
2021/01/26 21:32:49 body: 5
2021/01/26 21:32:49 body: 7
2021/01/26 21:32:49 body: 9
2021/01/26 21:32:49 body: 8
2021/01/26 21:32:49 body: 4
2021/01/26 21:32:49 body: 11
2021/01/26 21:32:49 body: 19
...
2021/01/26 21:33:13 body: 495
2021/01/26 21:33:13 body: 496
2021/01/26 21:33:13 body: 497
2021/01/26 21:33:13 body: 499
2021/01/26 21:33:13 body: 500
2021/01/26 21:33:13 body: 99
2021/01/26 21:33:24 body: 320