21/05/2024 - GO, KAFKA
This is a simple sync and async Kafka producer example in Golang. It uses Sarama client.
In case of asynchronous (non-blocking) producers, the Kafka client pushes messages into the application (producer) buffer and returns immediately. This increases throughput but at the expense of less message delivery reliability. It is like fire-and-forget. Acknowledgment from the broker is not waited because it is not part of this flow. If you really need some sort of response, you will need to use a callback function (channel based) to see if there is anything. Given all messages are placed in the application buffer, application must be shut down gracefully otherwise all the messages present in the buffer will be lost. Sync flow on the other hand is opposite of nearly everything we said for the async flow hence complete network round-trip will take place per message which would lead to a slowness.
Note: At the time of writing I personally haven's seen performance difference between these two flows but in theory async is meant to be faster. I also haven't seen much difference between CPU and RAM usage either. I cannot speak for the Sarama client library so you might as well play with configurations and try yourself.
Sorry about the duplications!
package main
import (
"log"
"net/http"
"client/api"
"client/producer"
)
func main() {
// TODO:
// You must implement graceful server shutdown which is vital for async
// producers!
// Create async Kafka producer.
apro, err := producer.Async()
if err != nil {
log.Fatalln(err)
}
defer apro.AsyncClose()
// Create sync Kafka producer.
spro, err := producer.Sync()
if err != nil {
log.Fatalln(err)
}
defer spro.Close()
// Create HTTP handler.
hnd := api.Event{
AyncProducer: apro,
SyncProducer: spro,
Topic: "errors-topic",
}
// Create HTTP router.
rtr := http.DefaultServeMux
rtr.HandleFunc("GET /async/{id}", hnd.Async)
rtr.HandleFunc("GET /sync/{id}", hnd.Sync)
// Start HTTP server.
http.ListenAndServe(":3001", rtr)
}
package producer
import (
"time"
"github.com/IBM/sarama"
)
func Async() (sarama.AsyncProducer, error) {
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-test-client"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
cfg.Producer.Retry.Max = 30
cfg.Producer.Retry.Backoff = time.Millisecond * 10
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Net.MaxOpenRequests = 1
return sarama.NewAsyncProducer([]string{":9093"}, cfg)
}
package producer
import (
"time"
"github.com/IBM/sarama"
)
func Sync() (sarama.SyncProducer, error) {
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-test-client"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Retry.Max = 30
cfg.Producer.Retry.Backoff = time.Millisecond * 10
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Idempotent = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Net.MaxOpenRequests = 1
return sarama.NewSyncProducer([]string{":9093"}, cfg)
}
package api
import (
"log"
"net/http"
"github.com/IBM/sarama"
)
type Event struct {
SyncProducer sarama.SyncProducer
AyncProducer sarama.AsyncProducer
Topic string
}
// GET /async/{id}
func (e Event) Async(w http.ResponseWriter, r *http.Request) {
msg := sarama.ProducerMessage{Topic: e.Topic, Value: sarama.StringEncoder(r.PathValue("id"))}
e.AyncProducer.Input() <- &msg
// e.AyncProducer.Input() <- another
// e.AyncProducer.Input() <- and another
select {
case suc := <-e.AyncProducer.Successes():
log.Printf("PAR: %d - OFF: %d\n", suc.Partition, suc.Offset)
case err := <-e.AyncProducer.Errors():
log.Println("ERR: ", err)
}
}
// GET /sync/{id}
func (e Event) Sync(w http.ResponseWriter, r *http.Request) {
msg := sarama.ProducerMessage{Topic: e.Topic, Value: sarama.StringEncoder(r.PathValue("id"))}
par, off, err := e.SyncProducer.SendMessage(&msg)
if err == nil {
log.Printf("PAR: %d - OFF: %d\n", par, off)
} else {
log.Println("ERR: ", err)
}
}