// You can run this example from the root of the mio repo: // cargo run --example tcp_listenfd_server --features="os-poll net" // or with wasi: // cargo +nightly build --target wasm32-wasi --example tcp_listenfd_server --features="os-poll net" // wasmtime run --tcplisten 127.0.0.1:9000 --env 'LISTEN_FDS=1' target/wasm32-wasi/debug/examples/tcp_listenfd_server.wasm use mio::event::Event; use mio::net::{TcpListener, TcpStream}; use mio::{Events, Interest, Poll, Registry, Token}; use std::collections::HashMap; use std::io::{self, Read, Write}; use std::str::from_utf8; // Setup some tokens to allow us to identify which event is for which socket. const SERVER: Token = Token(0); // Some data we'll send over the connection. const DATA: &[u8] = b"Hello world!\n"; #[cfg(not(windows))] fn get_first_listen_fd_listener() -> Option { #[cfg(any(unix, target_os = "hermit", target_os = "wasi"))] use std::os::fd::FromRawFd; let stdlistener = unsafe { std::net::TcpListener::from_raw_fd(3) }; stdlistener.set_nonblocking(true).unwrap(); Some(stdlistener) } #[cfg(windows)] fn get_first_listen_fd_listener() -> Option { // Windows does not support `LISTEN_FDS` None } fn main() -> io::Result<()> { env_logger::init(); std::env::var("LISTEN_FDS").expect("LISTEN_FDS environment variable unset"); // Create a poll instance. let mut poll = Poll::new()?; // Create storage for events. let mut events = Events::with_capacity(128); // Setup the TCP server socket. let mut server = { let stdlistener = get_first_listen_fd_listener().unwrap(); println!("Using preopened socket FD 3"); println!("You can connect to the server using `nc`:"); match stdlistener.local_addr() { Ok(a) => println!(" $ nc {} {}", a.ip(), a.port()), Err(_) => println!(" $ nc "), } println!("You'll see our welcome message and anything you type will be printed here."); TcpListener::from_std(stdlistener) }; // Register the server with poll we can receive events for it. poll.registry() .register(&mut server, SERVER, Interest::READABLE)?; // Map of `Token` -> `TcpStream`. let mut connections = HashMap::new(); // Unique token for each incoming connection. let mut unique_token = Token(SERVER.0 + 1); loop { poll.poll(&mut events, None)?; for event in events.iter() { match event.token() { SERVER => loop { // Received an event for the TCP server socket, which // indicates we can accept an connection. let (mut connection, address) = match server.accept() { Ok((connection, address)) => (connection, address), Err(ref e) if would_block(e) => { // If we get a `WouldBlock` error we know our // listener has no more incoming connections queued, // so we can return to polling and wait for some // more. break; } Err(e) => { // If it was any other kind of error, something went // wrong and we terminate with an error. return Err(e); } }; println!("Accepted connection from: {}", address); let token = next(&mut unique_token); poll.registry() .register(&mut connection, token, Interest::WRITABLE)?; connections.insert(token, connection); }, token => { // Maybe received an event for a TCP connection. let done = if let Some(connection) = connections.get_mut(&token) { handle_connection_event(poll.registry(), connection, event)? } else { // Sporadic events happen, we can safely ignore them. false }; if done { if let Some(mut connection) = connections.remove(&token) { poll.registry().deregister(&mut connection)?; } } } } } } } fn next(current: &mut Token) -> Token { let next = current.0; current.0 += 1; Token(next) } /// Returns `true` if the connection is done. fn handle_connection_event( registry: &Registry, connection: &mut TcpStream, event: &Event, ) -> io::Result { if event.is_writable() { // We can (maybe) write to the connection. match connection.write(DATA) { // We want to write the entire `DATA` buffer in a single go. If we // write less we'll return a short write error (same as // `io::Write::write_all` does). Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()), Ok(_) => { // After we've written something we'll reregister the connection // to only respond to readable events. registry.reregister(connection, event.token(), Interest::READABLE)? } // Would block "errors" are the OS's way of saying that the // connection is not actually ready to perform this I/O operation. Err(ref err) if would_block(err) => {} // Got interrupted (how rude!), we'll try again. Err(ref err) if interrupted(err) => { return handle_connection_event(registry, connection, event) } // Other errors we'll consider fatal. Err(err) => return Err(err), } } if event.is_readable() { let mut connection_closed = false; let mut received_data = vec![0; 4096]; let mut bytes_read = 0; // We can (maybe) read from the connection. loop { match connection.read(&mut received_data[bytes_read..]) { Ok(0) => { // Reading 0 bytes means the other side has closed the // connection or is done writing, then so are we. connection_closed = true; break; } Ok(n) => { bytes_read += n; if bytes_read == received_data.len() { received_data.resize(received_data.len() + 1024, 0); } } // Would block "errors" are the OS's way of saying that the // connection is not actually ready to perform this I/O operation. Err(ref err) if would_block(err) => break, Err(ref err) if interrupted(err) => continue, // Other errors we'll consider fatal. Err(err) => return Err(err), } } if bytes_read != 0 { let received_data = &received_data[..bytes_read]; if let Ok(str_buf) = from_utf8(received_data) { println!("Received data: {}", str_buf.trim_end()); } else { println!("Received (none UTF-8) data: {:?}", received_data); } } if connection_closed { println!("Connection closed"); return Ok(true); } } Ok(false) } fn would_block(err: &io::Error) -> bool { err.kind() == io::ErrorKind::WouldBlock } fn interrupted(err: &io::Error) -> bool { err.kind() == io::ErrorKind::Interrupted }