Kafka prodüktör önleyicileri, mesajları Kafka kümesinde yayınlanmadan önce yakalamanıza ve isteğe bağlı olarak değiştirmenize olanak tanır - ref. Bu örnekte errors-topic adlı konu için mesaj değerinin 1 olup olmadığını kontrol edeceğiz. Eşleşirse mesaj başlığına yeni bir anahtar/değer çifti ekleyeceğiz.


Örnek


package interceptor

import (
"log"

"github.com/IBM/sarama"
)

type ErrorTopic struct {
Topic string
BlockedUsers map[string]struct{}
}

func (e ErrorTopic) OnSend(msg *sarama.ProducerMessage) {
if msg.Topic != e.Topic {
return
}

body, err := msg.Value.Encode()
if err != nil {
log.Println(err)

return
}

if _, ok := e.BlockedUsers[string(body)]; !ok {
return
}

msg.Headers = append(msg.Headers, sarama.RecordHeader{
Key: []byte("blocked"),
Value: []byte("true"),
})
}

package main

import (
"log"
"net/http"
"time"

"client/interceptor"

"github.com/IBM/sarama"
)

func main() {
// Create Kafka config.
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Interceptors = []sarama.ProducerInterceptor{
interceptor.ErrorTopic{
Topic: "errors-topic",
BlockedUsers: map[string]struct{}{"1": {}},
},
}

// Create Kafka producer.
pro, err := sarama.NewSyncProducer([]string{":9093"}, cfg)
if err != nil {
log.Fatalln(err)
}
defer pro.Close()

// Create HTTP router.
rtr := http.DefaultServeMux
rtr.HandleFunc("GET /{topic}/{id}", (api{pro}).Handle)

// Start HTTP server.
http.ListenAndServe(":3001", rtr)
}

type api struct {
sarama.SyncProducer
}

func (a api) Handle(w http.ResponseWriter, r *http.Request) {
a.SendMessage(&sarama.ProducerMessage{
Topic: r.PathValue("topic"),
Value: sarama.StringEncoder(r.PathValue("id")),
Timestamp: time.Now(),
})
}

Uygulamayı başlatır ve curl http://localhost:3001/errors-topic/1 komutunu çalıştırırsanız, Kafka konusundaki mesaj başlığında {"blocked":"true" bulunmalıdır.