Skip to content

Instantly share code, notes, and snippets.

@tuxfight3r
Last active December 4, 2023 22:10
Show Gist options
  • Save tuxfight3r/cec254c240c5d2654c03f3b067d21af0 to your computer and use it in GitHub Desktop.
Save tuxfight3r/cec254c240c5d2654c03f3b067d21af0 to your computer and use it in GitHub Desktop.
KafkaCat configuration for AWS MSK

KafkaCat Configuration for AWS MSK

Set the below environment variable with the following values

NOTE: Kafkacat is renamed to kcat recently and the config variable should be KCAT_CONFIG for version 1.7 onwards.

# you can export the variable or present the config with -F parameter for kafkacat
export KAFKACAT_CONFIG=/home/tools/persistent/kcat/kafkacat_config

Contents of kafkacat configuration

# cat kafkacat_config

security.protocol=ssl
ssl.key.password=SecretPassw0rd123
ssl.certificate.location=/home/tools/persistent/kcat/ssl/cert1.pem
ssl.key.location=/home/tools/persistent/kcat/ssl/cert1.key.pem
ssl.ca.location=/home/tools/persistent/kcat/ssl/cacerts.pem

Commands for creating PEM files from JKS Keystore

keytool -importkeystore -srckeystore keystore.jks -srcstoretype JKS -deststoretype PKCS12 -destkeystore keystore.p12
openssl pkcs12 -in keystore.p12 -out cert.pem
openssl pkcs12 -in keystore.p12 -nodes -nocerts -out cert.key.pem

Testing kafkacat

# List metadata for topic
./kafkacat -b $KAFKA_BROKER -L -t test_topic
./kafkacat -b $KAFKA_BROKER -F $KAFKACAT_CONFIG -L -t test_topic

# Consume 2 messages from the beginning
./kafkacat -b $KAFKA_BROKER -C -c2 -t test_topic -f 'Key: %k\nValue: %s\n'

# Consume 2 messages from the end.
./kafkacat -b $KAFKA_BROKER -C -c2 -o-2 -t test_topic -f 'Key: %k\nValue: %s\n'

# Consume 2 avro messages from the beginning
./kafkacat -b $KAFKA_BROKER -s avro -r $KAFKA_SCHEMA_REGISTRY_URL -C -c2 -o-2 -t test_topic -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n'

# Consume 2 avro messages from the end.
./kafkacat -b $KAFKA_BROKER -s avro -r $KAFKA_SCHEMA_REGISTRY_URL -C -c2 -o-2 -t test_topic -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n'

Kafkacat basic functions

export KCAT_CONFIG=/usr/local/kafkacat/kcat_config.properties
function kcat() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} "$@" ;}
function kcat_query_topic() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -L -t $1 ;}
function kcat_read_between_timestamps() {
    # expects $2 = s@timestamp and $3 e@timestamp
    /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -o $2 -o $3 ;}
function kcat_read_between_offsets() {
    # expects $2 = offset and $3 = count of messages
    /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -o $2 -c $3 ;}
function kcat_query_offset_for_timestamp() {
    # expects $1 in topic:partition:timestamp
    /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -Q -t $1 ;}

Read avro messages value

# Read all avro value messages from beginning
function kcat_read_avro_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -e ;}
# Read n avro value messages from beginning
function kcat_read_avro_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e ;}
# Read n avro value messages from end
function kcat_read_avro_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e ;}
# Read n avro value messages from beginning on specific partition
function kcat_read_avro_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e -p ${3} ;}
# Read n avro value messages from end on specific partition
function kcat_read_avro_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e -p ${3} ;}

Read text messages value

# Read all text messages from beginning
function kcat_read_txt_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -e ;}
# Read n text value messages from beginning
function kcat_read_txt_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e ;}
# Read n text value messages from end
function kcat_read_txt_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e ;}
# Read n text value messages from beginning on specific partition
function kcat_read_txt_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -p ${3} ;}
# Read n text value messages from end on specific partition
function kcat_read_txt_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -p ${3} ;}

Read avro messages with key=avro and value=avro

# Read all messages from beginning with key and value
function kcat_read_avro_from_begin_with_keys_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro  -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -e -f 'KEY: %k, VALUE: %s, PART: %p\n';}
# Read n messages from beginning with key and value
function kcat_read_avro_from_begin_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end with key and value
function kcat_read_avro_from_end_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_avro_from_begin_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end on specific partition with key and value
function kcat_read_avro_from_end_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}

Read text messages with key=text and value=text

# Read all messages from beginning with key and value
function kcat_read_txt_from_begin_with_keys_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning with key and value
function kcat_read_txt_from_begin_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end with key and value
function kcat_read_txt_from_end_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_txt_from_begin_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end on specific partition with key and value
function kcat_read_txt_from_end_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}

Read messages with key=avro and value=text

# Read all messages from beginning with key and value
function kcat_read_keyavro_valtxt_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" ;}
# Read n messages from beginning with key and value
function kcat_read_keyavro_valtxt_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" ;}
# Read n messages from end with key and value
function kcat_read_keyavro_valtxt_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_keyavro_valtxt_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" -p ${3} ;}
# Read n messages from end on specific partition with key and value
function kcat_read_keyavro_valtxt_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" -p ${3} ;}

Read messages with key=txt and value=avro

# Read all messages from beginning with key and value
function kcat_read_keytxt_valavro_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning with key and value
function kcat_read_keytxt_valavro_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end with key and value
function kcat_read_keytxt_valavro_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_keytxt_valavro_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' -p ${3} ;}
# Read n messages from end on specific partition with key and value
function kcat_read_keytxt_valavro_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' -p ${3} ;}
@nelisSpotOn
Copy link

First, thank you for sharing this. I have been trying to get Kcat working from a Fargate instance that I have set up as a bastion to consume messages from MSK to ensure that an MSK Connect job is actually working. I would like to set up Kcat in a secure way instead of just allowing unauthenticated clients. I did have the following questions though.

Questions about this example:

  • What type of Client Authentication was set up on the MSK side? Sasl/IAM?
  • Where do the certificates come from?
    • Did you download the CA cert (ssl.ca.location) from AWS Certificate Manager?
    • The certificates you have listed in your kcat config seem to have different names than the ones you are generating.
  • What is the ssl.key.password password for? Is this a shared secret with the MSK side?

@tuxfight3r
Copy link
Author

@nelisSpotOn hope the below answers helps.

  1. MSK is setup with TLS using a AWS private CA (https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)
  2. you need to create CSR and get it signed by the AWS Private CA so it issues a certificate (follow the above document )
  3. Certnames are just for example, you need the certificate in pem format from step 2 and cacerts.pem can be downloaded here https://curl.se/docs/caextract.html
  4. ssl.key.password is the password for your certificate key which gets generated on step2

@yermulnik
Copy link

Is kcat capable of AWS MSK IAM Auth?

@tuxfight3r
Copy link
Author

tuxfight3r commented Dec 4, 2023

I have never tried it myself as all our msk's use private CA, but you can try the config file mentioned here.
https://github.com/UrbanCompass/librdkafka/blob/master/README_FORK.md

Things to keep in mind:
kcat supports only the librdkafka configuration options as docmumented here: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
However this has been a wanted feature and someone has created a fork of the librdkafka as discussed below
confluentinc/librdkafka#3496
https://github.com/UrbanCompass/librdkafka

This means you need to rebuild librdkafka from the fork and rebuild kcat using that librdkafka to get the MSK IAM auth working for kcat.

@yermulnik
Copy link

@tuxfight3r Thanks for the pointer, though that won't probably work as kcat is written in C, while AWS provides helper libs only for Java/JS/Python/Go/.NET: https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#configure-clients-for-iam-access-control-sasl-oauthbearer (https://github.com/orgs/aws/repositories?q=msk&type=source&language=&sort=) 😢

Odd is that kcat is cool and I couldn't find any other Kafka CLIs that are this much convenient.
I also couldn't find any 3rd-party CLIs (non-Apache Kafka and non-Java) that support AWS MSK IAM Auth at the moment =( The https://github.com/birdayz/kaf is the only one, though still WIP (birdayz/kaf#198)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment