Bu örnekte öncelikle özel bir uygulama kullanarak iki konu (topic) oluşturacağız. HTTP istemci uygulamasında her iki konuya da mesaj üreteceğiz ve bir terminal uygulamasında her iki konudan da tüketeceğiz. Tüm kurulum bir senkronizasyon modelidir. Ayrıca mesajların üretildiğini ve tüketildiğini doğrulamak için her HTTP uç noktasına binlerce eşzamanlı istek göndereceğim. Bunun için yine harika bir araç olan K6'yı kullanacağım.


Kodu kısa tuttum ve şimdilik kodda bazı kopyalar bıraktım. Dilediğiniz gibi değiştirin.


Bootstrap uygulaması


Bu, ortamı ve konuları belirler.


docker-compose.yaml


version: "3.7"

services:

kafka-zookeeper:
container_name: kafka-zookeeper
image: "bitnami/zookeeper:3.8.4"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes

kafka-broker:
container_name: kafka-brooker
image: "bitnami/kafka:3.3.2"
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=kafka-zookeeper:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-broker:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- kafka-zookeeper

kafka-ui:
container_name: kafka-ui
image: "provectuslabs/kafka-ui:latest"
ports:
- "8080:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-broker:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=kafka-zookeeper:2181
- KAFKA_BROKERCONNECT=kafka-broker:9092
depends_on:
- kafka-zookeeper
- kafka-broker

main.go


package main

import (
"fmt"
"log"

"github.com/IBM/sarama"
)

type topic struct {
name string
partition int32
replication int16
settings map[string]*string
}

var (
messageRetentionMinute = "60000"
messageRetentionHour = "3600000"
)

var topics = []topic{{
name: "warnings-topic",
partition: 1,
replication: 1,
settings: map[string]*string{
"retention.ms": &messageRetentionMinute,
"delete.retention.ms": &messageRetentionMinute,
},
}, {
name: "errors-topic",
partition: 3,
replication: 1,
settings: map[string]*string{
"retention.ms": &messageRetentionHour,
"delete.retention.ms": &messageRetentionHour,
},
}}

func main() {
config := sarama.NewConfig()
config.Version = sarama.V3_3_2_0

clusterAdmin, err := sarama.NewClusterAdmin([]string{":9093"}, config)
if err != nil {
log.Fatalln(err)
}
defer clusterAdmin.Close()

clusterTopics, err := clusterAdmin.ListTopics()
if err != nil {
log.Fatalln(err)
}

for _, topic := range topics {
if _, ok := clusterTopics[topic.name]; ok {
continue
}

err := clusterAdmin.CreateTopic(topic.name, &sarama.TopicDetail{
NumPartitions: topic.partition,
ReplicationFactor: topic.replication,
ConfigEntries: topic.settings,
}, false)
if err != nil {
log.Println(err)
continue
}

fmt.Printf("> TOPIC: %s PARTITION: %d REPLICA: %d\n", topic.name, topic.partition, topic.replication)
}
}

Makefile


setup:
go run -race main.go

docker:
docker system prune --volumes --force
docker-compose up

error:
k6 run k6.js -e SEVERITY=errors

warning:
k6 run k6.js -e SEVERITY=warnings

k6.js


import http from "k6/http";
import { check } from "k6";

export let options = {
vus: 99,
duration: "10s"
};

export default function() {
let response = http.get("http://localhost:3001/"+__ENV.SEVERITY+"/"+__VU);

check(response, {
"200 OK": (r) => r.status === 200,
"response < 100 ms": (r) => r.timings.duration < 100
});
}

İstemci(HTTP) uygulaması


Bu, her konuya mesaj üretir.


main.go


package main

import (
"client/api"
"fmt"
"log"
"net/http"
"time"

"github.com/IBM/sarama"
)

func main() {
fmt.Println("producer ...")

// Create Kafka config.
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-client"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Producer.Return.Successes = true
cfg.Producer.Retry.Max = 30
cfg.Producer.Retry.Backoff = time.Millisecond * 10
cfg.Producer.Compression = sarama.CompressionZSTD
// Enable EOS (Exactly Once Semantic).
// This relates to de-duplication of messages in built-in retry mechanism.
// It has nothing to do with de-duplication of messages that you would need
// from the client to send!
cfg.Producer.Idempotent = true
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Net.MaxOpenRequests = 1
//

// Create Kafka producer.
pro, err := sarama.NewSyncProducer([]string{":9093"}, cfg)
if err != nil {
log.Fatalln(err)
}
defer pro.Close()

// Create HTTP router.
rtr := http.DefaultServeMux
rtr.HandleFunc("GET /errors/{id}", (api.Error{Producer: pro, Topic: "errors-topic"}).Handle)
rtr.HandleFunc("GET /warnings/{id}", (api.Warning{Producer: pro, Topic: "warnings-topic"}).Handle)

// Start HTTP server.
http.ListenAndServe(":3001", rtr)
}

log.go


package model

type Log struct {
ID string `json:"id"`
Level string `json:"level"`
Time string `json:"time"`
}

error.go


package api

import (
"encoding/json"
"log"
"net/http"
"time"

"client/model"

"github.com/IBM/sarama"
)

type Error struct {
Producer sarama.SyncProducer
Topic string
}

// GET /errors/{id}
func (er Error) Handle(w http.ResponseWriter, r *http.Request) {
obj := model.Log{
ID: r.PathValue("id"),
Level: "ERR",
Time: time.Now().Format(time.RFC3339),
}

val, err := json.Marshal(obj)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

msg := sarama.ProducerMessage{
Topic: er.Topic,
Value: sarama.StringEncoder(val),
Timestamp: time.Now(),
}

prt, off, err := er.Producer.SendMessage(&msg)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

log.Printf("PART: %d OFFS: %d\n", prt, off)
}

warning.go


package api

import (
"encoding/json"
"log"
"net/http"
"time"

"client/model"

"github.com/IBM/sarama"
)

type Warning struct {
Producer sarama.SyncProducer
Topic string
}

// GET /warnings/{id}
func (wr Warning) Handle(w http.ResponseWriter, r *http.Request) {
obj := model.Log{
ID: r.PathValue("id"),
Level: "WAR",
Time: time.Now().Format(time.RFC3339),
}

val, err := json.Marshal(obj)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

msg := sarama.ProducerMessage{
Topic: wr.Topic,
Value: sarama.ByteEncoder(val),
Timestamp: time.Now(),
}

prt, off, err := wr.Producer.SendMessage(&msg)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

log.Printf("PART: %d OFFS: %d\n", prt, off)
}

Sunucu (CLI) uygulaması


Bu, her konudaki mesajları tüketir. Ayrıca üretici ve hizmetin sorunsuz bir şekilde kapatılması için sonlandırma sinyallerini de yönetir.


main.go


package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"server/api"
"server/kafka"

"github.com/IBM/sarama"
)

func main() {
fmt.Println("consumer ...")

// Set up service termination channel.
close := make(chan struct{})

// Set consumer shutdown delay.
timeout := time.Second * 5

// Set up signal notification context.
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

// Create Kafka config.
cfg := sarama.NewConfig()
cfg.ClientID = "inanzzz-server"
cfg.Version = sarama.V3_3_2_0
cfg.Metadata.AllowAutoTopicCreation = false
cfg.Consumer.Return.Errors = true
cfg.Consumer.Retry.Backoff = time.Second * 3
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest

// Set consumer groups.
groups := []kafka.ConsumerGroupArgs{{
Config: cfg,
Consumer: api.Warning{},
Group: "warnings-topic-group",
Topic: "warnings-topic",
Timeout: timeout,
}, {
Config: cfg,
Consumer: api.Error{},
Group: "errors-topic-group",
Topic: "errors-topic",
Timeout: timeout,
}}

// Run consumer groups for each topics.
for _, arg := range groups {
arg := arg

go func() {
if err := kafka.RunConsumerGroup(ctx, []string{":9093"}, arg, close); err != nil {
log.Fatalln(err)
}
}()
}

// Wait for close signal.
<-close
}

kafka.go


package kafka

import (
"context"
"fmt"
"log"
"time"

"github.com/IBM/sarama"
)

type ConsumerGroupArgs struct {
Config *sarama.Config
Consumer consumer
Group string
Topic string
Timeout time.Duration
}

func RunConsumerGroup(notify context.Context, brokers []string, args ConsumerGroupArgs, close chan<- struct{}) error {
con, err := sarama.NewConsumerGroup(brokers, args.Group, args.Config)
if err != nil {
return err
}

go func() {
for err := range con.Errors() {
log.Printf("consumer group error: %s: %s\n", args.Group, err)
}
}()

ctx, cancel := context.WithCancel(context.Background())

// Wait for termination signal. Once arrived, pause fetching from all partitions.
// Wait for a duration before cancelling consumer context and closing the
// consumer group. Finally, send to the close channel to stop the application.
go func() {
<-notify.Done()
con.PauseAll()

log.Printf("paused fetching from all partitions: %s\n", args.Group)

tick := time.NewTicker(args.Timeout)
defer tick.Stop()
<-tick.C

cancel()
if err := con.Close(); err != nil {
log.Printf("close consumer group: %s: %s\n", args.Group, err)
}

log.Printf("closed consumer group: %s\n", args.Group)

close <- struct{}{}
}()

handler := handler{
client: args.Config.ClientID,
topic: args.Topic,
group: args.Group,
consumer: args.Consumer,
}

for {
err := con.Consume(ctx, []string{args.Topic}, &handler)
switch {
case err != nil:
return fmt.Errorf("%s: %w", args.Group, err)
case ctx.Err() != nil:
return fmt.Errorf("%s: %w", args.Group, ctx.Err())
}
}
}

handler.go


package kafka

import (
"context"
"log"
"strings"
"time"

"github.com/IBM/sarama"
)

type Message struct {
// Message iteration counter.
Counter int64
// Consumer ID.
Consumer string
// Topic name.
Topic string
// Consumer group name.
Group string
// Topic partition.
Partition int32
// Partition offset of the topic.
Offset int64
// Partition key of the message.
Key []byte
// Message body.
Body []byte
// Message timestamp.
Timestamp time.Time
}

// consumer is implemented by the custom consumers.
type consumer interface {
Handle(ctx context.Context, message Message) error
}

// handler is the Kafka handler for consumer groups to handle message consumption.
type handler struct {
client string
topic string
group string
consumer consumer
}

// Setup is run when the consumer group is being rebalanced to mark the beginning
// of a new session.
func (h *handler) Setup(sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being rebalanced: %s\n", h.group)

return nil
}

// Cleanup is run just before the next rebalance takes place to mark the end of
// the current session.
func (h *handler) Cleanup(sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being cleaned up to start new rebalancing: %s\n", h.group)

return nil
}

// ConsumeClaim starts consuming messages. To avoid re-consuming a message again
// the message is marked as consumed (a.k.a. commiting the offset) on the group.
// Note: If an the consumer handler returns an error, the message will not be
// re-consumed hence, ideally, the message should be sent to a sort of topic for
// investigation or further processing otherwise it will be lost.
// - In case the session context is closed, consumption is terminated to avoid
// `ErrRebalanceInProgress` or `read tcp :: i/o timeout` errors when
// Kafka rebalance. REF: https://github.com/IBM/sarama/issues/1192
// - In case the message channel is closed, consumption is terminated to allow
// Kafka rebalance.
func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
var (
i int64
id = strings.TrimPrefix(session.MemberID(), h.client)
)

for {
select {
case <-session.Context().Done():
return nil
case msg, ok := <-claim.Messages():
if !ok {
log.Printf("consumer group message channel: %s\n", h.group)

return nil
}

i++

message := Message{
Counter: i,
Consumer: id,
Topic: h.topic,
Group: h.group,
Partition: msg.Partition,
Offset: msg.Offset,
Key: msg.Key,
Body: msg.Value,
Timestamp: msg.Timestamp,
}

if err := h.consumer.Handle(session.Context(), message); err != nil {
log.Printf("consumer group message consumption: %s: %s\n", h.group, err)

continue
}

session.MarkMessage(msg, "")
}
}
}

error.go


package api

import (
"context"
"fmt"
"server/kafka"
)

type Error struct{}

func (Error) Handle(ctx context.Context, message kafka.Message) error {
fmt.Printf("ERR - CTR:%-3d | CID:%-36s | PAR:%d | OFF:%-3d | TOP:%-14s | KEY:%s > %s\n",
message.Counter,
message.Consumer,
message.Partition,
message.Offset,
message.Topic,
message.Key,
message.Body,
)

return nil
}

warning.go


package api

import (
"context"
"fmt"
"server/kafka"
)

type Warning struct{}

func (Warning) Handle(ctx context.Context, message kafka.Message) error {
fmt.Printf("WAR - CTR:%-3d | CID:%-36s | PAR:%d | OFF:%-3d | TOP:%-14s | KEY:%s > %s\n",
message.Counter,
message.Consumer,
message.Partition,
message.Offset,
message.Topic,
message.Key,
message.Body,
)

return nil
}

Test


