Kafka producer interceptors allows you to intercept and optionally modify messages before being published to the Kafka cluster - ref. In this example we are going to check if the message value equals to 1 for the topic called errors-topic. If it matches, we will add a new key-value pair to the message header.


Example


package interceptor

import (
"log"

"github.com/IBM/sarama"
)

type ErrorTopic struct {
Topic string
BlockedUsers map[string]struct{}
}

func (e ErrorTopic) OnSend(msg *sarama.ProducerMessage) {
if msg.Topic != e.Topic {
return
}

body, err := msg.Value.Encode()
if err != nil {
log.Println(err)

return
}

if _, ok := e.BlockedUsers[string(body)]; !ok {
return
}

msg.Headers = append(msg.Headers, sarama.RecordHeader{
Key: []byte("blocked"),
Value: []byte("true"),
})
}

package main

import (
"log"
"net/http"
"time"

"client/interceptor"

"github.com/IBM/sarama"
)

func main() {
// Create Kafka config.
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Interceptors = []sarama.ProducerInterceptor{
interceptor.ErrorTopic{
Topic: "errors-topic",
BlockedUsers: map[string]struct{}{"1": {}},
},
}

// Create Kafka producer.
pro, err := sarama.NewSyncProducer([]string{":9093"}, cfg)
if err != nil {
log.Fatalln(err)
}
defer pro.Close()

// Create HTTP router.
rtr := http.DefaultServeMux
rtr.HandleFunc("GET /{topic}/{id}", (api{pro}).Handle)

// Start HTTP server.
http.ListenAndServe(":3001", rtr)
}

type api struct {
sarama.SyncProducer
}

func (a api) Handle(w http.ResponseWriter, r *http.Request) {
a.SendMessage(&sarama.ProducerMessage{
Topic: r.PathValue("topic"),
Value: sarama.StringEncoder(r.PathValue("id")),
Timestamp: time.Now(),
})
}

If you start application and run curl http://localhost:3001/errors-topic/1 command, message header should contain {"blocked":"true"} in Kafka topic.