-
-
Save nilsmagnus/4b582f9a36279bff5f8f9d453f8fb9c4 to your computer and use it in GitHub Desktop.
package main | |
import ( | |
"fmt" | |
"github.com/Shopify/sarama" | |
"os" | |
"os/signal" | |
"strings" | |
) | |
func main() { | |
config := sarama.NewConfig() | |
config.ClientID = "go-kafka-consumer" | |
config.Consumer.Return.Errors = true | |
brokers := []string{"localhost:9092"} | |
// Create new consumer | |
master, err := sarama.NewConsumer(brokers, config) | |
if err != nil { | |
panic(err) | |
} | |
defer func() { | |
if err := master.Close(); err != nil { | |
panic(err) | |
} | |
}() | |
topics, _ := master.Topics() | |
consumer, errors := consume(topics, master) | |
signals := make(chan os.Signal, 1) | |
signal.Notify(signals, os.Interrupt) | |
// Count how many message processed | |
msgCount := 0 | |
// Get signnal for finish | |
doneCh := make(chan struct{}) | |
go func() { | |
for { | |
select { | |
case msg := <-consumer: | |
msgCount++ | |
fmt.Println("Received messages", string(msg.Key), string(msg.Value)) | |
case consumerError := <-errors: | |
msgCount++ | |
fmt.Println("Received consumerError ", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err) | |
doneCh <- struct{}{} | |
case <-signals: | |
fmt.Println("Interrupt is detected") | |
doneCh <- struct{}{} | |
} | |
} | |
}() | |
<-doneCh | |
fmt.Println("Processed", msgCount, "messages") | |
} | |
func consume(topics []string, master sarama.Consumer) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError) { | |
consumers := make(chan *sarama.ConsumerMessage) | |
errors := make(chan *sarama.ConsumerError) | |
for _, topic := range topics { | |
if strings.Contains(topic, "__consumer_offsets") { | |
continue | |
} | |
partitions, _ := master.Partitions(topic) | |
// this only consumes partition no 1, you would probably want to consume all partitions | |
consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest) | |
if nil != err { | |
fmt.Printf("Topic %v Partitions: %v", topic, partitions) | |
panic(err) | |
} | |
fmt.Println(" Start consuming topic ", topic) | |
go func(topic string, consumer sarama.PartitionConsumer) { | |
for { | |
select { | |
case consumerError := <-consumer.Errors(): | |
errors <- consumerError | |
fmt.Println("consumerError: ", consumerError.Err) | |
case msg := <-consumer.Messages(): | |
consumers <- msg | |
fmt.Println("Got message on topic ", topic, msg.Value) | |
} | |
} | |
}(topic, consumer) | |
} | |
return consumers, errors | |
} |
I am using a similar solution, but it doesn't remove the message from kafka. so every time it is rebooted, it consume all mensages over again.
Do you have any solution for that?
I am using a similar solution, but it doesn't remove the message from kafka. so every time it is rebooted, it consume all mensages over again.
Do you have any solution for that?
The code above is probably not a good example, but what you want to do is a "commit" with the consumer after reading and processing your messages and when you restart your application, start reading from the latest commit-offset. I suspect line 74 should be modified with a different option, sarama.OldestOffset to something like sarama.LatestCommittedOffset(if such an option exist)
I am using a similar solution, but it doesn't remove the message from kafka. so every time it is rebooted, it consume all mensages over again.
Do you have any solution for that?The code above is probably not a good example, but what you want to do is a "commit" with the consumer after reading and processing your messages and when you restart your application, start reading from the latest commit-offset. I suspect line 74 should be modified with a different option, sarama.OldestOffset to something like sarama.LatestCommittedOffset(if such an option exist)
The constants of Sarama is only:
const (
// OffsetNewest stands for the log head offset, i.e. the offset that will be
// assigned to the next message that will be produced to the partition. You
// can send this to a client's GetOffset method to get this offset, or when
// calling ConsumePartition to start consuming new messages.
OffsetNewest int64 = -1
// OffsetOldest stands for the oldest offset available on the broker for a
// partition. You can send this to a client's GetOffset method to get this
// offset, or when calling ConsumePartition to start consuming from the
// oldest offset that is still available on the broker.
OffsetOldest int64 = -2
)
I tried to use the constant GroupGenerationUndefined but also did not work.
const GroupGenerationUndefined = -1
Guys, still no solution for this commit case?
You should use a consumer group https://github.com/Shopify/sarama/tree/master/examples/consumergroup if you're interested in continuing consuming from the previous position.
There are some specific cases for the low-level consumer mentioned on this gist, which might or might not work for you, most people should use the sarama consumer group, to let sarama manage the offsets, claims, consumer group rebalances, etc.
The low-level consumer does not create a consumer group, hence the Kafka server is unaware of the client position.
thanks for your help