In this example we are going to create two topics first using a dedicated application. We will produce messages to both topics in HTTP client application and consume from both topic in a terminal application. The whole setup is a sync model. I will also be sending thousands of concurrent requests to each HTTP endpoint to verify producing and consuming messages. For that, I will be using K6 which is also a great tool.


I kept it short and left some duplications in the code for now. Modify it as you wish.


Bootstrap app


This sets up the environment and the topics.


docker-compose.yaml


version: "3.7"

services:

kafka-zookeeper:
container_name: kafka-zookeeper
image: "bitnami/zookeeper:3.8.4"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes

kafka-broker:
container_name: kafka-brooker
image: "bitnami/kafka:3.3.2"
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=kafka-zookeeper:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-broker:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- kafka-zookeeper

kafka-ui:
container_name: kafka-ui
image: "provectuslabs/kafka-ui:latest"
ports:
- "8080:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-broker:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=kafka-zookeeper:2181
- KAFKA_BROKERCONNECT=kafka-broker:9092
depends_on:
- kafka-zookeeper
- kafka-broker

main.go


package main

import (
"fmt"
"log"

"github.com/IBM/sarama"
)

type topic struct {
name string
partition int32
replication int16
settings map[string]*string
}

var (
messageRetentionMinute = "60000"
messageRetentionHour = "3600000"
)

var topics = []topic{{
name: "warnings-topic",
partition: 1,
replication: 1,
settings: map[string]*string{
"retention.ms": &messageRetentionMinute,
"delete.retention.ms": &messageRetentionMinute,
},
}, {
name: "errors-topic",
partition: 3,
replication: 1,
settings: map[string]*string{
"retention.ms": &messageRetentionHour,
"delete.retention.ms": &messageRetentionHour,
},
}}

func main() {
config := sarama.NewConfig()
config.Version = sarama.V3_3_2_0

clusterAdmin, err := sarama.NewClusterAdmin([]string{":9093"}, config)
if err != nil {
log.Fatalln(err)
}
defer clusterAdmin.Close()

clusterTopics, err := clusterAdmin.ListTopics()
if err != nil {
log.Fatalln(err)
}

for _, topic := range topics {
if _, ok := clusterTopics[topic.name]; ok {
continue
}

err := clusterAdmin.CreateTopic(topic.name, &sarama.TopicDetail{
NumPartitions: topic.partition,
ReplicationFactor: topic.replication,
ConfigEntries: topic.settings,
}, false)
if err != nil {
log.Println(err)
continue
}

fmt.Printf("> TOPIC: %s PARTITION: %d REPLICA: %d\n", topic.name, topic.partition, topic.replication)
}
}

Makefile


setup:
go run -race main.go

docker:
docker system prune --volumes --force
docker-compose up

error:
k6 run k6.js -e SEVERITY=errors

warning:
k6 run k6.js -e SEVERITY=warnings

k6.js


import http from "k6/http";
import { check } from "k6";

export let options = {
vus: 99,
duration: "10s"
};

export default function() {
let response = http.get("http://localhost:3001/"+__ENV.SEVERITY+"/"+__VU);

check(response, {
"200 OK": (r) => r.status === 200,
"response < 100 ms": (r) => r.timings.duration < 100
});
}

Client (HTTP) app


This produces messages to each topics.


main.go


package main

import (
"client/api"
"fmt"
"log"
"net/http"
"time"

"github.com/IBM/sarama"
)

func main() {
fmt.Println("producer ...")

// Create Kafka config.
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-client"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Retry.Max = 30
cfg.Producer.Retry.Backoff = time.Millisecond * 10
cfg.Producer.Compression = sarama.CompressionZSTD
// Enable EOS (Exactly Once Semantic).
// This relates to de-duplication of messages in built-in retry mechanism.
// It has nothing to do with de-duplication of messages that you would need
// from the client to send!
cfg.Producer.Idempotent = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Net.MaxOpenRequests = 1
//

// Create Kafka producer.
pro, err := sarama.NewSyncProducer([]string{":9093"}, cfg)
if err != nil {
log.Fatalln(err)
}
defer pro.Close()

// Create HTTP router.
rtr := http.DefaultServeMux
rtr.HandleFunc("GET /errors/{id}", (api.Error{Producer: pro, Topic: "errors-topic"}).Handle)
rtr.HandleFunc("GET /warnings/{id}", (api.Warning{Producer: pro, Topic: "warnings-topic"}).Handle)

// Start HTTP server.
http.ListenAndServe(":3001", rtr)
}

