Skip to content

Instantly share code, notes, and snippets.

@p-karanthaker
Created August 2, 2021 10:01
Show Gist options
  • Save p-karanthaker/f54f12bfa3f91cff25cfd8d7f24b645f to your computer and use it in GitHub Desktop.
Save p-karanthaker/f54f12bfa3f91cff25cfd8d7f24b645f to your computer and use it in GitHub Desktop.
kafkajs consumer
const ip = require('ip');
const { KafkaJSError } = require('kafkajs/src/errors');
const { Kafka, logLevel } = require('./node_modules/kafkajs/index')
const host = process.env.HOST_IP || ip.address()
const kafka = new Kafka({
logLevel: logLevel.INFO,
brokers: [`${host}:9092`],
clientId: 'example-consumer',
})
const topic = 'test';
const consumer = kafka.consumer({
groupId: 'test-group'
})
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
console.log(`- ${prefix} ${message.key}#${message.value}`)
},
})
}
const { CRASH } = consumer.events;
consumer.on(CRASH, e => console.log(`crash at ${e.timestamp}`));
run().catch(e => console.error(`[example/consumer] ${e.message}`, e))
const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
errorTypes.map(type => {
process.on(type, async e => {
try {
console.log(`process.on ${type}`)
console.error(e)
await consumer.disconnect()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
})
signalTraps.map(type => {
process.once(type, async () => {
try {
await consumer.disconnect()
} finally {
process.kill(process.pid, type)
}
})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment