In this example we are going to check how Sarama consumer groups behave when running same consumer application in multiple machines/servers/instances. This test is based on a topic with 3 partitions. I will progressively run more and more application instances to see how partition assignment change.


What we see here is that, if you run multiple instances of same consumer group, topic partitions will be shared between each other. This will prevent duplicate message consumption. Also, if you stop an instance, unattended partition will be auto assigned to one of the remaining instances. If you run more instances than partition count, instances will wait in stand by just in case an active instance goes down. You can play with config.Consumer.Group.Rebalance.GroupStrategies to see how the behaviour changes.


This is the overall view of partition assignment change between instances which you can confirm in logs below.



Logs


Only instance a is running


// instance a logs

2024/05/23 13:06:11 starting ...
2024/05/23 13:06:14 consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)


Run instance b as well


// instance a logs

message session is closed
message session is closed
message session is closed
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)

// instance b logs

consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)


Run instance c as well


// instance a logs

message session is closed
message session is closed
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)

// instance b logs

message session is closed
consumer group is being cleaned up to start new rebalancing (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)

// instance c logs

consumer group is being rebalanced (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)


Run instance d as well


This has put instance b's member in sleep as an idle consumer.


// instance a logs

message session is closed
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
consumer group is being rebalanced (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)

// instance b logs

message session is closed
consumer group is being cleaned up to start new rebalancing (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)

// instance c logs

message session is closed
consumer group is being cleaned up to start new rebalancing (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
consumer group is being rebalanced (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)

// instance d logs

consumer group is being rebalanced (app:d - mem:inanzzz-server-0986aabd-e299-461b-8423-70ca8dbeac3c)


Stop instance a


This proves the point above by waking up idle instance b to take the position of stopped instance a.


// instance a logs

termination signal received
message session is closed
paused consumer
consumer group is being cleaned up to start new rebalancing (app:a - mem:inanzzz-server-969a9641-32b5-4644-b0fa-f953f4b89e56)
context canceled
exiting ...

// instance b logs

consumer group is being cleaned up to start new rebalancing (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)
consumer group is being rebalanced (app:b - mem:inanzzz-server-a88f19a9-7008-43aa-9519-55bb6d9d4153)

// instance c logs

message session is closed
consumer group is being cleaned up to start new rebalancing (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)
consumer group is being rebalanced (app:c - mem:inanzzz-server-1c9a3fa4-6f7b-413b-afe4-7585c88e311e)

// instance d logs

message session is closed
consumer group is being cleaned up to start new rebalancing (app:d - mem:inanzzz-server-0986aabd-e299-461b-8423-70ca8dbeac3c)
consumer group is being rebalanced (app:d - mem:inanzzz-server-0986aabd-e299-461b-8423-70ca8dbeac3c)


Example


Run it as INS=[a/b/c/d....] go run -race main.go in terminal.


package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"

"server/consumer"

"github.com/IBM/sarama"
)

func main() {
log.Println("starting ...")

// Set up signal notification context
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

// Set topic and group
var (
topic = "errors-topic"
group = topic + "group"
)

// Create consumer config
config := sarama.NewConfig()
config.ClientID = "inanzzz-server"
config.Version = sarama.V3_3_2_0
config.Metadata.AllowAutoTopicCreation = false
config.Consumer.Return.Errors = true
config.Consumer.Retry.Backoff = time.Second * 3
config.Consumer.Offsets.Initial = sarama.OffsetOldest

// Create consumer group
con, err := sarama.NewConsumerGroup([]string{":9093"}, group, config)
if err != nil {
log.Println(err)

return
}
defer con.Close()

handler := &consumer.Consumer{Instance: os.Getenv("INS")}

go func() {
for {
err := con.Consume(ctx, []string{topic}, handler)
switch {
case err != nil:
log.Println(err)

return
case ctx.Err() != nil:
log.Println(ctx.Err())

return
}
}
}()

// Wait for close signal and shutdown after a graceful period
<-ctx.Done()
log.Println("termination signal received")

con.PauseAll()
log.Println("paused consumer")

<-time.After(time.Second * 5)
log.Println("exiting ...")
}

package consumer

import (
"fmt"
"log"
"os"

"github.com/IBM/sarama"
)

type Consumer struct {
Instance string
}

func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being rebalanced (app:%s - mem:%s)\n",
c.Instance,
session.MemberID(),
)

return nil
}

func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
log.Printf("consumer group is being cleaned up to start new rebalancing (app:%s - mem:%s)\n",
c.Instance,
session.MemberID(),
)

return nil
}

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
var i int64

for {
select {
case <-session.Context().Done():
log.Println("message session is closed")

return nil
case msg, ok := <-claim.Messages():
if !ok {
log.Println("message channel is closed")

return nil
}

i++

if err := c.log(i, msg.Partition, msg.Offset); err != nil {
fmt.Println(err)
}

session.MarkMessage(msg, "")
}
}
}

// This is a dirty piece for debugging purposes. Remove it as you wish.
func (c *Consumer) log(counter int64, partition int32, offset int64) error {
path := fmt.Sprintf("%s-%d.log", c.Instance, partition)

file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()

if _, err := file.WriteString(fmt.Sprintf("%-4d - %d\n", counter, offset)); err != nil {
return err
}

return nil
}