23/05/2024 - GO, KAFKA
Bu örneği, bir konunun tamamı yerine yalnızca belirli bölümlerinden gelen iletileri tüketmek için kullanabilirsiniz. Tüketiciyi başlattığımızda, tüketim için bir dizi bölüm tanımlarız ancak bunu, basit bir değişiklik olması gereken bölüm numaralarını kabul edecek şekilde geliştirebilirsiniz.
3 bölüm olduğunu varsayalım. $ FROM=0 TO=1 go run -race main.go
komutunu çalıştırmak yalnızca 0 ve 1 numaralı bölümdeki mesajları tüketir.
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"server/consumer"
"github.com/IBM/sarama"
)
func main() {
log.Println("starting ...")
// Get applicatio specific consumer properties
from, ok := os.LookupEnv("FROM")
if !ok {
log.Fatalln("partition from `FROM` env variable is not set")
}
to, ok := os.LookupEnv("TO")
if !ok {
log.Fatalln("partition to `TO` env variable is not set")
}
// Set up signal notification context
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
// Set topic and group
var (
topic = "errors-topic"
group = topic + "-group"
)
// Create consumer config
config := sarama.NewConfig()
config.ClientID = "inanzzz-server"
config.Version = sarama.V3_3_2_0
config.Metadata.AllowAutoTopicCreation = false
config.Consumer.Return.Errors = true
config.Consumer.Retry.Backoff = time.Second * 3
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// Create consumer group
con, err := sarama.NewConsumerGroup([]string{":9093"}, group, config)
if err != nil {
log.Println(err)
return
}
defer con.Close()
handler := consumer.New(from, to)
go func() {
for {
err := con.Consume(ctx, []string{topic}, handler)
switch {
case err != nil:
log.Println(err)
return
case ctx.Err() != nil:
log.Println(ctx.Err())
return
}
}
}()
// Wait for close signal and shutdown after a graceful period
<-ctx.Done()
log.Println("termination signal received")
con.PauseAll()
log.Println("paused consumer")
<-time.After(time.Second * 5)
log.Println("exiting ...")
}
package consumer
import (
"fmt"
"log"
"os"
"github.com/IBM/sarama"
)
type Consumer struct {
from int32
to int32
}
func New(from, to string) *Consumer {
con := &Consumer{}
fmt.Sscan(from, &con.from)
fmt.Sscan(to, &con.to)
return con
}
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
log.Println("consumer group is being rebalanced")
return nil
}
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
log.Println("consumer group is being cleaned up to start new rebalancing")
return nil
}
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
var i int64
for {
select {
case <-session.Context().Done():
log.Println("message session is closed")
return nil
case msg, ok := <-claim.Messages():
if !ok {
log.Println("message channel is closed")
return nil
}
if msg.Partition < c.from || msg.Partition > c.to {
continue
}
i++
if err := c.log(i, msg.Partition, msg.Value); err != nil {
fmt.Println(err)
}
session.MarkMessage(msg, "")
}
}
}
// This is a dirty piece of code for just debugging purposes!
func (c *Consumer) log(counter int64, partition int32, body []byte) error {
path := fmt.Sprintf("%d.log", partition)
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
if _, err := file.WriteString(fmt.Sprintf("%-4d - BODY:%s\n", counter, body)); err != nil {
return err
}
return nil
}