Skip to content

Instantly share code, notes, and snippets.

@krasin-ga
Created March 10, 2024 11:43
Show Gist options
  • Save krasin-ga/1ee2749c11cb9e2f8d6e68e7c525c7c6 to your computer and use it in GitHub Desktop.
Save krasin-ga/1ee2749c11cb9e2f8d6e68e7c525c7c6 to your computer and use it in GitHub Desktop.
EmulatedBacklogBenchmark
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Text.Json;
using System.Threading.Channels;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
void CheckConsistency()
{
var benchmark = new EmulatedBacklogBenchmark
{
NumberOfConsumers = 8,
NumberOfProducers = 8
};
var channelsResult = benchmark.WithChannelsAndThreadPool().Result;
var threadsResult = benchmark.WithConcurrentQueueAndThreads();
if (channelsResult != threadsResult)
throw new Exception($"Inconsistent state between benchmarks: {channelsResult} != {threadsResult} ");
}
CheckConsistency();
BenchmarkRunner.Run(typeof(EmulatedBacklogBenchmark).Assembly);
[ThreadingDiagnoser]
public class EmulatedBacklogBenchmark
{
private const int NumberOfItems = 10_000_000;
[Params(1, 8, 32, 128)]
public int NumberOfConsumers = 1;
[Params(1, 8, 32, 128)]
public int NumberOfProducers = 1;
[Benchmark]
public async Task<int> WithChannelsAndThreadPool()
{
var hashSum = 0;
var consumerData = Enumerable.Range(0, NumberOfConsumers)
.Select(_ => Channel.CreateUnbounded<long>(
new UnboundedChannelOptions()))
.Select(
channel => new
{
Channel = channel,
RunConsumer =
(Func<Task>)(() => Task.Run(
async () =>
{
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out var item))
Interlocked.Add(ref hashSum, EmulateWorkload(item));
}
}))
})
.ToArray();
var producers = Enumerable.Range(0, NumberOfProducers)
.Select(_ =>
Task.Run(() =>
{
var numberOfItemsToPublish = NumberOfItems / NumberOfProducers;
for (var i = 0; i < numberOfItemsToPublish; i++)
consumerData[i % consumerData.Length]
.Channel.Writer.TryWrite(i);
}))
.ToArray();
var consumers = consumerData.Select(c => c.RunConsumer()).ToArray();
await Task.WhenAll(producers);
foreach (var data in consumerData)
data.Channel.Writer.Complete();
await Task.WhenAll(consumers);
return hashSum;
}
[Benchmark]
public int WithConcurrentQueueAndThreads()
{
var hashSum = 0;
var isRunning = true;
var consumers = Enumerable
.Range(0, NumberOfConsumers)
.Select(_ => new
{
Queue = new ConcurrentQueue<long>(),
Event = new AutoResetEvent(false),
})
.Select(
input => new
{
Input = input,
CreateThread = (Func<Thread>)(
() => new Thread(
_ =>
{
while (true)
{
while (input.Queue.TryDequeue(out var item))
Interlocked.Add(ref hashSum, EmulateWorkload(item));
if (!Volatile.Read(ref isRunning))
break;
input.Event.WaitOne();
}
}
))
})
.ToArray();
var producers = Enumerable
.Range(0, NumberOfProducers)
.Select(_ => new Thread(
_ =>
{
var numberOfItemsToPublish = NumberOfItems / NumberOfProducers;
for (var i = 0; i < numberOfItemsToPublish; i++)
{
var target = consumers[i % consumers.Length];
target.Input.Queue.Enqueue(i);
target.Input.Event.Set();
}
}
))
.ToArray();
var consumerThreads = consumers.Select(c => c.CreateThread()).ToArray();
foreach (var consumerThread in consumerThreads)
consumerThread.Start();
foreach (var producer in producers)
producer.Start();
foreach (var producer in producers)
producer.Join();
foreach (var data in consumers)
data.Input.Event.Set();
isRunning = false;
foreach (var data in consumers)
data.Input.Event.Set();
foreach (var consumerThread in consumerThreads)
consumerThread.Join();
return hashSum;
}
private static int EmulateWorkload(long someNumber)
{
var serialized = JsonSerializer.Serialize(
new { Test = someNumber, Date = new DateTime(2024, 03, 10) }
);
var hashCode = new HashCode();
hashCode.AddBytes(MemoryMarshal.AsBytes(serialized.AsSpan()));
return hashCode.ToHashCode();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment