17/05/2024 - DOCKER, GO, KAFKA
Bu örnek, uygulamalarınızın kullanması için bir Kafka ortamını nasıl kurabileceğinizi gösterir. Zookeeper ve Kafka kullanıcı arayüzü de olacak. Ayrıca bölümlü test konuları oluşturmak için Golang uygulamasını kullanacağım.
version: "3.7"
services:
kafka-zookeeper:
container_name: kafka-zookeeper
image: "bitnami/zookeeper:3.8.4"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka-broker:
container_name: kafka-brooker
image: "bitnami/kafka:3.3.2"
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=kafka-zookeeper:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-broker:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- kafka-zookeeper
kafka-ui:
container_name: kafka-ui
image: "provectuslabs/kafka-ui:latest"
ports:
- "8080:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-broker:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=kafka-zookeeper:2181
- KAFKA_BROKERCONNECT=kafka-broker:9092
depends_on:
- kafka-zookeeper
- kafka-broker
package main
import (
"fmt"
"log"
"github.com/IBM/sarama"
)
type topic struct {
name string
partition int32
replication int16
settings map[string]*string
}
var (
messageRetentionMinute = "60000"
messageRetentionHour = "3600000"
)
var topics = []topic{{
name: "warnings-topic",
partition: 1,
replication: 1,
settings: map[string]*string{
"retention.ms": &messageRetentionMinute,
"delete.retention.ms": &messageRetentionMinute,
},
}, {
name: "errors-topic",
partition: 3,
replication: 1,
settings: map[string]*string{
"retention.ms": &messageRetentionHour,
"delete.retention.ms": &messageRetentionHour,
},
}}
func main() {
config := sarama.NewConfig()
config.Version = sarama.V3_3_2_0
clusterAdmin, err := sarama.NewClusterAdmin([]string{":9093"}, config)
if err != nil {
log.Fatalln(err)
}
defer clusterAdmin.Close()
clusterTopics, err := clusterAdmin.ListTopics()
if err != nil {
log.Fatalln(err)
}
for _, topic := range topics {
if _, ok := clusterTopics[topic.name]; ok {
continue
}
err := clusterAdmin.CreateTopic(topic.name, &sarama.TopicDetail{
NumPartitions: topic.partition,
ReplicationFactor: topic.replication,
ConfigEntries: topic.settings,
}, false)
if err != nil {
log.Println(err)
continue
}
fmt.Printf("> TOPIC: %s PARTITION: %d REPLICA: %d\n", topic.name, topic.partition, topic.replication)
}
}
setup:
go run -race main.go
docker:
docker system prune --volumes --force
docker-compose up
Her şeyin kurulumunu tamamlamak için ilk önce $ make docker
ve $ make setup
'ı çalıştırın. İşiniz bittiğinde kullanıcı arayüzü için http://0.0.0.0:8080/
adresini ziyaret edin.
$ docker compose ps
NAME COMMAND SERVICE STATUS PORTS
kafka-brooker "/opt/bitnami/script…" kafka-broker running 0.0.0.0:9092-9093->9092-9093/tcp
kafka-ui "/bin/sh -c 'java --…" kafka-ui running 0.0.0.0:8080->8080/tcp
kafka-zookeeper "/opt/bitnami/script…" kafka-zookeeper running 0.0.0.0:2181->2181/tcp