14/06/2024 - GO, KAFKA
Kafka prodüktör önleyicileri, mesajları Kafka kümesinde yayınlanmadan önce yakalamanıza ve isteğe bağlı olarak değiştirmenize olanak tanır - ref. Bu örnekte errors-topic
adlı konu için mesaj değerinin 1
olup olmadığını kontrol edeceğiz. Eşleşirse mesaj başlığına yeni bir anahtar/değer çifti ekleyeceğiz.
package interceptor
import (
"log"
"github.com/IBM/sarama"
)
type ErrorTopic struct {
Topic string
BlockedUsers map[string]struct{}
}
func (e ErrorTopic) OnSend(msg *sarama.ProducerMessage) {
if msg.Topic != e.Topic {
return
}
body, err := msg.Value.Encode()
if err != nil {
log.Println(err)
return
}
if _, ok := e.BlockedUsers[string(body)]; !ok {
return
}
msg.Headers = append(msg.Headers, sarama.RecordHeader{
Key: []byte("blocked"),
Value: []byte("true"),
})
}
package main
import (
"log"
"net/http"
"time"
"client/interceptor"
"github.com/IBM/sarama"
)
func main() {
// Create Kafka config.
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Interceptors = []sarama.ProducerInterceptor{
interceptor.ErrorTopic{
Topic: "errors-topic",
BlockedUsers: map[string]struct{}{"1": {}},
},
}
// Create Kafka producer.
pro, err := sarama.NewSyncProducer([]string{":9093"}, cfg)
if err != nil {
log.Fatalln(err)
}
defer pro.Close()
// Create HTTP router.
rtr := http.DefaultServeMux
rtr.HandleFunc("GET /{topic}/{id}", (api{pro}).Handle)
// Start HTTP server.
http.ListenAndServe(":3001", rtr)
}
type api struct {
sarama.SyncProducer
}
func (a api) Handle(w http.ResponseWriter, r *http.Request) {
a.SendMessage(&sarama.ProducerMessage{
Topic: r.PathValue("topic"),
Value: sarama.StringEncoder(r.PathValue("id")),
Timestamp: time.Now(),
})
}
Uygulamayı başlatır ve curl http://localhost:3001/errors-topic/1
komutunu çalıştırırsanız, Kafka konusundaki mesaj başlığında {"blocked":"true"
bulunmalıdır.