log.go


package model

type Log struct {
ID string `json:"id"`
Level string `json:"level"`
Time string `json:"time"`
}

error.go


package api

import (
"encoding/json"
"log"
"net/http"
"time"

"client/model"

"github.com/IBM/sarama"
)

type Error struct {
Producer sarama.SyncProducer
Topic string
}

// GET /errors/{id}
func (er Error) Handle(w http.ResponseWriter, r *http.Request) {
obj := model.Log{
ID: r.PathValue("id"),
Level: "ERR",
Time: time.Now().Format(time.RFC3339),
}

val, err := json.Marshal(obj)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

msg := sarama.ProducerMessage{
Topic: er.Topic,
Value: sarama.StringEncoder(val),
Timestamp: time.Now(),
}

prt, off, err := er.Producer.SendMessage(&msg)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

log.Printf("PART: %d OFFS: %d\n", prt, off)
}

warning.go


package api

import (
"encoding/json"
"log"
"net/http"
"time"

"client/model"

"github.com/IBM/sarama"
)

type Warning struct {
Producer sarama.SyncProducer
Topic string
}

// GET /warnings/{id}
func (wr Warning) Handle(w http.ResponseWriter, r *http.Request) {
obj := model.Log{
ID: r.PathValue("id"),
Level: "WAR",
Time: time.Now().Format(time.RFC3339),
}

val, err := json.Marshal(obj)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

msg := sarama.ProducerMessage{
Topic: wr.Topic,
Value: sarama.ByteEncoder(val),
Timestamp: time.Now(),
}

prt, off, err := wr.Producer.SendMessage(&msg)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

log.Printf("PART: %d OFFS: %d\n", prt, off)
}

Server (CLI) app


This consumes messages from each topics. It also handles termination signals for graceful producer and service shutdown.


main.go


package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"server/api"
"server/kafka"

"github.com/IBM/sarama"
)

func main() {
fmt.Println("consumer ...")

// Set up service termination channel.
close := make(chan struct{})

// Set consumer shutdown delay.
timeout := time.Second * 5

// Set up signal notification context.
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

// Create Kafka config.
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-server"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Consumer.Return.Errors = true
cfg.Consumer.Retry.Backoff = time.Second * 3
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest

// Set consumer groups.
groups := []kafka.ConsumerGroupArgs{{
Config: cfg,
Consumer: api.Warning{},
Group: "warnings-topic-group",
Topic: "warnings-topic",
Timeout: timeout,
}, {
Config: cfg,
Consumer: api.Error{},
Group: "errors-topic-group",
Topic: "errors-topic",
Timeout: timeout,
}}

// Run consumer groups for each topics.
for _, arg := range groups {
arg := arg

go func() {
if err := kafka.RunConsumerGroup(ctx, []string{":9093"}, arg, close); err != nil {
log.Fatalln(err)
}
}()
}

// Wait for close signal.
<-close
}

kafka.go


package kafka

import (
"context"
"fmt"
"log"
"time"

"github.com/IBM/sarama"
)

type ConsumerGroupArgs struct {
Config *sarama.Config
Consumer consumer
Group string
Topic string
Timeout time.Duration
}

func RunConsumerGroup(notify context.Context, brokers []string, args ConsumerGroupArgs, close chan<- struct{}) error {
con, err := sarama.NewConsumerGroup(brokers, args.Group, args.Config)
if err != nil {
return err
}

go func() {
for err := range con.Errors() {
log.Printf("consumer group error: %s: %s\n", args.Group, err)
}
}()

ctx, cancel := context.WithCancel(context.Background())

// Wait for termination signal. Once arrived, pause fetching from all partitions.
// Wait for a duration before cancelling consumer context and closing the
// consumer group. Finally, send to the close channel to stop the application.
go func() {
<-notify.Done()
con.PauseAll()

log.Printf("paused fetching from all partitions: %s\n", args.Group)

tick := time.NewTicker(args.Timeout)
defer tick.Stop()
<-tick.C

cancel()
if err := con.Close(); err != nil {
log.Printf("close consumer group: %s: %s\n", args.Group, err)
}

log.Printf("closed consumer group: %s\n", args.Group)

close <- struct{}{}
}()

handler := handler{
client: args.Config.ClientID,
topic: args.Topic,
group: args.Group,
consumer: args.Consumer,
}

for {
err := con.Consume(ctx, []string{args.Topic}, &handler)
switch {
case err != nil:
return fmt.Errorf("%s: %w", args.Group, err)
case ctx.Err() != nil:
return fmt.Errorf("%s: %w", args.Group, ctx.Err())
}
}
}

