23/05/2024 - GO, KAFKA
In this example we are going to check how Sarama consumer groups behave when running same consumer application in multiple machines/servers/instances. This test is based on a topic with 3 partitions. I will progressively run more and more application instances to see how partition assignment change.
What we see here is that, if you run multiple instances of same consumer group, topic partitions will be shared between each other. This will prevent duplicate message consumption. Also, if you stop an instance, unattended partition will be auto assigned to one of the remaining instances. If you run more instances than partition count, instances will wait in stand by just in case an active instance goes down. You can play with config.Consumer.Group.Rebalance.GroupStrategies
to see how the behaviour changes.
This is the overall view of partition assignment change between instances which you can confirm in logs below.
// instance a logs
2024/05/23 13:06:11 starting ...
2024/05/23 13:06:14 consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
// instance a logs
message session is closed
message session is closed
message session is closed
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
// instance b logs
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
// instance a logs
message session is closed
message session is closed
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
// instance b logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
// instance c logs
consumer group is being rebalanced (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
This has put instance b's member in sleep as an idle consumer.
// instance a logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
// instance b logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
// instance c logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
consumer group is being rebalanced (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
// instance d logs
consumer group is being rebalanced (app:d - mem:inanzzz-server-0986aabd-e299-461b-8423-70ca8dbeac3c)
This proves the point above by waking up idle instance b to take the position of stopped instance a.
// instance a logs
termination signal received
message session is closed
paused consumer
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
context canceled
exiting ...
// instance b logs
consumer group is being cleaned up to start new rebalancing (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
// instance c logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
consumer group is being rebalanced (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
// instance d logs
message session is closed
consumer group is being cleaned up to start new rebalancing (app:d - mem:inanzzz-server-0986aabd-e299-461b-8423-70ca8dbeac3c)
consumer group is being rebalanced (app:d - mem:inanzzz-server-0986aabd-e299-461b-8423-70ca8dbeac3c)
Run it as INS=[a/b/c/d....] go run -race main.go
in terminal.
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"server/consumer"
"github.com/IBM/sarama"
)
func main() {
log.Println("starting ...")
// Set up signal notification context
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
// Set topic and group
var (
topic = "errors-topic"
group = topic + "group"
)
// Create consumer config
config := sarama.NewConfig()
config.ClientID = "inanzzz-server"
config.Version = sarama.V3_3_2_0
config.Metadata.AllowAutoTopicCreation = false
config.Consumer.Return.Errors = true
config.Consumer.Retry.Backoff = time.Second * 3
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// Create consumer group
con, err := sarama.NewConsumerGroup([]string{":9093"}, group, config)
if err != nil {
log.Println(err)
return
}
defer con.Close()
handler := &consumer.Consumer{Instance: os.Getenv("INS")}
go func() {
for {
err := con.Consume(ctx, []string{topic}, handler)
switch {
case err != nil:
log.Println(err)
return
case ctx.Err() != nil:
log.Println(ctx.Err())
return
}
}
}()
// Wait for close signal and shutdown after a graceful period
<-ctx.Done()
log.Println("termination signal received")
con.PauseAll()
log.Println("paused consumer")
<-time.After(time.Second * 5)
log.Println("exiting ...")
}
package consumer
import (
"fmt"
"log"
"os"
"github.com/IBM/sarama"
)
type Consumer struct {
Instance string
}
func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being rebalanced (app:%s - mem:%s)\n",
c.Instance,
session.MemberID(),
)
return nil
}
func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being cleaned up to start new rebalancing (app:%s - mem:%s)\n",
c.Instance,
session.MemberID(),
)
return nil
}
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
var i int64
for {
select {
case <-session.Context().Done():
log.Println("message session is closed")
return nil
case msg, ok := <-claim.Messages():
if !ok {
log.Println("message channel is closed")
return nil
}
i++
if err := c.log(i, msg.Partition, msg.Offset); err != nil {
fmt.Println(err)
}
session.MarkMessage(msg, "")
}
}
}
// This is a dirty piece for debugging purposes. Remove it as you wish.
func (c *Consumer) log(counter int64, partition int32, offset int64) error {
path := fmt.Sprintf("%s-%d.log", c.Instance, partition)
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
if _, err := file.WriteString(fmt.Sprintf("%-4d - %d\n", counter, offset)); err != nil {
return err
}
return nil
}