Last active
June 23, 2020 15:06
-
-
Save Nemo157/092bc8ccd258ec07911ee2ee88ff2c47 to your computer and use it in GitHub Desktop.
An attempt at working out how to shim an argument accepting generator into a Sink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(generator_trait)] | |
#![feature(generators)] | |
use core::pin::Pin; | |
use core::task::{Context, Poll}; | |
use core::ops::{Generator, GeneratorState}; | |
pub trait Sink<Item> { | |
type Error; | |
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>; | |
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; | |
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>; | |
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>; | |
} | |
enum Arg<'a, 'b, T> { | |
StartSend(T), | |
Flush(&'a mut core::task::Context<'b>), | |
Close(&'a mut core::task::Context<'b>), | |
} | |
enum Res { | |
Idle, | |
NotReady, | |
Accepted, | |
} | |
struct SinkShim<Gen>(Gen); | |
impl<Item, Err, Gen> Sink<Item> for SinkShim<Gen> where Gen: for<'a, 'b> Generator<Arg<'a, 'b, Item>, Yield = Res, Return = Result<(), Err>> { | |
type Error = Err; | |
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { | |
self.poll_flush(cx) | |
} | |
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { | |
match unsafe { Pin::new_unchecked(&mut Pin::get_unchecked_mut(self).0) }.resume(Arg::StartSend(item)) { | |
GeneratorState::Yielded(Res::Idle) => panic!("sink idle after start send"), | |
GeneratorState::Yielded(Res::NotReady) => panic!("sink rejected send"), | |
GeneratorState::Yielded(Res::Accepted) => Ok(()), | |
GeneratorState::Complete(Ok(())) => panic!("sink unexpectedly closed"), | |
GeneratorState::Complete(Err(err)) => Err(err), | |
} | |
} | |
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { | |
match unsafe { Pin::new_unchecked(&mut Pin::get_unchecked_mut(self).0) }.resume(Arg::Flush(cx)) { | |
GeneratorState::Yielded(Res::Idle) => Poll::Ready(Ok(())), | |
GeneratorState::Yielded(Res::NotReady) => Poll::Pending, | |
GeneratorState::Yielded(Res::Accepted) => panic!("sink accepted during flush"), | |
GeneratorState::Complete(Ok(())) => Poll::Ready(Ok(())), | |
GeneratorState::Complete(Err(err)) => Poll::Ready(Err(err)), | |
} | |
} | |
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { | |
match unsafe { Pin::new_unchecked(&mut Pin::get_unchecked_mut(self).0) }.resume(Arg::Close(cx)) { | |
GeneratorState::Yielded(Res::Idle) => panic!("should have returned"), | |
GeneratorState::Yielded(Res::NotReady) => Poll::Pending, | |
GeneratorState::Yielded(Res::Accepted) => panic!("sink accepted during close"), | |
GeneratorState::Complete(Ok(())) => Poll::Ready(Ok(())), | |
GeneratorState::Complete(Err(err)) => Poll::Ready(Err(err)), | |
} | |
} | |
} | |
macro_rules! await_in_sink { | |
($arg:ident, $e:expr) => ({ | |
enum MaybeDone<F, T> { NotDone(F), Done(T) } | |
let mut maybe_future = MaybeDone::NotDone($e); | |
loop { | |
match maybe_future { | |
MaybeDone::NotDone(future) => { | |
$arg = match $arg { | |
Arg::StartSend(item) => yield Res::NotReady, | |
Arg::Flush(cx) | Arg::Close(cx) => { | |
if let Poll::Ready(e) = core::future::Future::poll(unsafe { Pin::new_unchecked(&mut future) }, cx) { | |
maybe_future = MaybeDone::Done(e); | |
} | |
yield Res::NotReady | |
} | |
} | |
} | |
MaybeDone::Done(e) => { | |
break e; | |
} | |
} | |
} | |
}) | |
} | |
macro_rules! await_item { | |
($arg:ident) => ({ | |
let mut maybe_item = None; | |
loop { | |
match maybe_item { | |
Some(item) => break item, | |
None => { | |
$arg = match $arg { | |
Arg::StartSend(item) => { | |
maybe_item = Some(Some(item)); | |
yield Res::Accepted | |
} | |
Arg::Flush(cx) => { | |
yield Res::Idle | |
} | |
Arg::Close(cx) => { | |
maybe_item = Some(None); | |
Arg::Close(cx) | |
} | |
} | |
} | |
} | |
} | |
}) | |
} | |
async fn bar(s: String) -> Result<(), String> { | |
if s == "bang" { Err(s) } else { Ok(()) } | |
} | |
fn foo() -> impl Sink<String, Error = String> { | |
SinkShim(|arg: Arg<String>| { | |
while let Some(item) = await_item!(arg) { | |
await_in_sink!(arg, bar(item))?; | |
} | |
Ok(()) | |
}) | |
} | |
fn main() { | |
let sink = foo(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment