In this example we are going to use Localstack and Golang to work with AWS Simple Notification Service (SNS). We will create a new topic, list all topics, subscribe to a topic, list all topic subscriptions, publish to a topic and unsubscribe from a topic.


Structure


├── internal
│   ├── pkg
│   │   └── cloud
│   │   ├── aws
│   │   │   ├── aws.go
│   │   │   └── sns.go
│   │   ├── client.go
│   │   └── model.go
│   └── pubsub
│   └── pubsub.go
└── main.go

Files


main.go


package main

import (
"log"
"time"

"github.com/you/aws/internal/pkg/cloud/aws"
"github.com/you/aws/internal/pubsub"
)

func main() {
// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
})
if err != nil {
log.Fatalln(err)
}

// Test pubsub
pubsub.PubSub(aws.NewSNS(ses, time.Second*5))
}

client.go


package cloud

import (
"context"
)

type PubSubClient interface {
// Creates a new topic and returns its ARN.
Create(ctx context.Context, topic string) (string, error)
// Lists all topics.
ListTopics(ctx context.Context) ([]*Topic, error)
// Subscribes a user (e.g. email, phone) to a topic and returns subscription ARN.
Subscribe(ctx context.Context, endpoint, protocol, topicARN string) (string, error)
// Lists all subscriptions for a topic.
ListTopicSubscriptions(ctx context.Context, topicARN string) ([]*Subscription, error)
// Publishes a message to all subscribers of a topic and returns its message ID.
Publish(ctx context.Context, message, topicARN string) (string, error)
// Unsubscribes a topic subscription.
Unsubscribe(ctx context.Context, subscriptionARN string) error
}

model.go


package cloud

type Topic struct {
ARN string
}

type Subscription struct {
ARN string
TopicARN string
Endpoint string
Protocol string
}

aws.go


package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

type Config struct {
Address string
Region string
Profile string
ID string
Secret string
}

func New(config Config) (*session.Session, error) {
return session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials(config.ID, config.Secret, ""),
Region: aws.String(config.Region),
Endpoint: aws.String(config.Address),
S3ForcePathStyle: aws.Bool(true),
},
Profile: config.Profile,
},
)
}

sns.go


package aws

import (
"context"
"fmt"
"time"

"github.com/you/aws/internal/pkg/cloud"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
)

var _ cloud.PubSubClient = SNS{}

type SNS struct {
timeout time.Duration
client *sns.SNS
}

func NewSNS(session *session.Session, timeout time.Duration) SNS {
return SNS{
timeout: timeout,
client: sns.New(session),
}
}

func (s SNS) Create(ctx context.Context, topic string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.CreateTopicWithContext(ctx, &sns.CreateTopicInput{
Name: aws.String(topic),
})
if err != nil {
return "", fmt.Errorf("create: %w", err)
}

return *res.TopicArn, nil
}

func (s SNS) ListTopics(ctx context.Context) ([]*cloud.Topic, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.ListTopicsWithContext(ctx, nil)
if err != nil {
return nil, fmt.Errorf("list topics: %w", err)
}

topics := make([]*cloud.Topic, len(res.Topics))

for i, topic := range res.Topics {
topics[i] = &cloud.Topic{
ARN: *topic.TopicArn,
}
}

return topics, nil
}

func (s SNS) Subscribe(ctx context.Context, endpoint, protocol, topicARN string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.SubscribeWithContext(ctx, &sns.SubscribeInput{
Endpoint: aws.String(endpoint),
Protocol: aws.String(protocol),
ReturnSubscriptionArn: aws.Bool(true),
TopicArn: aws.String(topicARN),
})
if err != nil {
return "", fmt.Errorf("subscribe: %w", err)
}

return *res.SubscriptionArn, nil
}

func (s SNS) ListTopicSubscriptions(ctx context.Context, topicARN string) ([]*cloud.Subscription, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.ListSubscriptionsByTopicWithContext(ctx, &sns.ListSubscriptionsByTopicInput{
NextToken: nil,
TopicArn: aws.String(topicARN),
})
if err != nil {
return nil, fmt.Errorf("list topic subscriptions: %w", err)
}

subs := make([]*cloud.Subscription, len(res.Subscriptions))

for i, sub := range res.Subscriptions {
subs[i] = &cloud.Subscription{
ARN: *sub.SubscriptionArn,
TopicARN: *sub.TopicArn,
Endpoint: *sub.Endpoint,
Protocol: *sub.Protocol,
}
}

return subs, nil
}

