Created
January 28, 2018 01:39
-
-
Save mhowlett/e9491aad29817aeda6003c3404874b35 to your computer and use it in GitHub Desktop.
kafka go client performance testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"math/rand" | |
crand "crypto/rand" | |
"os" | |
"time" | |
"log" | |
"strings" | |
"fmt" | |
"io" | |
"github.com/confluentinc/confluent-kafka-go/kafka" | |
"github.com/Shopify/sarama" | |
kafkago "github.com/segmentio/kafka-go" | |
) | |
var ( | |
client string | |
mode string | |
brokers string | |
topic string | |
partition int | |
msgSize int | |
numMessages int | |
value []byte | |
) | |
func newUUID() (string, error) { | |
uuid := make([]byte, 16) | |
n, err := io.ReadFull(crand.Reader, uuid) | |
if n != len(uuid) || err != nil { | |
return "", err | |
} | |
uuid[8] = uuid[8]&^0xc0 | 0x80 | |
uuid[6] = uuid[6]&^0xf0 | 0x40 | |
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil | |
} | |
func consumeConfluentKafkaGo() { | |
// !poll -> ~150k/s | |
// poll -> ~170k/s | |
var poll = true | |
group, _ := newUUID() | |
c, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
"bootstrap.servers": brokers, | |
"group.id": group, | |
"session.timeout.ms": 6000, | |
"go.events.channel.enable": !poll, | |
"go.application.rebalance.enable": false, | |
"enable.auto.commit": false, | |
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}, | |
}) | |
if err != nil { | |
log.Printf("could not set up kafka consumer: %s", err.Error()) | |
os.Exit(1) | |
} | |
c.Assign([]kafka.TopicPartition{ kafka.TopicPartition{Topic: &topic, Partition: int32(partition), Offset: 0} } ) | |
var start = time.Now() | |
if poll { | |
var msgCount = 0 | |
for msgCount < numMessages { | |
ev := c.Poll(100) | |
if ev == nil { | |
continue | |
} | |
switch e := ev.(type) { | |
case *kafka.Message: | |
msgCount++ | |
break | |
case kafka.PartitionEOF: | |
fmt.Printf("%% Reached %v\n", e) | |
case kafka.Error: | |
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) | |
os.Exit(1) | |
default: | |
fmt.Printf("Ignored %v\n", e) | |
os.Exit(1) | |
} | |
} | |
} else { | |
var msgCount = 0; | |
for msgCount < numMessages { | |
select { | |
case ev := <-c.Events(): | |
switch e := ev.(type) { | |
case *kafka.Message: | |
msgCount++ | |
break | |
case kafka.PartitionEOF: | |
log.Printf("%% Reached %v\n", e) | |
os.Exit(1) | |
case kafka.Error: | |
log.Printf("%% Error: %v\n", e) | |
os.Exit(1) | |
} | |
} | |
} | |
} | |
elapsed := time.Since(start) | |
log.Printf("[conflunet-kafka-go consumer] msg/s: %f", (float64(numMessages) / elapsed.Seconds())) | |
} | |
func produceConfluentKafkaGo() { | |
// ~380k/s | |
var p, err = kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": brokers, "linger.ms": 100}) | |
if err != nil { | |
log.Printf("could not set up kafka producer: %s", err.Error()) | |
os.Exit(1) | |
} | |
done := make(chan bool) | |
go func() { | |
var msgCount int | |
for e := range p.Events() { | |
msg := e.(*kafka.Message); | |
if msg.TopicPartition.Error != nil { | |
log.Printf("delivery report error: %v", msg.TopicPartition.Error) | |
os.Exit(1) | |
} | |
msgCount++ | |
if msgCount >= numMessages { | |
done <- true | |
} | |
} | |
}() | |
defer p.Close() | |
var start = time.Now() | |
for j := 0; j < numMessages; j++ { | |
p.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: int32(partition)}, Value: value} | |
} | |
<-done | |
elapsed := time.Since(start) | |
log.Printf("[confluent-kafka-go producer] msg/s: %f", (float64(numMessages) / elapsed.Seconds())) | |
} | |
func consumeSarama() { | |
// ~1.1M/s | |
config := sarama.NewConfig() | |
config.Version = sarama.V1_0_0_0 | |
config.Consumer.Return.Errors = true | |
brokers := []string{brokers} | |
// Create new consumer | |
master, err := sarama.NewConsumer(brokers, config) | |
if err != nil { | |
panic(err) | |
} | |
defer master.Close(); | |
consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) | |
if err != nil { | |
panic(err) | |
} | |
msgCount := 0 | |
var start = time.Now() | |
doneCh := make(chan bool) | |
go func() { | |
for { | |
select { | |
case err := <-consumer.Errors(): | |
fmt.Println(err) | |
case <-consumer.Messages(): | |
// per-partition errors? | |
msgCount++ | |
if (msgCount >= numMessages) { | |
doneCh <- true | |
} | |
} | |
} | |
}() | |
<-doneCh | |
elapsed := time.Since(start) | |
log.Printf("[sarama consumer] msg/s: %f", (float64(numMessages) / elapsed.Seconds())) | |
} | |
func produceSarama() { | |
// ~410k/s | |
conf := sarama.NewConfig() | |
conf.Version = sarama.V1_0_0_0 | |
conf.Producer.Return.Successes = true | |
conf.Producer.Flush.Frequency = time.Duration(100)*time.Millisecond | |
// the default 1000000 results in request-too-large errors. | |
sarama.MaxRequestSize = 999000 | |
var p, err = sarama.NewAsyncProducer(strings.Split(brokers, ","), conf) | |
if err != nil { | |
log.Printf("could not set up kafka producer: %s", err.Error()) | |
os.Exit(1) | |
} | |
done := make(chan bool) | |
go func() { | |
var nomessages int | |
for _ = range p.Successes() { | |
nomessages++ | |
if nomessages%numMessages == 0 { | |
done <- true | |
} | |
} | |
}() | |
go func() { | |
for err := range p.Errors() { | |
log.Printf("failed to deliver message: %s", err.Error()) | |
os.Exit(1) | |
} | |
}() | |
defer func() { | |
err := p.Close() | |
if err != nil { | |
log.Printf("failed to close producer: %s", err.Error()) | |
os.Exit(1) | |
} | |
}() | |
var start = time.Now() | |
for j := 0; j < numMessages; j++ { | |
p.Input() <- &sarama.ProducerMessage{Topic: topic, Partition: int32(partition), Value: sarama.ByteEncoder(value)} | |
} | |
<-done | |
elapsed := time.Since(start) | |
log.Printf("[sarama producer] msg/s: %f", (float64(numMessages) / elapsed.Seconds())) | |
} | |
func produceKafkaGo() { | |
// this apparently produces synchronously and is very slow - is there a way to | |
// produce synchronously. | |
w := kafkago.NewWriter(kafkago.WriterConfig{ | |
Brokers: []string{brokers}, | |
Topic: topic, | |
// match other clients: | |
Balancer: &kafkago.Hash{}, | |
BatchTimeout: time.Duration(100) * time.Millisecond, | |
// these are low by default on this client | |
QueueCapacity: 10000, | |
BatchSize: 1000000, | |
}) | |
var start = time.Now() | |
for j := 0; j < numMessages; j++ { | |
err := w.WriteMessages(context.Background(), kafkago.Message{Value: value,}) | |
if err != nil { | |
log.Printf("failed to write message: %s", err); | |
os.Exit(1) | |
} | |
} | |
elapsed := time.Since(start) | |
w.Close() | |
log.Printf("[kafka-go] msg/s: %f", (float64(numMessages) / elapsed.Seconds())) | |
} | |
func main() { | |
flag.StringVar(&brokers, "brokers", "localhost:9092", "broker addresses") | |
flag.StringVar(&topic, "topic", "topic", "topic") | |
flag.IntVar(&partition, "partition", 0, "partition") | |
flag.IntVar(&msgSize, "msgsize", 64, "message size") | |
flag.IntVar(&numMessages, "numMessages", 10000000, "number of messages") | |
flag.StringVar(&client, "client", "confluent-kafka-go", "confluent-kafka-go / sarama / kafka-go") | |
flag.StringVar(&mode, "mode", "producer", "producer / consumer") | |
flag.Parse() | |
value = make([]byte, msgSize) | |
rand.Read(value) | |
switch client { | |
case "confluent-kafka-go": | |
if (mode == "producer") { | |
produceConfluentKafkaGo() | |
} else { | |
consumeConfluentKafkaGo() | |
} | |
break | |
case "sarama": | |
if (mode == "producer") { | |
produceSarama() | |
} else { | |
consumeSarama() | |
} | |
break | |
case "kafka-go": | |
if (mode == "producer") { | |
produceKafkaGo() | |
} else { | |
log.Printf("not implemented") | |
os.Exit(1) | |
} | |
break | |
default: | |
log.Printf("unknown client: %s", client) | |
} | |
} |
@mhowlett yes please post the numbers! I'm an avid user of confluent-kafka-go but it looks like I've run in to a scenario where I need the smallest cpu util% achievable for a fixed consumer ingest rate, and I might just need to try sarama for this case.
Actually he did, in each produce method, he put a comment stating the number of message sent per second for each kafka client.
But, i suggest a fix to this code, actually i changed kafka go produce function so it can now handle async messages:
func produceKafkaGo() {
// this apparently produces synchronously and is very slow - is there a way to
// produce synchronously.
w := &kafkago.Writer{
Addr: kafkago.TCP([]string{brokers}[0]),
Balancer: &kafkago.Hash{},
Topic: topic,
BatchTimeout: time.Duration(100) * time.Millisecond,
RequiredAcks: kafkago.RequiredAcks(0),
BatchSize: 1000000,
Async: true,
}
var start = time.Now()
for j := 0; j < numMessages; j++ {
err := w.WriteMessages(context.Background(), kafkago.Message{Value: value,})
if err != nil {
log.Printf("failed to write message: %s", err);
os.Exit(1)
}
}
elapsed := time.Since(start)
w.Close()
log.Printf("[kafka-go] msg/s: %f", (float64(numMessages) / elapsed.Seconds()))
}
still sarama is the best on my machine
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@mhowlett Come on Matt, you're leaving us hanging... can you post the numbers you observed? 😄 I'm curious particularly in light of your update to confluentinc/confluent-kafka-go#214 (comment).