Skip to content

Instantly share code, notes, and snippets.

@nilsmagnus
Created February 15, 2017 08:04
Show Gist options
  • Save nilsmagnus/4b582f9a36279bff5f8f9d453f8fb9c4 to your computer and use it in GitHub Desktop.
Save nilsmagnus/4b582f9a36279bff5f8f9d453f8fb9c4 to your computer and use it in GitHub Desktop.
Example of go consuming from kafka, using the shopify/sarama library
package main
import (
"fmt"
"github.com/Shopify/sarama"
"os"
"os/signal"
"strings"
)
func main() {
config := sarama.NewConfig()
config.ClientID = "go-kafka-consumer"
config.Consumer.Return.Errors = true
brokers := []string{"localhost:9092"}
// Create new consumer
master, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
topics, _ := master.Topics()
consumer, errors := consume(topics, master)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// Count how many message processed
msgCount := 0
// Get signnal for finish
doneCh := make(chan struct{})
go func() {
for {
select {
case msg := <-consumer:
msgCount++
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
case consumerError := <-errors:
msgCount++
fmt.Println("Received consumerError ", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
doneCh <- struct{}{}
case <-signals:
fmt.Println("Interrupt is detected")
doneCh <- struct{}{}
}
}
}()
<-doneCh
fmt.Println("Processed", msgCount, "messages")
}
func consume(topics []string, master sarama.Consumer) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError) {
consumers := make(chan *sarama.ConsumerMessage)
errors := make(chan *sarama.ConsumerError)
for _, topic := range topics {
if strings.Contains(topic, "__consumer_offsets") {
continue
}
partitions, _ := master.Partitions(topic)
// this only consumes partition no 1, you would probably want to consume all partitions
consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
if nil != err {
fmt.Printf("Topic %v Partitions: %v", topic, partitions)
panic(err)
}
fmt.Println(" Start consuming topic ", topic)
go func(topic string, consumer sarama.PartitionConsumer) {
for {
select {
case consumerError := <-consumer.Errors():
errors <- consumerError
fmt.Println("consumerError: ", consumerError.Err)
case msg := <-consumer.Messages():
consumers <- msg
fmt.Println("Got message on topic ", topic, msg.Value)
}
}
}(topic, consumer)
}
return consumers, errors
}
@tarekbadrsh
Copy link

thanks for your help

@rcoelho6
Copy link

I am using a similar solution, but it doesn't remove the message from kafka. so every time it is rebooted, it consume all mensages over again.
Do you have any solution for that?

@nilsmagnus
Copy link
Author

nilsmagnus commented Aug 22, 2019

@rcoelho6

I am using a similar solution, but it doesn't remove the message from kafka. so every time it is rebooted, it consume all mensages over again.
Do you have any solution for that?

The code above is probably not a good example, but what you want to do is a "commit" with the consumer after reading and processing your messages and when you restart your application, start reading from the latest commit-offset. I suspect line 74 should be modified with a different option, sarama.OldestOffset to something like sarama.LatestCommittedOffset(if such an option exist)

@rcoelho6
Copy link

@rcoelho6

I am using a similar solution, but it doesn't remove the message from kafka. so every time it is rebooted, it consume all mensages over again.
Do you have any solution for that?

The code above is probably not a good example, but what you want to do is a "commit" with the consumer after reading and processing your messages and when you restart your application, start reading from the latest commit-offset. I suspect line 74 should be modified with a different option, sarama.OldestOffset to something like sarama.LatestCommittedOffset(if such an option exist)

The constants of Sarama is only:

const (
// OffsetNewest stands for the log head offset, i.e. the offset that will be
// assigned to the next message that will be produced to the partition. You
// can send this to a client's GetOffset method to get this offset, or when
// calling ConsumePartition to start consuming new messages.
OffsetNewest int64 = -1
// OffsetOldest stands for the oldest offset available on the broker for a
// partition. You can send this to a client's GetOffset method to get this
// offset, or when calling ConsumePartition to start consuming from the
// oldest offset that is still available on the broker.
OffsetOldest int64 = -2
)

I tried to use the constant GroupGenerationUndefined but also did not work.

const GroupGenerationUndefined = -1

@cikupin
Copy link

cikupin commented Nov 20, 2020

Guys, still no solution for this commit case?

@d1egoaz
Copy link

d1egoaz commented Feb 11, 2021

You should use a consumer group https://github.com/Shopify/sarama/tree/master/examples/consumergroup if you're interested in continuing consuming from the previous position.

There are some specific cases for the low-level consumer mentioned on this gist, which might or might not work for you, most people should use the sarama consumer group, to let sarama manage the offsets, claims, consumer group rebalances, etc.

The low-level consumer does not create a consumer group, hence the Kafka server is unaware of the client position.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment