Hello everyone!

We have been investing plenty of personal time and energy for many years to share our knowledge with you all. However, we now need your help to keep this blog running. All you have to do is just click one of the adverts on the site, otherwise it will sadly be taken down due to hosting etc. costs. Thank you.

In this example we are going to create a application-to-application (A2A) communication. Both applications are HTTP APIs. The client (publisher) dispatches events (image upload or download) and server (subscriber) listens on the events to handle them. We have two event types which are upload and download. Both of these are linked to image topic.


We are benefiting from topic filters because:



Steps


  1. Create a topic.

  2. Run server.

  3. Subscribe server to the topic.

  4. Client produces events.

Create a topic


You are going to use ARN below in the client later on.


$ aws --profile localstack --endpoint-url http://localhost:4566 sns create-topic --name image
{
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image"
}

Run server


This is what the application structure is like. Make sure the application is running because the next step will send a "Subscription Confirmation" request to it.


├── internal
│   └── image
│   ├── image.go
│   └── request.go
└── main.go

main.go

package main

import (
"log"
"net/http"

"github.com/you/server/internal/image"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

func main() {
// Instantiate AWS session
ses, err := session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials("test", "test", ""),
Region: aws.String("eu-west-1"),
Endpoint: aws.String("http://localhost:4566"),
S3ForcePathStyle: aws.Bool(true),
},
Profile: "localstack",
},
)
if err != nil {
log.Fatalln(err)
}

// Instantiate image controller.
img := image.Image{Session: ses}

// Instantiate HTTP router.
rtr := http.NewServeMux()
rtr.HandleFunc("/api/v1/images", img.Handle)

// Instantiate HTTP server.
log.Fatalln(http.ListenAndServe(":8080", rtr))
}

request.go

package image

import (
"encoding/json"
"net/http"
)

type RequestType string

const (
SubscriptionConfirmation RequestType = "SubscriptionConfirmation"
Notification RequestType = "Notification"
)

type EventValue string

const (
Upload EventValue = "upload"
Download EventValue = "download"
)

type SubscriptionConfirmationRequest struct {
Type RequestType
MessageId string
TopicArn string
Message string
Timestamp string
SignatureVersion string
SigningCertURL string
SubscribeURL string
Token string
MessageAttributes MessageAttribute
}

type NotificationRequest struct {
Type RequestType
MessageId string
TopicArn string
Message string
Timestamp string
SignatureVersion string
SigningCertURL string
MessageAttributes MessageAttribute
}

type MessageAttribute struct {
Event Event
}

type Event struct {
Type string
Value EventValue
}

func (s *SubscriptionConfirmationRequest) Bind(r *http.Request) error {
return json.NewDecoder(r.Body).Decode(s)
}

func (n *NotificationRequest) Bind(r *http.Request) error {
return json.NewDecoder(r.Body).Decode(n)
}

image.go

package image

import (
"context"
"fmt"
"log"
"net/http"
"net/http/httputil"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
)

type Image struct {
Session *session.Session
}

// POST /api/v1/images
func (i Image) Handle(w http.ResponseWriter, r *http.Request) {
data, _ := httputil.DumpRequest(r, true)
fmt.Println(string(data))

var err error
switch r.Header.Get("X-Amz-Sns-Message-Type") {
case string(SubscriptionConfirmation):
err = i.confirm(r)
case string(Notification):
err = i.handle(r)
default:
log.Println("invalid message type")
w.WriteHeader(http.StatusInternalServerError)
return
}

if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

// Subscription confirmation. Takes place only once. Once confirmed, dupplicated
// CLI subscription command executions will not trigger another request.
func (i Image) confirm(r *http.Request) error {
var req SubscriptionConfirmationRequest
if err := req.Bind(r); err != nil {
return fmt.Errorf("request binding: %w", err)
}

ctx, cancel := context.WithTimeout(r.Context(), time.Second*5)
defer cancel()

if _, err := sns.New(i.Session).ConfirmSubscriptionWithContext(ctx, &sns.ConfirmSubscriptionInput{
Token: aws.String(req.Token),
TopicArn: aws.String(req.TopicArn),
}); err != nil {
return fmt.Errorf("subscription confirmation: %w", err)
}

log.Println("confirming subscription ...")

return nil
}

// Consumes published events. Called as many times as the client publishes.
func (i Image) handle(r *http.Request) error {
var req NotificationRequest
if err := req.Bind(r); err != nil {
return fmt.Errorf("request binding: %w", err)
}

switch req.MessageAttributes.Event.Value {
case Upload:
log.Printf("uploading %s ...", req.Message)
case Download:
log.Printf("downloading %s ...", req.Message)
default:
return fmt.Errorf("unknwon event value")
}

return nil
}

Subscribe server to the topic


The command uses "image-topic-attributes.json" file and its content is like below.


{
"FilterPolicy":"{\"Event\":[\"upload\",\"download\"]}"
}

$ aws --profile localstack --endpoint-url http://localhost:4566 sns subscribe --topic-arn arn:aws:sns:eu-west-1:000000000000:image --protocol https --notification-endpoint https://1fd011d7aa1z.ngrok.io/api/v1/images --attributes file://image-topic-attributes.json
{
"SubscriptionArn": "arn:aws:sns:eu-west-1:000000000000:images:ac2e0e62-a51a-4ee2-a859-e54018350e14"
}

As seen above, I am using Ngrok to expose my application to Internet. The command for that is ./ngrok http -host-header=rewrite localhost:8080.


Once the server is subscribed to the topic, it should receive a request like below. The server will recognise this and confirm subscription.


POST /api/v1/images HTTP/1.1
Host: localhost:8080
Accept: */*
Accept-Encoding: gzip, deflate
Content-Length: 682
Content-Type: text/plain
User-Agent: Amazon Simple Notification Service Agent
X-Amz-Sns-Message-Type: SubscriptionConfirmation
X-Amz-Sns-Subscription-Arn: arn:aws:sns:eu-west-1:000000000000:image:5ff05082-f52f-4319-bc97-dedb6831baf0
X-Amz-Sns-Topic-Arn: arn:aws:sns:eu-west-1:000000000000:image
X-Forwarded-For: 2.28.157.27
X-Forwarded-Proto: https
X-Original-Host: 1fd011d7aa1z.ngrok.io

{
"Type": "SubscriptionConfirmation",
"MessageId": "745bc6e1-214e-4070-868d-950359c0ac27",
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image",
"Message": "You have chosen to subscribe to the topic arn:aws:sns:eu-west-1:000000000000:image.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
"Timestamp": "2021-03-28T18:01:46.542Z",
"SignatureVersion": "1",
"Signature": "EXAMPLEpH+..",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
"SubscribeURL": "http://localhost:4566/?Action=ConfirmSubscription&TopicArn=arn:aws:sns:eu-west-1:000000000000:image&Token=7194c9aa",
"Token": "7194c9aa"
}

You should also see 2021/03/28 19:01:48 confirming subscription ... in logs.


Client produces events


The SNS method you will use is below.


func (s SNS) Publish(ctx context.Context, message, topicARN string, msgAttrs map[string]string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

attrs := make(map[string]*sns.MessageAttributeValue, len(msgAttrs))
for key, val := range msgAttrs {
attrs[key] = &sns.MessageAttributeValue{DataType: aws.String("string"), StringValue: aws.String(val)}
}

res, err := s.client.PublishWithContext(ctx, &sns.PublishInput{
Message: &message,
TopicArn: aws.String(topicARN),
MessageAttributes: attrs,
})
if err != nil {
return "", fmt.Errorf("publish: %w", err)
}

return *res.MessageId, nil
}

For the manual tests, use examples below.


attrs := map[string]string{"Event": "upload"}

Publish(r.Context(), "https://www......image.jpeg", "arn:aws:sns:eu-west-1:000000000000:image", attrs)

attrs := map[string]string{"Event": "download"}

Publish(r.Context(), "https://www......image.jpeg", "arn:aws:sns:eu-west-1:000000000000:image", attrs)

When you publish one of these messages, the server output will be like below.


POST /api/v1/images HTTP/1.1
Host: localhost:8080
Accept: */*
Accept-Encoding: gzip, deflate
Content-Length: 451
Content-Type: text/plain
User-Agent: Amazon Simple Notification Service Agent
X-Amz-Sns-Message-Type: Notification
X-Amz-Sns-Subscription-Arn: arn:aws:sns:eu-west-1:000000000000:image:5ff05082-f52f-4319-bc97-dedb6831baf0
X-Amz-Sns-Topic-Arn: arn:aws:sns:eu-west-1:000000000000:image
X-Forwarded-For: 2.28.157.27
X-Forwarded-Proto: https
X-Original-Host: 1fd011d7aa1z.ngrok.io

{
"Type": "Notification",
"MessageId": "23309108-d583-4145-bdc8-0ad7f710a428",
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image",
"Message": "https://www......image.jpeg",
"Timestamp": "2021-03-28T18:03:03.129Z",
"SignatureVersion": "1",
"Signature": "EXAMPLEpH+..",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
"MessageAttributes": {
"Event": {
"Type": "string",
"Value": "upload"
}
}
}

The server will also output 2021/03/28 19:06:04 uploading https://www......image.jpeg ... log. The MessageId above will match the one client generated when publishing the message.