Bu örnekte, senkronize üreticiler için bir havuzdan yararlanan bir üretici sağlayıcı oluşturacağız. Her üretici, topiklere birden fazla mesaj göndermek için işlemlerde kullanılacaktır. Havuzun üreticiler için bir üst limiti olacak. Bir üretici bir işlemci tarafından kullanıldığında, işlem hatalarını önlemek amacıyla bu üretici diğer işlemciler tarafından görülmeyecektir. İşlem taahhüt edilip serbest bırakıldığında, ilgili üretici havuza geri bırakılır.


Uyarı: Tüketici izolasyon düzeyinin isolation_level=1 olarak ayarlandığından emin olun!


Dosyalar


provider.go


package kafka

import (
"fmt"
"sync"
"sync/atomic"

"github.com/IBM/sarama"
)

type ProducerProvider struct {
brokers []string
config *sarama.Config
producers []*Producer
counter *atomic.Uint64
mux *sync.Mutex
}

func NewProducerProvider(brokers []string, version, client string, size int) (*ProducerProvider, error) {
if size < 1 {
return nil, fmt.Errorf("pool size must be greater than 0")
}

ver, err := sarama.ParseKafkaVersion(version)
if err != nil {
return nil, err
}

cfg := sarama.NewConfig()
cfg.Version = ver
cfg.ClientID = client
cfg.Net.MaxOpenRequests = 1
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Idempotent = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner
cfg.Producer.Transaction.ID = fmt.Sprintf("%s-txn", client)

prv := &ProducerProvider{
brokers: brokers,
config: cfg,
producers: []*Producer{},
counter: &atomic.Uint64{},
mux: &sync.Mutex{},
}

for i := 0; i < size; i++ {
if err := prv.new(); err != nil {
continue
}
}

if len(prv.producers) == 0 {
return nil, fmt.Errorf("failed to create any producers")
}

return prv, nil
}

// Borrow attempts to borrow a producer from the pool and takes it out of the
// pool so it is not available for any other thread.
func (p *ProducerProvider) Borrow() *Producer {
for {
p.mux.Lock()

if len(p.producers) != 0 {
idx := len(p.producers) - 1
pro := p.producers[idx]
p.producers = p.producers[:idx]
p.mux.Unlock()

return pro
}

p.mux.Unlock()
}
}

// Release attempts to release only transaction-free or reusable producers back
// into the pool. The check is based on the status of the current transaction.
// The producer whose transaction status is not "ready" is always thrown away
// and a new one is created as a replacement so the pool does not prematurely get
// drained.
func (p *ProducerProvider) Release(pro *Producer) []error {
for {
switch pro.pr.TxnStatus() {
case sarama.ProducerTxnFlagReady:
p.mux.Lock()
p.producers = append(p.producers, pro)
p.mux.Unlock()

return nil
case sarama.ProducerTxnFlagFatalError:
errs := []error{
fmt.Errorf("release producer: status: %s", pro.pr.TxnStatus()),
}

if err := pro.pr.Close(); err != nil {
errs = append(errs, fmt.Errorf("release producer: close: %w", err))
}

p.new()

return errs
case sarama.ProducerTxnFlagInError | sarama.ProducerTxnFlagAbortableError:
errs := []error{
fmt.Errorf("release producer: status: %s", pro.pr.TxnStatus()),
}

if err := pro.pr.AbortTxn(); err != nil {
errs = append(errs, fmt.Errorf("release producer: abort txn: %w", err))
}

if err := pro.pr.Close(); err != nil {
errs = append(errs, fmt.Errorf("release producer: close: %w", err))
}

p.new()

return errs
}
}
}

// Size returns total of how many producers were successfully created at the
// application start.
func (p *ProducerProvider) Size() int {
p.mux.Lock()
defer p.mux.Unlock()

return len(p.producers)
}

// Close attempts to close all producers in the pool.
func (p *ProducerProvider) Close() []error {
p.mux.Lock()
defer p.mux.Unlock()

var errs []error

for _, pro := range p.producers {
if err := pro.pr.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close producer %s: %w", pro.id, err))
}
}

return errs
}

// new creates a new producer and adds it to the pool.
func (p *ProducerProvider) new() error {
p.mux.Lock()
config := *p.config
brokers := p.brokers
p.counter.Add(1)
suffix := p.counter.Load()
p.mux.Unlock()

config.Producer.Transaction.ID = fmt.Sprintf("%s-%d", config.Producer.Transaction.ID, suffix)

pro, err := sarama.NewSyncProducer(brokers, &config)
if err != nil {
return err
}

p.mux.Lock()
p.producers = append(p.producers, &Producer{
pr: pro,
id: fmt.Sprintf("%s-pro-%d", config.ClientID, suffix),
})
p.mux.Unlock()

return nil
}

producer.go


package kafka

import (
"fmt"

"github.com/IBM/sarama"
)

type Producer struct {
pr sarama.SyncProducer
id string
}

