Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save dahlsailrunner/217d10a5bcc3f0adb631062038547366 to your computer and use it in GitHub Desktop.
Save dahlsailrunner/217d10a5bcc3f0adb631062038547366 to your computer and use it in GitHub Desktop.
Kafka consumer for log entries
class Program
{
static void Main(string[] args)
{
Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
SubscribeToEvents(cts.Token);
}
private static void SubscribeToEvents(CancellationToken cancellationToken)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "csharp-consumer",
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
const int commitPeriod = 5;
using var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
consumer.Subscribe("log-entry-topic");
try
{
while (true)
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine(
$"Received message at {consumeResult.TopicPartitionOffset}: \n{consumeResult.Message.Value}");
if (consumeResult.Offset % commitPeriod == 0)
{
consumer.Commit(consumeResult);
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
}
catch (Exception e)
{
Console.WriteLine($"Message: {e.Message}");
if (e is KafkaException ke)
Console.WriteLine($"Reason: {ke.Error.Reason}");
if (e is ConsumeException ce)
Console.WriteLine($"Reason: {ce.Error.Reason}");
}
finally
{
consumer.Close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment