use std::thread; use std::time::{Duration, Instant}; #[macro_use] extern crate mco; use crate::coroutine::yield_now; use mco::coroutine; use mco::mco_gen as mco_gen; use mco_gen::Gn; #[test] fn panic_coroutine() { let j: coroutine::JoinHandle<()> = co!(move || { panic!("panic inside coroutine"); }); match j.join() { Ok(_) => panic!("test should return panic"), Err(panic) => match panic.downcast_ref::<&str>() { Some(e) => return println!("Panicked inside: {:?}", e), None => panic!("panic type wrong"), }, } } #[test] fn cancel_coroutine() { let j = co!(move || { // suspend the coroutine to simulate an endless loop println!("before cancel"); coroutine::park(); println!("canceled, should not come here"); // next 'sys call would cause a Cancel panic' coroutine::sleep(Duration::from_secs(1000000)); }); // let the coroutine run thread::sleep(Duration::from_millis(10)); unsafe { j.coroutine().cancel() }; match j.join() { Ok(_) => panic!("test should return panic"), Err(panic) => { use mco_gen::Error; match panic.downcast_ref::() { Some(&Error::Cancel) => return println!("coroutine cancelled"), _ => panic!("panic type wrong"), } } } } #[test] fn cancel_io_coroutine() { let j = co!(move || { let listener = mco::net::TcpListener::bind(("0.0.0.0", 1234)).unwrap(); println!("listening on {:?}", listener.local_addr().unwrap()); for stream in listener.incoming() { match stream { Ok(_s) => println!("got a connection"), Err(e) => println!("err = {:?}", e), } } }); // let the coroutine run thread::sleep(Duration::from_millis(10)); unsafe { j.coroutine().cancel() }; match j.join() { Ok(_) => panic!("test should return panic"), Err(panic) => { use mco_gen::Error; match panic.downcast_ref::() { Some(&Error::Cancel) => return println!("coroutine cancelled"), _ => panic!("panic type wrong"), } } } } #[test] fn one_coroutine() { let j = co!(move || { println!("hello, coroutine"); }); j.join().unwrap(); } #[test] fn coroutine_result() { let j = co!(move || { println!("hello, coroutine"); 100 }); assert_eq!(j.join().unwrap(), 100); } #[test] fn multi_coroutine() { for i in 0..10 { co!(move || { println!("hi, coroutine{}", i); }); } thread::sleep(Duration::from_millis(200)); } #[test] fn test_yield() { let j = co!(move || { println!("hello, coroutine"); yield_now(); println!("goodbye, coroutine"); }); j.join().unwrap(); } #[test] fn multi_yield() { for i in 0..10 { co!(move || { println!("hi, coroutine{}", i); yield_now(); println!("bye, coroutine{}", i); }); } thread::sleep(Duration::from_millis(200)); } #[test] fn spawn_inside() { co!(coroutine::Builder::new().name("parent".to_owned()), || { let me = coroutine::current(); println!("hi, I'm parent: {:?}", me); for i in 0..10 { co!(move || { println!("hi, I'm child{:?}", i); yield_now(); println!("bye from child{:?}", i); }); } yield_now(); println!("bye from parent: {:?}", me); }) .join() .unwrap(); thread::sleep(Duration::from_millis(200)); } #[test] fn wait_join() { let j = co!(move || { println!("hi, I'm parent"); let join = (0..10) .map(|i| { co!(move || { println!("hi, I'm child{:?}", i); yield_now(); println!("bye from child{:?}", i); }) }) .collect::>(); for j in join { j.join().unwrap(); } println!("bye from parent"); }); j.join().unwrap(); } #[test] fn scoped_coroutine() { let mut array = [1, 2, 3]; coroutine::scope(|scope| { for i in &mut array { co!(scope, move || { *i += 1; }); } }); assert_eq!(array[0], 2); assert_eq!(array[1], 3); assert_eq!(array[2], 4); } #[test] fn yield_from_gen() { let mut a = 0; coroutine::scope(|scope| { co!(scope, || { let g = Gn::<()>::new_scoped(|mut scope| { while a < 10 { scope.yield_(a); // this is yield from the generator context! yield_now(); a += 1; } a }); g.fold((), |_, i| { println!("got {:?}", i); }); }); }); assert_eq!(a, 10); } #[test] #[allow(unused_assignments)] fn unpark() { let mut a = 0; coroutine::scope(|scope| { let h = co!(scope, || { let co = coroutine::current(); println!("child coroutine name:{:?}", co); co.unpark(); a = 5; coroutine::park(); a = 10; coroutine::park(); }); // seems this could happend before the inner unpark // it depends on how many coroutines are running // if the test_sleep spawns too many within the same time span // this could be fail due to schedule latency. default 1 worker thread::sleep(Duration::from_millis(100)); h.coroutine().unpark(); }); assert_eq!(a, 10); } #[test] fn park_timeout() { let mut a = 0; coroutine::scope(|scope| { let h = co!(scope, || { let co = coroutine::current(); co.unpark(); a = 5; coroutine::park_timeout(Duration::from_millis(100)); let now = Instant::now(); coroutine::park_timeout(Duration::from_millis(10)); assert!(now.elapsed() >= Duration::from_millis(10)); // let now = Instant::now(); coroutine::park_timeout(Duration::from_millis(100)); // this test may fail if the scheduler is a little bit slow // assert!(now.elapsed() < Duration::from_millis(100)); a = 10; }); thread::sleep(Duration::from_millis(50)); h.coroutine().unpark(); }); assert_eq!(a, 10); } #[test] fn test_sleep() { let now = Instant::now(); coroutine::sleep(Duration::from_millis(500)); assert!(now.elapsed() >= Duration::from_millis(500)); coroutine::scope(|scope| { for _ in 0..1000 { co!(scope, || { let now = Instant::now(); coroutine::sleep(Duration::from_millis(100)); assert!(now.elapsed() >= Duration::from_millis(100)); }); } }); } #[test] fn join_macro() { use mco::std::sync::channel::channel; let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); co!(move || { tx2.send("hello").unwrap(); coroutine::sleep(Duration::from_millis(100)); tx1.send(42).unwrap(); }); join!( coroutine::sleep(Duration::from_millis(1000)), { let x = rx1.recv().unwrap(); assert_eq!(x, 42) }, { let a = rx2.recv().unwrap(); assert_eq!(a, "hello") } ); assert_eq!(rx1.try_recv().is_err(), true); assert_eq!(rx2.try_recv().is_err(), true); } #[test] fn co_with_macro() { use mco::std::sync::channel::channel; let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); spawn_with!(8192, move || { tx1.send(coroutine::current().stack_size()).unwrap(); }) .join() .unwrap(); spawn_with!("test_task", 10240, move || { let task = coroutine::current(); let msg = (task.name().map(ToOwned::to_owned), task.stack_size()); tx2.send(msg).unwrap(); }) .join() .unwrap(); { let x = rx1.recv().unwrap(); assert_eq!(x, 8192); } { let (name, stack_size) = rx2.recv().unwrap(); assert_eq!(name, Some("test_task".to_owned())); assert_eq!(stack_size, 10240); } }