Created
September 16, 2020 11:29
-
-
Save noseratio/5d2d5f2a0cbb71b7880ce731c3958e62 to your computer and use it in GitHub Desktop.
Continue on the specified task scheduler, which becomes the current one
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// by @noseratio | |
#nullable enable | |
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Noseratio.Experimental | |
{ | |
public static class TaskSchedulerExtensions | |
{ | |
/// <summary> | |
/// Continue on the specified task scheduler, which becomes the current one | |
/// Inspired by <see cref="https://github.com/dotnet/runtime/issues/20025"/>this GitHub issue</see>. | |
/// </summary> | |
/// <param name="@this">A task scheduler instance, e.g., <c>TaskScheduler.Default</c></param> | |
/// <param name="alwaysSchedule">Always use the task scheduler to queue the continuations, | |
/// even if it can be executed synchronously. | |
/// </param> | |
/// <example> | |
/// <code> | |
/// await await TaskScheduler.Default.SwitchTo(alwaysSchedule: true); | |
/// </code> | |
/// </example> | |
/// <returns></returns> | |
public static TaskSchedulerAwaitable SwitchTo(this TaskScheduler @this, bool alwaysSchedule = false) | |
{ | |
return new TaskSchedulerAwaitable(@this, alwaysSchedule); | |
} | |
public struct TaskSchedulerAwaiter : System.Runtime.CompilerServices.ICriticalNotifyCompletion | |
{ | |
private readonly TaskScheduler _scheduler; | |
private bool _alwaysSchedule; | |
public TaskSchedulerAwaiter(TaskScheduler scheduler, bool alwaysSchedule = false) | |
{ | |
_scheduler = scheduler; | |
_alwaysSchedule = alwaysSchedule; | |
} | |
private void Schedule(Action continuation) | |
{ | |
Task.Factory.StartNew( | |
continuation, | |
CancellationToken.None, | |
TaskCreationOptions.None, | |
_scheduler); | |
} | |
public bool IsCompleted => | |
// optimize if already on the default task scheduler | |
// and on a thread pool thread without sync context | |
!_alwaysSchedule && | |
_scheduler == TaskScheduler.Default && | |
TaskScheduler.Current == TaskScheduler.Default && | |
Thread.CurrentThread.IsThreadPoolThread && | |
SynchronizationContext.Current == null; | |
public void GetResult() | |
{ | |
} | |
// a safe version that has to flow the execution context | |
public void OnCompleted(Action continuation) | |
{ | |
throw new NotImplementedException(nameof(OnCompleted)); | |
} | |
// an unsafe version that doesn't have to flow the execution context | |
public void UnsafeOnCompleted(Action continuation) | |
{ | |
// use ThreadPool.UnsafeQueueUserWorkItem to optimize for TaskScheduler.Default | |
if (_scheduler == TaskScheduler.Default) | |
{ | |
ThreadPool.UnsafeQueueUserWorkItem( | |
c => ((Action)c!).Invoke(), | |
continuation, | |
preferLocal: true); | |
return; | |
} | |
// use Task.Factory.StartNew for all non-default task schedulers | |
if (ExecutionContext.IsFlowSuppressed()) | |
{ | |
Schedule(continuation); | |
return; | |
} | |
// suppress execution context flow | |
ExecutionContext.SuppressFlow(); | |
try | |
{ | |
Schedule(continuation); | |
} | |
finally | |
{ | |
ExecutionContext.RestoreFlow(); | |
} | |
} | |
} | |
public struct TaskSchedulerAwaitable | |
{ | |
private readonly TaskSchedulerAwaiter _awaiter; | |
public TaskSchedulerAwaitable(TaskScheduler scheduler, bool alwaysSchedule = false) | |
{ | |
_awaiter = new TaskSchedulerAwaiter(scheduler, alwaysSchedule); | |
} | |
public TaskSchedulerAwaiter GetAwaiter() | |
{ | |
return _awaiter; | |
} | |
} | |
} | |
/// <summary> | |
/// Testing TaskScheduler.Default.SwitchTo | |
/// </summary> | |
class Program | |
{ | |
public class CustomTaskScheduler : TaskScheduler | |
{ | |
protected override IEnumerable<Task>? GetScheduledTasks() | |
{ | |
throw new NotImplementedException(); | |
} | |
protected override void QueueTask(Task task) | |
{ | |
ThreadPool.QueueUserWorkItem(t => | |
base.TryExecuteTask((Task)t!), task); | |
} | |
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) | |
{ | |
return false; | |
} | |
} | |
static async Task Main() | |
{ | |
await TaskScheduler.Default.SwitchTo(alwaysSchedule: true); | |
Trace.Assert(TaskScheduler.Current == TaskScheduler.Default); | |
var scheduler = new CustomTaskScheduler(); | |
await scheduler.SwitchTo(); | |
Trace.Assert(TaskScheduler.Current == scheduler); | |
var tcs = new TaskCompletionSource<bool>(); | |
ThreadPool.QueueUserWorkItem(_ => | |
{ | |
Debug.Assert(TaskScheduler.Current == TaskScheduler.Default); | |
tcs.SetResult(true); | |
}); | |
await tcs.Task; | |
Trace.Assert(TaskScheduler.Current == scheduler); | |
await TaskScheduler.Default.SwitchTo(); | |
Trace.Assert(TaskScheduler.Current == TaskScheduler.Default); | |
} | |
} | |
} |
Per the docs you shouldn't call
RestoreFlow
.
Hi @AArnott, sorry I've only just noticed your comment. I actually borrowed this idea of calling RestoreFlow
directly from .NET Core sources, particularly from here. I think the idea was to avoid creating AsyncFlowControl
for a short synchronous scope.
But now I believe we shouldn't even be suppressing it for .NET Core, inline with Stephen's notes?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Per the docs you shouldn't call
RestoreFlow
.