Kurulum ve çalıştırma


boot$ make setup
go run -race main.go
> TOPIC: warnings-topic PARTITION: 1 REPLICA: 1
> TOPIC: errors-topic PARTITION: 3 REPLICA: 1

client$ go run -race main.go
producer ...

server$ go run -race main.go
consumer ...

İstekleri göndermek


boot$ make warning
k6 run k6.js -e SEVERITY=warnings

/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io

execution: local
script: k6.js
output: -

scenarios: (100.00%) 1 scenario, 99 max VUs, 40s max duration (incl. graceful stop):
* default: 99 looping VUs for 10s (gracefulStop: 30s)


running (10.0s), 00/99 VUs, 25005 complete and 0 interrupted iterations
default ✓ [======================================] 99 VUs 10s

✓ 200 OK
✗ response < 100 ms
↳ 99% — ✓ 24826 / ✗ 179

checks.........................: 99.64% ✓ 49831 ✗ 179
data_received..................: 1.9 MB 187 kB/s
data_sent......................: 2.3 MB 227 kB/s
http_req_blocked...............: avg=14.63µs min=1µs med=3µs max=14.44ms p(90)=5µs p(95)=7µs
http_req_connecting............: avg=3.96µs min=0s med=0s max=5.32ms p(90)=0s p(95)=0s
http_req_duration..............: avg=39.41ms min=14.15ms med=36.76ms max=181.17ms p(90)=53.3ms p(95)=61.14ms
{ expected_response:true }...: avg=39.41ms min=14.15ms med=36.76ms max=181.17ms p(90)=53.3ms p(95)=61.14ms
http_req_failed................: 0.00% ✓ 0 ✗ 25005
http_req_receiving.............: avg=47.98µs min=11µs med=23µs max=16.03ms p(90)=64µs p(95)=102µs
http_req_sending...............: avg=23.77µs min=6µs med=12µs max=19.15ms p(90)=26µs p(95)=44.79µs
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=39.34ms min=14.12ms med=36.7ms max=181.13ms p(90)=53.23ms p(95)=61.03ms
http_reqs......................: 25005 2494.229258/s
iteration_duration.............: avg=39.62ms min=14.46ms med=36.98ms max=181.45ms p(90)=53.52ms p(95)=61.4ms
iterations.....................: 25005 2494.229258/s
vus............................: 99 min=99 max=99
vus_max........................: 99 min=99 max=99

boot$ make error
k6 run k6.js -e SEVERITY=errors

/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io

execution: local
script: k6.js
output: -

scenarios: (100.00%) 1 scenario, 99 max VUs, 40s max duration (incl. graceful stop):
* default: 99 looping VUs for 10s (gracefulStop: 30s)


running (10.0s), 00/99 VUs, 23371 complete and 0 interrupted iterations
default ✓ [======================================] 99 VUs 10s

✓ 200 OK
✗ response < 100 ms
↳ 99% — ✓ 23181 / ✗ 190

checks.........................: 99.59% ✓ 46552 ✗ 190
data_received..................: 1.8 MB 175 kB/s
data_sent......................: 2.1 MB 207 kB/s
http_req_blocked...............: avg=68.37µs min=1µs med=3µs max=34.43ms p(90)=5µs p(95)=7µs
http_req_connecting............: avg=13.89µs min=0s med=0s max=14.51ms p(90)=0s p(95)=0s
http_req_duration..............: avg=42.12ms min=16.81ms med=39.39ms max=174.13ms p(90)=58.05ms p(95)=66.14ms
{ expected_response:true }...: avg=42.12ms min=16.81ms med=39.39ms max=174.13ms p(90)=58.05ms p(95)=66.14ms
http_req_failed................: 0.00% ✓ 0 ✗ 23371
http_req_receiving.............: avg=45.33µs min=11µs med=24µs max=8.96ms p(90)=64µs p(95)=97µs
http_req_sending...............: avg=27.32µs min=6µs med=12µs max=8.93ms p(90)=26µs p(95)=43µs
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=42.05ms min=16.7ms med=39.33ms max=174.02ms p(90)=57.94ms p(95)=65.96ms
http_reqs......................: 23371 2330.345029/s
iteration_duration.............: avg=42.36ms min=17.18ms med=39.56ms max=174.53ms p(90)=58.45ms p(95)=66.67ms
iterations.....................: 23371 2330.345029/s
vus............................: 99 min=99 max=99
vus_max........................: 99 min=99 max=99