Skip to content

Instantly share code, notes, and snippets.

@mhowlett
Created January 28, 2018 01:39
Show Gist options
  • Save mhowlett/e9491aad29817aeda6003c3404874b35 to your computer and use it in GitHub Desktop.
Save mhowlett/e9491aad29817aeda6003c3404874b35 to your computer and use it in GitHub Desktop.
kafka go client performance testing
package main
import (
"context"
"flag"
"math/rand"
crand "crypto/rand"
"os"
"time"
"log"
"strings"
"fmt"
"io"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/Shopify/sarama"
kafkago "github.com/segmentio/kafka-go"
)
var (
client string
mode string
brokers string
topic string
partition int
msgSize int
numMessages int
value []byte
)
func newUUID() (string, error) {
uuid := make([]byte, 16)
n, err := io.ReadFull(crand.Reader, uuid)
if n != len(uuid) || err != nil {
return "", err
}
uuid[8] = uuid[8]&^0xc0 | 0x80
uuid[6] = uuid[6]&^0xf0 | 0x40
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
}
func consumeConfluentKafkaGo() {
// !poll -> ~150k/s
// poll -> ~170k/s
var poll = true
group, _ := newUUID()
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": group,
"session.timeout.ms": 6000,
"go.events.channel.enable": !poll,
"go.application.rebalance.enable": false,
"enable.auto.commit": false,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
})
if err != nil {
log.Printf("could not set up kafka consumer: %s", err.Error())
os.Exit(1)
}
c.Assign([]kafka.TopicPartition{ kafka.TopicPartition{Topic: &topic, Partition: int32(partition), Offset: 0} } )
var start = time.Now()
if poll {
var msgCount = 0
for msgCount < numMessages {
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
msgCount++
break
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
os.Exit(1)
default:
fmt.Printf("Ignored %v\n", e)
os.Exit(1)
}
}
} else {
var msgCount = 0;
for msgCount < numMessages {
select {
case ev := <-c.Events():
switch e := ev.(type) {
case *kafka.Message:
msgCount++
break
case kafka.PartitionEOF:
log.Printf("%% Reached %v\n", e)
os.Exit(1)
case kafka.Error:
log.Printf("%% Error: %v\n", e)
os.Exit(1)
}
}
}
}
elapsed := time.Since(start)
log.Printf("[conflunet-kafka-go consumer] msg/s: %f", (float64(numMessages) / elapsed.Seconds()))
}
func produceConfluentKafkaGo() {
// ~380k/s
var p, err = kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": brokers, "linger.ms": 100})
if err != nil {
log.Printf("could not set up kafka producer: %s", err.Error())
os.Exit(1)
}
done := make(chan bool)
go func() {
var msgCount int
for e := range p.Events() {
msg := e.(*kafka.Message);
if msg.TopicPartition.Error != nil {
log.Printf("delivery report error: %v", msg.TopicPartition.Error)
os.Exit(1)
}
msgCount++
if msgCount >= numMessages {
done <- true
}
}
}()
defer p.Close()
var start = time.Now()
for j := 0; j < numMessages; j++ {
p.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: int32(partition)}, Value: value}
}
<-done
elapsed := time.Since(start)
log.Printf("[confluent-kafka-go producer] msg/s: %f", (float64(numMessages) / elapsed.Seconds()))
}
func consumeSarama() {
// ~1.1M/s
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Consumer.Return.Errors = true
brokers := []string{brokers}
// Create new consumer
master, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer master.Close();
consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
msgCount := 0
var start = time.Now()
doneCh := make(chan bool)
go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case <-consumer.Messages():
// per-partition errors?
msgCount++
if (msgCount >= numMessages) {
doneCh <- true
}
}
}
}()
<-doneCh
elapsed := time.Since(start)
log.Printf("[sarama consumer] msg/s: %f", (float64(numMessages) / elapsed.Seconds()))
}
func produceSarama() {
// ~410k/s
conf := sarama.NewConfig()
conf.Version = sarama.V1_0_0_0
conf.Producer.Return.Successes = true
conf.Producer.Flush.Frequency = time.Duration(100)*time.Millisecond
// the default 1000000 results in request-too-large errors.
sarama.MaxRequestSize = 999000
var p, err = sarama.NewAsyncProducer(strings.Split(brokers, ","), conf)
if err != nil {
log.Printf("could not set up kafka producer: %s", err.Error())
os.Exit(1)
}
done := make(chan bool)
go func() {
var nomessages int
for _ = range p.Successes() {
nomessages++
if nomessages%numMessages == 0 {
done <- true
}
}
}()
go func() {
for err := range p.Errors() {
log.Printf("failed to deliver message: %s", err.Error())
os.Exit(1)
}
}()
defer func() {
err := p.Close()
if err != nil {
log.Printf("failed to close producer: %s", err.Error())
os.Exit(1)
}
}()
var start = time.Now()
for j := 0; j < numMessages; j++ {
p.Input() <- &sarama.ProducerMessage{Topic: topic, Partition: int32(partition), Value: sarama.ByteEncoder(value)}
}
<-done
elapsed := time.Since(start)
log.Printf("[sarama producer] msg/s: %f", (float64(numMessages) / elapsed.Seconds()))
}
func produceKafkaGo() {
// this apparently produces synchronously and is very slow - is there a way to
// produce synchronously.
w := kafkago.NewWriter(kafkago.WriterConfig{
Brokers: []string{brokers},
Topic: topic,
// match other clients:
Balancer: &kafkago.Hash{},
BatchTimeout: time.Duration(100) * time.Millisecond,
// these are low by default on this client
QueueCapacity: 10000,
BatchSize: 1000000,
})
var start = time.Now()
for j := 0; j < numMessages; j++ {
err := w.WriteMessages(context.Background(), kafkago.Message{Value: value,})
if err != nil {
log.Printf("failed to write message: %s", err);
os.Exit(1)
}
}
elapsed := time.Since(start)
w.Close()
log.Printf("[kafka-go] msg/s: %f", (float64(numMessages) / elapsed.Seconds()))
}
func main() {
flag.StringVar(&brokers, "brokers", "localhost:9092", "broker addresses")
flag.StringVar(&topic, "topic", "topic", "topic")
flag.IntVar(&partition, "partition", 0, "partition")
flag.IntVar(&msgSize, "msgsize", 64, "message size")
flag.IntVar(&numMessages, "numMessages", 10000000, "number of messages")
flag.StringVar(&client, "client", "confluent-kafka-go", "confluent-kafka-go / sarama / kafka-go")
flag.StringVar(&mode, "mode", "producer", "producer / consumer")
flag.Parse()
value = make([]byte, msgSize)
rand.Read(value)
switch client {
case "confluent-kafka-go":
if (mode == "producer") {
produceConfluentKafkaGo()
} else {
consumeConfluentKafkaGo()
}
break
case "sarama":
if (mode == "producer") {
produceSarama()
} else {
consumeSarama()
}
break
case "kafka-go":
if (mode == "producer") {
produceKafkaGo()
} else {
log.Printf("not implemented")
os.Exit(1)
}
break
default:
log.Printf("unknown client: %s", client)
}
}
@jeffwidman
Copy link

