Bu örnekte, ağ gidiş dönüşlerini azaltmak için mesajları toplu olarak üreteceğiz. Bu uygulama hakkındaki tek not, işlemleri (transaction) kullanmaması ve dolayısıyla atomik olmamasıdır. Bir grupta bir veya daha fazla mesaj başarısız olursa, hangi mesajların başarısız olduğunu/başarısız olduğunu bilemezsiniz. Tahmin edebileceğiniz gibi mesajlar tek tek başarılı veya başarısız olabilir. Bazıları başarılı, bazıları başarısız olursa SendMessages bir hata döndürecektir. Mesaj tesliminin güvenilirliği sizi ilgilendirmiyorsa bu işlevi kullanmak isteyebilirsiniz. Tüketicinizde yinelenen mesajları kontrol etmeniz gerekecek.


Atomik toplu işlem istiyorsanız bunun yerine işlemlere (transaction) bakabilirsiniz. Ancak Sarama istemcisini kullanıyorsanız büyük olasılıkla iki yaygın sorunla karşı karşıya kalacaksınız. Ben şahsen her birini birkaç kez deneyimledim ancak bir çözüm aramak için derinlemesine araştırma yapmadım. GitHub sayılarında da onlardan bahsediliyor. Bunlardan ilki, her iki NewSyncProducer/NewAsyncProducer'ın da eşzamansız (async) biçimde çalışmasıdır. Bunun anlamı, bazı gecikmeler ve dış etkenler nedeniyle istemcinin işlemin son durumunu hiçbir zaman bilemeyeceğidir. Bu nedenle, üreticiyi hızlı bir şekilde tekrar kullanmaya çalıştığınızda kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing hatasını göreceğiniz zamanlar olacaktır. Diyelim ki bunun yerine işlem başına yeni bir üretici oluşturuyorsunuz ki bu zaten pek de iyi bir fikir değil. Bu da zaman zaman transaction_manager.go, async_producer.go vb. dosyalara işaret eden veri yarışı (data race) hataları üretecektir. Ve bazen, ERROR [ReplicaManager broker=1] Error processing append operation on partition my-topic-0 (kafka.server.ReplicaManager) ve org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 247 at offset 6263 in partition my-topic-0: 12 (incoming seq. number), 9 (current end sequence number) hatası görebilirsiniz. Dolayısıyla size önerebileceğim şey, eğer transaction kullanacaksanız kodunuzu gerçekten sıkı bir şekilde test ettiğinizden emin olun. Örneğin, bir HTTP api oluşturun ve aynı anda binlerce istek gönderin ve ne olacağını görün.


Örnek


Burada toplam 10001 mesaj gönderiyoruz ve her grupta 10 mesaj bulunuyor. Bu bize toplam 1001 toplu istek verir.


package main

import (
"log"
"strconv"
"time"

"github.com/IBM/sarama"
)

func main() {
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-client"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Retry.Max = 30
cfg.Producer.Retry.Backoff = time.Millisecond * 10
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Idempotent = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Net.MaxOpenRequests = 1

pro, err := sarama.NewSyncProducer([]string{":9093"}, cfg)
if err != nil {
log.Fatalln(err)
}
defer pro.Close()

var (
batch int
msgs = make([]*sarama.ProducerMessage, 0, 10)
)

for i := 1; i <= 10001; i++ {
msgs = append(msgs, &sarama.ProducerMessage{
Topic: "warnings-topic",
Value: sarama.StringEncoder(strconv.Itoa(i)),
})

if len(msgs) == cap(msgs) {
batch++

if err := pro.SendMessages(msgs); err != nil {
log.Println(err)
}

msgs = msgs[:0]
}
}

if len(msgs) != 0 {
batch++

if err := pro.SendMessages(msgs); err != nil {
log.Println(err)
}
}

log.Println(batch)
}