In this example we are going to create a producer provider which leverages a pool for sync producers. Each producer will be used for transactions to send multiple messages to topics. The pool will have an upper limit for producers. When a producer is borrowed by an operation, it will not be visible to other operations in order to avoid transaction errors. Once the transaction is committed and free, associated producer is released back into the pool.


Caution: Make sure the consumer isolation level is set as isolation_level=1!


Files


provider.go


package kafka

import (
"fmt"
"sync"
"sync/atomic"

"github.com/IBM/sarama"
)

type ProducerProvider struct {
brokers []string
config *sarama.Config
producers []*Producer
counter *atomic.Uint64
mux *sync.Mutex
}

func NewProducerProvider(brokers []string, version, client string, size int) (*ProducerProvider, error) {
if size < 1 {
return nil, fmt.Errorf("pool size must be greater than 0")
}

ver, err := sarama.ParseKafkaVersion(version)
if err != nil {
return nil, err
}

cfg := sarama.NewConfig()
cfg.Version = ver
cfg.ClientID = client
cfg.Net.MaxOpenRequests = 1
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Compression = sarama.CompressionZSTD
cfg.Producer.Idempotent = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner
cfg.Producer.Transaction.ID = fmt.Sprintf("%s-txn", client)

prv := &ProducerProvider{
brokers: brokers,
config: cfg,
producers: []*Producer{},
counter: &atomic.Uint64{},
mux: &sync.Mutex{},
}

for i := 0; i < size; i++ {
if err := prv.new(); err != nil {
continue
}
}

if len(prv.producers) == 0 {
return nil, fmt.Errorf("failed to create any producers")
}

return prv, nil
}

// Borrow attempts to borrow a producer from the pool and takes it out of the
// pool so it is not available for any other thread.
func (p *ProducerProvider) Borrow() *Producer {
for {
p.mux.Lock()

if len(p.producers) != 0 {
idx := len(p.producers) - 1
pro := p.producers[idx]
p.producers = p.producers[:idx]
p.mux.Unlock()

return pro
}

p.mux.Unlock()
}
}

// Release attempts to release only transaction-free or reusable producers back
// into the pool. The check is based on the status of the current transaction.
// The producer whose transaction status is not "ready" is always thrown away
// and a new one is created as a replacement so the pool does not prematurely get
// drained.
func (p *ProducerProvider) Release(pro *Producer) []error {
for {
switch pro.pr.TxnStatus() {
case sarama.ProducerTxnFlagReady:
p.mux.Lock()
p.producers = append(p.producers, pro)
p.mux.Unlock()

return nil
case sarama.ProducerTxnFlagFatalError:
errs := []error{
fmt.Errorf("release producer: status: %s", pro.pr.TxnStatus()),
}

if err := pro.pr.Close(); err != nil {
errs = append(errs, fmt.Errorf("release producer: close: %w", err))
}

p.new()

return errs
case sarama.ProducerTxnFlagInError | sarama.ProducerTxnFlagAbortableError:
errs := []error{
fmt.Errorf("release producer: status: %s", pro.pr.TxnStatus()),
}

if err := pro.pr.AbortTxn(); err != nil {
errs = append(errs, fmt.Errorf("release producer: abort txn: %w", err))
}

if err := pro.pr.Close(); err != nil {
errs = append(errs, fmt.Errorf("release producer: close: %w", err))
}

p.new()

return errs
}
}
}

// Size returns total of how many producers were successfully created at the
// application start.
func (p *ProducerProvider) Size() int {
p.mux.Lock()
defer p.mux.Unlock()

return len(p.producers)
}

// Close attempts to close all producers in the pool.
func (p *ProducerProvider) Close() []error {
p.mux.Lock()
defer p.mux.Unlock()

var errs []error

for _, pro := range p.producers {
if err := pro.pr.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close producer %s: %w", pro.id, err))
}
}

return errs
}

// new creates a new producer and adds it to the pool.
func (p *ProducerProvider) new() error {
p.mux.Lock()
config := *p.config
brokers := p.brokers
p.counter.Add(1)
suffix := p.counter.Load()
p.mux.Unlock()

config.Producer.Transaction.ID = fmt.Sprintf("%s-%d", config.Producer.Transaction.ID, suffix)

pro, err := sarama.NewSyncProducer(brokers, &config)
if err != nil {
return err
}

p.mux.Lock()
p.producers = append(p.producers, &Producer{
pr: pro,
id: fmt.Sprintf("%s-pro-%d", config.ClientID, suffix),
})
p.mux.Unlock()

return nil
}

producer.go


package kafka

import (
"fmt"

"github.com/IBM/sarama"
)

type Producer struct {
pr sarama.SyncProducer
id string
}

func (p *Producer) ID() string {
return p.id
}

