22/05/2024 - GO, KAFKA
In this example we are going to produce messages in batches to reduce network roundtrips. The only thing to note is that, this implementation it is not using transactions so it is not atomic. If one or many messages fail in a batch, you won't know which message(s) has/have failed. As you can guess messages can succeed and fail individually. If some succeed and some fail SendMessages
will return an error. If message delivery reliability is not your concern, you may wish to use this function. You will have to control duplicate messages in your consumer.
If you want atomic batch operation, you might look into transactions instead. However, if you use Sarama client, you will highly likely come face to face with two common problems. I personally have experienced each a few times already but didn't really dig deep into to look for a solution. They are mentioned in the GitHub issues too. First ones is, both NewSyncProducer/NewAsyncProducer
operate in async fashion. What it means is that, client will never know the final status of the transaction because of some delays and external factors. Hence there will be times you will see kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
error when you attempt to use the producer again quickly. Let's say you create a new producer per transaction instead which is not a great idea anyway. This will, again time to time, produce data race errors that point to transaction_manager.go
, async_producer.go
etc. files. And sometimes, you may see ERROR [ReplicaManager broker=1] Error processing append operation on partition my-topic-0 (kafka.server.ReplicaManager)
and org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 247 at offset 6263 in partition my-topic-0: 12 (incoming seq. number), 9 (current end sequence number)
errors. So what I can suggest you is that, if you will use transactions, make sure you test your code really hard. For instance, create a HTTP api and send thousands of requests concurrently and see what happens.
Here we send total of 10001 messages and each batch contains 10 messages. This gives us total of 1001 batch requests.
package main
import (
"log"
"strconv"
"time"
"github.com/IBM/sarama"
)
func main() {
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-client"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Retry.Max = 30
cfg.Producer.Retry.Backoff = time.Millisecond * 10
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Idempotent = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Net.MaxOpenRequests = 1
pro, err := sarama.NewSyncProducer([]string{":9093"}, cfg)
if err != nil {
log.Fatalln(err)
}
defer pro.Close()
var (
batch int
msgs = make([]*sarama.ProducerMessage, 0, 10)
)
for i := 1; i <= 10001; i++ {
msgs = append(msgs, &sarama.ProducerMessage{
Topic: "warnings-topic",
Value: sarama.StringEncoder(strconv.Itoa(i)),
})
if len(msgs) == cap(msgs) {
batch++
if err := pro.SendMessages(msgs); err != nil {
log.Println(err)
}
msgs = msgs[:0]
}
}
if len(msgs) != 0 {
batch++
if err := pro.SendMessages(msgs); err != nil {
log.Println(err)
}
}
log.Println(batch)
}