Crates.io | reactio |
lib.rs | reactio |
version | 0.1.8 |
source | src |
created_at | 2024-10-20 04:03:18.642884 |
updated_at | 2024-11-01 03:37:35.70552 |
description | Low-Latency Event Driven Nonblocking Reactor Pattern |
homepage | |
repository | https://github.com/zjgoggle/reactio-rs |
max_upload_size | |
id | 1415904 |
size | 159,399 |
Low-latency Event-driven Non-blocking Reactor pattern in Rust.
Supported platforms: Linux, Windows and x86_64, arm64. (Other platforms are not tested.)
Only 64-bit platforms are supported
ReactIO impements a low-latency event-driven Reactor pattern in non-threaded and multiple-threaded environment.
Users implement a Reactor
(as least implement on_inbound_message
) and add it to a ReactRuntime
.
Each ReactRuntime
instance runs in a dedicated thread. It polls all events for managed Reactors. There'are 2 kinds of events:
There's another set of structs - SimpleIoReactor, SimpleIoListener, CommandReactor - when people only want to supply event functions. See below examples.
Key technologies:
ReactRuntime
.When processing events, Reactor doesn't need any mutex to protect resources.
More tests are in tests and examples.
// PingpongReactor is a Reactor to send back any received messages, which could be used to test round-trip TCP time.
pub fn test_ping_pong_reactor() {
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let mut runtime = ReactRuntime::new();
let cmd_sender = runtime.get_cmd_sender();
cmd_sender
.send_listen(
addr,
DefaultTcpListenerHandler::<PingpongReactor>::new(
recv_buffer_min_size,
ServerParam {
name: "server".to_owned(), // parent/listner reactor name. Children names are appended a count number. E.g. "Server-1" for the first connection.
latency_batch: 1000, // report round-trip time for each latency_batch samples.
},
),
Deferred::Immediate,
|_| {}, // OnCommandCompletion
)
.unwrap();
cmd_sender
.send_connect(
addr,
recv_buffer_min_size,
// client PingpongReactor initiate a message. It sends echo back 2 messages before close and latency_batch=1000.
PingpongReactor::new_client("client".to_owned(), 2, 1000),
Deferred::Immediate,
|_| {}, // OnCommandCompletion
)
.unwrap();
// In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
let timer = utils::Timer::new_millis(1000);
while runtime.process_events() {
if timer.expired() {
assert!(false, "ERROR: timeout waiting for tests to complete!");
break;
}
}
assert_eq!(runtime.count_reactors(), 0);
assert_eq!(runtime.count_deferred_queue(), 0);
}
/// SimpleIoReactor implements `Reactor` and calls user handlers on events.
pub fn test_io_reactor() {
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let mut runtime = reactio::SimpleIoRuntime::new();
let on_sock_msg = {
let max_echos = 1;
let mut count_echos = 0;
move |buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>| {
if count_echos >= max_echos {
return Err(format!("Reached max echo: {max_echos}")); // close socket
}
ctx.send_or_que(buf)?; // echo back message.
count_echos += 1;
Ok(buf.len()) // return number of bytes having been consumed.
}
};
let on_server_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid| {
ctx.cmd_sender
.send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; // close parent listerner.
Ok(()) // accept current connection.
};
let on_new_connection = move |_childid| {
// create a new Reactor for the new connection.
Some(reactio::SimpleIoReactor::new_boxed(
Some(Box::new(on_server_connected)), // on_connected
None, // on_closed
on_sock_msg, // on_sock_msg
))
};
let on_client_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, _| {
// client sends initial msg.
let mut auto_sender = ctx.acquire_send(); // send on drop
auto_sender.write_fmt(format_args!("test ")).unwrap();
auto_sender.write_fmt(format_args!("msgsend")).unwrap();
assert_eq!(auto_sender.count_written(), 12);
assert_eq!(auto_sender.get_written(), b"test msgsend");
// auto_sender.send(None).unwrap(); // this line can be omitted to let it auto send on drop.
// ctx.send_or_que("Hello".as_bytes())?; // rather than using auto_sender, we call ctx.send_or_que
Ok(()) // accept connection
};
//-- server
runtime
.get_cmd_sender()
.send_listen(
addr,
reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
reactio::Deferred::Immediate,
|_| {}, // OnCommandCompletion
)
.unwrap();
// wait for server ready.
let timer = reactio::utils::Timer::new_millis(1000);
while runtime.count_reactors() < 1 {
if timer.expired() {
logerr!("ERROR: timeout waiting for listener start!");
break;
}
runtime.process_events();
}
//-- client
runtime
.get_cmd_sender()
.send_connect(
addr,
recv_buffer_min_size,
reactio::SimpleIoReactor::new(
Some(Box::new(on_client_connected)), // on_connected
None, // on_closed
on_sock_msg, // on_sock_msg
),
reactio::Deferred::Immediate,
|_| {}, // OnCommandCompletion
)
.unwrap();
// In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
let timer = reactio::utils::Timer::new_millis(1000);
while runtime.process_events() {
if timer.expired() {
logerr!("ERROR: timeout waiting for tests to complete!");
break;
}
}
assert_eq!(runtime.count_reactors(), 0);
assert_eq!(runtime.count_deferred_queue(), 0);
}
See example in threaded_pingpong.rs
.
pub fn test_threaded_pingpong() {
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let stopcounter = Arc::new(AtomicI32::new(0)); // each Reactor increases it when exiting.
let mgr = ThreadedReactorMgr::<String>::new(2); // 2 threads
let (threadid0, threadid1) = (0, 1);
// cloned Arc are passed to threads.
let (amgr, astopcounter) = (Arc::clone(&mgr), Arc::clone(&stopcounter));
// send a command to mgr to create a listener in threadid0.
// When the listen socket is ready (command is completed), send another command to connect from threadid1.
mgr.get_cmd_sender(threadid0)
.unwrap()
.send_listen(
addr,
create_tcp_listener(
recv_buffer_min_size,
ThreadedServerParam {
runtimeid: threadid0,
reactormgr: Arc::clone(&mgr),
stopcounter: Arc::clone(&stopcounter),
name: "server".to_owned(),
latency_batch: 1000,
},
),
Deferred::Immediate,
// OnCommandCompletion, when listen socket is ready, send another command to connect from another thread.
move |res| {
if let Err(err) = res {
logerr!("[ERROR] Failed to listen. {err}");
return;
}
amgr.get_cmd_sender(threadid1)
.unwrap()
.send_connect(
addr,
recv_buffer_min_size,
ThreadedPingpongReactor::new_client(
"myclient".to_owned(),
threadid1,
Arc::clone(&amgr),
5,
1000,
Arc::clone(&astopcounter),
),
Deferred::Immediate,
|res| {
if let Err(err) = res {
logerr!("Failed connect! {err}");
}
},
)
.unwrap();
},
)
.unwrap();
// wait for 2 reactors exit
let timer = utils::Timer::new_millis(1000);
while stopcounter.load(atomic::Ordering::Relaxed) != 2 {
timer.sleep_or_expire(10);
std::thread::yield_now();
if timer.expired() {
assert!(false, "ERROR: timeout waiting for reactors to complete");
break;
}
}
}
Licensed under either of
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.