In this example we are going to use Localstack and Golang to work with AWS Simple Queue Service (SQS). We will create queues, send messages and receive messages as well as doing some other minor work.


Structure


├── internal
│   ├── message
│   │   └── message.go
│   └── pkg
│   └── cloud
│   ├── aws
│   │   ├── aws.go
│   │   └── sqs.go
│   ├── client.go
│   └── model.go
└── main.go

Files


main.go


package main

import (
"log"
"time"

"github.com/you/aws/internal/message"
"github.com/you/aws/internal/pkg/cloud/aws"
)

func main() {
// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
})
if err != nil {
log.Fatalln(err)
}

// Test message
message.Message(aws.NewSQS(ses, time.Second*5))
}

client.go


package cloud

import (
"context"
)

type MessageClient interface {
// Creates a new long polling queue and returns its URL.
CreateQueue(ctx context.Context, queueName string, isDLX bool) (string, error)
// Get a queue ARN.
QueueARN(ctx context.Context, queueURL string) (string, error)
// Binds a DLX queue to a normal queue.
BindDLX(ctx context.Context, queueURL, dlxARN string) error
// Send a message to queue and returns its message ID.
Send(ctx context.Context, req *SendRequest) (string, error)
// Long polls given amount of messages from a queue.
Receive(ctx context.Context, queueURL string) (*Message, error)
// Deletes a message from a queue.
Delete(ctx context.Context, queueURL, rcvHandle string) error
}

model.go


package cloud

type SendRequest struct {
QueueURL string
Body string
Attributes []Attribute
}

type Attribute struct {
Key string
Value string
Type string
}

type Message struct {
ID string
ReceiptHandle string
Body string
Attributes map[string]string
}

aws.go


package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

type Config struct {
Address string
Region string
Profile string
ID string
Secret string
}

func New(config Config) (*session.Session, error) {
return session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials(config.ID, config.Secret, ""),
Region: aws.String(config.Region),
Endpoint: aws.String(config.Address),
S3ForcePathStyle: aws.Bool(true),
},
Profile: config.Profile,
},
)
}

sqs.go


package aws

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/you/aws/internal/pkg/cloud"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

var _ cloud.MessageClient = SQS{}

type SQS struct {
timeout time.Duration
client *sqs.SQS
}

func NewSQS(session *session.Session, timeout time.Duration) SQS {
return SQS{
timeout: timeout,
client: sqs.New(session),
}
}

func (s SQS) CreateQueue(ctx context.Context, queueName string, isDLX bool) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

ret := "345600" // 4 days
if isDLX {
ret = "1209600" // 14 days
}

res, err := s.client.CreateQueueWithContext(ctx, &sqs.CreateQueueInput{
QueueName: aws.String(queueName),
Attributes: map[string]*string{
"MessageRetentionPeriod": aws.String(ret),
"VisibilityTimeout": aws.String("5"),
"ReceiveMessageWaitTimeSeconds": aws.String("20"), // Enable long polling
},
})
if err != nil {
return "", fmt.Errorf("create: %w", err)
}

return *res.QueueUrl, nil
}

func (s SQS) QueueARN(ctx context.Context, queueURL string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.GetQueueAttributesWithContext(ctx, &sqs.GetQueueAttributesInput{
AttributeNames: []*string{aws.String("QueueArn")},
QueueUrl: aws.String(queueURL),
})
if err != nil {
return "", fmt.Errorf("get attributes: %w", err)
}

if len(res.Attributes) != 1 {
return "", fmt.Errorf("not found")
}

return *res.Attributes["QueueArn"], nil
}

func (s SQS) BindDLX(ctx context.Context, queueURL, dlxARN string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

policy, err := json.Marshal(map[string]string{
"deadLetterTargetArn": dlxARN,
"maxReceiveCount": "3",
})
if err != nil {
return fmt.Errorf("marshal policy")
}

if _, err := s.client.SetQueueAttributesWithContext(ctx, &sqs.SetQueueAttributesInput{
QueueUrl: aws.String(queueURL),
Attributes: map[string]*string{
sqs.QueueAttributeNameRedrivePolicy: aws.String(string(policy)),
},
}); err != nil {
return fmt.Errorf("set attributes: %w", err)
}

return nil
}

func (s SQS) Send(ctx context.Context, req *cloud.SendRequest) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

attrs := make(map[string]*sqs.MessageAttributeValue, len(req.Attributes))
for _, attr := range req.Attributes {
attrs[attr.Key] = &sqs.MessageAttributeValue{
StringValue: aws.String(attr.Value),
DataType: aws.String(attr.Type),
}
}

res, err := s.client.SendMessageWithContext(ctx, &sqs.SendMessageInput{
MessageAttributes: attrs,
MessageBody: aws.String(req.Body),
QueueUrl: aws.String(req.QueueURL),
})
if err != nil {
return "", fmt.Errorf("send: %w", err)
}

return *res.MessageId, nil
}

func (s SQS) Receive(ctx context.Context, queueURL string) (*cloud.Message, error) {
// timeout = WaitTimeSeconds + 5
ctx, cancel := context.WithTimeout(ctx, time.Second*(20+5))
defer cancel()

res, err := s.client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: aws.Int64(1),
WaitTimeSeconds: aws.Int64(20),
MessageAttributeNames: aws.StringSlice([]string{"All"}),
})
if err != nil {
return nil, fmt.Errorf("receive: %w", err)
}

if len(res.Messages) == 0 {
return nil, nil
}

attrs := make(map[string]string)
for key, attr := range res.Messages[0].MessageAttributes {
attrs[key] = *attr.StringValue
}

return &cloud.Message{
ID: *res.Messages[0].MessageId,
ReceiptHandle: *res.Messages[0].ReceiptHandle,
Body: *res.Messages[0].Body,
Attributes: attrs,
}, nil
}

func (s SQS) Delete(ctx context.Context, queueURL, rcvHandle string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

if _, err := s.client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(rcvHandle),
}); err != nil {
return fmt.Errorf("delete: %w", err)
}

return nil
}

message.go


This is just a "dirty" example of usages!


package message

import (
"context"
"log"

"github.com/you/aws/internal/pkg/cloud"
)

func Message(client cloud.MessageClient) {
ctx := context.Background()

dlxURL := createQueueDLX(ctx, client)
queURL := createQueue(ctx, client)
dlxARN := queueARN(ctx, client, dlxURL)
bindDLX(ctx, client, queURL, dlxARN)
send(ctx, client, queURL)
rcvHnd := receive(ctx, client, queURL)
deleteMessage(ctx, client, queURL, rcvHnd)
}

func createQueueDLX(ctx context.Context, client cloud.MessageClient) string {
url, err := client.CreateQueue(ctx, "welcome-email-queue.dlx", true)
if err != nil {
log.Fatalln(err)
}
log.Println("create queue:", url)

return url
}

func createQueue(ctx context.Context, client cloud.MessageClient) string {
url, err := client.CreateQueue(ctx, "welcome-email-queue", false)
if err != nil {
log.Fatalln(err)
}
log.Println("create queue:", url)

return url
}

func queueARN(ctx context.Context, client cloud.MessageClient, url string) string {
arn, err := client.QueueARN(ctx, url)
if err != nil {
log.Fatalln(err)
}
log.Println("queue ARN:", arn)

return arn
}

func bindDLX(ctx context.Context, client cloud.MessageClient, queueURL, dlxARN string) {
if err := client.BindDLX(ctx, queueURL, dlxARN); err != nil {
log.Fatalln(err)
}
log.Println("bind DLX: ok")
}

func send(ctx context.Context, client cloud.MessageClient, queueURL string) {
id, err := client.Send(ctx, &cloud.SendRequest{
QueueURL: queueURL,
Body: "Message body!",
Attributes: []cloud.Attribute{
{
Key: "Title",
Value: "SQS send message",
Type: "String",
},
{
Key: "Year",
Value: "2021",
Type: "Number",
},
},
})
if err != nil {
log.Fatalln(err)
}
log.Println("send: message ID:", id)
}

func receive(ctx context.Context, client cloud.MessageClient, queueURL string) string {
res, err := client.Receive(ctx, queueURL)
if err != nil {
log.Fatalln(err)
}
log.Println("receive:", res)

return res.ReceiptHandle
}

func deleteMessage(ctx context.Context, client cloud.MessageClient, queueURL, rcvHnd string) {
if err := client.Delete(ctx, queueURL, rcvHnd); err != nil {
log.Fatalln(err)
}
log.Println("delete message: ok")
}

Test


$ go run --race main.go
2021/01/25 17:45:41 create queue: http://localhost:4566/000000000000/welcome-email-queue.dlx
2021/01/25 17:45:41 create queue: http://localhost:4566/000000000000/welcome-email-queue
2021/01/25 17:45:41 queue ARN: arn:aws:sqs:eu-west-1:000000000000:welcome-email-queue.dlx
2021/01/25 17:45:41 bind DLX: ok
2021/01/25 17:45:42 send: message ID: b9d8cb50-6233-d8ef-835e-434c6bfa9b9b
2021/01/25 17:45:42 receive: &{b9d8cb50-6233-d8ef-835e-434c6bfa9b9b agtiixwvcwdlpa Message body! map[Title:SQS send message Year:2021]}
2021/01/25 17:45:42 delete message: ok