func (s SNS) Publish(ctx context.Context, message, topicARN string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.PublishWithContext(ctx, &sns.PublishInput{
Message: &message,
TopicArn: aws.String(topicARN),
})
if err != nil {
return "", fmt.Errorf("publish: %w", err)
}

return *res.MessageId, nil
}

func (s SNS) Unsubscribe(ctx context.Context, subscriptionARN string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

if _, err := s.client.UnsubscribeWithContext(ctx, &sns.UnsubscribeInput{
SubscriptionArn: aws.String(subscriptionARN),
}); err != nil {
return fmt.Errorf("unsubscribe: %w", err)
}

return nil
}

pubsub.go


This is just a "dirty" example of usages!


package pubsub

import (
"context"
"fmt"
"log"

"github.com/you/aws/internal/pkg/cloud"
)

func PubSub(client cloud.PubSubClient) {
ctx := context.Background()

tARN := create(ctx, client)
listTopics(ctx, client)
sARN := subscribe(ctx, client, tARN)
listTopicSubscriptions(ctx, client, tARN)
publish(ctx, client, tARN)
unsubscribe(ctx, client, sARN)
}

func create(ctx context.Context, client cloud.PubSubClient) string {
arn, err := client.Create(ctx, "welcome-email")
if err != nil {
log.Fatalln(err)
}
log.Println("create: topic ARN:", arn)

return arn
}

func listTopics(ctx context.Context, client cloud.PubSubClient) {
topics, err := client.ListTopics(ctx)
if err != nil {
log.Fatalln(err)
}
log.Println("list topics:")
for _, topic := range topics {
fmt.Printf("%+v\n", topic)
}
}

func subscribe(ctx context.Context, client cloud.PubSubClient, topicARN string) string {
arn, err := client.Subscribe(ctx, "email@example.com", "email", topicARN)
if err != nil {
log.Fatalln(err)
}
log.Println("subscribe: subscription ARN:", arn)

return arn
}

func listTopicSubscriptions(ctx context.Context, client cloud.PubSubClient, topicARN string) {
subs, err := client.ListTopicSubscriptions(ctx, topicARN)
if err != nil {
log.Fatalln(err)
}
log.Println("list topic subscriptions:")
for _, sub := range subs {
fmt.Printf("%+v\n", sub)
}
}

func publish(ctx context.Context, client cloud.PubSubClient, topicARN string) {
id, err := client.Publish(ctx, "hello!", topicARN)
if err != nil {
log.Fatalln(err)
}
log.Println("publish: message ID:", id)
}

func unsubscribe(ctx context.Context, client cloud.PubSubClient, subARN string) {
if err := client.Unsubscribe(ctx, subARN); err != nil {
log.Fatalln(err)
}
log.Println("unsubscribe: ok")
}

Test


$ go run --race main.go
2021/01/24 17:25:33 create: topic ARN: arn:aws:sns:eu-west-1:000000000000:welcome-email
2021/01/24 17:25:33 list topics:
&{ARN:arn:aws:sns:eu-west-1:000000000000:welcome-email}
2021/01/24 17:25:33 subscribe: subscription ARN: arn:aws:sns:eu-west-1:000000000000:welcome-email:970e29a3-39d8-4d0f-9b01-bc87dc5fac19
2021/01/24 17:25:33 list topic subscriptions:
&{ARN:arn:aws:sns:eu-west-1:000000000000:welcome-email:970e29a3-39d8-4d0f-9b01-bc87dc5fac19 TopicARN:arn:aws:sns:eu-west-1:000000000000:welcome-email Endpoint:email@example.com Protocol:email}
2021/01/24 17:25:33 publish: message ID: d1ed89ae-6eb5-4d0a-8e71-31e1ac8589c6
2021/01/24 17:25:33 unsubscribe: ok