// Copyright 2020, The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the // following conditions are met: // // 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following // disclaimer. // // 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the // following disclaimer in the documentation and/or other materials provided with the distribution. // // 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote // products derived from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, // INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. mod stress; use std::{env, net::Ipv4Addr, path::Path, process, sync::Arc, time::Duration}; use futures::{future, future::Either}; use stress::{error::Error, prompt::user_prompt}; use tari_shutdown::Shutdown; use tari_utilities::message_format::MessageFormat; use tempfile::Builder; use tokio::{sync::oneshot, time}; use crate::stress::{node, prompt::parse_from_short_str, service, service::StressTestServiceRequest}; #[tokio::main] async fn main() { env_logger::init(); match run().await { Ok(_) | Err(Error::UserQuit) => {}, Err(err) => { println!("{error:?}: {error}", error = err); process::exit(1); }, } } fn remove_arg(args: &mut Vec, item: &str) -> Option { if let Some(pos) = args.iter().position(|s| s == item) { args.remove(pos); Some(pos) } else { None } } async fn run() -> Result<(), Error> { let mut args = env::args().skip(1).collect::>(); let is_tcp = remove_arg(&mut args, "--tcp").is_some(); let mut public_ip = None; if let Some(pos) = remove_arg(&mut args, "--public-ip") { public_ip = Some(args.remove(pos).parse::().unwrap()); } let mut tor_identity_path = None; if let Some(pos) = remove_arg(&mut args, "--tor-identity") { tor_identity_path = Some(args.remove(pos)); } let mut node_identity_path = None; if let Some(pos) = remove_arg(&mut args, "--identity") { node_identity_path = Some(args.remove(pos)); } let mut peer = None; if let Some(pos) = remove_arg(&mut args, "--peer") { peer = Some(args.remove(pos)); } let mut port = 9098; if let Some(pos) = remove_arg(&mut args, "--port") { port = args.remove(pos).parse().expect("Unable to parse port"); } println!("Initializing...",); let tor_identity = tor_identity_path.as_ref().and_then(load_json); let node_identity = node_identity_path.as_ref().and_then(load_json).map(Arc::new); let shutdown = Shutdown::new(); let temp_dir = Builder::new().prefix("stress-test").tempdir().unwrap(); let (comms_node, protocol_notif, inbound_rx, outbound_tx) = node::create( node_identity, temp_dir.as_ref(), public_ip, port, tor_identity.clone(), is_tcp, shutdown.to_signal(), ) .await?; if let Some(node_identity_path) = node_identity_path.as_ref() { save_json(comms_node.node_identity_ref(), node_identity_path)?; } if !is_tcp { if let Some(tor_identity_path) = tor_identity_path.as_ref() { save_json(&tor_identity.unwrap(), tor_identity_path)?; } } println!("Stress test service started!"); let (handle, requester) = service::start_service(comms_node, protocol_notif, inbound_rx, outbound_tx, shutdown); let mut last_peer = peer.as_ref().and_then(parse_from_short_str); loop { let (peer, selected_protocol) = user_prompt(last_peer.take())?; last_peer = Some(peer.clone()); let (reply_tx, reply_rx) = oneshot::channel(); requester .send(StressTestServiceRequest::BeginProtocol( peer, selected_protocol, reply_tx, )) .await .unwrap(); let ctrl_c = tokio::signal::ctrl_c(); futures::pin_mut!(ctrl_c); match future::select(reply_rx, ctrl_c).await { Either::Left((result, _)) => { println!("Stress test complete: {:?}", result); }, Either::Right((_, _)) => { println!("SIGINT caught. Waiting for service to exit"); break; }, } } println!("Stress test service is shutting down..."); requester.send(StressTestServiceRequest::Shutdown).await.unwrap(); time::timeout(Duration::from_secs(2), handle).await???; Ok(()) } fn save_json, T: MessageFormat>(obj: &T, path: P) -> Result<(), Error> { let json = obj.to_json()?; std::fs::write(path, json).map_err(Into::into) } fn load_json, T: MessageFormat>(path: P) -> Option { if path.as_ref().exists() { let contents = std::fs::read_to_string(path).unwrap(); Some(T::from_json(&contents).unwrap()) } else { None } }