mod io; mod config; mod que; mod map; mod disk; mod workers; mod locator; mod response; mod benchmark; pub use config::Config; pub use que::Que; use tokio::sync::Notify; use std::sync::Arc; use std::time::Instant; use futures::future::join_all; #[tokio::main] async fn main() { let cmd_line = std::env::args(); for cmd in cmd_line{ if cmd.contains("--simple"){ simple().await; return; } else if cmd.contains("--bechmark"){ bechmarks().await; return; } } simple().await; } async fn bechmarks(){ let mut build = benchmark::BenchmarkBuilder::new( "D://workstation/expo/rust/rust_store/test/rustque/bechmark_8.txt".to_string() ); //5000 if false { build.add(benchmark::Benchmark{ no_of_writers:10, no_of_writes:5000, map_files:vec![ "D://workstation/expo/rust/rust_store/test/rustque/que1.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que2.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que3.rustque".to_string(), ], write_size:256, min_que_size:10000000, expansion_size:5000000, no_of_disk_workers:10 }); } //50,000 if false { build.add(benchmark::Benchmark{ no_of_writers:10, no_of_writes:50000, map_files:vec![ "D://workstation/expo/rust/rust_store/test/rustque/que1.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que2.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que3.rustque".to_string(), ], write_size:256, min_que_size:50000000, expansion_size:25000000, no_of_disk_workers:10 }); } //100,000 if true { build.add(benchmark::Benchmark{ no_of_writers:5, no_of_writes:200000, map_files:vec![ "D://workstation/expo/rust/rust_store/test/rustque/que1.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que2.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que3.rustque".to_string() ], write_size:512, min_que_size:50_000_000, expansion_size:50_000_000, no_of_disk_workers:10 }); build.add(benchmark::Benchmark{ no_of_writers:5, no_of_writes:200000, map_files:vec![ "D://workstation/expo/rust/rust_store/test/rustque/que1.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que2.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que3.rustque".to_string() ], write_size:512, min_que_size:50_000_000, expansion_size:50_000_000, no_of_disk_workers:25 }); build.add(benchmark::Benchmark{ no_of_writers:5, no_of_writes:200000, map_files:vec![ "D://workstation/expo/rust/rust_store/test/rustque/que1.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que2.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que3.rustque".to_string() ], write_size:512, min_que_size:50_000_000, expansion_size:50_000_000, no_of_disk_workers:50 }); } build.run().await; } async fn simple(){ println!(">>> simple"); let hold = Instant::now(); //--------------------------- //initiate que //--------------------------- let mut que:Que; match Que::new(Config::new( vec![ "D://workstation/expo/rust/rust_store/test/rustque/que1.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que2.rustque".to_string(), "D://workstation/expo/rust/rust_store/test/rustque/que3.rustque".to_string(), // "D://workstation/expo/rust/rust_store/test/rustque/que4.rustque".to_string(), ], 5_000_000, 5_000_000, 5 )).await{ Ok(v)=>{ que = v; println!("que initiated : {:?}",hold.elapsed()); }, Err(e)=>{ println!("!!! failed-que::new => {:?}",e); return; } } let mut big_value = vec![]; let mut last_put = 0; loop { if big_value.len() == 512{break;} if last_put == 200{last_put = 1;} else {last_put += 1;} big_value.push(last_put); } // println!("{:?}",big_value); //1_000_000 //--------------------------- //write items to the que //--------------------------- if true{ for _ in 0..1{ let write_time_final = Instant::now(); let sleeper = Arc::new(Notify::new()); let waker = sleeper.clone(); let no_of_spawns = 1; for _nsp in 0..no_of_spawns{ let que_to_move = que.clone(); let waker_to_move = waker.clone(); let hold_big_value = big_value.clone(); tokio::spawn(async move { let write_spawn_time = Instant::now(); let mut que = que_to_move; let mut collect = Vec::new(); for _n in 0..1000{ match que.add(hold_big_value.clone()).await{ Ok(que_response)=>{ collect.push(async move{ que_response.check().await }); // let _quer_resp = que_response.check().await; // println!("{:?} add resp : {:?}",_n,_quer_resp); // println!(">>> success-que-add {:?}",_n); }, Err(_e)=>{ println!("!!! failed-que-add : {:?}",_e); } } } let mut failed = 0; let mut success = 0; for r in join_all(collect).await.iter(){ if !r{failed += 1;} else {success += 1;} } if false { println!("{:?} write_spawn_time : {:?} {:?} {:?}",_nsp,write_spawn_time.elapsed(),failed,success); } waker_to_move.notify_one(); }); } for _ in 0..no_of_spawns{ sleeper.notified().await } println!("write_time_final : {:?}",write_time_final.elapsed()); } } //--------------------------- //get and remove items from que //--------------------------- if false{ let remove_time_final = Instant::now(); loop{ match que.next().await{ Ok(next_response)=>{ let _quer_resp = next_response.check().await; if !_quer_resp {break;} // println!("next resp : {:?}",_quer_resp); match next_response.data().await{ Some((_value,pointer))=>{ // println!("pointer : {:?}",pointer); if true{ match que.remove(pointer).await{ Ok(remove_response)=>{ let remove_resp = remove_response.check().await; // println!("remove resp : {:?}",remove_resp); }, Err(_e)=>{ println!("!!! failed-que-remove : {:?}",_e); } } } if false{ match que.reset(pointer).await{ Ok(reset_response)=>{ let reset_resp = reset_response.check().await; println!("reset resp : {:?}",reset_resp); }, Err(_e)=>{ println!("!!! failed-que-reset : {:?}",_e); } } } }, None=>{} } // break; }, Err(_e)=>{ println!("!!! failed-que-get : {:?}",_e); break; } } } println!("remove_time_final : {:?}",remove_time_final.elapsed()); } // if false{ // for _ in 0..5{ // match que.get().await{ // Ok(_v)=>{ // println!(">>> success-que-get {:?}",_v); // match que.reset(_v.1).await{ // Ok(_v)=>{ // println!(">>> success-que-reset"); // }, // Err(_e)=>{ // println!("!!! failed-que-reset : {:?}",_e); // } // } // }, // Err(_e)=>{ // println!("!!! failed-que-get : {:?}",_e); // } // } // } // } println!("final in : {:?}",hold.elapsed()); } //que(message)(await confirm)->map(message)->disk(message)(submit confirm)