Crates.io | open-coroutine-queue |
lib.rs | open-coroutine-queue |
version | 0.5.0 |
source | src |
created_at | 2023-07-23 01:26:38.40823 |
updated_at | 2024-01-21 12:35:27.284472 |
description | Concurrent work-stealing queue, implemented using st3 and crossbeam-deque. |
homepage | |
repository | https://github.com/acl-dev/open-coroutine/tree/dev/open-coroutine-queue |
max_upload_size | |
id | 923501 |
size | 26,767 |
The open-coroutine
is a simple, efficient and generic stackful-coroutine library.
Still under development, please do not
use this library in the production
environment !
[dependencies]
# check https://crates.io/crates/open-coroutine
open-coroutine = "x.y.z"
#[open_coroutine::main]
fn main() {
//......
}
Note: not supported for windows
#[open_coroutine::main]
fn main() -> std::io::Result<()> {
cfg_if::cfg_if! {
if #[cfg(all(unix, feature = "preemptive-schedule"))] {
use open_coroutine_core::scheduler::Scheduler;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
static mut TEST_FLAG1: bool = true;
static mut TEST_FLAG2: bool = true;
let pair = Arc::new((Mutex::new(true), Condvar::new()));
let pair2 = Arc::clone(&pair);
let handler = std::thread::Builder::new()
.name("preemptive".to_string())
.spawn(move || {
let scheduler = Scheduler::new();
_ = scheduler.submit(
|_, _| {
println!("coroutine1 launched");
while unsafe { TEST_FLAG1 } {
println!("loop1");
_ = unsafe { libc::usleep(10_000) };
}
println!("loop1 end");
1
},
None,
);
_ = scheduler.submit(
|_, _| {
println!("coroutine2 launched");
while unsafe { TEST_FLAG2 } {
println!("loop2");
_ = unsafe { libc::usleep(10_000) };
}
println!("loop2 end");
unsafe { TEST_FLAG1 = false };
2
},
None,
);
_ = scheduler.submit(
|_, _| {
println!("coroutine3 launched");
unsafe { TEST_FLAG2 = false };
3
},
None,
);
scheduler.try_schedule();
let (lock, cvar) = &*pair2;
let mut pending = lock.lock().unwrap();
*pending = false;
// notify the condvar that the value has changed.
cvar.notify_one();
})
.expect("failed to spawn thread");
// wait for the thread to start up
let (lock, cvar) = &*pair;
let result = cvar
.wait_timeout_while(
lock.lock().unwrap(),
Duration::from_millis(3000),
|&mut pending| pending,
)
.unwrap();
if result.1.timed_out() {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"preemptive schedule failed",
))
} else {
unsafe {
handler.join().unwrap();
assert!(!TEST_FLAG1);
}
Ok(())
}
} else {
println!("please enable preemptive-schedule feature");
Ok(())
}
}
}
outputs
coroutine1 launched
loop1
coroutine2 launched
loop2
coroutine3 launched
loop1
loop2 end
loop1 end
#[open_coroutine::main]
fn main() {
std::thread::sleep(std::time::Duration::from_secs(1));
}
outputs
nanosleep hooked
support scalable stack
support and compatibility for AF_XDP socket
hook other syscall maybe interrupt by signal
support #[open_coroutine::join]
macro to wait coroutines
#[open_coroutine::main]
macroMonitor
follow the thread-per-core
guidelineEventLoop
follow the thread-per-core
guidelinegenawaiter
as low_level stackless coroutine (can't support it due to hook)corosensei
as low_level coroutine#[open_coroutine::co]
macroWorkStealQueue
use correct epoll_event
struct
use rayon
for parallel computing
support #[open_coroutine::main]
macro
hook almost all read
syscall
hook almost all write
syscall
hook other syscall