Kafka consumer interceptors allows you to intercept and optionally modify messages before being consumed by Kafka consumers - ref. In this example we are going to check if the message has certain headers and matches criteria, we will prevent this message from being consumed by consumer.


Example


package interceptor

import (
"github.com/IBM/sarama"
)

type ErrorTopic struct {
Topic string
}

func (e ErrorTopic) OnConsume(msg *sarama.ConsumerMessage) {
if msg.Topic != e.Topic {
return
}

for _, header := range msg.Headers {
if string(header.Key) == "blocked" && string(header.Value) == "true" {
return
}
}
}

package main

import (
"log"

"server/interceptor"

"github.com/IBM/sarama"
)

func main() {
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Consumer.Return.Errors = true
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
cfg.Consumer.Interceptors = []sarama.ConsumerInterceptor{
interceptor.ErrorTopic{
Topic: "errors-topic",
},
}

cons, err := sarama.NewConsumer([]string{":9093"}, cfg)
if err != nil {
log.Fatalln(err)
}
defer cons.Close()

parts, err := cons.Partitions("errors-topic")
if err != nil {
log.Fatalln(err)
}

for _, part := range parts {
pcon, err := cons.ConsumePartition("errors-topic", part, sarama.OffsetOldest)
if err != nil {
log.Fatalln(err)
}
defer pcon.Close()

go func(part int32) {
for msg := range pcon.Messages() {
for _, header := range msg.Headers {
log.Println("consumer:", string(header.Key), string(header.Value))
}
}
}(part)
}

select {}
}