Bu örnekte AWS Simple Notification Service (SNS) hizmetiyle çalışmak için Localstack ve Golang'ı kullanacağız. Yeni bir topik oluşturacağız, tüm topikleri listeleyeceğiz, bir topiğe abone olacağız, tüm topik aboneliklerini listeleyeceğiz, bir topiğe yayın yapacağız ve bir topikten çıkacağız.


Yapı


├── internal
│   ├── pkg
│   │   └── cloud
│   │   ├── aws
│   │   │   ├── aws.go
│   │   │   └── sns.go
│   │   ├── client.go
│   │   └── model.go
│   └── pubsub
│   └── pubsub.go
└── main.go

Dosyalar


main.go


package main

import (
"log"
"time"

"github.com/you/aws/internal/pkg/cloud/aws"
"github.com/you/aws/internal/pubsub"
)

func main() {
// Create a session instance.
ses, err := aws.New(aws.Config{
Address: "http://localhost:4566",
Region: "eu-west-1",
Profile: "localstack",
ID: "test",
Secret: "test",
})
if err != nil {
log.Fatalln(err)
}

// Test pubsub
pubsub.PubSub(aws.NewSNS(ses, time.Second*5))
}

client.go


package cloud

import (
"context"
)

type PubSubClient interface {
// Creates a new topic and returns its ARN.
Create(ctx context.Context, topic string) (string, error)
// Lists all topics.
ListTopics(ctx context.Context) ([]*Topic, error)
// Subscribes a user (e.g. email, phone) to a topic and returns subscription ARN.
Subscribe(ctx context.Context, endpoint, protocol, topicARN string) (string, error)
// Lists all subscriptions for a topic.
ListTopicSubscriptions(ctx context.Context, topicARN string) ([]*Subscription, error)
// Publishes a message to all subscribers of a topic and returns its message ID.
Publish(ctx context.Context, message, topicARN string) (string, error)
// Unsubscribes a topic subscription.
Unsubscribe(ctx context.Context, subscriptionARN string) error
}

model.go


package cloud

type Topic struct {
ARN string
}

type Subscription struct {
ARN string
TopicARN string
Endpoint string
Protocol string
}

aws.go


package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

type Config struct {
Address string
Region string
Profile string
ID string
Secret string
}

func New(config Config) (*session.Session, error) {
return session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials(config.ID, config.Secret, ""),
Region: aws.String(config.Region),
Endpoint: aws.String(config.Address),
S3ForcePathStyle: aws.Bool(true),
},
Profile: config.Profile,
},
)
}

sns.go


package aws

import (
"context"
"fmt"
"time"

"github.com/you/aws/internal/pkg/cloud"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
)

var _ cloud.PubSubClient = SNS{}

type SNS struct {
timeout time.Duration
client *sns.SNS
}

func NewSNS(session *session.Session, timeout time.Duration) SNS {
return SNS{
timeout: timeout,
client: sns.New(session),
}
}

func (s SNS) Create(ctx context.Context, topic string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.CreateTopicWithContext(ctx, &sns.CreateTopicInput{
Name: aws.String(topic),
})
if err != nil {
return "", fmt.Errorf("create: %w", err)
}

return *res.TopicArn, nil
}

func (s SNS) ListTopics(ctx context.Context) ([]*cloud.Topic, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.ListTopicsWithContext(ctx, nil)
if err != nil {
return nil, fmt.Errorf("list topics: %w", err)
}

topics := make([]*cloud.Topic, len(res.Topics))

for i, topic := range res.Topics {
topics[i] = &cloud.Topic{
ARN: *topic.TopicArn,
}
}

return topics, nil
}

func (s SNS) Subscribe(ctx context.Context, endpoint, protocol, topicARN string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.SubscribeWithContext(ctx, &sns.SubscribeInput{
Endpoint: aws.String(endpoint),
Protocol: aws.String(protocol),
ReturnSubscriptionArn: aws.Bool(true),
TopicArn: aws.String(topicARN),
})
if err != nil {
return "", fmt.Errorf("subscribe: %w", err)
}

return *res.SubscriptionArn, nil
}

