26/01/2021 - AWS, GO
In this example we are going to use AWS SQS to produce and consume messages. The consumer worker we will be using works in either synchronous (blocking) or asynchronous (non-blocking) manner. It allows us to run one or many workers. Also each worker can run one or many consumers within as shown below in the digram.
├── cmd
│ ├── consumer
│ │ └── main.go
│ └── producer
│ └── main.go
└── internal
│ ├── consumer.go
│ └── producer.go
└── pkg
└── cloud
├── aws
│ ├── aws.go
│ └── sqs.go
├── client.go
└── model.go
package aws
import (
type Config struct {
Address string
Region string
Profile string
ID string
Secret string
func New(config Config) (*session.Session, error) {
return session.NewSessionWithOptions(
Config: aws.Config{
Credentials: credentials.NewStaticCredentials(config.ID, config.Secret, ""),
Region: aws.String(config.Region),
Endpoint: aws.String(config.Address),
S3ForcePathStyle: aws.Bool(true),
Profile: config.Profile,
package aws
import (
var _ cloud.MessageClient = SQS{}
type SQS struct {
timeout time.Duration
client *sqs.SQS
func NewSQS(session *session.Session, timeout time.Duration) SQS {
return SQS{
timeout: timeout,
client: sqs.New(session),
func (s SQS) Send(ctx context.Context, req *cloud.SendRequest) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
attrs := make(map[string]*sqs.MessageAttributeValue, len(req.Attributes))
for _, attr := range req.Attributes {
attrs[attr.Key] = &sqs.MessageAttributeValue{
StringValue: aws.String(attr.Value),
DataType: aws.String(attr.Type),
res, err := s.client.SendMessageWithContext(ctx, &sqs.SendMessageInput{
MessageAttributes: attrs,
MessageBody: aws.String(req.Body),
QueueUrl: aws.String(req.QueueURL),
if err != nil {
return "", fmt.Errorf("send: %w", err)
return *res.MessageId, nil
func (s SQS) Receive(ctx context.Context, queueURL string, maxMsg int64) ([]*sqs.Message, error) {
if maxMsg < 1 || maxMsg > 10 {
return nil, fmt.Errorf("receive argument: msgMax valid values: 1 to 10: given %d", maxMsg)
var waitTimeSeconds int64 = 10
// Must always be above `WaitTimeSeconds` otherwise `ReceiveMessageWithContext`
// trigger context timeout error.
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(waitTimeSeconds+5))
defer cancel()
res, err := s.client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: aws.Int64(maxMsg),
WaitTimeSeconds: aws.Int64(waitTimeSeconds),
MessageAttributeNames: aws.StringSlice([]string{"All"}),
if err != nil {
return nil, fmt.Errorf("receive: %w", err)
return res.Messages, nil
func (s SQS) Delete(ctx context.Context, queueURL, rcvHandle string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
if _, err := s.client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(rcvHandle),
}); err != nil {
return fmt.Errorf("delete: %w", err)
return nil
package cloud
import (
type MessageClient interface {
// Send a message to queue and returns its message ID.
Send(ctx context.Context, req *SendRequest) (string, error)
// Long polls given amount of messages from a queue.
Receive(ctx context.Context, queueURL string, maxMsg int64) ([]*sqs.Message, error)
// Deletes a message from a queue.
Delete(ctx context.Context, queueURL, rcvHandle string) error
package cloud
type SendRequest struct {
QueueURL string
Body string
Attributes []Attribute
type Attribute struct {
Key string
Value string
Type string
We are producing 500 dummy messages here.
package email
import (
type Producer struct {
client cloud.MessageClient
func NewProducer(client cloud.MessageClient) Producer {
return Producer{client: client}
func (p Producer) Produce(ctx context.Context, queueURL string) error {
for i := 1; i <= 500; i++ {
if _, err := p.client.Send(ctx, &cloud.SendRequest{
QueueURL: queueURL,
Body: fmt.Sprintf("body: %d", i),
Attributes: []cloud.Attribute{
Key: "FromEmail",
Value: "from@example.com",
Type: "String",
Key: "ToEmail",
Value: "to@example.com",
Type: "String",
Key: "Subject",
Value: "this is subject",
Type: "String",
}); err != nil {
return err
log.Printf("body: %d\n", i)
return nil
This is our producer application/microservice.
package main
import (
func main() {
// Create a cancellable context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
if err != nil {
// Set queue URL.
url := "http://localhost:4566/000000000000/welcome-email"
// Instantiate client.
client := aws.NewSQS(ses, time.Second*5)
// Instantiate email producer and start producing dummy messages.
if err := email.NewProducer(client).Produce(ctx, url); err != nil {
log.Fatalln("produce:", err)
package email
import (
type ConsumerType string
const (
// Consumers to consume messages one by one.
// A single goroutine handles all messages.
// Progression is slower and requires less system resource.
// Ideal for quiet/non-critical queues.
SyncConsumer ConsumerType = "blocking"
// Consumers to consume messages at the same time.
// Runs an individual goroutine per message.
// Progression is faster and requires more system resource.
// Ideal for busy/critical queues.
AsyncConsumer ConsumerType = "non-blocking"
type ConsumerConfig struct {
// Instructs whether to consume messages come from a worker synchronously or asynchronous.
Type ConsumerType
// Queue URL to receive messages from.
QueueURL string
// Maximum workers that will independently receive messages from a queue.
MaxWorker int
// Maximum messages that will be picked up by a worker in one-go.
MaxMsg int
type Consumer struct {
client cloud.MessageClient
config ConsumerConfig
func NewConsumer(client cloud.MessageClient, config ConsumerConfig) Consumer {
return Consumer{
client: client,
config: config,
func (c Consumer) Start(ctx context.Context) {
wg := &sync.WaitGroup{}
for i := 1; i <= c.config.MaxWorker; i++ {
go c.worker(ctx, wg, i)
func (c Consumer) worker(ctx context.Context, wg *sync.WaitGroup, id int) {
defer wg.Done()
log.Printf("worker %d: started\n", id)
for {
select {
case <-ctx.Done():
log.Printf("worker %d: stopped\n", id)
msgs, err := c.client.Receive(ctx, c.config.QueueURL, int64(c.config.MaxMsg))
if err != nil {
// Critical error!
log.Printf("worker %d: receive error: %s\n", id, err.Error())
if len(msgs) == 0 {
if c.config.Type == SyncConsumer {
c.sync(ctx, msgs)
} else {
c.async(ctx, msgs)
func (c Consumer) sync(ctx context.Context, msgs []*sqs.Message) {
for _, msg := range msgs {
c.consume(ctx, msg)
func (c Consumer) async(ctx context.Context, msgs []*sqs.Message) {
wg := &sync.WaitGroup{}
for _, msg := range msgs {
go func(msg *sqs.Message) {
defer wg.Done()
c.consume(ctx, msg)
func (c Consumer) consume(ctx context.Context, msg *sqs.Message) {
if err := c.client.Delete(ctx, c.config.QueueURL, *msg.ReceiptHandle); err != nil {
// Critical error!
log.Printf("delete error: %s\n", err.Error())
This is our consumer application/microservice.
package main
import (
func main() {
// Create a cancellable context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
if err != nil {
// Set queue URL.
url := "http://localhost:4566/000000000000/welcome-email"
// Instantiate client.
client := aws.NewSQS(ses, time.Second*5)
// Instantiate consumer and start consuming.
email.NewConsumer(client, email.ConsumerConfig{
Type: email.AsyncConsumer,
QueueURL: url,
MaxWorker: 2,
MaxMsg: 10,
$ aws --profile localstack --endpoint-url http://localhost:4566 sqs get-queue-attributes --queue-url http://localhost:4566/000000000000/welcome-email
"Attributes": {
"ApproximateNumberOfMessages": "0",
"ApproximateNumberOfMessagesDelayed": "0",
"ApproximateNumberOfMessagesNotVisible": "0",
"CreatedTimestamp": "1611682123.5068",
"DelaySeconds": "0",
"LastModifiedTimestamp": "1611682123.5068",
"MaximumMessageSize": "262144",
"MessageRetentionPeriod": "345600",
"QueueArn": "arn:aws:sqs:eu-west-1:000000000000:welcome-email",
"ReceiveMessageWaitTimeSeconds": "10",
"VisibilityTimeout": "20"
$ go run --race cmd/producer/main.go
2021/01/26 21:31:46 body: 1
2021/01/26 21:31:46 body: 2
2021/01/26 21:31:46 body: 3
2021/01/26 21:31:46 body: 4
2021/01/26 21:32:23 body: 489
2021/01/26 21:32:24 body: 490
2021/01/26 21:32:24 body: 491
2021/01/26 21:32:24 body: 492
2021/01/26 21:32:24 body: 493
2021/01/26 21:32:24 body: 494
2021/01/26 21:32:24 body: 495
2021/01/26 21:32:24 body: 496
2021/01/26 21:32:24 body: 497
2021/01/26 21:32:24 body: 498
2021/01/26 21:32:24 body: 499
2021/01/26 21:32:25 body: 500
$ go run --race cmd/consumer/main.go
2021/01/26 21:32:49 worker 1: started
2021/01/26 21:32:49 worker 2: started
2021/01/26 21:32:49 body: 1
2021/01/26 21:32:49 body: 2
2021/01/26 21:32:49 body: 6
2021/01/26 21:32:49 body: 10
2021/01/26 21:32:49 body: 3
2021/01/26 21:32:49 body: 5
2021/01/26 21:32:49 body: 7
2021/01/26 21:32:49 body: 9
2021/01/26 21:32:49 body: 8
2021/01/26 21:32:49 body: 4
2021/01/26 21:32:49 body: 11
2021/01/26 21:32:49 body: 19
2021/01/26 21:33:13 body: 495
2021/01/26 21:33:13 body: 496
2021/01/26 21:33:13 body: 497
2021/01/26 21:33:13 body: 499
2021/01/26 21:33:13 body: 500
2021/01/26 21:33:13 body: 99
2021/01/26 21:33:24 body: 320