You can use this example to consume messages only from specific partitions of a topic rather than all. When we start the consumer, we define a range of partitions for consumption however, you can enhance it to accept partition numbers instead which should be a simple change.


Consumer


Assume there is 3 partitions. Running $ FROM=0 TO=1 go run -race main.go command will consume messages in only partition 0 and 1.


package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"

"server/consumer"

"github.com/IBM/sarama"
)

func main() {
log.Println("starting ...")

// Get applicatio specific consumer properties
from, ok := os.LookupEnv("FROM")
if !ok {
log.Fatalln("partition from `FROM` env variable is not set")
}
to, ok := os.LookupEnv("TO")
if !ok {
log.Fatalln("partition to `TO` env variable is not set")
}

// Set up signal notification context
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

// Set topic and group
var (
topic = "errors-topic"
group = topic + "-group"
)

// Create consumer config
config := sarama.NewConfig()
config.ClientID = "inanzzz-server"
config.Version = sarama.V3_3_2_0
config.Metadata.AllowAutoTopicCreation = false
config.Consumer.Return.Errors = true
config.Consumer.Retry.Backoff = time.Second * 3
config.Consumer.Offsets.Initial = sarama.OffsetOldest

// Create consumer group
con, err := sarama.NewConsumerGroup([]string{":9093"}, group, config)
if err != nil {
log.Println(err)

return
}
defer con.Close()

handler := consumer.New(from, to)

go func() {
for {
err := con.Consume(ctx, []string{topic}, handler)
switch {
case err != nil:
log.Println(err)

return
case ctx.Err() != nil:
log.Println(ctx.Err())

return
}
}
}()

// Wait for close signal and shutdown after a graceful period
<-ctx.Done()
log.Println("termination signal received")

con.PauseAll()
log.Println("paused consumer")

<-time.After(time.Second * 5)
log.Println("exiting ...")
}

package consumer

import (
"fmt"
"log"
"os"

"github.com/IBM/sarama"
)

type Consumer struct {
from int32
to int32
}

func New(from, to string) *Consumer {
con := &Consumer{}

fmt.Sscan(from, &con.from)
fmt.Sscan(to, &con.to)

return con
}

func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
log.Println("consumer group is being rebalanced")

return nil
}

func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
log.Println("consumer group is being cleaned up to start new rebalancing")

return nil
}

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
var i int64

for {
select {
case <-session.Context().Done():
log.Println("message session is closed")

return nil
case msg, ok := <-claim.Messages():
if !ok {
log.Println("message channel is closed")

return nil
}

if msg.Partition < c.from || msg.Partition > c.to {
continue
}

i++

if err := c.log(i, msg.Partition, msg.Value); err != nil {
fmt.Println(err)
}

session.MarkMessage(msg, "")
}
}
}

// This is a dirty piece of code for just debugging purposes!
func (c *Consumer) log(counter int64, partition int32, body []byte) error {
path := fmt.Sprintf("%d.log", partition)

file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()

if _, err := file.WriteString(fmt.Sprintf("%-4d - BODY:%s\n", counter, body)); err != nil {
return err
}

return nil
}

Result