Skip to content

Instantly share code, notes, and snippets.

@noseratio
Last active February 14, 2024 00:17
Show Gist options
  • Save noseratio/eb8f58ac483426f299bd4acf1c269583 to your computer and use it in GitHub Desktop.
Save noseratio/eb8f58ac483426f299bd4acf1c269583 to your computer and use it in GitHub Desktop.
Cancelling an observable with a CancellationToken
// https://stackoverflow.com/q/72471152/1768303
// https://dotnetfiddle.net/bLYDgw
/*
I don't think I should be seeing "Emitting: 19" after "Cancelling"
Emitting: 18
TakeUntil passed: 18
OnNext: 18
Cancelling
OnError: System.OperationCanceledException: The operation was canceled.
Emitting: 19
Finally
Unhandled exception. System.OperationCanceledException: The operation was canceled.
I'm not seeing it when I use a custom WithCancellation operator (below)
*/
#nullable enable
using System;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
async Task Test(CancellationToken token)
{
var publishedSequence = Observable
.Interval(TimeSpan.FromMilliseconds(50))
.Do(n => Console.WriteLine($"Emitting: {n}"))
.Skip(3)
//.TakeUntil(Task.Delay(Timeout.Infinite, token).ToObservable())
//.WithCancellation(token)
.TakeUntil(
Observable.Create<long>(
observer => token.Register(
(_, token) =>
{
Console.WriteLine($"Cancelling");
observer.OnError(new OperationCanceledException(token));
},
null)))
.Do(n => Console.WriteLine($"TakeUntil passed: {n}"))
.Finally(() => Console.WriteLine($"Finally"))
.Publish();
using var subscription = publishedSequence.Subscribe(
onNext: n => Console.WriteLine($"OnNext: {n}"),
onError: e => Console.WriteLine($"OnError: {e}"),
onCompleted: () => Console.WriteLine("OnCompleted"));
using var connection = publishedSequence.Connect();
try
{
await publishedSequence.ToTask();
}
finally
{
// don't dispose anything just yet
await Task.Delay(1000);
}
}
ThreadPool.SetMaxThreads(workerThreads: 100, completionPortThreads: 100);
var cts = new CancellationTokenSource(1000);
await Test(cts.Token);
Environment.Exit(0);
public static class RxExt
{
/// <summary>
/// An pass-through observable that also observes a cancellation token
/// </summary>
public static IObservable<T> WithCancellation<T>(this IObservable<T> @this, CancellationToken cancelToken)
{
if (cancelToken.IsCancellationRequested)
{
return Observable.Throw<T>(new OperationCanceledException(cancelToken));
}
if (!cancelToken.CanBeCanceled)
{
return Observable.Never<T>();
}
return Observable.Create<T>(observer =>
{
object syncRoot = new();
IDisposable subscription = Disposable.Empty;
IDisposable rego = Disposable.Empty;
bool subscribed = true;
bool cancelled = false;
subscription = @this.Subscribe(OnNext, OnError, OnCompleted);
rego = cancelToken.Register(OnCancel, null);
return Disposable.Create(Dispose);
void Dispose()
{
lock (syncRoot)
{
Unsubscribe();
}
}
void OnCancel(object? _, CancellationToken token)
{
lock (syncRoot)
{
Cancel(token);
}
}
void Unsubscribe()
{
Debug.Assert(Monitor.IsEntered(syncRoot));
if (!subscribed) { return; }
subscribed = false;
subscription.Dispose();
if (!cancelToken.IsCancellationRequested)
{
// a workaround for a rare deadlock when
// this is called from the cancellation callback
rego.Dispose();
}
}
void Cancel(CancellationToken token)
{
Debug.Assert(Monitor.IsEntered(syncRoot));
if (!subscribed || cancelled) { return; }
cancelled = true;
Unsubscribe();
observer.OnError(new OperationCanceledException(token));
}
void OnNext(T item)
{
lock (syncRoot)
{
if (!subscribed)
{
return;
}
if (cancelToken.IsCancellationRequested)
{
Cancel(cancelToken);
}
else
{
observer.OnNext(item);
}
}
}
void OnError(Exception error)
{
lock (syncRoot)
{
if (!subscribed)
{
return;
}
if (cancelToken.IsCancellationRequested)
{
Cancel(cancelToken);
}
else
{
Unsubscribe();
observer.OnError(error);
}
}
}
void OnCompleted()
{
lock (syncRoot)
{
if (!subscribed)
{
return;
}
if (cancelToken.IsCancellationRequested)
{
Cancel(cancelToken);
}
else
{
Unsubscribe();
observer.OnCompleted();
}
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment