-
-
Save krasin-ga/1ee2749c11cb9e2f8d6e68e7c525c7c6 to your computer and use it in GitHub Desktop.
EmulatedBacklogBenchmark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Concurrent; | |
using System.Runtime.InteropServices; | |
using System.Text.Json; | |
using System.Threading.Channels; | |
using BenchmarkDotNet.Attributes; | |
using BenchmarkDotNet.Running; | |
void CheckConsistency() | |
{ | |
var benchmark = new EmulatedBacklogBenchmark | |
{ | |
NumberOfConsumers = 8, | |
NumberOfProducers = 8 | |
}; | |
var channelsResult = benchmark.WithChannelsAndThreadPool().Result; | |
var threadsResult = benchmark.WithConcurrentQueueAndThreads(); | |
if (channelsResult != threadsResult) | |
throw new Exception($"Inconsistent state between benchmarks: {channelsResult} != {threadsResult} "); | |
} | |
CheckConsistency(); | |
BenchmarkRunner.Run(typeof(EmulatedBacklogBenchmark).Assembly); | |
[ThreadingDiagnoser] | |
public class EmulatedBacklogBenchmark | |
{ | |
private const int NumberOfItems = 10_000_000; | |
[Params(1, 8, 32, 128)] | |
public int NumberOfConsumers = 1; | |
[Params(1, 8, 32, 128)] | |
public int NumberOfProducers = 1; | |
[Benchmark] | |
public async Task<int> WithChannelsAndThreadPool() | |
{ | |
var hashSum = 0; | |
var consumerData = Enumerable.Range(0, NumberOfConsumers) | |
.Select(_ => Channel.CreateUnbounded<long>( | |
new UnboundedChannelOptions())) | |
.Select( | |
channel => new | |
{ | |
Channel = channel, | |
RunConsumer = | |
(Func<Task>)(() => Task.Run( | |
async () => | |
{ | |
while (await channel.Reader.WaitToReadAsync()) | |
{ | |
while (channel.Reader.TryRead(out var item)) | |
Interlocked.Add(ref hashSum, EmulateWorkload(item)); | |
} | |
})) | |
}) | |
.ToArray(); | |
var producers = Enumerable.Range(0, NumberOfProducers) | |
.Select(_ => | |
Task.Run(() => | |
{ | |
var numberOfItemsToPublish = NumberOfItems / NumberOfProducers; | |
for (var i = 0; i < numberOfItemsToPublish; i++) | |
consumerData[i % consumerData.Length] | |
.Channel.Writer.TryWrite(i); | |
})) | |
.ToArray(); | |
var consumers = consumerData.Select(c => c.RunConsumer()).ToArray(); | |
await Task.WhenAll(producers); | |
foreach (var data in consumerData) | |
data.Channel.Writer.Complete(); | |
await Task.WhenAll(consumers); | |
return hashSum; | |
} | |
[Benchmark] | |
public int WithConcurrentQueueAndThreads() | |
{ | |
var hashSum = 0; | |
var isRunning = true; | |
var consumers = Enumerable | |
.Range(0, NumberOfConsumers) | |
.Select(_ => new | |
{ | |
Queue = new ConcurrentQueue<long>(), | |
Event = new AutoResetEvent(false), | |
}) | |
.Select( | |
input => new | |
{ | |
Input = input, | |
CreateThread = (Func<Thread>)( | |
() => new Thread( | |
_ => | |
{ | |
while (true) | |
{ | |
while (input.Queue.TryDequeue(out var item)) | |
Interlocked.Add(ref hashSum, EmulateWorkload(item)); | |
if (!Volatile.Read(ref isRunning)) | |
break; | |
input.Event.WaitOne(); | |
} | |
} | |
)) | |
}) | |
.ToArray(); | |
var producers = Enumerable | |
.Range(0, NumberOfProducers) | |
.Select(_ => new Thread( | |
_ => | |
{ | |
var numberOfItemsToPublish = NumberOfItems / NumberOfProducers; | |
for (var i = 0; i < numberOfItemsToPublish; i++) | |
{ | |
var target = consumers[i % consumers.Length]; | |
target.Input.Queue.Enqueue(i); | |
target.Input.Event.Set(); | |
} | |
} | |
)) | |
.ToArray(); | |
var consumerThreads = consumers.Select(c => c.CreateThread()).ToArray(); | |
foreach (var consumerThread in consumerThreads) | |
consumerThread.Start(); | |
foreach (var producer in producers) | |
producer.Start(); | |
foreach (var producer in producers) | |
producer.Join(); | |
foreach (var data in consumers) | |
data.Input.Event.Set(); | |
isRunning = false; | |
foreach (var data in consumers) | |
data.Input.Event.Set(); | |
foreach (var consumerThread in consumerThreads) | |
consumerThread.Join(); | |
return hashSum; | |
} | |
private static int EmulateWorkload(long someNumber) | |
{ | |
var serialized = JsonSerializer.Serialize( | |
new { Test = someNumber, Date = new DateTime(2024, 03, 10) } | |
); | |
var hashCode = new HashCode(); | |
hashCode.AddBytes(MemoryMarshal.AsBytes(serialized.AsSpan())); | |
return hashCode.ToHashCode(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment