20/05/2024 - GO, KAFKA
Using this example will spin up consumers for each partitions for the given topic. Just bare in mind, if the consumers restart, same message will be consumed again. To avoid this issue, you can use consumer groups instead.
type Kafka struct {
config *sarama.Config
brokers []string
}
func (k Kafka) CreateConsumer(topic string) error {
cons, err := sarama.NewConsumer(k.brokers, k.config)
if err != nil {
return err
}
defer cons.Close()
parts, err := cons.Partitions(topic)
if err != nil {
return err
}
for _, part := range parts {
pcon, err := cons.ConsumePartition(topic, part, sarama.OffsetOldest)
if err != nil {
return err
}
defer pcon.Close()
go func(part int32) {
for msg := range pcon.Messages() {
fmt.Printf("PARTITION_CONSUMER:%d - PARTITION:%d - OFFSET:%d\n", part, msg.Partition, msg.Offset)
}
}(part)
}
return nil
}