func (p *Producer) ID() string {
return p.id
}

func (p *Producer) Produce(msgs []*sarama.ProducerMessage) []error {
var errs []error

if err := p.pr.BeginTxn(); err != nil {
errs = append(errs, fmt.Errorf("begin txn: producer: %s: %w", p.id, err))

return errs
}

if err := p.pr.SendMessages(msgs); err != nil {
errs = append(errs, fmt.Errorf("send messages: producer: %s: %w", p.id, err))

if err := p.pr.AbortTxn(); err != nil {
errs = append(errs, fmt.Errorf("arort txn: producer: %s: %w", p.id, err))
}

return errs
}

if err := p.pr.CommitTxn(); err != nil {
errs = append(errs, fmt.Errorf("commit txn: producer: %s: %w", p.id, err))

if err := p.pr.AbortTxn(); err != nil {
errs = append(errs, fmt.Errorf("arort txn: producer: %s: %w", p.id, err))
}

return errs
}

return errs
}

main.go


Burada talepleri karşılamaya hazır maksimum 5 üreticimiz var. Sayı ne kadar yüksek olursa, verim ve Kafka bağlantısı da o kadar yüksek olur.


package main

import (
"fmt"
"log"
"net/http"

"client/kafka"

"github.com/IBM/sarama"
)

func main() {
prv, err := kafka.NewProducerProvider([]string{":9093"}, "3.3.2", "inanzzz", 5)
if err != nil {
log.Fatalln(err)
}
defer func() {
for _, err := range prv.Close() {
log.Println(err)
}
}()

api := api{prv: prv, err: "errors-topic", war: "warnings-topic"}
rtr := http.NewServeMux()
rtr.HandleFunc("GET /errors/{id}", api.errors)
rtr.HandleFunc("GET /warnings/{id}", api.warning)
http.ListenAndServe(":3001", rtr)
}

type api struct {
prv *kafka.ProducerProvider
err string
war string
}

func (a api) errors(w http.ResponseWriter, r *http.Request) {
pro := a.prv.Borrow()
if pro == nil {
log.Println("failed to borrow")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func() {
if errs := a.prv.Release(pro); len(errs) != 0 {
log.Println(errs)
}
}()

user := r.PathValue("id")

msg1 := fmt.Sprintf("err: 1 - pro: %s - usr: %s", pro.ID(), user)
msg2 := fmt.Sprintf("err: 2 - pro: %s - usr: %s", pro.ID(), user)

msgs := []*sarama.ProducerMessage{
{Topic: a.err, Value: sarama.StringEncoder(msg1)},
{Topic: a.err, Value: sarama.StringEncoder(msg2)},
}

if errs := pro.Produce(msgs); len(errs) != 0 {
log.Println(errs)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

func (a api) warning(w http.ResponseWriter, r *http.Request) {
pro := a.prv.Borrow()
if pro == nil {
log.Println("failed to borrow")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func() {
if errs := a.prv.Release(pro); len(errs) != 0 {
log.Println(errs)
}
}()

user := r.PathValue("id")

msg1 := fmt.Sprintf("war: 1 - pro: %s - usr: %s", pro.ID(), user)
msg2 := fmt.Sprintf("war: 2 - pro: %s - usr: %s", pro.ID(), user)

msgs := []*sarama.ProducerMessage{
{Topic: a.war, Value: sarama.StringEncoder(msg1)},
{Topic: a.war, Value: sarama.StringEncoder(msg2)},
}

if errs := pro.Produce(msgs); len(errs) != 0 {
log.Println(errs)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

Test


Uygulamayı çalıştırdıktan sonra aşağıdaki uç noktalara eş zamanlı birçok istek gönderebilirsiniz. Terminalde herhangi bir işlem hatası almamalısınız.


GET http://localhost:3001/errors/{user-id}
GET http://localhost:3001/warnings/{user-id}

Burada uygulama başlangıcında işlem kimliklerine sahip üreticileri görüyoruz.


kafka-broker       | [2024-06-11 20:10:29,196] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-1 with producerId 13 and producer epoch 7 on partition __transaction_state-49 (kafka.coordinator.transaction.TransactionCoordinator)
kafka-broker | [2024-06-11 20:10:29,216] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-2 with producerId 14 and producer epoch 6 on partition __transaction_state-0 (kafka.coordinator.transaction.TransactionCoordinator)
kafka-broker | [2024-06-11 20:10:29,231] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-3 with producerId 15 and producer epoch 6 on partition __transaction_state-1 (kafka.coordinator.transaction.TransactionCoordinator)
kafka-broker | [2024-06-11 20:10:29,248] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-4 with producerId 16 and producer epoch 6 on partition __transaction_state-2 (kafka.coordinator.transaction.TransactionCoordinator)
kafka-broker | [2024-06-11 20:10:29,265] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-5 with producerId 17 and producer epoch 6 on partition __transaction_state-3 (kafka.coordinator.transaction.TransactionCoordinator)