handler.go


package kafka

import (
"context"
"log"
"strings"
"time"

"github.com/IBM/sarama"
)

type Message struct {
// Message iteration counter.
Counter int64
// Consumer ID.
Consumer string
// Topic name.
Topic string
// Consumer group name.
Group string
// Topic partition.
Partition int32
// Partition offset of the topic.
Offset int64
// Partition key of the message.
Key []byte
// Message body.
Body []byte
// Message timestamp.
Timestamp time.Time
}

// consumer is implemented by the custom consumers.
type consumer interface {
Handle(ctx context.Context, message Message) error
}

// handler is the Kafka handler for consumer groups to handle message consumption.
type handler struct {
client string
topic string
group string
consumer consumer
}

// Setup is run when the consumer group is being rebalanced to mark the beginning
// of a new session.
func (h *handler) Setup(sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being rebalanced: %s\n", h.group)

return nil
}

// Cleanup is run just before the next rebalance takes place to mark the end of
// the current session.
func (h *handler) Cleanup(sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being cleaned up to start new rebalancing: %s\n", h.group)

return nil
}

// ConsumeClaim starts consuming messages. To avoid re-consuming a message again
// the message is marked as consumed (a.k.a. commiting the offset) on the group.
// Note: If an the consumer handler returns an error, the message will not be
// re-consumed hence, ideally, the message should be sent to a sort of topic for
// investigation or further processing otherwise it will be lost.
// - In case the session context is closed, consumption is terminated to avoid
// `ErrRebalanceInProgress` or `read tcp :: i/o timeout` errors when
// Kafka rebalance. REF: https://github.com/IBM/sarama/issues/1192
// - In case the message channel is closed, consumption is terminated to allow
// Kafka rebalance.
func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
var (
i int64
id = strings.TrimPrefix(session.MemberID(), h.client)
)

for {
select {
case <-session.Context().Done():
return nil
case msg, ok := <-claim.Messages():
if !ok {
log.Printf("consumer group message channel: %s\n", h.group)

return nil
}

i++

message := Message{
Counter: i,
Consumer: id,
Topic: h.topic,
Group: h.group,
Partition: msg.Partition,
Offset: msg.Offset,
Key: msg.Key,
Body: msg.Value,
Timestamp: msg.Timestamp,
}

if err := h.consumer.Handle(session.Context(), message); err != nil {
log.Printf("consumer group message consumption: %s: %s\n", h.group, err)

continue
}

session.MarkMessage(msg, "")
}
}
}

error.go


package api

import (
"context"
"fmt"
"server/kafka"
)

type Error struct{}

func (Error) Handle(ctx context.Context, message kafka.Message) error {
fmt.Printf("ERR - CTR:%-3d | CID:%-36s | PAR:%d | OFF:%-3d | TOP:%-14s | KEY:%s > %s\n",
message.Counter,
message.Consumer,
message.Partition,
message.Offset,
message.Topic,
message.Key,
message.Body,
)

return nil
}

warning.go


package api

import (
"context"
"fmt"
"server/kafka"
)

type Warning struct{}

func (Warning) Handle(ctx context.Context, message kafka.Message) error {
fmt.Printf("WAR - CTR:%-3d | CID:%-36s | PAR:%d | OFF:%-3d | TOP:%-14s | KEY:%s > %s\n",
message.Counter,
message.Consumer,
message.Partition,
message.Offset,
message.Topic,
message.Key,
message.Body,
)

return nil
}

Test


Setup and run


boot$ make setup
go run -race main.go
> TOPIC: warnings-topic PARTITION: 1 REPLICA: 1
> TOPIC: errors-topic PARTITION: 3 REPLICA: 1

