Herkese merhaba!

Uzun yıllardır bol miktarda kişisel zaman ve enerji harcayarak bilgimizi hepinizle paylaşıyoruz. Ancak şu andan itibaren bu blogu çalışır durumda tutabilmek için yardımınıza ihtiyacımız var. Yapmanız gereken tek şey, sitedeki reklamlardan birine tıklamak olacaktır, aksi takdirde hosting vb. masraflar nedeniyle maalesef yayından kaldırılacaktır. Teşekkürler.

Bu örnekte AWS Simple Queue Service (SQS) hizmetiyle çalışmak için Localstack ve Golang'ı kullanacağız. Burada kuyruklar oluşturup, mesajlar göndereceğiz ve mesaj alacağız. Bunla birlikte ayrıca başka küçük işler yapacağız.


Yapı


├── internal
│   ├── message
│   │   └── message.go
│   └── pkg
│   └── cloud
│   ├── aws
│   │   ├── aws.go
│   │   └── sqs.go
│   ├── client.go
│   └── model.go
└── main.go

Dosyalar


main.go


package main

import (
"log"
"time"

"github.com/you/aws/internal/message"
"github.com/you/aws/internal/pkg/cloud/aws"
)

func main() {
// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
})
if err != nil {
log.Fatalln(err)
}

// Test message
message.Message(aws.NewSQS(ses, time.Second*5))
}

client.go


package cloud

import (
"context"
)

type MessageClient interface {
// Creates a new long polling queue and returns its URL.
CreateQueue(ctx context.Context, queueName string, isDLX bool) (string, error)
// Get a queue ARN.
QueueARN(ctx context.Context, queueURL string) (string, error)
// Binds a DLX queue to a normal queue.
BindDLX(ctx context.Context, queueURL, dlxARN string) error
// Send a message to queue and returns its message ID.
Send(ctx context.Context, req *SendRequest) (string, error)
// Long polls given amount of messages from a queue.
Receive(ctx context.Context, queueURL string) (*Message, error)
// Deletes a message from a queue.
Delete(ctx context.Context, queueURL, rcvHandle string) error
}

model.go


package cloud

type SendRequest struct {
QueueURL string
Body string
Attributes []Attribute
}

type Attribute struct {
Key string
Value string
Type string
}

type Message struct {
ID string
ReceiptHandle string
Body string
Attributes map[string]string
}

aws.go


package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

type Config struct {
Address string
Region string
Profile string
ID string
Secret string
}

func New(config Config) (*session.Session, error) {
return session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials(config.ID, config.Secret, ""),
Region: aws.String(config.Region),
Endpoint: aws.String(config.Address),
S3ForcePathStyle: aws.Bool(true),
},
Profile: config.Profile,
},
)
}

sqs.go


package aws

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/you/aws/internal/pkg/cloud"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

var _ cloud.MessageClient = SQS{}

type SQS struct {
timeout time.Duration
client *sqs.SQS
}

func NewSQS(session *session.Session, timeout time.Duration) SQS {
return SQS{
timeout: timeout,
client: sqs.New(session),
}
}

func (s SQS) CreateQueue(ctx context.Context, queueName string, isDLX bool) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

ret := "345600" // 4 days
if isDLX {
ret = "1209600" // 14 days
}

res, err := s.client.CreateQueueWithContext(ctx, &sqs.CreateQueueInput{
QueueName: aws.String(queueName),
Attributes: map[string]*string{
"MessageRetentionPeriod": aws.String(ret),
"VisibilityTimeout": aws.String("5"),
"ReceiveMessageWaitTimeSeconds": aws.String("20"), // Enable long polling
},
})
if err != nil {
return "", fmt.Errorf("create: %w", err)
}

return *res.QueueUrl, nil
}

func (s SQS) QueueARN(ctx context.Context, queueURL string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.GetQueueAttributesWithContext(ctx, &sqs.GetQueueAttributesInput{
AttributeNames: []*string{aws.String("QueueArn")},
QueueUrl: aws.String(queueURL),
})
if err != nil {
return "", fmt.Errorf("get attributes: %w", err)
}

if len(res.Attributes) != 1 {
return "", fmt.Errorf("not found")
}

return *res.Attributes["QueueArn"], nil
}

func (s SQS) BindDLX(ctx context.Context, queueURL, dlxARN string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

policy, err := json.Marshal(map[string]string{
"deadLetterTargetArn": dlxARN,
"maxReceiveCount": "3",
})
if err != nil {
return fmt.Errorf("marshal policy")
}

if _, err := s.client.SetQueueAttributesWithContext(ctx, &sqs.SetQueueAttributesInput{
QueueUrl: aws.String(queueURL),
Attributes: map[string]*string{
sqs.QueueAttributeNameRedrivePolicy: aws.String(string(policy)),
},
}); err != nil {
return fmt.Errorf("set attributes: %w", err)
}

return nil
}

func (s SQS) Send(ctx context.Context, req *cloud.SendRequest) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

