20/05/2024 - GO, KAFKA
Bu örneği kullanmak, verilen konuya ilişkin her bölüm (topic) için tüketicileri hızlandıracaktır. Tüketiciler yeniden başlatılırsa aynı mesajın tekrar tüketileceğini unutmayın. Bu sorunu önlemek için bunun yerine tüketici gruplarını kullanabilirsiniz.
type Kafka struct {
config *sarama.Config
brokers []string
}
func (k Kafka) CreateConsumer(topic string) error {
cons, err := sarama.NewConsumer(k.brokers, k.config)
if err != nil {
return err
}
defer cons.Close()
parts, err := cons.Partitions(topic)
if err != nil {
return err
}
for _, part := range parts {
pcon, err := cons.ConsumePartition(topic, part, sarama.OffsetOldest)
if err != nil {
return err
}
defer pcon.Close()
go func(part int32) {
for msg := range pcon.Messages() {
fmt.Printf("PARTITION_CONSUMER:%d - PARTITION:%d - OFFSET:%d\n", part, msg.Partition, msg.Offset)
}
}(part)
}
return nil
}