Created
June 7, 2024 05:27
-
-
Save sxlijin/6967045e0b8d19b34340db43f369c257 to your computer and use it in GitHub Desktop.
ruby-tokio-demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::{future, FutureExt}; | |
use magnus::{ | |
class, exception::runtime_error, function, method, prelude::*, Error, IntoValue, RModule, | |
}; | |
//#[cfg(ruby_have_ruby_fiber_scheduler_h)] | |
use rb_sys::bindings::uncategorized::{ | |
rb_fiber_current, rb_fiber_scheduler_block, rb_fiber_scheduler_current, rb_fiber_scheduler_get, | |
rb_fiber_scheduler_kernel_sleep, rb_fiber_scheduler_unblock, | |
}; | |
use std::cell::RefCell; | |
use std::ops::{Deref, DerefMut}; | |
//use rb_sys::rb_fiber_current; | |
use std::task::Poll; | |
use std::{future::Future, sync::Arc}; | |
use tokio::time::{sleep, Duration}; | |
use crate::Result; | |
async fn async_fn() -> String { | |
let duration = Duration::from_secs(2); | |
println!("async-BEGIN- sleeping for {duration:#?}"); | |
sleep(duration).await; | |
println!("async-END- slept for {duration:#?}"); | |
"async-retval".to_string() | |
} | |
fn wrap_fiber<T>(f: impl Future<Output = T>) -> impl Future<Output = T> { | |
let mut f = Box::pin(f); | |
future::poll_fn(move |cx| { | |
let result = f.as_mut().poll(cx); | |
let fiber = unsafe { rb_fiber_current() }; | |
println!("fiber={fiber}"); | |
match result { | |
Poll::Ready(_) => unsafe { | |
//rb_fiber_scheduler_unblock( | |
// rb_fiber_scheduler_current(), | |
// rb_sys::special_consts::Qnil.into(), | |
// fiber, | |
//); | |
}, | |
Poll::Pending => unsafe { | |
rb_fiber_scheduler_block( | |
rb_fiber_scheduler_current(), | |
rb_sys::special_consts::Qnil.into(), | |
rb_sys::special_consts::Qnil.into(), | |
); | |
}, | |
} | |
result | |
}) | |
} | |
#[magnus::wrap(class = "Baml::Ffi::TokioDemo", free_immediately, size)] | |
/// For testing how to implement tokio in a Ruby extension | |
pub struct TokioDemo { | |
t: RefCell<Option<tokio::runtime::Runtime>>, | |
} | |
#[allow(dead_code)] | |
impl TokioDemo { | |
fn new() -> Result<Self> { | |
let Ok(tokio_runtime) = tokio::runtime::Builder::new_multi_thread() | |
.worker_threads(1) | |
.enable_all() | |
.build() | |
else { | |
return Err(Error::new(runtime_error(), "Failed to start tokio runtime")); | |
}; | |
Ok(Self { | |
t: RefCell::new(Some(tokio_runtime)), | |
}) | |
} | |
//fn does_this_yield(&self) { | |
// let rb_qnil = rb_sys::special_consts::Qnil; | |
// println!("build2 qnil {}", Into::<u64>::into(rb_qnil)); | |
// let rb_scheduler = unsafe { rb_fiber_scheduler_get() }; | |
// println!("current scheduler {}", Into::<u64>::into(rb_scheduler)); | |
// let rb_curr_fiber = unsafe { rb_fiber_current() }; | |
// println!( | |
// " fiber={} going to sleep", | |
// Into::<u64>::into(rb_curr_fiber) | |
// ); | |
// unsafe { | |
// let rb_duration = magnus::Integer::from_i64(10).into_value(); | |
// //rb_fiber_scheduler_kernel_sleep(rb_scheduler, rb_duration.as_raw()); | |
// } | |
// let fut = self.t.spawn(async move { | |
// async_fn().await; | |
// println!(" fiber={} done sleeping, pls wake up", rb_curr_fiber); | |
// unsafe { | |
// let rb_qnil = rb_sys::special_consts::Qnil; | |
// if rb_scheduler != Into::<u64>::into(rb_qnil) { | |
// rb_fiber_scheduler_unblock(rb_scheduler, rb_qnil.into(), rb_curr_fiber); | |
// } | |
// } | |
// }); | |
// println!( | |
// " fiber={} signalling that we're going to block", | |
// Into::<u64>::into(rb_curr_fiber) | |
// ); | |
// unsafe { | |
// if rb_scheduler != Into::<u64>::into(rb_qnil) { | |
// rb_fiber_scheduler_block( | |
// rb_scheduler, | |
// rb_qnil.into(), | |
// // In theory, according to rb_fiber_scheduler_make_timeout, qnil blocks indefinitely | |
// /*timeout:*/ | |
// rb_qnil.into(), | |
// ); | |
// } | |
// } | |
// println!( | |
// " fiber={} blocking until woken up", | |
// Into::<u64>::into(rb_curr_fiber) | |
// ); | |
// self.t.block_on(fut); | |
//} | |
fn wrapped_yield(&self) { | |
let borrow = self.t.borrow(); | |
let Some(ref t) = borrow.deref() else { | |
println!("no runtime found"); | |
return; | |
}; | |
let (tx, rx) = tokio::sync::oneshot::channel(); | |
let rb_curr_fiber = unsafe { rb_fiber_current() }; | |
let rb_curr_scheduler = unsafe { rb_fiber_scheduler_current() }; | |
t.spawn(async move { | |
async_fn().await; | |
if let Err(_) = tx.send(3) { | |
println!("the receiver dropped"); | |
} | |
unsafe { | |
rb_fiber_scheduler_unblock( | |
rb_curr_scheduler, | |
rb_sys::special_consts::Qnil.into(), | |
rb_curr_fiber, | |
); | |
} | |
}); | |
drop(borrow); | |
//let mut fut = std::pin::pin!(async_fn()); | |
let mut fut = std::pin::pin!(rx); | |
let waker = unsafe { std::task::Waker::from(Arc::new(RubyWaker {})) }; | |
let mut cx = std::task::Context::from_waker(&waker); | |
loop { | |
match Future::poll(fut.as_mut(), &mut cx) { | |
Poll::Pending => { | |
println!(" fiber={rb_curr_fiber} pending"); | |
unsafe { | |
rb_fiber_scheduler_block( | |
rb_curr_scheduler, | |
rb_sys::special_consts::Qnil.into(), | |
rb_sys::special_consts::Qnil.into(), | |
); | |
} | |
} | |
Poll::Ready(_) => { | |
println!(" fiber={rb_curr_fiber} ready"); | |
break; | |
} | |
} | |
} | |
} | |
fn shutdown(&self) { | |
let mut shutdown = None; | |
std::mem::swap(self.t.borrow_mut().deref_mut(), &mut shutdown); | |
if let Some(t) = shutdown { | |
t.shutdown_background(); | |
} | |
} | |
#[allow(unused_variables)] | |
fn tokio_test(&self) { | |
//let f0 = self.t.spawn(async_fn()); | |
//let f1 = self.t.spawn(async_fn()); | |
//let f2 = self.t.spawn(async_fn()); | |
} | |
/// For usage in magnus::init | |
/// | |
/// TODO: use traits and macros to implement this | |
pub fn define_in_ruby(module: &RModule) -> Result<()> { | |
let tokio_demo = module.define_class("TokioDemo", class::object())?; | |
tokio_demo.define_singleton_method("new", function!(TokioDemo::new, 0))?; | |
//tokio_demo.define_method("does_this_yield", method!(TokioDemo::does_this_yield, 0))?; | |
tokio_demo.define_method("does_this_yield", method!(TokioDemo::wrapped_yield, 0))?; | |
tokio_demo.define_method("shutdown", method!(TokioDemo::shutdown, 0))?; | |
Ok(()) | |
} | |
} | |
struct RubyWaker {} | |
impl std::task::Wake for RubyWaker { | |
fn wake(self: Arc<Self>) { | |
println!("Waking up Ruby fiber"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment