28/03/2021 - AWS, GO
In this example we are going to create a application-to-application (A2A) communication. Both applications are HTTP APIs. The client (publisher) dispatches events (image upload or download) and server (subscriber) listens on the events to handle them. We have two event types which are upload
and download
. Both of these are linked to image
topic.
We are benefiting from topic filters because:
You are going to use ARN below in the client later on.
$ aws --profile localstack --endpoint-url http://localhost:4566 sns create-topic --name image
{
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image"
}
This is what the application structure is like. Make sure the application is running because the next step will send a "Subscription Confirmation" request to it.
├── internal
│ └── image
│ ├── image.go
│ └── request.go
└── main.go
package main
import (
"log"
"net/http"
"github.com/you/server/internal/image"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)
func main() {
// Instantiate AWS session
ses, err := session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials("test", "test", ""),
Region: aws.String("eu-west-1"),
Endpoint: aws.String("http://localhost:4566"),
S3ForcePathStyle: aws.Bool(true),
},
Profile: "localstack",
},
)
if err != nil {
log.Fatalln(err)
}
// Instantiate image controller.
img := image.Image{Session: ses}
// Instantiate HTTP router.
rtr := http.NewServeMux()
rtr.HandleFunc("/api/v1/images", img.Handle)
// Instantiate HTTP server.
log.Fatalln(http.ListenAndServe(":8080", rtr))
}
package image
import (
"encoding/json"
"net/http"
)
type RequestType string
const (
SubscriptionConfirmation RequestType = "SubscriptionConfirmation"
Notification RequestType = "Notification"
)
type EventValue string
const (
Upload EventValue = "upload"
Download EventValue = "download"
)
type SubscriptionConfirmationRequest struct {
Type RequestType
MessageId string
TopicArn string
Message string
Timestamp string
SignatureVersion string
SigningCertURL string
SubscribeURL string
Token string
MessageAttributes MessageAttribute
}
type NotificationRequest struct {
Type RequestType
MessageId string
TopicArn string
Message string
Timestamp string
SignatureVersion string
SigningCertURL string
MessageAttributes MessageAttribute
}
type MessageAttribute struct {
Event Event
}
type Event struct {
Type string
Value EventValue
}
func (s *SubscriptionConfirmationRequest) Bind(r *http.Request) error {
return json.NewDecoder(r.Body).Decode(s)
}
func (n *NotificationRequest) Bind(r *http.Request) error {
return json.NewDecoder(r.Body).Decode(n)
}
package image
import (
"context"
"fmt"
"log"
"net/http"
"net/http/httputil"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
)
type Image struct {
Session *session.Session
}
// POST /api/v1/images
func (i Image) Handle(w http.ResponseWriter, r *http.Request) {
data, _ := httputil.DumpRequest(r, true)
fmt.Println(string(data))
var err error
switch r.Header.Get("X-Amz-Sns-Message-Type") {
case string(SubscriptionConfirmation):
err = i.confirm(r)
case string(Notification):
err = i.handle(r)
default:
log.Println("invalid message type")
w.WriteHeader(http.StatusInternalServerError)
return
}
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
// Subscription confirmation. Takes place only once. Once confirmed, dupplicated
// CLI subscription command executions will not trigger another request.
func (i Image) confirm(r *http.Request) error {
var req SubscriptionConfirmationRequest
if err := req.Bind(r); err != nil {
return fmt.Errorf("request binding: %w", err)
}
ctx, cancel := context.WithTimeout(r.Context(), time.Second*5)
defer cancel()
if _, err := sns.New(i.Session).ConfirmSubscriptionWithContext(ctx, &sns.ConfirmSubscriptionInput{
Token: aws.String(req.Token),
TopicArn: aws.String(req.TopicArn),
}); err != nil {
return fmt.Errorf("subscription confirmation: %w", err)
}
log.Println("confirming subscription ...")
return nil
}
// Consumes published events. Called as many times as the client publishes.
func (i Image) handle(r *http.Request) error {
var req NotificationRequest
if err := req.Bind(r); err != nil {
return fmt.Errorf("request binding: %w", err)
}
switch req.MessageAttributes.Event.Value {
case Upload:
log.Printf("uploading %s ...", req.Message)
case Download:
log.Printf("downloading %s ...", req.Message)
default:
return fmt.Errorf("unknwon event value")
}
return nil
}
The command uses "image-topic-attributes.json" file and its content is like below.
{
"FilterPolicy":"{\"Event\":[\"upload\",\"download\"]}"
}
$ aws --profile localstack --endpoint-url http://localhost:4566 sns subscribe --topic-arn arn:aws:sns:eu-west-1:000000000000:image --protocol https --notification-endpoint https://1fd011d7aa1z.ngrok.io/api/v1/images --attributes file://image-topic-attributes.json
{
"SubscriptionArn": "arn:aws:sns:eu-west-1:000000000000:images:ac2e0e62-a51a-4ee2-a859-e54018350e14"
}
As seen above, I am using Ngrok to expose my application to Internet. The command for that is ./ngrok http -host-header=rewrite localhost:8080
.
Once the server is subscribed to the topic, it should receive a request like below. The server will recognise this and confirm subscription.
POST /api/v1/images HTTP/1.1
Host: localhost:8080
Accept: */*
Accept-Encoding: gzip, deflate
Content-Length: 682
Content-Type: text/plain
User-Agent: Amazon Simple Notification Service Agent
X-Amz-Sns-Message-Type: SubscriptionConfirmation
X-Amz-Sns-Subscription-Arn: arn:aws:sns:eu-west-1:000000000000:image:5ff05082-f52f-4319-bc97-dedb6831baf0
X-Amz-Sns-Topic-Arn: arn:aws:sns:eu-west-1:000000000000:image
X-Forwarded-For: 2.28.157.27
X-Forwarded-Proto: https
X-Original-Host: 1fd011d7aa1z.ngrok.io
{
"Type": "SubscriptionConfirmation",
"MessageId": "745bc6e1-214e-4070-868d-950359c0ac27",
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image",
"Message": "You have chosen to subscribe to the topic arn:aws:sns:eu-west-1:000000000000:image.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
"Timestamp": "2021-03-28T18:01:46.542Z",
"SignatureVersion": "1",
"Signature": "EXAMPLEpH+..",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
"SubscribeURL": "http://localhost:4566/?Action=ConfirmSubscription&TopicArn=arn:aws:sns:eu-west-1:000000000000:image&Token=7194c9aa",
"Token": "7194c9aa"
}
You should also see 2021/03/28 19:01:48 confirming subscription ...
in logs.
The SNS method you will use is below.
func (s SNS) Publish(ctx context.Context, message, topicARN string, msgAttrs map[string]string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
attrs := make(map[string]*sns.MessageAttributeValue, len(msgAttrs))
for key, val := range msgAttrs {
attrs[key] = &sns.MessageAttributeValue{DataType: aws.String("string"), StringValue: aws.String(val)}
}
res, err := s.client.PublishWithContext(ctx, &sns.PublishInput{
Message: &message,
TopicArn: aws.String(topicARN),
MessageAttributes: attrs,
})
if err != nil {
return "", fmt.Errorf("publish: %w", err)
}
return *res.MessageId, nil
}
For the manual tests, use examples below.
attrs := map[string]string{"Event": "upload"}
Publish(r.Context(), "https://www......image.jpeg", "arn:aws:sns:eu-west-1:000000000000:image", attrs)
attrs := map[string]string{"Event": "download"}
Publish(r.Context(), "https://www......image.jpeg", "arn:aws:sns:eu-west-1:000000000000:image", attrs)
When you publish one of these messages, the server output will be like below.
POST /api/v1/images HTTP/1.1
Host: localhost:8080
Accept: */*
Accept-Encoding: gzip, deflate
Content-Length: 451
Content-Type: text/plain
User-Agent: Amazon Simple Notification Service Agent
X-Amz-Sns-Message-Type: Notification
X-Amz-Sns-Subscription-Arn: arn:aws:sns:eu-west-1:000000000000:image:5ff05082-f52f-4319-bc97-dedb6831baf0
X-Amz-Sns-Topic-Arn: arn:aws:sns:eu-west-1:000000000000:image
X-Forwarded-For: 2.28.157.27
X-Forwarded-Proto: https
X-Original-Host: 1fd011d7aa1z.ngrok.io
{
"Type": "Notification",
"MessageId": "23309108-d583-4145-bdc8-0ad7f710a428",
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image",
"Message": "https://www......image.jpeg",
"Timestamp": "2021-03-28T18:03:03.129Z",
"SignatureVersion": "1",
"Signature": "EXAMPLEpH+..",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
"MessageAttributes": {
"Event": {
"Type": "string",
"Value": "upload"
}
}
}
The server will also output 2021/03/28 19:06:04 uploading https://www......image.jpeg ...
log. The MessageId
above will match the one client generated when publishing the message.