@mhowlett Come on Matt, you're leaving us hanging... can you post the numbers you observed? 😄 I'm curious particularly in light of your update to confluentinc/confluent-kafka-go#214 (comment).

@billygout
Copy link

@mhowlett yes please post the numbers! I'm an avid user of confluent-kafka-go but it looks like I've run in to a scenario where I need the smallest cpu util% achievable for a fixed consumer ingest rate, and I might just need to try sarama for this case.

@alichamouda
Copy link

Actually he did, in each produce method, he put a comment stating the number of message sent per second for each kafka client.
But, i suggest a fix to this code, actually i changed kafka go produce function so it can now handle async messages:

func produceKafkaGo() {

	// this apparently produces synchronously and is very slow - is there a way to
	// produce synchronously.
	w := &kafkago.Writer{
		Addr:     kafkago.TCP([]string{brokers}[0]),
		Balancer: &kafkago.Hash{},
		Topic:   topic,
		BatchTimeout: time.Duration(100) * time.Millisecond,
		RequiredAcks: kafkago.RequiredAcks(0),
		BatchSize: 1000000,
		Async: true,
	}



	var start = time.Now()
	for j := 0; j < numMessages; j++ {
		err := w.WriteMessages(context.Background(), kafkago.Message{Value: value,})
		if err != nil {
			log.Printf("failed to write message: %s", err);
			os.Exit(1)
		}
	}
	elapsed := time.Since(start)

	w.Close()

	log.Printf("[kafka-go] msg/s: %f", (float64(numMessages) / elapsed.Seconds()))
}

still sarama is the best on my machine

@3AceShowHand
Copy link

VfpHCc2FT5

For kafka-go, set BatchSize to 100 ~ 2000.

Apple M1 Pro / 32GB / kafka 2.13-3.0.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment