In this example we are going to use AWS SQS to produce and consume messages. The consumer worker we will be using works in either synchronous (blocking) or asynchronous (non-blocking) manner. It allows us to run one or many workers. Also each worker can run one or many consumers within as shown below in the digram.



Structure


├── cmd
│   ├── consumer
│   │   └── main.go
│   └── producer
│   └── main.go
└── internal
├── email
│   ├── consumer.go
│   └── producer.go
└── pkg
└── cloud
├── aws
│   ├── aws.go
│   └── sqs.go
├── client.go
└── model.go

Files


aws.go


package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

type Config struct {
Address string
Region string
Profile string
ID string
Secret string
}

func New(config Config) (*session.Session, error) {
return session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials(config.ID, config.Secret, ""),
Region: aws.String(config.Region),
Endpoint: aws.String(config.Address),
S3ForcePathStyle: aws.Bool(true),
},
Profile: config.Profile,
},
)
}

sqs.go


package aws

import (
"context"
"fmt"
"time"

"github.com/you/aws/internal/pkg/cloud"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

var _ cloud.MessageClient = SQS{}

type SQS struct {
timeout time.Duration
client *sqs.SQS
}

func NewSQS(session *session.Session, timeout time.Duration) SQS {
return SQS{
timeout: timeout,
client: sqs.New(session),
}
}

func (s SQS) Send(ctx context.Context, req *cloud.SendRequest) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

attrs := make(map[string]*sqs.MessageAttributeValue, len(req.Attributes))
for _, attr := range req.Attributes {
attrs[attr.Key] = &sqs.MessageAttributeValue{
StringValue: aws.String(attr.Value),
DataType: aws.String(attr.Type),
}
}

res, err := s.client.SendMessageWithContext(ctx, &sqs.SendMessageInput{
MessageAttributes: attrs,
MessageBody: aws.String(req.Body),
QueueUrl: aws.String(req.QueueURL),
})
if err != nil {
return "", fmt.Errorf("send: %w", err)
}

return *res.MessageId, nil
}

func (s SQS) Receive(ctx context.Context, queueURL string, maxMsg int64) ([]*sqs.Message, error) {
if maxMsg < 1 || maxMsg > 10 {
return nil, fmt.Errorf("receive argument: msgMax valid values: 1 to 10: given %d", maxMsg)
}

var waitTimeSeconds int64 = 10

// Must always be above `WaitTimeSeconds` otherwise `ReceiveMessageWithContext`
// trigger context timeout error.
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(waitTimeSeconds+5))
defer cancel()

res, err := s.client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: aws.Int64(maxMsg),
WaitTimeSeconds: aws.Int64(waitTimeSeconds),
MessageAttributeNames: aws.StringSlice([]string{"All"}),
})
if err != nil {
return nil, fmt.Errorf("receive: %w", err)
}

return res.Messages, nil
}

func (s SQS) Delete(ctx context.Context, queueURL, rcvHandle string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

if _, err := s.client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(rcvHandle),
}); err != nil {
return fmt.Errorf("delete: %w", err)
}

return nil
}

client.go


package cloud

import (
"context"

"github.com/aws/aws-sdk-go/service/sqs"
)

type MessageClient interface {
// Send a message to queue and returns its message ID.
Send(ctx context.Context, req *SendRequest) (string, error)
// Long polls given amount of messages from a queue.
Receive(ctx context.Context, queueURL string, maxMsg int64) ([]*sqs.Message, error)
// Deletes a message from a queue.
Delete(ctx context.Context, queueURL, rcvHandle string) error
}

model.go


package cloud

type SendRequest struct {
QueueURL string
Body string
Attributes []Attribute
}

type Attribute struct {
Key string
Value string
Type string
}

producer.go


We are producing 500 dummy messages here.


package email

import (
"context"
"fmt"
"log"

"github.com/you/aws/internal/pkg/cloud"
)

type Producer struct {
client cloud.MessageClient
}

func NewProducer(client cloud.MessageClient) Producer {
return Producer{client: client}
}

func (p Producer) Produce(ctx context.Context, queueURL string) error {
for i := 1; i <= 500; i++ {
if _, err := p.client.Send(ctx, &cloud.SendRequest{
QueueURL: queueURL,
Body: fmt.Sprintf("body: %d", i),
Attributes: []cloud.Attribute{
{
Key: "FromEmail",
Value: "from@example.com",
Type: "String",
},
{
Key: "ToEmail",
Value: "to@example.com",
Type: "String",
},
{
Key: "Subject",
Value: "this is subject",
Type: "String",
},
},
}); err != nil {
return err
}

log.Printf("body: %d\n", i)
}

return nil
}

main.go (producer)


This is our producer application/microservice.


package main

import (
"context"
"log"
"time"

"github.com/you/aws/internal/email"
"github.com/you/aws/internal/pkg/cloud/aws"
)

func main() {
// Create a cancellable context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
})
if err != nil {
log.Fatalln(err)
}

// Set queue URL.
url := "http://localhost:4566/000000000000/welcome-email"

// Instantiate client.
client := aws.NewSQS(ses, time.Second*5)

// Instantiate email producer and start producing dummy messages.
if err := email.NewProducer(client).Produce(ctx, url); err != nil {
log.Fatalln("produce:", err)
}
}

consumer.go


package email

import (
"context"
"log"
"sync"

"github.com/you/aws/internal/pkg/cloud"
"github.com/aws/aws-sdk-go/service/sqs"
)

type ConsumerType string

const (
// Consumers to consume messages one by one.
// A single goroutine handles all messages.
// Progression is slower and requires less system resource.
// Ideal for quiet/non-critical queues.
SyncConsumer ConsumerType = "blocking"
// Consumers to consume messages at the same time.
// Runs an individual goroutine per message.
// Progression is faster and requires more system resource.
// Ideal for busy/critical queues.
AsyncConsumer ConsumerType = "non-blocking"
)

