28/03/2021 - AWS, GO
Bu örnekte, bir uygulamadan uygulamaya (A2A) iletişim oluşturacağız. Her iki uygulama da HTTP API'leridir. İstemci (yayıncı) etkinliği/olayları (imaj yükleme veya indirme) gönderir ve sunucu (abone) bunları işlemek için etkinlikleri dinler. upload
ve download
olmak üzere iki etkinlik türüne sahibiz. Bunların her ikisi de image
konusuna bağlıdır.
Konu filtrelerinden faydalanıyoruz çünkü:
Daha sonra istemcide aşağıdaki ARN'yi kullanacaksınız.
$ aws --profile localstack --endpoint-url http://localhost:4566 sns create-topic --name image
{
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image"
}
Uygulama yapısı böyle. Uygulamanın çalıştığından emin olun çünkü sonraki adımda ona bir "Abonelik Onayı" isteği gönderilecektir.
├── internal
│ └── image
│ ├── image.go
│ └── request.go
└── main.go
package main
import (
"log"
"net/http"
"github.com/you/server/internal/image"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)
func main() {
// Instantiate AWS session
ses, err := session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials("test", "test", ""),
Region: aws.String("eu-west-1"),
Endpoint: aws.String("http://localhost:4566"),
S3ForcePathStyle: aws.Bool(true),
},
Profile: "localstack",
},
)
if err != nil {
log.Fatalln(err)
}
// Instantiate image controller.
img := image.Image{Session: ses}
// Instantiate HTTP router.
rtr := http.NewServeMux()
rtr.HandleFunc("/api/v1/images", img.Handle)
// Instantiate HTTP server.
log.Fatalln(http.ListenAndServe(":8080", rtr))
}
package image
import (
"encoding/json"
"net/http"
)
type RequestType string
const (
SubscriptionConfirmation RequestType = "SubscriptionConfirmation"
Notification RequestType = "Notification"
)
type EventValue string
const (
Upload EventValue = "upload"
Download EventValue = "download"
)
type SubscriptionConfirmationRequest struct {
Type RequestType
MessageId string
TopicArn string
Message string
Timestamp string
SignatureVersion string
SigningCertURL string
SubscribeURL string
Token string
MessageAttributes MessageAttribute
}
type NotificationRequest struct {
Type RequestType
MessageId string
TopicArn string
Message string
Timestamp string
SignatureVersion string
SigningCertURL string
MessageAttributes MessageAttribute
}
type MessageAttribute struct {
Event Event
}
type Event struct {
Type string
Value EventValue
}
func (s *SubscriptionConfirmationRequest) Bind(r *http.Request) error {
return json.NewDecoder(r.Body).Decode(s)
}
func (n *NotificationRequest) Bind(r *http.Request) error {
return json.NewDecoder(r.Body).Decode(n)
}
package image
import (
"context"
"fmt"
"log"
"net/http"
"net/http/httputil"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
)
type Image struct {
Session *session.Session
}
// POST /api/v1/images
func (i Image) Handle(w http.ResponseWriter, r *http.Request) {
data, _ := httputil.DumpRequest(r, true)
fmt.Println(string(data))
var err error
switch r.Header.Get("X-Amz-Sns-Message-Type") {
case string(SubscriptionConfirmation):
err = i.confirm(r)
case string(Notification):
err = i.handle(r)
default:
log.Println("invalid message type")
w.WriteHeader(http.StatusInternalServerError)
return
}
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
// Subscription confirmation. Takes place only once. Once confirmed, dupplicated
// CLI subscription command executions will not trigger another request.
func (i Image) confirm(r *http.Request) error {
var req SubscriptionConfirmationRequest
if err := req.Bind(r); err != nil {
return fmt.Errorf("request binding: %w", err)
}
ctx, cancel := context.WithTimeout(r.Context(), time.Second*5)
defer cancel()
if _, err := sns.New(i.Session).ConfirmSubscriptionWithContext(ctx, &sns.ConfirmSubscriptionInput{
Token: aws.String(req.Token),
TopicArn: aws.String(req.TopicArn),
}); err != nil {
return fmt.Errorf("subscription confirmation: %w", err)
}
log.Println("confirming subscription ...")
return nil
}
// Consumes published events. Called as many times as the client publishes.
func (i Image) handle(r *http.Request) error {
var req NotificationRequest
if err := req.Bind(r); err != nil {
return fmt.Errorf("request binding: %w", err)
}
switch req.MessageAttributes.Event.Value {
case Upload:
log.Printf("uploading %s ...", req.Message)
case Download:
log.Printf("downloading %s ...", req.Message)
default:
return fmt.Errorf("unknwon event value")
}
return nil
}
Komut "image-topic-attributes.json" ismindeki dosyayı kullanıyor ve içeriği aşağıdaki gibidir.
{
"FilterPolicy":"{\"Event\":[\"upload\",\"download\"]}"
}
$ aws --profile localstack --endpoint-url http://localhost:4566 sns subscribe --topic-arn arn:aws:sns:eu-west-1:000000000000:image --protocol https --notification-endpoint https://1fd011d7aa1z.ngrok.io/api/v1/images --attributes file://image-topic-attributes.json
{
"SubscriptionArn": "arn:aws:sns:eu-west-1:000000000000:images:ac2e0e62-a51a-4ee2-a859-e54018350e14"
}
Yukarıda görüldüğü gibi, uygulamamı Internet'e açmak için Ngrok kullanıyorum. Bunun için komut şudur: ./ngrok http -host-header=rewrite localhost:8080
.
Sunucu konuya abone olduktan sonra aşağıdaki gibi bir istek alacaktır. Sunucu bunu tanıyacak ve aboneliği onaylayacaktır.
POST /api/v1/images HTTP/1.1
Host: localhost:8080
Accept: */*
Accept-Encoding: gzip, deflate
Content-Length: 682
Content-Type: text/plain
User-Agent: Amazon Simple Notification Service Agent
X-Amz-Sns-Message-Type: SubscriptionConfirmation
X-Amz-Sns-Subscription-Arn: arn:aws:sns:eu-west-1:000000000000:image:5ff05082-f52f-4319-bc97-dedb6831baf0
X-Amz-Sns-Topic-Arn: arn:aws:sns:eu-west-1:000000000000:image
X-Forwarded-For: 2.28.157.27
X-Forwarded-Proto: https
X-Original-Host: 1fd011d7aa1z.ngrok.io
{
"Type": "SubscriptionConfirmation",
"MessageId": "745bc6e1-214e-4070-868d-950359c0ac27",
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image",
"Message": "You have chosen to subscribe to the topic arn:aws:sns:eu-west-1:000000000000:image.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
"Timestamp": "2021-03-28T18:01:46.542Z",
"SignatureVersion": "1",
"Signature": "EXAMPLEpH+..",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
"SubscribeURL": "http://localhost:4566/?Action=ConfirmSubscription&TopicArn=arn:aws:sns:eu-west-1:000000000000:image&Token=7194c9aa",
"Token": "7194c9aa"
}
Loglarda ayrıca 2021/03/28 19:01:48 confirming subscription ...
girdisini göreceksiniz.
Kullanacağınız SNS yöntemi aşağıdadır.
func (s SNS) Publish(ctx context.Context, message, topicARN string, msgAttrs map[string]string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
attrs := make(map[string]*sns.MessageAttributeValue, len(msgAttrs))
for key, val := range msgAttrs {
attrs[key] = &sns.MessageAttributeValue{DataType: aws.String("string"), StringValue: aws.String(val)}
}
res, err := s.client.PublishWithContext(ctx, &sns.PublishInput{
Message: &message,
TopicArn: aws.String(topicARN),
MessageAttributes: attrs,
})
if err != nil {
return "", fmt.Errorf("publish: %w", err)
}
return *res.MessageId, nil
}
Manuel testler için aşağıdaki örnekleri kullanın.
attrs := map[string]string{"Event": "upload"}
Publish(r.Context(), "https://www......image.jpeg", "arn:aws:sns:eu-west-1:000000000000:image", attrs)
attrs := map[string]string{"Event": "download"}
Publish(r.Context(), "https://www......image.jpeg", "arn:aws:sns:eu-west-1:000000000000:image", attrs)
Bu mesajlardan birini yayınladığınızda sunucu çıktısı aşağıdaki gibi olacaktır.
POST /api/v1/images HTTP/1.1
Host: localhost:8080
Accept: */*
Accept-Encoding: gzip, deflate
Content-Length: 451
Content-Type: text/plain
User-Agent: Amazon Simple Notification Service Agent
X-Amz-Sns-Message-Type: Notification
X-Amz-Sns-Subscription-Arn: arn:aws:sns:eu-west-1:000000000000:image:5ff05082-f52f-4319-bc97-dedb6831baf0
X-Amz-Sns-Topic-Arn: arn:aws:sns:eu-west-1:000000000000:image
X-Forwarded-For: 2.28.157.27
X-Forwarded-Proto: https
X-Original-Host: 1fd011d7aa1z.ngrok.io
{
"Type": "Notification",
"MessageId": "23309108-d583-4145-bdc8-0ad7f710a428",
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image",
"Message": "https://www......image.jpeg",
"Timestamp": "2021-03-28T18:03:03.129Z",
"SignatureVersion": "1",
"Signature": "EXAMPLEpH+..",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
"MessageAttributes": {
"Event": {
"Type": "string",
"Value": "upload"
}
}
}
Loglarda ayrıca 2021/03/28 19:06:04 uploading https://www......image.jpeg ...
girdisini göreceksiniz. Yukarıdaki MessageId
bilgisi, mesajı yayınlarken oluşturulan ile aynıdır.