// Copyright (c) 2019 Parity Technologies (UK) Ltd. // // Licensed under the Apache License, Version 2.0 // or the MIT // license , at your // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. // Example to be used with the autobahn test suite, a fully automated test // suite to verify client and server implementations of websocket // implementation. // // Once started, the tests can be executed with: wstest -m fuzzingclient // // See https://github.com/crossbario/autobahn-testsuite for details. use futures::io::{BufReader, BufWriter}; use soket::{BoxedError, connection, handshake}; use tokio::{net::{TcpListener, TcpStream}, stream::StreamExt}; use tokio_util::compat::{Compat, Tokio02AsyncReadCompatExt}; #[tokio::main] async fn main() -> Result<(), BoxedError> { let mut listener = TcpListener::bind("127.0.0.1:9001").await?; let mut incoming = listener.incoming(); while let Some(socket) = incoming.next().await { let mut server = new_server(socket?); let key = { let req = server.receive_request().await?; req.into_key() }; let accept = handshake::server::Response::Accept { key: &key, protocol: None }; server.send_response(&accept).await?; let (mut sender, mut receiver) = server.into_builder().finish(); let mut message = Vec::new(); loop { message.clear(); match receiver.receive_data(&mut message).await { Ok(soket::Data::Binary(n)) => { assert_eq!(n, message.len()); sender.send_binary_mut(&mut message).await?; sender.flush().await? } Ok(soket::Data::Text(n)) => { assert_eq!(n, message.len()); if let Ok(txt) = std::str::from_utf8(&message) { sender.send_text(txt).await?; sender.flush().await? } else { break } } Err(connection::Error::Closed) => break, Err(e) => { log::error!("connection error: {}", e); break } } } } Ok(()) } #[cfg(not(feature = "deflate"))] fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader>>> { handshake::Server::new(BufReader::new(BufWriter::new(socket.compat()))) } #[cfg(feature = "deflate")] fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader>>> { let socket = BufReader::with_capacity(8 * 1024, BufWriter::with_capacity(16 * 1024, socket.compat())); let mut server = handshake::Server::new(socket); let deflate = soket::extension::deflate::Deflate::new(soket::Mode::Server); server.add_extension(Box::new(deflate)); server }