func (p *Producer) Produce(msgs []*sarama.ProducerMessage) []error {
var errs []error

if err := p.pr.BeginTxn(); err != nil {
errs = append(errs, fmt.Errorf("begin txn: producer: %s: %w", p.id, err))

return errs
}

if err := p.pr.SendMessages(msgs); err != nil {
errs = append(errs, fmt.Errorf("send messages: producer: %s: %w", p.id, err))

if err := p.pr.AbortTxn(); err != nil {
errs = append(errs, fmt.Errorf("arort txn: producer: %s: %w", p.id, err))
}

return errs
}

if err := p.pr.CommitTxn(); err != nil {
errs = append(errs, fmt.Errorf("commit txn: producer: %s: %w", p.id, err))

if err := p.pr.AbortTxn(); err != nil {
errs = append(errs, fmt.Errorf("arort txn: producer: %s: %w", p.id, err))
}

return errs
}

return errs
}

main.go


Here we have maximum 5 producers ready to handle requests. The higher number, the higher throughput and broker connection.


package main

import (
"fmt"
"log"
"net/http"

"client/kafka"

"github.com/IBM/sarama"
)

func main() {
prv, err := kafka.NewProducerProvider([]string{":9093"}, "3.3.2", "inanzzz", 5)
if err != nil {
log.Fatalln(err)
}
defer func() {
for _, err := range prv.Close() {
log.Println(err)
}
}()

api := api{prv: prv, err: "errors-topic", war: "warnings-topic"}
rtr := http.NewServeMux()
rtr.HandleFunc("GET /errors/{id}", api.errors)
rtr.HandleFunc("GET /warnings/{id}", api.warning)
http.ListenAndServe(":3001", rtr)
}

type api struct {
prv *kafka.ProducerProvider
err string
war string
}

func (a api) errors(w http.ResponseWriter, r *http.Request) {
pro := a.prv.Borrow()
if pro == nil {
log.Println("failed to borrow")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func() {
if errs := a.prv.Release(pro); len(errs) != 0 {
log.Println(errs)
}
}()

user := r.PathValue("id")

msg1 := fmt.Sprintf("err: 1 - pro: %s - usr: %s", pro.ID(), user)
msg2 := fmt.Sprintf("err: 2 - pro: %s - usr: %s", pro.ID(), user)

msgs := []*sarama.ProducerMessage{
{Topic: a.err, Value: sarama.StringEncoder(msg1)},
{Topic: a.err, Value: sarama.StringEncoder(msg2)},
}

if errs := pro.Produce(msgs); len(errs) != 0 {
log.Println(errs)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

func (a api) warning(w http.ResponseWriter, r *http.Request) {
pro := a.prv.Borrow()
if pro == nil {
log.Println("failed to borrow")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func() {
if errs := a.prv.Release(pro); len(errs) != 0 {
log.Println(errs)
}
}()

user := r.PathValue("id")

msg1 := fmt.Sprintf("war: 1 - pro: %s - usr: %s", pro.ID(), user)
msg2 := fmt.Sprintf("war: 2 - pro: %s - usr: %s", pro.ID(), user)

msgs := []*sarama.ProducerMessage{
{Topic: a.war, Value: sarama.StringEncoder(msg1)},
{Topic: a.war, Value: sarama.StringEncoder(msg2)},
}

if errs := pro.Produce(msgs); len(errs) != 0 {
log.Println(errs)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

Test


After running the application, you can send many concurrent requests to endpoints below. You should not get any race or transaction errors in terminal.


GET http://localhost:3001/errors/{user-id}
GET http://localhost:3001/warnings/{user-id}

Here we see producers with transaction IDs at application start.


kafka-broker       | [2024-06-11 20:10:29,196] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-1 with producerId 13 and producer epoch 7 on partition __transaction_state-49 (kafka.coordinator.transaction.TransactionCoordinator)
kafka-broker | [2024-06-11 20:10:29,216] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-2 with producerId 14 and producer epoch 6 on partition __transaction_state-0 (kafka.coordinator.transaction.TransactionCoordinator)
kafka-broker | [2024-06-11 20:10:29,231] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-3 with producerId 15 and producer epoch 6 on partition __transaction_state-1 (kafka.coordinator.transaction.TransactionCoordinator)
kafka-broker | [2024-06-11 20:10:29,248] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-4 with producerId 16 and producer epoch 6 on partition __transaction_state-2 (kafka.coordinator.transaction.TransactionCoordinator)
kafka-broker | [2024-06-11 20:10:29,265] INFO [TransactionCoordinator id=1] Initialized transactionalId inanzzz-txn-5 with producerId 17 and producer epoch 6 on partition __transaction_state-3 (kafka.coordinator.transaction.TransactionCoordinator)