-
-
Save joseluisq/e7f926d73e02fb9dd6114f4d8be6607d to your computer and use it in GitHub Desktop.
Tokio Async: Concurrent vs Parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::StreamExt; | |
use std::error::Error; | |
use tokio; | |
use tokio::macros::support::Pin; | |
use tokio::prelude::*; | |
use tokio::time::{Duration, Instant}; | |
pub fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let mut multi_threaded_runtime = tokio::runtime::Builder::new() | |
.threaded_scheduler() | |
.enable_all() | |
.core_threads(10) | |
.max_threads(10) | |
.thread_name("multi-threaded") | |
.build()?; | |
// multi_threaded_runtime.block_on(concurrent()); | |
multi_threaded_runtime.block_on(concurrentAndParallel()); | |
Ok(()) | |
} | |
// As can be seen by the output below, despite specifying parallelism of 3, we are still bound by | |
// the fact that all of the future generated by this function execute within a single task. | |
async fn concurrent() { | |
let before = Instant::now(); | |
let paths = (0..6).rev(); | |
let fetches = futures::stream::iter(paths.into_iter().map(|path| make_request(path))) | |
.buffer_unordered(3) | |
.map(|r| println!("finished request: {}", r)) | |
.collect::<Vec<_>>(); | |
fetches.await; | |
println!("elapsed time: {:.2?}", before.elapsed()); | |
} | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 5 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 4 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 3 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 2 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 1 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 0 | |
// elapsed time: 15.01s | |
// Here, we wrap every future within its own task using tokio::spawn. This allows the "requests" to | |
// execute in parallel (depending on how many threads the runtime is configured with; 10 in this | |
// case) using the multiplexing that Tokio does between different tasks and threads. You can see | |
// from the output how 3 threads with ids, 9, 10, and 11, are consistently used to execute all of | |
// the 6 "requests". | |
async fn concurrentAndParallel() { | |
let before = Instant::now(); | |
let paths = (0..6).rev(); | |
let fetches = futures::stream::iter( | |
paths | |
.into_iter() | |
.map(|path| tokio::spawn(make_request(path))), | |
) | |
.buffer_unordered(3) | |
.map(|r| { | |
println!( | |
"finished request: {}", | |
match r { | |
Ok(rr) => rr, | |
Err(_) => String::from("Bad"), | |
} | |
); | |
}) | |
.collect::<Vec<_>>(); | |
fetches.await; | |
println!("elapsed time: {:.2?}", before.elapsed()); | |
} | |
// started request | |
// started request | |
// started request | |
// finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 3 | |
// started request | |
// finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 4 | |
// started request | |
// finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 5 | |
// started request | |
// finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 0 | |
// finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 1 | |
// finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 2 | |
// elapsed time: 5.01s | |
async fn make_request(sleep: u64) -> String { | |
println!("started request"); | |
std::thread::sleep(Duration::from_secs(sleep)); | |
format!( | |
"current thread {:?} | thread name {} | request_duration {:?}", | |
std::thread::current().id(), | |
std::thread::current() | |
.name() | |
.get_or_insert("default_thread_name"), | |
sleep | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
std::thread::sleep
blocks the OS-level thread, right?