Last active
February 14, 2024 00:17
-
-
Save noseratio/eb8f58ac483426f299bd4acf1c269583 to your computer and use it in GitHub Desktop.
Cancelling an observable with a CancellationToken
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://stackoverflow.com/q/72471152/1768303 | |
// https://dotnetfiddle.net/bLYDgw | |
/* | |
I don't think I should be seeing "Emitting: 19" after "Cancelling" | |
Emitting: 18 | |
TakeUntil passed: 18 | |
OnNext: 18 | |
Cancelling | |
OnError: System.OperationCanceledException: The operation was canceled. | |
Emitting: 19 | |
Finally | |
Unhandled exception. System.OperationCanceledException: The operation was canceled. | |
I'm not seeing it when I use a custom WithCancellation operator (below) | |
*/ | |
#nullable enable | |
using System; | |
using System.Reactive; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Reactive.Threading.Tasks; | |
using System.Threading; | |
using System.Threading.Tasks; | |
async Task Test(CancellationToken token) | |
{ | |
var publishedSequence = Observable | |
.Interval(TimeSpan.FromMilliseconds(50)) | |
.Do(n => Console.WriteLine($"Emitting: {n}")) | |
.Skip(3) | |
//.TakeUntil(Task.Delay(Timeout.Infinite, token).ToObservable()) | |
//.WithCancellation(token) | |
.TakeUntil( | |
Observable.Create<long>( | |
observer => token.Register( | |
(_, token) => | |
{ | |
Console.WriteLine($"Cancelling"); | |
observer.OnError(new OperationCanceledException(token)); | |
}, | |
null))) | |
.Do(n => Console.WriteLine($"TakeUntil passed: {n}")) | |
.Finally(() => Console.WriteLine($"Finally")) | |
.Publish(); | |
using var subscription = publishedSequence.Subscribe( | |
onNext: n => Console.WriteLine($"OnNext: {n}"), | |
onError: e => Console.WriteLine($"OnError: {e}"), | |
onCompleted: () => Console.WriteLine("OnCompleted")); | |
using var connection = publishedSequence.Connect(); | |
try | |
{ | |
await publishedSequence.ToTask(); | |
} | |
finally | |
{ | |
// don't dispose anything just yet | |
await Task.Delay(1000); | |
} | |
} | |
ThreadPool.SetMaxThreads(workerThreads: 100, completionPortThreads: 100); | |
var cts = new CancellationTokenSource(1000); | |
await Test(cts.Token); | |
Environment.Exit(0); | |
public static class RxExt | |
{ | |
/// <summary> | |
/// An pass-through observable that also observes a cancellation token | |
/// </summary> | |
public static IObservable<T> WithCancellation<T>(this IObservable<T> @this, CancellationToken cancelToken) | |
{ | |
if (cancelToken.IsCancellationRequested) | |
{ | |
return Observable.Throw<T>(new OperationCanceledException(cancelToken)); | |
} | |
if (!cancelToken.CanBeCanceled) | |
{ | |
return Observable.Never<T>(); | |
} | |
return Observable.Create<T>(observer => | |
{ | |
object syncRoot = new(); | |
IDisposable subscription = Disposable.Empty; | |
IDisposable rego = Disposable.Empty; | |
bool subscribed = true; | |
bool cancelled = false; | |
subscription = @this.Subscribe(OnNext, OnError, OnCompleted); | |
rego = cancelToken.Register(OnCancel, null); | |
return Disposable.Create(Dispose); | |
void Dispose() | |
{ | |
lock (syncRoot) | |
{ | |
Unsubscribe(); | |
} | |
} | |
void OnCancel(object? _, CancellationToken token) | |
{ | |
lock (syncRoot) | |
{ | |
Cancel(token); | |
} | |
} | |
void Unsubscribe() | |
{ | |
Debug.Assert(Monitor.IsEntered(syncRoot)); | |
if (!subscribed) { return; } | |
subscribed = false; | |
subscription.Dispose(); | |
if (!cancelToken.IsCancellationRequested) | |
{ | |
// a workaround for a rare deadlock when | |
// this is called from the cancellation callback | |
rego.Dispose(); | |
} | |
} | |
void Cancel(CancellationToken token) | |
{ | |
Debug.Assert(Monitor.IsEntered(syncRoot)); | |
if (!subscribed || cancelled) { return; } | |
cancelled = true; | |
Unsubscribe(); | |
observer.OnError(new OperationCanceledException(token)); | |
} | |
void OnNext(T item) | |
{ | |
lock (syncRoot) | |
{ | |
if (!subscribed) | |
{ | |
return; | |
} | |
if (cancelToken.IsCancellationRequested) | |
{ | |
Cancel(cancelToken); | |
} | |
else | |
{ | |
observer.OnNext(item); | |
} | |
} | |
} | |
void OnError(Exception error) | |
{ | |
lock (syncRoot) | |
{ | |
if (!subscribed) | |
{ | |
return; | |
} | |
if (cancelToken.IsCancellationRequested) | |
{ | |
Cancel(cancelToken); | |
} | |
else | |
{ | |
Unsubscribe(); | |
observer.OnError(error); | |
} | |
} | |
} | |
void OnCompleted() | |
{ | |
lock (syncRoot) | |
{ | |
if (!subscribed) | |
{ | |
return; | |
} | |
if (cancelToken.IsCancellationRequested) | |
{ | |
Cancel(cancelToken); | |
} | |
else | |
{ | |
Unsubscribe(); | |
observer.OnCompleted(); | |
} | |
} | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment