Bu örnekte AWS Simple Queue Service (SQS) hizmetiyle çalışmak için Localstack ve Golang'ı kullanacağız. Burada kuyruklar oluşturup, mesajlar göndereceğiz ve mesaj alacağız. Bunla birlikte ayrıca başka küçük işler yapacağız.


Yapı


├── internal
│   ├── message
│   │   └── message.go
│   └── pkg
│   └── cloud
│   ├── aws
│   │   ├── aws.go
│   │   └── sqs.go
│   ├── client.go
│   └── model.go
└── main.go

Dosyalar


main.go


package main

import (
"log"
"time"

"github.com/you/aws/internal/message"
"github.com/you/aws/internal/pkg/cloud/aws"
)

func main() {
// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
})
if err != nil {
log.Fatalln(err)
}

// Test message
message.Message(aws.NewSQS(ses, time.Second*5))
}

client.go


package cloud

import (
"context"
)

type MessageClient interface {
// Creates a new long polling queue and returns its URL.
CreateQueue(ctx context.Context, queueName string, isDLX bool) (string, error)
// Get a queue ARN.
QueueARN(ctx context.Context, queueURL string) (string, error)
// Binds a DLX queue to a normal queue.
BindDLX(ctx context.Context, queueURL, dlxARN string) error
// Send a message to queue and returns its message ID.
Send(ctx context.Context, req *SendRequest) (string, error)
// Long polls given amount of messages from a queue.
Receive(ctx context.Context, queueURL string) (*Message, error)
// Deletes a message from a queue.
Delete(ctx context.Context, queueURL, rcvHandle string) error
}

model.go


package cloud

type SendRequest struct {
QueueURL string
Body string
Attributes []Attribute
}

type Attribute struct {
Key string
Value string
Type string
}

type Message struct {
ID string
ReceiptHandle string
Body string
Attributes map[string]string
}

aws.go


package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

type Config struct {
Address string
Region string
Profile string
ID string
Secret string
}

func New(config Config) (*session.Session, error) {
return session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials(config.ID, config.Secret, ""),
Region: aws.String(config.Region),
Endpoint: aws.String(config.Address),
S3ForcePathStyle: aws.Bool(true),
},
Profile: config.Profile,
},
)
}

sqs.go


package aws

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/you/aws/internal/pkg/cloud"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

var _ cloud.MessageClient = SQS{}

type SQS struct {
timeout time.Duration
client *sqs.SQS
}

func NewSQS(session *session.Session, timeout time.Duration) SQS {
return SQS{
timeout: timeout,
client: sqs.New(session),
}
}

func (s SQS) CreateQueue(ctx context.Context, queueName string, isDLX bool) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

ret := "345600" // 4 days
if isDLX {
ret = "1209600" // 14 days
}

res, err := s.client.CreateQueueWithContext(ctx, &sqs.CreateQueueInput{
QueueName: aws.String(queueName),
Attributes: map[string]*string{
"MessageRetentionPeriod": aws.String(ret),
"VisibilityTimeout": aws.String("5"),
"ReceiveMessageWaitTimeSeconds": aws.String("20"), // Enable long polling
},
})
if err != nil {
return "", fmt.Errorf("create: %w", err)
}

return *res.QueueUrl, nil
}

func (s SQS) QueueARN(ctx context.Context, queueURL string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.GetQueueAttributesWithContext(ctx, &sqs.GetQueueAttributesInput{
AttributeNames: []*string{aws.String("QueueArn")},
QueueUrl: aws.String(queueURL),
})
if err != nil {
return "", fmt.Errorf("get attributes: %w", err)
}

if len(res.Attributes) != 1 {
return "", fmt.Errorf("not found")
}

return *res.Attributes["QueueArn"], nil
}

func (s SQS) BindDLX(ctx context.Context, queueURL, dlxARN string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

policy, err := json.Marshal(map[string]string{
"deadLetterTargetArn": dlxARN,
"maxReceiveCount": "3",
})
if err != nil {
return fmt.Errorf("marshal policy")
}

if _, err := s.client.SetQueueAttributesWithContext(ctx, &sqs.SetQueueAttributesInput{
QueueUrl: aws.String(queueURL),
Attributes: map[string]*string{
sqs.QueueAttributeNameRedrivePolicy: aws.String(string(policy)),
},
}); err != nil {
return fmt.Errorf("set attributes: %w", err)
}

return nil
}

func (s SQS) Send(ctx context.Context, req *cloud.SendRequest) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

attrs := make(map[string]*sqs.MessageAttributeValue, len(req.Attributes))
for _, attr := range req.Attributes {
attrs[attr.Key] = &sqs.MessageAttributeValue{
StringValue: aws.String(attr.Value),
DataType: aws.String(attr.Type),
}
}