attrs := make(map[string]*sqs.MessageAttributeValue, len(req.Attributes))
for _, attr := range req.Attributes {
attrs[attr.Key] = &sqs.MessageAttributeValue{
StringValue: aws.String(attr.Value),
DataType: aws.String(attr.Type),
}
}

res, err := s.client.SendMessageWithContext(ctx, &sqs.SendMessageInput{
MessageAttributes: attrs,
MessageBody: aws.String(req.Body),
QueueUrl: aws.String(req.QueueURL),
})
if err != nil {
return "", fmt.Errorf("send: %w", err)
}

return *res.MessageId, nil
}

func (s SQS) Receive(ctx context.Context, queueURL string) (*cloud.Message, error) {
// timeout = WaitTimeSeconds + 5
ctx, cancel := context.WithTimeout(ctx, time.Second*(20+5))
defer cancel()

res, err := s.client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: aws.Int64(1),
WaitTimeSeconds: aws.Int64(20),
MessageAttributeNames: aws.StringSlice([]string{"All"}),
})
if err != nil {
return nil, fmt.Errorf("receive: %w", err)
}

if len(res.Messages) == 0 {
return nil, nil
}

attrs := make(map[string]string)
for key, attr := range res.Messages[0].MessageAttributes {
attrs[key] = *attr.StringValue
}

return &cloud.Message{
ID: *res.Messages[0].MessageId,
ReceiptHandle: *res.Messages[0].ReceiptHandle,
Body: *res.Messages[0].Body,
Attributes: attrs,
}, nil
}

func (s SQS) Delete(ctx context.Context, queueURL, rcvHandle string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

if _, err := s.client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(rcvHandle),
}); err != nil {
return fmt.Errorf("delete: %w", err)
}

return nil
}

message.go


Bu sadece "kirli" bir kullanım örneğidir!


package message

import (
"context"
"log"

"github.com/you/aws/internal/pkg/cloud"
)

func Message(client cloud.MessageClient) {
ctx := context.Background()

dlxURL := createQueueDLX(ctx, client)
queURL := createQueue(ctx, client)
dlxARN := queueARN(ctx, client, dlxURL)
bindDLX(ctx, client, queURL, dlxARN)
send(ctx, client, queURL)
rcvHnd := receive(ctx, client, queURL)
deleteMessage(ctx, client, queURL, rcvHnd)
}

func createQueueDLX(ctx context.Context, client cloud.MessageClient) string {
url, err := client.CreateQueue(ctx, "welcome-email-queue.dlx", true)
if err != nil {
log.Fatalln(err)
}
log.Println("create queue:", url)

return url
}

func createQueue(ctx context.Context, client cloud.MessageClient) string {
url, err := client.CreateQueue(ctx, "welcome-email-queue", false)
if err != nil {
log.Fatalln(err)
}
log.Println("create queue:", url)

return url
}

func queueARN(ctx context.Context, client cloud.MessageClient, url string) string {
arn, err := client.QueueARN(ctx, url)
if err != nil {
log.Fatalln(err)
}
log.Println("queue ARN:", arn)

return arn
}

func bindDLX(ctx context.Context, client cloud.MessageClient, queueURL, dlxARN string) {
if err := client.BindDLX(ctx, queueURL, dlxARN); err != nil {
log.Fatalln(err)
}
log.Println("bind DLX: ok")
}

func send(ctx context.Context, client cloud.MessageClient, queueURL string) {
id, err := client.Send(ctx, &cloud.SendRequest{
QueueURL: queueURL,
Body: "Message body!",
Attributes: []cloud.Attribute{
{
Key: "Title",
Value: "SQS send message",
Type: "String",
},
{
Key: "Year",
Value: "2021",
Type: "Number",
},
},
})
if err != nil {
log.Fatalln(err)
}
log.Println("send: message ID:", id)
}

func receive(ctx context.Context, client cloud.MessageClient, queueURL string) string {
res, err := client.Receive(ctx, queueURL)
if err != nil {
log.Fatalln(err)
}
log.Println("receive:", res)

return res.ReceiptHandle
}

func deleteMessage(ctx context.Context, client cloud.MessageClient, queueURL, rcvHnd string) {
if err := client.Delete(ctx, queueURL, rcvHnd); err != nil {
log.Fatalln(err)
}
log.Println("delete message: ok")
}

Test


$ go run --race main.go
2021/01/25 17:45:41 create queue: http://localhost:4566/000000000000/welcome-email-queue.dlx
2021/01/25 17:45:41 create queue: http://localhost:4566/000000000000/welcome-email-queue
2021/01/25 17:45:41 queue ARN: arn:aws:sqs:eu-west-1:000000000000:welcome-email-queue.dlx
2021/01/25 17:45:41 bind DLX: ok
2021/01/25 17:45:42 send: message ID: b9d8cb50-6233-d8ef-835e-434c6bfa9b9b
2021/01/25 17:45:42 receive: &{b9d8cb50-6233-d8ef-835e-434c6bfa9b9b agtiixwvcwdlpa Message body! map[Title:SQS send message Year:2021]}
2021/01/25 17:45:42 delete message: ok