23/05/2024 - GO, KAFKA
Bu örnekte, aynı tüketici uygulamasını birden fazla makinede/sunucuda/örnekte çalıştırırken Sarama tüketici gruplarının nasıl davrandığını kontrol edeceğiz. Bu test 3 bölümden oluşan bir konuya dayanmaktadır. Bölüm atamasının nasıl değiştiğini görmek için giderek daha fazla uygulama örneği çalıştıracağım.
Burada gördüğümüz şey, aynı tüketici grubunun birden fazla örneğini çalıştırırsanız konu bölümlerinin birbirleri arasında paylaşılacağıdır. Bu, yinelenen ileti tüketimini önleyecektir. Ayrıca, bir bulut sunucusunu durdurursanız gözetimsiz bölüm, kalan bulut sunucularından birine otomatik olarak atanacaktır. Bölüm sayısından daha fazla örnek çalıştırırsanız, etkin bir örneğin kapanması durumunda örnekler beklemede bekleyecektir. Davranışın nasıl değiştiğini görmek için config.Consumer.Group.Rebalance.GroupStrategies
ile oynayabilirsiniz.
Bu, aşağıdaki günlüklerde onaylayabileceğiniz örnekler arasındaki bölüm ataması değişikliğinin genel görünümüdür.
// instance a logs
2024/05/23 13:06:11 starting ...
2024/05/23 13:06:14 consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
// instance a logs
message session is closed
message session is closed
message session is closed
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
// instance b logs
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
// instance a logs
message session is closed
message session is closed
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
// instance b logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
// instance c logs
consumer group is being rebalanced (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
Bu durum b örneğinin üyesini boşta kalan bir tüketici olarak uykuya yatırdı.
// instance a logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
// instance b logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
// instance c logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
consumer group is being rebalanced (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
// instance d logs
consumer group is being rebalanced (app:d - mem:inanzzz-server-0986aabd-e299-461b-8423-70ca8dbeac3c)
Bu, boşta olan b örneğini, durdurulmuş a örneğinin konumunu almak üzere uyandırarak yukarıdaki noktayı kanıtlar.
// instance a logs
termination signal received
message session is closed
paused consumer
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
context canceled
exiting ...
// instance b logs
consumer group is being cleaned up to start new rebalancing (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
// instance c logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
consumer group is being rebalanced (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
// instance d logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:d - mem:inanzzz-server-0986aabd-e299-461b-8423-70ca8dbeac3c)
consumer group is being rebalanced (app:d - mem:inanzzz-server-0986aabd-e299-461b-8423-70ca8dbeac3c)
Terminalde INS=[a/b/c/d....] go run -race main.go
olarak çalıştırın.
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"server/consumer"
"github.com/IBM/sarama"
)
func main() {
log.Println("starting ...")
// Set up signal notification context
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
// Set topic and group
var (
topic = "errors-topic"
group = topic + "group"
)
// Create consumer config
config := sarama.NewConfig()
config.ClientID = "inanzzz-server"
config.Version = sarama.V3_3_2_0
config.Metadata.AllowAutoTopicCreation = false
config.Consumer.Return.Errors = true
config.Consumer.Retry.Backoff = time.Second * 3
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// Create consumer group
con, err := sarama.NewConsumerGroup([]string{":9093"}, group, config)
if err != nil {
log.Println(err)
return
}
defer con.Close()
handler := &consumer.Consumer{Instance: os.Getenv("INS")}
go func() {
for {
err := con.Consume(ctx, []string{topic}, handler)
switch {
case err != nil:
log.Println(err)
return
case ctx.Err() != nil:
log.Println(ctx.Err())
return
}
}
}()
// Wait for close signal and shutdown after a graceful period
<-ctx.Done()
log.Println("termination signal received")
con.PauseAll()
log.Println("paused consumer")
<-time.After(time.Second * 5)
log.Println("exiting ...")
}
package consumer
import (
"fmt"
"log"
"os"
"github.com/IBM/sarama"
)
type Consumer struct {
Instance string
}
func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being rebalanced (app:%s - mem:%s)\n",
c.Instance,
session.MemberID(),
)
return nil
}
func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being cleaned up to start new rebalancing (app:%s - mem:%s)\n",
c.Instance,
session.MemberID(),
)
return nil
}
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
var i int64
for {
select {
case <-session.Context().Done():
log.Println("message session is closed")
return nil
case msg, ok := <-claim.Messages():
if !ok {
log.Println("message channel is closed")
return nil
}
i++
if err := c.log(i, msg.Partition, msg.Offset); err != nil {
fmt.Println(err)
}
session.MarkMessage(msg, "")
}
}
}
// This is a dirty piece for debugging purposes. Remove it as you wish.
func (c *Consumer) log(counter int64, partition int32, offset int64) error {
path := fmt.Sprintf("%s-%d.log", c.Instance, partition)
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
if _, err := file.WriteString(fmt.Sprintf("%-4d - %d\n", counter, offset)); err != nil {
return err
}
return nil
}