func (s SNS) ListTopicSubscriptions(ctx context.Context, topicARN string) ([]*cloud.Subscription, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.ListSubscriptionsByTopicWithContext(ctx, &sns.ListSubscriptionsByTopicInput{
NextToken: nil,
TopicArn: aws.String(topicARN),
})
if err != nil {
return nil, fmt.Errorf("list topic subscriptions: %w", err)
}

subs := make([]*cloud.Subscription, len(res.Subscriptions))

for i, sub := range res.Subscriptions {
subs[i] = &cloud.Subscription{
ARN: *sub.SubscriptionArn,
TopicARN: *sub.TopicArn,
Endpoint: *sub.Endpoint,
Protocol: *sub.Protocol,
}
}

return subs, nil
}

func (s SNS) Publish(ctx context.Context, message, topicARN string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

res, err := s.client.PublishWithContext(ctx, &sns.PublishInput{
Message: &message,
TopicArn: aws.String(topicARN),
})
if err != nil {
return "", fmt.Errorf("publish: %w", err)
}

return *res.MessageId, nil
}

func (s SNS) Unsubscribe(ctx context.Context, subscriptionARN string) error {
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

if _, err := s.client.UnsubscribeWithContext(ctx, &sns.UnsubscribeInput{
SubscriptionArn: aws.String(subscriptionARN),
}); err != nil {
return fmt.Errorf("unsubscribe: %w", err)
}

return nil
}

pubsub.go


Bu sadece "kirli" bir kullanım örneğidir!


package pubsub

import (
"context"
"fmt"
"log"

"github.com/you/aws/internal/pkg/cloud"
)

func PubSub(client cloud.PubSubClient) {
ctx := context.Background()

tARN := create(ctx, client)
listTopics(ctx, client)
sARN := subscribe(ctx, client, tARN)
listTopicSubscriptions(ctx, client, tARN)
publish(ctx, client, tARN)
unsubscribe(ctx, client, sARN)
}

func create(ctx context.Context, client cloud.PubSubClient) string {
arn, err := client.Create(ctx, "welcome-email")
if err != nil {
log.Fatalln(err)
}
log.Println("create: topic ARN:", arn)

return arn
}

func listTopics(ctx context.Context, client cloud.PubSubClient) {
topics, err := client.ListTopics(ctx)
if err != nil {
log.Fatalln(err)
}
log.Println("list topics:")
for _, topic := range topics {
fmt.Printf("%+v\n", topic)
}
}

func subscribe(ctx context.Context, client cloud.PubSubClient, topicARN string) string {
arn, err := client.Subscribe(ctx, "email@example.com", "email", topicARN)
if err != nil {
log.Fatalln(err)
}
log.Println("subscribe: subscription ARN:", arn)

return arn
}

func listTopicSubscriptions(ctx context.Context, client cloud.PubSubClient, topicARN string) {
subs, err := client.ListTopicSubscriptions(ctx, topicARN)
if err != nil {
log.Fatalln(err)
}
log.Println("list topic subscriptions:")
for _, sub := range subs {
fmt.Printf("%+v\n", sub)
}
}

func publish(ctx context.Context, client cloud.PubSubClient, topicARN string) {
id, err := client.Publish(ctx, "hello!", topicARN)
if err != nil {
log.Fatalln(err)
}
log.Println("publish: message ID:", id)
}

func unsubscribe(ctx context.Context, client cloud.PubSubClient, subARN string) {
if err := client.Unsubscribe(ctx, subARN); err != nil {
log.Fatalln(err)
}
log.Println("unsubscribe: ok")
}

Test


$ go run --race main.go
2021/01/24 17:25:33 create: topic ARN: arn:aws:sns:eu-west-1:000000000000:welcome-email
2021/01/24 17:25:33 list topics:
&{ARN:arn:aws:sns:eu-west-1:000000000000:welcome-email}
2021/01/24 17:25:33 subscribe: subscription ARN: arn:aws:sns:eu-west-1:000000000000:welcome-email:970e29a3-39d8-4d0f-9b01-bc87dc5fac19
2021/01/24 17:25:33 list topic subscriptions:
&{ARN:arn:aws:sns:eu-west-1:000000000000:welcome-email:970e29a3-39d8-4d0f-9b01-bc87dc5fac19 TopicARN:arn:aws:sns:eu-west-1:000000000000:welcome-email Endpoint:email@example.com Protocol:email}
2021/01/24 17:25:33 publish: message ID: d1ed89ae-6eb5-4d0a-8e71-31e1ac8589c6
2021/01/24 17:25:33 unsubscribe: ok