14/06/2024 - GO, KAFKA
Kafka producer interceptors allows you to intercept and optionally modify messages before being published to the Kafka cluster - ref. In this example we are going to check if the message value equals to 1
for the topic called errors-topic
. If it matches, we will add a new key-value pair to the message header.
package interceptor
import (
"log"
"github.com/IBM/sarama"
)
type ErrorTopic struct {
Topic string
BlockedUsers map[string]struct{}
}
func (e ErrorTopic) OnSend(msg *sarama.ProducerMessage) {
if msg.Topic != e.Topic {
return
}
body, err := msg.Value.Encode()
if err != nil {
log.Println(err)
return
}
if _, ok := e.BlockedUsers[string(body)]; !ok {
return
}
msg.Headers = append(msg.Headers, sarama.RecordHeader{
Key: []byte("blocked"),
Value: []byte("true"),
})
}
package main
import (
"log"
"net/http"
"time"
"client/interceptor"
"github.com/IBM/sarama"
)
func main() {
// Create Kafka config.
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Interceptors = []sarama.ProducerInterceptor{
interceptor.ErrorTopic{
Topic: "errors-topic",
BlockedUsers: map[string]struct{}{"1": {}},
},
}
// Create Kafka producer.
pro, err := sarama.NewSyncProducer([]string{":9093"}, cfg)
if err != nil {
log.Fatalln(err)
}
defer pro.Close()
// Create HTTP router.
rtr := http.DefaultServeMux
rtr.HandleFunc("GET /{topic}/{id}", (api{pro}).Handle)
// Start HTTP server.
http.ListenAndServe(":3001", rtr)
}
type api struct {
sarama.SyncProducer
}
func (a api) Handle(w http.ResponseWriter, r *http.Request) {
a.SendMessage(&sarama.ProducerMessage{
Topic: r.PathValue("topic"),
Value: sarama.StringEncoder(r.PathValue("id")),
Timestamp: time.Now(),
})
}
If you start application and run curl http://localhost:3001/errors-topic/1
command, message header should contain {"blocked":"true"}
in Kafka topic.