Last active
September 24, 2020 23:42
-
-
Save isc30/03c65d1668c53c782ff2e0e420beb216 to your computer and use it in GitHub Desktop.
SelectParallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Test | |
{ | |
class Program | |
{ | |
static Random rnd = new Random(); | |
static async Task Main(string[] args) | |
{ | |
Log("Starting"); | |
var source = GetIds(); | |
var maps = source.SelectParallel(e => Map(e)); | |
await foreach (var str in maps) | |
{ | |
Log(str); | |
} | |
Log("Ending"); | |
} | |
static void Log(string str) | |
{ | |
Console.WriteLine($"[{DateTime.Now.ToLongTimeString()}] {str}"); | |
} | |
static async IAsyncEnumerable<int> GetIds() | |
{ | |
foreach (var i in Enumerable.Range(1, 20)) | |
{ | |
await Task.Delay(1); | |
yield return i; | |
} | |
} | |
static async Task<string> Map(int id) | |
{ | |
await Task.Delay(rnd.Next(1000, 2000)); | |
return $"{id}_{Thread.CurrentThread.ManagedThreadId}"; | |
} | |
} | |
public static class AsyncEnumerableExtensions | |
{ | |
public static async IAsyncEnumerable<T> AsAsyncEnumerable<T>(this IEnumerable<T> source) | |
{ | |
foreach (var e in source) | |
{ | |
yield return e; | |
} | |
} | |
/// <summary> | |
/// Runs the selectors in parallel and yields in completion order | |
/// </summary> | |
public static IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>( | |
this IEnumerable<TIn> source, | |
Func<TIn, TOut> selector) | |
{ | |
return SelectParallel(source, e => Task.Run(() => selector(e))); | |
} | |
/// <summary> | |
/// Runs the selectors in parallel and yields in completion order | |
/// </summary> | |
public static async IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>( | |
this IEnumerable<TIn> source, | |
Func<TIn, Task<TOut>> selector) | |
{ | |
if (source == null) | |
{ | |
throw new InvalidOperationException("Source is null"); | |
} | |
var tasks = source | |
.Select(selector) | |
.ToHashSet(); | |
while (tasks.Any()) | |
{ | |
var completedTask = await Task.WhenAny<TOut>(tasks); | |
tasks.Remove(completedTask); | |
yield return completedTask.Result; | |
} | |
} | |
/// <summary> | |
/// Runs the selectors in parallel and yields in completion order | |
/// </summary> | |
public static IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>( | |
this IAsyncEnumerable<TIn> source, | |
Func<TIn, TOut> selector) | |
{ | |
return SelectParallel(source, e => Task.Run(() => selector(e))); | |
} | |
/// <summary> | |
/// Runs the selectors in parallel and yields in completion order | |
/// </summary> | |
public static async IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>( | |
this IAsyncEnumerable<TIn> source, | |
Func<TIn, Task<TOut>> selector) | |
{ | |
if (source == null) | |
{ | |
throw new InvalidOperationException("Source is null"); | |
} | |
var enumerator = source.GetAsyncEnumerator(); | |
var sourceFinished = false; | |
var tasks = new HashSet<Task<TOut>>(); | |
Task<bool> sourceMoveTask = null; | |
Task<Task<TOut>> pipeCompletionTask = null; | |
try | |
{ | |
while (!sourceFinished || tasks.Any()) | |
{ | |
if (sourceMoveTask == null && !sourceFinished) | |
{ | |
sourceMoveTask = enumerator.MoveNextAsync().AsTask(); | |
} | |
if (pipeCompletionTask == null && tasks.Any()) | |
{ | |
pipeCompletionTask = Task.WhenAny<TOut>(tasks); | |
} | |
var coreTasks = new Task[] { pipeCompletionTask, sourceMoveTask } | |
.Where(t => t != null) | |
.ToList(); | |
if (!coreTasks.Any()) | |
{ | |
break; | |
} | |
await Task.WhenAny(coreTasks); | |
if (sourceMoveTask != null && sourceMoveTask.IsCompleted) | |
{ | |
sourceFinished = !sourceMoveTask.Result; | |
if (!sourceFinished) | |
{ | |
try | |
{ | |
tasks.Add(selector(enumerator.Current)); | |
} | |
catch { } | |
} | |
sourceMoveTask = null; | |
} | |
if (pipeCompletionTask != null && pipeCompletionTask.IsCompleted) | |
{ | |
var completedTask = pipeCompletionTask.Result; | |
if (completedTask.IsCompletedSuccessfully) | |
{ | |
yield return completedTask.Result; | |
} | |
tasks.Remove(completedTask); | |
pipeCompletionTask = null; | |
} | |
} | |
} | |
finally | |
{ | |
await enumerator.DisposeAsync(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment