Bu Golang'daki basit bir senkronizasyon ve asenkron Kafka üreticisi örneğidir. Sarama istemcisini kullanır.


Eşzamansız (engellenmeyen) üreticiler durumunda, Kafka istemcisi mesajları uygulama (üretici) arabelleğine iter ve hemen geri döner. Bu, verimi artırır ancak daha az mesaj teslimi güvenilirliği pahasına olur. Ateş et ve unut gibi bir şey bu. Brokerdan onay beklenmiyor çünkü bu akışın bir parçası değil. Gerçekten bir tür yanıta ihtiyacınız varsa, herhangi bir şey olup olmadığını görmek için bir geri arama işlevini (kanal tabanlı) kullanmanız gerekecektir. Tüm mesajların uygulama arabelleğine yerleştirildiği göz önüne alındığında, uygulamanın düzgün bir şekilde kapatılması gerekir, aksi takdirde arabellekte bulunan tüm mesajlar kaybolacaktır. Öte yandan senkronizasyon akışı, eşzamansız akış için söylediğimiz hemen hemen her şeyin tersidir, dolayısıyla mesaj başına tam ağ gidiş-dönüş gerçekleşecektir ve bu da yavaşlamaya yol açacaktır.


Not: Yazma sırasında kişisel olarak bu iki akış arasında performans farkı görmedim, ancak teoride async'in daha hızlı olması gerekiyor. Ayrıca CPU ve RAM kullanımı arasında da pek bir fark görmedim. Sarama istemci kütüphanesi adına konuşamam bu yüzden konfigürasyonlarla oynayıp kendiniz deneyebilirsiniz.



Dosyalar


Tekrarlamalar için özür dilerim!


main.go


package main

import (
"log"
"net/http"

"client/api"
"client/producer"
)

func main() {
// TODO:
// You must implement graceful server shutdown which is vital for async
// producers!

// Create async Kafka producer.
apro, err := producer.Async()
if err != nil {
log.Fatalln(err)
}
defer apro.AsyncClose()

// Create sync Kafka producer.
spro, err := producer.Sync()
if err != nil {
log.Fatalln(err)
}
defer spro.Close()

// Create HTTP handler.
hnd := api.Event{
AyncProducer: apro,
SyncProducer: spro,
Topic: "errors-topic",
}

// Create HTTP router.
rtr := http.DefaultServeMux
rtr.HandleFunc("GET /async/{id}", hnd.Async)
rtr.HandleFunc("GET /sync/{id}", hnd.Sync)

// Start HTTP server.
http.ListenAndServe(":3001", rtr)
}

async.go


package producer

import (
"time"

"github.com/IBM/sarama"
)

func Async() (sarama.AsyncProducer, error) {
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-test-client"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
cfg.Producer.Retry.Max = 30
cfg.Producer.Retry.Backoff = time.Millisecond * 10
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Net.MaxOpenRequests = 1

return sarama.NewAsyncProducer([]string{":9093"}, cfg)
}

sync.go


package producer

import (
"time"

"github.com/IBM/sarama"
)

func Sync() (sarama.SyncProducer, error) {
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-test-client"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Retry.Max = 30
cfg.Producer.Retry.Backoff = time.Millisecond * 10
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Idempotent = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Net.MaxOpenRequests = 1

return sarama.NewSyncProducer([]string{":9093"}, cfg)
}

event.go


package api

import (
"log"
"net/http"

"github.com/IBM/sarama"
)

type Event struct {
SyncProducer sarama.SyncProducer
AyncProducer sarama.AsyncProducer
Topic string
}

// GET /async/{id}
func (e Event) Async(w http.ResponseWriter, r *http.Request) {
msg := sarama.ProducerMessage{Topic: e.Topic, Value: sarama.StringEncoder(r.PathValue("id"))}

e.AyncProducer.Input() <- &msg
// e.AyncProducer.Input() <- another
// e.AyncProducer.Input() <- and another

select {
case suc := <-e.AyncProducer.Successes():
log.Printf("PAR: %d - OFF: %d\n", suc.Partition, suc.Offset)
case err := <-e.AyncProducer.Errors():
log.Println("ERR: ", err)
}
}

// GET /sync/{id}
func (e Event) Sync(w http.ResponseWriter, r *http.Request) {
msg := sarama.ProducerMessage{Topic: e.Topic, Value: sarama.StringEncoder(r.PathValue("id"))}

par, off, err := e.SyncProducer.SendMessage(&msg)

if err == nil {
log.Printf("PAR: %d - OFF: %d\n", par, off)
} else {
log.Println("ERR: ", err)
}
}