use std::thread; use std::sync::Arc; use std::io::Result; use std::time::Duration; use futures::future::{FutureExt, LocalBoxFuture}; use httparse::{EMPTY_HEADER, Request}; use bytes::{BufMut, BytesMut}; use env_logger; use pi_async_rt::rt::serial::AsyncRuntimeBuilder; use tcp::{AsyncService, Socket, SocketHandle, SocketConfig, SocketStatus, SocketEvent, connect::TcpSocket, tls_connect::TlsSocket, server::{PortsAdapterFactory, SocketListener}, utils::{TlsConfig, Ready}}; use ws::{server::WebsocketListener, connect::WsSocket, utils::{ChildProtocol, WsSession}}; #[test] fn test_parse_http_header() { let part0 = b"GET /index.html HTTP/1.1"; let part1 = b"\r\nHost: example.domain\r\n\r\n"; let mut bytes = BytesMut::new(); bytes.put_slice(part0); bytes.put_slice(part1); let mut headers = [EMPTY_HEADER; 16]; let mut req = Request::new(&mut headers); if let Ok(status) = req.parse(bytes.as_ref()) { if status.is_partial() { match req.parse(part1) { Err(e) => panic!(e), Ok(status) => { println!("{}", status.is_partial()); }, } } else { println!("!!!!!!{:?}", req); } } } struct TestChildProtocol; impl ChildProtocol for TestChildProtocol { fn protocol_name(&self) -> &str { "echo" } fn is_strict(&self) -> bool { false } fn decode_protocol(&self, connect: WsSocket, context: &mut WsSession) -> LocalBoxFuture<'static, Result<()>> { let msg = context.pop_msg(); let msg_type = context.get_type(); println!("!!!!!!receive ok, msg: {:?}", String::from_utf8(msg.clone())); async move { // if let Some(hibernate) = connect.hibernate(Ready::Writable) { // let connect_copy = connect.clone(); // thread::spawn(move || { // thread::sleep(Duration::from_millis(1000)); // while !connect_copy.wakeup(Ok(())) { // //唤醒被阻塞,则休眠指定时间后继续尝试唤醒 // thread::sleep(Duration::from_millis(15)); // } // }); // let start = Instant::now(); // if let Err(e) = hibernate.await { // //唤醒后返回错误,则立即返回错误原因 // return Err(e); // } // println!("!!!!!!wakeup hibernate ok, time: {:?}", start.elapsed()); // } for _ in 0..1 { if let Err(e) = connect.send(msg_type.clone(), msg.clone()) { return Err(e); } } println!("reply msg ok"); Ok(()) }.boxed_local() } fn close_protocol(&self, connect: WsSocket, context: WsSession, reason: Result<()>) -> LocalBoxFuture<'static, ()> { async move { if let Err(e) = reason { return println!("websocket closed, reason: {:?}", e); } println!("websocket closed"); }.boxed_local() } fn protocol_timeout(&self, connect: WsSocket, context: &mut WsSession, event: SocketEvent) -> LocalBoxFuture<'static, Result<()>> { async move { println!("websocket timeout"); Ok(()) }.boxed_local() } } #[test] fn test_websocket_listener() { //启动日志系统 env_logger::builder().format_timestamp_millis().init(); let rt0 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt1 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt2 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt3 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt4 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt5 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt6 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt7 = AsyncRuntimeBuilder::default_local_thread(None, None); let mut factory = PortsAdapterFactory::::new(); factory.bind(38080, Box::new(WebsocketListener::with_protocol(Arc::new(TestChildProtocol)))); let mut config = SocketConfig::new("0.0.0.0", factory.ports().as_slice()); config.set_option(16384, 16384, 16384, 16); match SocketListener::bind(vec![rt0, rt1, rt2, rt3, rt4, rt5, rt6, rt7], factory, config, 1024, 1024 * 1024, 1024, 16, 4096, 4096, Some(1000)) { Err(e) => { println!("!!!> Websocket Listener Bind Error, reason: {:?}", e); }, Ok(driver) => { println!("===> Websocket Listener Bind Ok"); } } thread::sleep(Duration::from_millis(10000000)); } #[test] fn test_tls_websocket_listener() { //启动日志系统 env_logger::builder().format_timestamp_millis().init(); let rt0 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt1 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt2 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt3 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt4 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt5 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt6 = AsyncRuntimeBuilder::default_local_thread(None, None); let rt7 = AsyncRuntimeBuilder::default_local_thread(None, None); let mut factory = PortsAdapterFactory::::new(); factory.bind(38080, Box::new(WebsocketListener::with_protocol(Arc::new(TestChildProtocol)))); let tls_config = TlsConfig::new_server("", false, "./tests/7285407__17youx.cn.pem", "./tests/7285407__17youx.cn.key", "", "", "", 512, false, "").unwrap(); let mut config = SocketConfig::with_tls("0.0.0.0", &[(38080, tls_config)]); config.set_option(16384, 16384, 16384, 16); match SocketListener::bind(vec![rt0, rt1, rt2, rt3, rt4, rt5, rt6, rt7], factory, config, 1024, 1024 * 1024, 1024, 16, 4096, 4096, Some(1000)) { Err(e) => { println!("!!!> Websocket Listener Bind Error, reason: {:?}", e); }, Ok(driver) => { println!("===> Websocket Listener Bind Ok"); } } thread::sleep(Duration::from_millis(10000000)); }