Using this example will spin up consumers for each partitions for the given topic. Just bare in mind, if the consumers restart, same message will be consumed again. To avoid this issue, you can use consumer groups instead.

type Kafka struct {
config *sarama.Config
brokers []string

func (k Kafka) CreateConsumer(topic string) error {
cons, err := sarama.NewConsumer(k.brokers, k.config)
if err != nil {
return err
defer cons.Close()

parts, err := cons.Partitions(topic)
if err != nil {
return err

for _, part := range parts {
pcon, err := cons.ConsumePartition(topic, part, sarama.OffsetOldest)
if err != nil {
return err
defer pcon.Close()

go func(part int32) {
for msg := range pcon.Messages() {
fmt.Printf("PARTITION_CONSUMER:%d - PARTITION:%d - OFFSET:%d\n", part, msg.Partition, msg.Offset)


return nil