res, err := s.client.SendMessageWithContext(ctx, &sqs.SendMessageInput{
MessageAttributes: attrs,
MessageBody: aws.String(req.Body),
QueueUrl: aws.String(req.QueueURL),
})
if err != nil {
return "", fmt.Errorf("send: %w", err)
}

return *res.MessageId, nil
}

func (s SQS) Receive(ctx context.Context, queueURL string) (*cloud.Message, error) {
// timeout = WaitTimeSeconds + 5
ctx, cancel := context.WithTimeout(ctx, time.Second*(20+5))
defer cancel()

res, err := s.client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: aws.Int64(1),
WaitTimeSeconds: aws.Int64(20),
MessageAttributeNames: aws.StringSlice([]string{"All"}),
})
if err != nil {
return nil, fmt.Errorf("receive: %w", err)
}

if len(res.Messages) == 0 {
return nil, nil
}

attrs := make(map[string]string)
for key, attr := range res.Messages[0].MessageAttributes {
attrs[key] = *attr.StringValue
}

return &cloud.Message{
ID: *res.Messages[0].MessageId,
ReceiptHandle: *res.Messages[0].ReceiptHandle,
Body: *res.Messages[0].Body,
Attributes: attrs,
}, nil
}

func (s SQS) Delete(ctx context.Context, queueURL, rcvHandle string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

if _, err := s.client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(rcvHandle),
}); err != nil {
return fmt.Errorf("delete: %w", err)
}

return nil
}

message.go


Bu sadece "kirli" bir kullanım örneğidir!


package message

import (
"context"
"log"

"github.com/you/aws/internal/pkg/cloud"
)

func Message(client cloud.MessageClient) {
ctx := context.Background()

dlxURL := createQueueDLX(ctx, client)
queURL := createQueue(ctx, client)
dlxARN := queueARN(ctx, client, dlxURL)
bindDLX(ctx, client, queURL, dlxARN)
send(ctx, client, queURL)
rcvHnd := receive(ctx, client, queURL)
deleteMessage(ctx, client, queURL, rcvHnd)
}

func createQueueDLX(ctx context.Context, client cloud.MessageClient) string {
url, err := client.CreateQueue(ctx, "welcome-email-queue.dlx", true)
if err != nil {
log.Fatalln(err)
}
log.Println("create queue:", url)

return url
}

func createQueue(ctx context.Context, client cloud.MessageClient) string {
url, err := client.CreateQueue(ctx, "welcome-email-queue", false)
if err != nil {
log.Fatalln(err)
}
log.Println("create queue:", url)

return url
}

func queueARN(ctx context.Context, client cloud.MessageClient, url string) string {
arn, err := client.QueueARN(ctx, url)
if err != nil {
log.Fatalln(err)
}
log.Println("queue ARN:", arn)

return arn
}

func bindDLX(ctx context.Context, client cloud.MessageClient, queueURL, dlxARN string) {
if err := client.BindDLX(ctx, queueURL, dlxARN); err != nil {
log.Fatalln(err)
}
log.Println("bind DLX: ok")
}

func send(ctx context.Context, client cloud.MessageClient, queueURL string) {
id, err := client.Send(ctx, &cloud.SendRequest{
QueueURL: queueURL,
Body: "Message body!",
Attributes: []cloud.Attribute{
{
Key: "Title",
Value: "SQS send message",
Type: "String",
},
{
Key: "Year",
Value: "2021",
Type: "Number",
},
},
})
if err != nil {
log.Fatalln(err)
}
log.Println("send: message ID:", id)
}

func receive(ctx context.Context, client cloud.MessageClient, queueURL string) string {
res, err := client.Receive(ctx, queueURL)
if err != nil {
log.Fatalln(err)
}
log.Println("receive:", res)

return res.ReceiptHandle
}

func deleteMessage(ctx context.Context, client cloud.MessageClient, queueURL, rcvHnd string) {
if err := client.Delete(ctx, queueURL, rcvHnd); err != nil {
log.Fatalln(err)
}
log.Println("delete message: ok")
}

Test


$ go run --race main.go
2021/01/25 17:45:41 create queue: http://localhost:4566/000000000000/welcome-email-queue.dlx
2021/01/25 17:45:41 create queue: http://localhost:4566/000000000000/welcome-email-queue
2021/01/25 17:45:41 queue ARN: arn:aws:sqs:eu-west-1:000000000000:welcome-email-queue.dlx
2021/01/25 17:45:41 bind DLX: ok
2021/01/25 17:45:42 send: message ID: b9d8cb50-6233-d8ef-835e-434c6bfa9b9b
2021/01/25 17:45:42 receive: &{b9d8cb50-6233-d8ef-835e-434c6bfa9b9b agtiixwvcwdlpa Message body! map[Title:SQS send message Year:2021]}
2021/01/25 17:45:42 delete message: ok