client$ go run -race main.go
producer ...

server$ go run -race main.go
consumer ...

Send requests


boot$ make warning
k6 run k6.js -e SEVERITY=warnings

/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io

execution: local
script: k6.js
output: -

scenarios: (100.00%) 1 scenario, 99 max VUs, 40s max duration (incl. graceful stop):
* default: 99 looping VUs for 10s (gracefulStop: 30s)


running (10.0s), 00/99 VUs, 25005 complete and 0 interrupted iterations
default ✓ [======================================] 99 VUs 10s

✓ 200 OK
✗ response < 100 ms
↳ 99% — ✓ 24826 / ✗ 179

checks.........................: 99.64% ✓ 49831 ✗ 179
data_received..................: 1.9 MB 187 kB/s
data_sent......................: 2.3 MB 227 kB/s
http_req_blocked...............: avg=14.63µs min=1µs med=3µs max=14.44ms p(90)=5µs p(95)=7µs
http_req_connecting............: avg=3.96µs min=0s med=0s max=5.32ms p(90)=0s p(95)=0s
http_req_duration..............: avg=39.41ms min=14.15ms med=36.76ms max=181.17ms p(90)=53.3ms p(95)=61.14ms
{ expected_response:true }...: avg=39.41ms min=14.15ms med=36.76ms max=181.17ms p(90)=53.3ms p(95)=61.14ms
http_req_failed................: 0.00% ✓ 0 ✗ 25005
http_req_receiving.............: avg=47.98µs min=11µs med=23µs max=16.03ms p(90)=64µs p(95)=102µs
http_req_sending...............: avg=23.77µs min=6µs med=12µs max=19.15ms p(90)=26µs p(95)=44.79µs
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=39.34ms min=14.12ms med=36.7ms max=181.13ms p(90)=53.23ms p(95)=61.03ms
http_reqs......................: 25005 2494.229258/s
iteration_duration.............: avg=39.62ms min=14.46ms med=36.98ms max=181.45ms p(90)=53.52ms p(95)=61.4ms
iterations.....................: 25005 2494.229258/s
vus............................: 99 min=99 max=99
vus_max........................: 99 min=99 max=99

boot$ make error
k6 run k6.js -e SEVERITY=errors

/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io

execution: local
script: k6.js
output: -

scenarios: (100.00%) 1 scenario, 99 max VUs, 40s max duration (incl. graceful stop):
* default: 99 looping VUs for 10s (gracefulStop: 30s)


running (10.0s), 00/99 VUs, 23371 complete and 0 interrupted iterations
default ✓ [======================================] 99 VUs 10s

✓ 200 OK
✗ response < 100 ms
↳ 99% — ✓ 23181 / ✗ 190

checks.........................: 99.59% ✓ 46552 ✗ 190
data_received..................: 1.8 MB 175 kB/s
data_sent......................: 2.1 MB 207 kB/s
http_req_blocked...............: avg=68.37µs min=1µs med=3µs max=34.43ms p(90)=5µs p(95)=7µs
http_req_connecting............: avg=13.89µs min=0s med=0s max=14.51ms p(90)=0s p(95)=0s
http_req_duration..............: avg=42.12ms min=16.81ms med=39.39ms max=174.13ms p(90)=58.05ms p(95)=66.14ms
{ expected_response:true }...: avg=42.12ms min=16.81ms med=39.39ms max=174.13ms p(90)=58.05ms p(95)=66.14ms
http_req_failed................: 0.00% ✓ 0 ✗ 23371
http_req_receiving.............: avg=45.33µs min=11µs med=24µs max=8.96ms p(90)=64µs p(95)=97µs
http_req_sending...............: avg=27.32µs min=6µs med=12µs max=8.93ms p(90)=26µs p(95)=43µs
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=42.05ms min=16.7ms med=39.33ms max=174.02ms p(90)=57.94ms p(95)=65.96ms
http_reqs......................: 23371 2330.345029/s
iteration_duration.............: avg=42.36ms min=17.18ms med=39.56ms max=174.53ms p(90)=58.45ms p(95)=66.67ms
iterations.....................: 23371 2330.345029/s
vus............................: 99 min=99 max=99
vus_max........................: 99 min=99 max=99