Skip to content

Instantly share code, notes, and snippets.

@Nemo157
Last active June 23, 2020 15:06
Show Gist options
  • Save Nemo157/092bc8ccd258ec07911ee2ee88ff2c47 to your computer and use it in GitHub Desktop.
Save Nemo157/092bc8ccd258ec07911ee2ee88ff2c47 to your computer and use it in GitHub Desktop.
An attempt at working out how to shim an argument accepting generator into a Sink
#![feature(generator_trait)]
#![feature(generators)]
use core::pin::Pin;
use core::task::{Context, Poll};
use core::ops::{Generator, GeneratorState};
pub trait Sink<Item> {
type Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
}
enum Arg<'a, 'b, T> {
StartSend(T),
Flush(&'a mut core::task::Context<'b>),
Close(&'a mut core::task::Context<'b>),
}
enum Res {
Idle,
NotReady,
Accepted,
}
struct SinkShim<Gen>(Gen);
impl<Item, Err, Gen> Sink<Item> for SinkShim<Gen> where Gen: for<'a, 'b> Generator<Arg<'a, 'b, Item>, Yield = Res, Return = Result<(), Err>> {
type Error = Err;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
match unsafe { Pin::new_unchecked(&mut Pin::get_unchecked_mut(self).0) }.resume(Arg::StartSend(item)) {
GeneratorState::Yielded(Res::Idle) => panic!("sink idle after start send"),
GeneratorState::Yielded(Res::NotReady) => panic!("sink rejected send"),
GeneratorState::Yielded(Res::Accepted) => Ok(()),
GeneratorState::Complete(Ok(())) => panic!("sink unexpectedly closed"),
GeneratorState::Complete(Err(err)) => Err(err),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
match unsafe { Pin::new_unchecked(&mut Pin::get_unchecked_mut(self).0) }.resume(Arg::Flush(cx)) {
GeneratorState::Yielded(Res::Idle) => Poll::Ready(Ok(())),
GeneratorState::Yielded(Res::NotReady) => Poll::Pending,
GeneratorState::Yielded(Res::Accepted) => panic!("sink accepted during flush"),
GeneratorState::Complete(Ok(())) => Poll::Ready(Ok(())),
GeneratorState::Complete(Err(err)) => Poll::Ready(Err(err)),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
match unsafe { Pin::new_unchecked(&mut Pin::get_unchecked_mut(self).0) }.resume(Arg::Close(cx)) {
GeneratorState::Yielded(Res::Idle) => panic!("should have returned"),
GeneratorState::Yielded(Res::NotReady) => Poll::Pending,
GeneratorState::Yielded(Res::Accepted) => panic!("sink accepted during close"),
GeneratorState::Complete(Ok(())) => Poll::Ready(Ok(())),
GeneratorState::Complete(Err(err)) => Poll::Ready(Err(err)),
}
}
}
macro_rules! await_in_sink {
($arg:ident, $e:expr) => ({
enum MaybeDone<F, T> { NotDone(F), Done(T) }
let mut maybe_future = MaybeDone::NotDone($e);
loop {
match maybe_future {
MaybeDone::NotDone(future) => {
$arg = match $arg {
Arg::StartSend(item) => yield Res::NotReady,
Arg::Flush(cx) | Arg::Close(cx) => {
if let Poll::Ready(e) = core::future::Future::poll(unsafe { Pin::new_unchecked(&mut future) }, cx) {
maybe_future = MaybeDone::Done(e);
}
yield Res::NotReady
}
}
}
MaybeDone::Done(e) => {
break e;
}
}
}
})
}
macro_rules! await_item {
($arg:ident) => ({
let mut maybe_item = None;
loop {
match maybe_item {
Some(item) => break item,
None => {
$arg = match $arg {
Arg::StartSend(item) => {
maybe_item = Some(Some(item));
yield Res::Accepted
}
Arg::Flush(cx) => {
yield Res::Idle
}
Arg::Close(cx) => {
maybe_item = Some(None);
Arg::Close(cx)
}
}
}
}
}
})
}
async fn bar(s: String) -> Result<(), String> {
if s == "bang" { Err(s) } else { Ok(()) }
}
fn foo() -> impl Sink<String, Error = String> {
SinkShim(|arg: Arg<String>| {
while let Some(item) = await_item!(arg) {
await_in_sink!(arg, bar(item))?;
}
Ok(())
})
}
fn main() {
let sink = foo();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment