Herkese merhaba!

Uzun yıllardır bol miktarda kişisel zaman ve enerji harcayarak bilgimizi hepinizle paylaşıyoruz. Ancak şu andan itibaren bu blogu çalışır durumda tutabilmek için yardımınıza ihtiyacımız var. Yapmanız gereken tek şey, sitedeki reklamlardan birine tıklamak olacaktır, aksi takdirde hosting vb. masraflar nedeniyle maalesef yayından kaldırılacaktır. Teşekkürler.

Bu örnekte, bir uygulamadan uygulamaya (A2A) iletişim oluşturacağız. Her iki uygulama da HTTP API'leridir. İstemci (yayıncı) etkinliği/olayları (imaj yükleme veya indirme) gönderir ve sunucu (abone) bunları işlemek için etkinlikleri dinler. upload ve download olmak üzere iki etkinlik türüne sahibiz. Bunların her ikisi de image konusuna bağlıdır.


Konu filtrelerinden faydalanıyoruz çünkü:



Adımlar


  1. Konu (topic) yaratma.

  2. Sunucuyu çalıştma.

  3. Sunucuyu konuya abone etme.

  4. Client etkinlikleri üretir.

Konu (topic) yaratma


Daha sonra istemcide aşağıdaki ARN'yi kullanacaksınız.


$ aws --profile localstack --endpoint-url http://localhost:4566 sns create-topic --name image
{
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image"
}

Sunucuyu çalıştma


Uygulama yapısı böyle. Uygulamanın çalıştığından emin olun çünkü sonraki adımda ona bir "Abonelik Onayı" isteği gönderilecektir.


├── internal
│   └── image
│   ├── image.go
│   └── request.go
└── main.go

main.go

package main

import (
"log"
"net/http"

"github.com/you/server/internal/image"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

func main() {
// Instantiate AWS session
ses, err := session.NewSessionWithOptions(
session.Options{
Config: aws.Config{
Credentials: credentials.NewStaticCredentials("test", "test", ""),
Region: aws.String("eu-west-1"),
Endpoint: aws.String("http://localhost:4566"),
S3ForcePathStyle: aws.Bool(true),
},
Profile: "localstack",
},
)
if err != nil {
log.Fatalln(err)
}

// Instantiate image controller.
img := image.Image{Session: ses}

// Instantiate HTTP router.
rtr := http.NewServeMux()
rtr.HandleFunc("/api/v1/images", img.Handle)

// Instantiate HTTP server.
log.Fatalln(http.ListenAndServe(":8080", rtr))
}

request.go

package image

import (
"encoding/json"
"net/http"
)

type RequestType string

const (
SubscriptionConfirmation RequestType = "SubscriptionConfirmation"
Notification RequestType = "Notification"
)

type EventValue string

const (
Upload EventValue = "upload"
Download EventValue = "download"
)

type SubscriptionConfirmationRequest struct {
Type RequestType
MessageId string
TopicArn string
Message string
Timestamp string
SignatureVersion string
SigningCertURL string
SubscribeURL string
Token string
MessageAttributes MessageAttribute
}

type NotificationRequest struct {
Type RequestType
MessageId string
TopicArn string
Message string
Timestamp string
SignatureVersion string
SigningCertURL string
MessageAttributes MessageAttribute
}

type MessageAttribute struct {
Event Event
}

type Event struct {
Type string
Value EventValue
}

func (s *SubscriptionConfirmationRequest) Bind(r *http.Request) error {
return json.NewDecoder(r.Body).Decode(s)
}

func (n *NotificationRequest) Bind(r *http.Request) error {
return json.NewDecoder(r.Body).Decode(n)
}

image.go

package image

import (
"context"
"fmt"
"log"
"net/http"
"net/http/httputil"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
)

type Image struct {
Session *session.Session
}

// POST /api/v1/images
func (i Image) Handle(w http.ResponseWriter, r *http.Request) {
data, _ := httputil.DumpRequest(r, true)
fmt.Println(string(data))

var err error
switch r.Header.Get("X-Amz-Sns-Message-Type") {
case string(SubscriptionConfirmation):
err = i.confirm(r)
case string(Notification):
err = i.handle(r)
default:
log.Println("invalid message type")
w.WriteHeader(http.StatusInternalServerError)
return
}

if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

// Subscription confirmation. Takes place only once. Once confirmed, dupplicated
// CLI subscription command executions will not trigger another request.
func (i Image) confirm(r *http.Request) error {
var req SubscriptionConfirmationRequest
if err := req.Bind(r); err != nil {
return fmt.Errorf("request binding: %w", err)
}

ctx, cancel := context.WithTimeout(r.Context(), time.Second*5)
defer cancel()

if _, err := sns.New(i.Session).ConfirmSubscriptionWithContext(ctx, &sns.ConfirmSubscriptionInput{
Token: aws.String(req.Token),
TopicArn: aws.String(req.TopicArn),
}); err != nil {
return fmt.Errorf("subscription confirmation: %w", err)
}

log.Println("confirming subscription ...")

return nil
}

// Consumes published events. Called as many times as the client publishes.
func (i Image) handle(r *http.Request) error {
var req NotificationRequest
if err := req.Bind(r); err != nil {
return fmt.Errorf("request binding: %w", err)
}

switch req.MessageAttributes.Event.Value {
case Upload:
log.Printf("uploading %s ...", req.Message)
case Download:
log.Printf("downloading %s ...", req.Message)
default:
return fmt.Errorf("unknwon event value")
}

return nil
}

Sunucuyu konuya abone etme


Komut "image-topic-attributes.json" ismindeki dosyayı kullanıyor ve içeriği aşağıdaki gibidir.


{
"FilterPolicy":"{\"Event\":[\"upload\",\"download\"]}"
}

$ aws --profile localstack --endpoint-url http://localhost:4566 sns subscribe --topic-arn arn:aws:sns:eu-west-1:000000000000:image --protocol https --notification-endpoint https://1fd011d7aa1z.ngrok.io/api/v1/images --attributes file://image-topic-attributes.json
{
"SubscriptionArn": "arn:aws:sns:eu-west-1:000000000000:images:ac2e0e62-a51a-4ee2-a859-e54018350e14"
}

Yukarıda görüldüğü gibi, uygulamamı Internet'e açmak için Ngrok kullanıyorum. Bunun için komut şudur: ./ngrok http -host-header=rewrite localhost:8080.


Sunucu konuya abone olduktan sonra aşağıdaki gibi bir istek alacaktır. Sunucu bunu tanıyacak ve aboneliği onaylayacaktır.


POST /api/v1/images HTTP/1.1
Host: localhost:8080
Accept: */*
Accept-Encoding: gzip, deflate
Content-Length: 682
Content-Type: text/plain
User-Agent: Amazon Simple Notification Service Agent
X-Amz-Sns-Message-Type: SubscriptionConfirmation
X-Amz-Sns-Subscription-Arn: arn:aws:sns:eu-west-1:000000000000:image:5ff05082-f52f-4319-bc97-dedb6831baf0
X-Amz-Sns-Topic-Arn: arn:aws:sns:eu-west-1:000000000000:image
X-Forwarded-For: 2.28.157.27
X-Forwarded-Proto: https
X-Original-Host: 1fd011d7aa1z.ngrok.io

{
"Type": "SubscriptionConfirmation",
"MessageId": "745bc6e1-214e-4070-868d-950359c0ac27",
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image",
"Message": "You have chosen to subscribe to the topic arn:aws:sns:eu-west-1:000000000000:image.\nTo confirm the subscription, visit the SubscribeURL included in this message.",
"Timestamp": "2021-03-28T18:01:46.542Z",
"SignatureVersion": "1",
"Signature": "EXAMPLEpH+..",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
"SubscribeURL": "http://localhost:4566/?Action=ConfirmSubscription&TopicArn=arn:aws:sns:eu-west-1:000000000000:image&Token=7194c9aa",
"Token": "7194c9aa"
}

Loglarda ayrıca 2021/03/28 19:01:48 confirming subscription ... girdisini göreceksiniz.


Client etkinlikleri üretir


Kullanacağınız SNS yöntemi aşağıdadır.


func (s SNS) Publish(ctx context.Context, message, topicARN string, msgAttrs map[string]string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

attrs := make(map[string]*sns.MessageAttributeValue, len(msgAttrs))
for key, val := range msgAttrs {
attrs[key] = &sns.MessageAttributeValue{DataType: aws.String("string"), StringValue: aws.String(val)}
}

res, err := s.client.PublishWithContext(ctx, &sns.PublishInput{
Message: &message,
TopicArn: aws.String(topicARN),
MessageAttributes: attrs,
})
if err != nil {
return "", fmt.Errorf("publish: %w", err)
}

return *res.MessageId, nil
}

Manuel testler için aşağıdaki örnekleri kullanın.


attrs := map[string]string{"Event": "upload"}

Publish(r.Context(), "https://www......image.jpeg", "arn:aws:sns:eu-west-1:000000000000:image", attrs)

attrs := map[string]string{"Event": "download"}

Publish(r.Context(), "https://www......image.jpeg", "arn:aws:sns:eu-west-1:000000000000:image", attrs)

Bu mesajlardan birini yayınladığınızda sunucu çıktısı aşağıdaki gibi olacaktır.


POST /api/v1/images HTTP/1.1
Host: localhost:8080
Accept: */*
Accept-Encoding: gzip, deflate
Content-Length: 451
Content-Type: text/plain
User-Agent: Amazon Simple Notification Service Agent
X-Amz-Sns-Message-Type: Notification
X-Amz-Sns-Subscription-Arn: arn:aws:sns:eu-west-1:000000000000:image:5ff05082-f52f-4319-bc97-dedb6831baf0
X-Amz-Sns-Topic-Arn: arn:aws:sns:eu-west-1:000000000000:image
X-Forwarded-For: 2.28.157.27
X-Forwarded-Proto: https
X-Original-Host: 1fd011d7aa1z.ngrok.io

{
"Type": "Notification",
"MessageId": "23309108-d583-4145-bdc8-0ad7f710a428",
"TopicArn": "arn:aws:sns:eu-west-1:000000000000:image",
"Message": "https://www......image.jpeg",
"Timestamp": "2021-03-28T18:03:03.129Z",
"SignatureVersion": "1",
"Signature": "EXAMPLEpH+..",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
"MessageAttributes": {
"Event": {
"Type": "string",
"Value": "upload"
}
}
}

Loglarda ayrıca 2021/03/28 19:06:04 uploading https://www......image.jpeg ... girdisini göreceksiniz. Yukarıdaki MessageId bilgisi, mesajı yayınlarken oluşturulan ile aynıdır.