type ConsumerConfig struct {
// Instructs whether to consume messages come from a worker synchronously or asynchronous.
Type ConsumerType
// Queue URL to receive messages from.
QueueURL string
// Maximum workers that will independently receive messages from a queue.
MaxWorker int
// Maximum messages that will be picked up by a worker in one-go.
MaxMsg int
}

type Consumer struct {
client cloud.MessageClient
config ConsumerConfig
}

func NewConsumer(client cloud.MessageClient, config ConsumerConfig) Consumer {
return Consumer{
client: client,
config: config,
}
}

func (c Consumer) Start(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(c.config.MaxWorker)

for i := 1; i <= c.config.MaxWorker; i++ {
go c.worker(ctx, wg, i)
}

wg.Wait()
}

func (c Consumer) worker(ctx context.Context, wg *sync.WaitGroup, id int) {
defer wg.Done()

log.Printf("worker %d: started\n", id)

for {
select {
case <-ctx.Done():
log.Printf("worker %d: stopped\n", id)
return
default:
}

msgs, err := c.client.Receive(ctx, c.config.QueueURL, int64(c.config.MaxMsg))
if err != nil {
// Critical error!
log.Printf("worker %d: receive error: %s\n", id, err.Error())
continue
}

if len(msgs) == 0 {
continue
}

if c.config.Type == SyncConsumer {
c.sync(ctx, msgs)
} else {
c.async(ctx, msgs)
}
}
}

func (c Consumer) sync(ctx context.Context, msgs []*sqs.Message) {
for _, msg := range msgs {
c.consume(ctx, msg)
}
}

func (c Consumer) async(ctx context.Context, msgs []*sqs.Message) {
wg := &sync.WaitGroup{}
wg.Add(len(msgs))

for _, msg := range msgs {
go func(msg *sqs.Message) {
defer wg.Done()

c.consume(ctx, msg)
}(msg)
}

wg.Wait()
}

func (c Consumer) consume(ctx context.Context, msg *sqs.Message) {
log.Println(*msg.Body)

if err := c.client.Delete(ctx, c.config.QueueURL, *msg.ReceiptHandle); err != nil {
// Critical error!
log.Printf("delete error: %s\n", err.Error())
}
}

main.go (consumer)


This is our consumer application/microservice.


package main

import (
"context"
"log"
"time"

"github.com/you/aws/internal/email"
"github.com/you/aws/internal/pkg/cloud/aws"
)

func main() {
// Create a cancellable context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
})
if err != nil {
log.Fatalln(err)
}

// Set queue URL.
url := "http://localhost:4566/000000000000/welcome-email"

// Instantiate client.
client := aws.NewSQS(ses, time.Second*5)

// Instantiate consumer and start consuming.
email.NewConsumer(client, email.ConsumerConfig{
Type: email.AsyncConsumer,
QueueURL: url,
MaxWorker: 2,
MaxMsg: 10,
}).Start(ctx)
}

Queue


$ aws --profile localstack --endpoint-url http://localhost:4566 sqs get-queue-attributes --queue-url http://localhost:4566/000000000000/welcome-email
{
"Attributes": {
"ApproximateNumberOfMessages": "0",
"ApproximateNumberOfMessagesDelayed": "0",
"ApproximateNumberOfMessagesNotVisible": "0",
"CreatedTimestamp": "1611682123.5068",
"DelaySeconds": "0",
"LastModifiedTimestamp": "1611682123.5068",
"MaximumMessageSize": "262144",
"MessageRetentionPeriod": "345600",
"QueueArn": "arn:aws:sqs:eu-west-1:000000000000:welcome-email",
"ReceiveMessageWaitTimeSeconds": "10",
"VisibilityTimeout": "20"
}
}

Test


Producer


$ go run --race cmd/producer/main.go

2021/01/26 21:31:46 body: 1
2021/01/26 21:31:46 body: 2
2021/01/26 21:31:46 body: 3
2021/01/26 21:31:46 body: 4
...
2021/01/26 21:32:23 body: 489
2021/01/26 21:32:24 body: 490
2021/01/26 21:32:24 body: 491
2021/01/26 21:32:24 body: 492
2021/01/26 21:32:24 body: 493
2021/01/26 21:32:24 body: 494
2021/01/26 21:32:24 body: 495
2021/01/26 21:32:24 body: 496
2021/01/26 21:32:24 body: 497
2021/01/26 21:32:24 body: 498
2021/01/26 21:32:24 body: 499
2021/01/26 21:32:25 body: 500

Consumer


$ go run --race cmd/consumer/main.go

2021/01/26 21:32:49 worker 1: started
2021/01/26 21:32:49 worker 2: started
2021/01/26 21:32:49 body: 1
2021/01/26 21:32:49 body: 2
2021/01/26 21:32:49 body: 6
2021/01/26 21:32:49 body: 10
2021/01/26 21:32:49 body: 3
2021/01/26 21:32:49 body: 5
2021/01/26 21:32:49 body: 7
2021/01/26 21:32:49 body: 9
2021/01/26 21:32:49 body: 8
2021/01/26 21:32:49 body: 4
2021/01/26 21:32:49 body: 11
2021/01/26 21:32:49 body: 19
...
2021/01/26 21:33:13 body: 495
2021/01/26 21:33:13 body: 496
2021/01/26 21:33:13 body: 497
2021/01/26 21:33:13 body: 499
2021/01/26 21:33:13 body: 500
2021/01/26 21:33:13 body: 99
2021/01/26 21:33:24 body: 320