extern crate rand; extern crate sesh; use rand::{Rng, thread_rng}; use sesh::*; use std::boxed::Box; use std::error::Error; use std::marker; use std::mem; use std::sync::mpsc; use std::thread::sleep; use std::time::Duration; // Test sending a ping across threads. #[test] fn ping_works() { assert!(|| -> Result<(), Box> { let s = fork(move |s: Send<(), End>| { let s = send((), s); close(s)?; Ok(()) }); let ((), s) = recv(s)?; close(s)?; Ok(()) }().is_ok()); } /// Test writing a program which duplicates a session. /// /// ```compile_fail /// assert!(|| -> Result<(), Box> { /// /// let r1 = fork(move |s1: Send<(), End>| { /// let s2 = send((), s1); /// close(s2)?; /// let s3 = send((), s1); /// close(s3)?; /// Ok(()) /// }); /// let ((), r2) = recv(r1)?; /// close(r2)?; /// Ok(()) /// /// }().is_ok()); /// ``` // Test a simple calculator server, implemented using binary choice. type NegServer = Recv>; type NegClient = as Session>::Dual; type AddServer = Recv>>; type AddClient = as Session>::Dual; type SimpleCalcServer = Offer, AddServer>; type SimpleCalcClient = as Session>::Dual; fn simple_calc_server(s: SimpleCalcServer) -> Result<(), Box> { offer_either(s, |s: NegServer| { let (x, s) = recv(s)?; let s = send(-x, s); close(s)?; Ok(()) }, |s: AddServer| { let (x, s) = recv(s)?; let (y, s) = recv(s)?; let s = send(x.wrapping_add(y), s); close(s)?; Ok(()) }) } #[test] fn simple_calc_works() { assert!(|| -> Result<(), Box> { let mut rng = thread_rng(); // Test the negation function. { let s: SimpleCalcClient = fork(simple_calc_server); let x: i32 = rng.gen(); let s = choose_left::<_, AddClient>(s); let s = send(x, s); let (y, s) = recv(s)?; close(s)?; assert_eq!(-x, y); } // Test the addition function. { let s: SimpleCalcClient = fork(simple_calc_server); let x: i32 = rng.gen(); let y: i32 = rng.gen(); let s = choose_right::, _>(s); let s = send(x, s); let s = send(y, s); let (z, s) = recv(s)?; close(s)?; assert_eq!(x.wrapping_add(y), z); } Ok(()) }().is_ok()); } // Test a nice calculator server, implemented using variant types. enum CalcOp { Neg(NegServer), Add(AddServer), } type NiceCalcServer = Recv, End>; type NiceCalcClient = as Session>::Dual; fn nice_calc_server(s: NiceCalcServer) -> Result<(), Box> { offer!(s, { CalcOp::Neg(s) => { let (x, s) = recv(s)?; let s = send(-x, s); close(s)?; Ok(()) }, CalcOp::Add(s) => { let (x, s) = recv(s)?; let (y, s) = recv(s)?; let s = send(x.wrapping_add(y), s); close(s)?; Ok(()) }, }) } #[test] fn nice_calc_works() { assert!(|| -> Result<(), Box> { // Pick some random numbers. let mut rng = thread_rng(); // Test the negation function. { let s: NiceCalcClient = fork(nice_calc_server); let x: i32 = rng.gen(); let s = choose!(CalcOp::Neg, s); let s = send(x, s); let (y, s) = recv(s)?; close(s)?; assert_eq!(-x, y); } // Test the addition function. { let s: NiceCalcClient = fork(nice_calc_server); let x: i32 = rng.gen(); let y: i32 = rng.gen(); let s = choose!(CalcOp::Add, s); let s = send(x, s); let s = send(y, s); let (z, s) = recv(s)?; close(s)?; assert_eq!(x.wrapping_add(y), z); } Ok(()) }().is_ok()); } // Test cancellation. #[test] fn cancel_recv_works() { let (other_thread, s) = fork_with_thread_id(nice_calc_server); assert!(|| -> Result<(), Box> { cancel(s); Ok(()) }().is_ok()); assert!(other_thread.join().is_err()); } #[test] fn cancel_send_works() { let (other_thread, s) = fork_with_thread_id( move |s: Recv<(), End>| { cancel(s); Ok(()) }); assert!(|| -> Result<(), Box> { let s = send((), s); close(s)?; Ok(()) }().is_err()); assert!(other_thread.join().is_ok()); } // Test cancellation of delegation. #[test] fn delegation_works() { let (other_thread1, s) = fork_with_thread_id(nice_calc_server); let (other_thread2, u) = fork_with_thread_id( move |u: Recv, End>| { cancel(u); Ok(()) }); assert!(|| -> Result<(), Box> { let u = send(s, u); close(u)?; Ok(()) }().is_err()); assert!(other_thread1.join().is_err()); assert!(other_thread2.join().is_ok()); } // Test cancellation of closures. #[test] fn closure_works() { let (other_thread, s) = fork_with_thread_id(nice_calc_server); assert!(|| -> Result> { // Create a closure which uses the session. let f = move |x: i32| -> Result> { let s = choose!(CalcOp::Neg, s); let s = send(x, s); let (y, s) = recv(s)?; close(s)?; Ok(y) }; // Let the closure go out of scope. Err(Box::new(mpsc::RecvError))?; f(5) }().is_err()); assert!(other_thread.join().is_err()); } // Test recursive sessions. enum SumOp { More(Recv>), Done(Send), } type NiceSumServer = Recv, End>; type NiceSumClient = as Session>::Dual; fn nice_sum_server(s: NiceSumServer) -> Result<(), Box> { nice_sum_server_accum(s, 0) } fn nice_sum_server_accum(s: NiceSumServer, x: i32) -> Result<(), Box> { offer!(s, { SumOp::More(s) => { let (y, s) = recv(s)?; nice_sum_server_accum(s, x.wrapping_add(y)) }, SumOp::Done(s) => { let s = send(x, s); close(s)?; Ok(()) }, })?; Ok(()) } fn nice_sum_client_accum(s: NiceSumClient, mut xs: Vec) -> Result> { match xs.pop() { Option::Some(x) => { let s = choose!(SumOp::More, s); let s = send(x, s); nice_sum_client_accum(s, xs) }, Option::None => { let s = choose!(SumOp::Done, s); let (sum, s) = recv(s)?; close(s)?; Ok(sum) }, } } #[test] fn recursion_works() { // Pick some random numbers. let mut rng = thread_rng(); let xs: Vec = (1..100).map(|_| rng.gen()).collect(); let sum1: i32 = xs.iter().fold(0, |sum, &x| sum.wrapping_add(x)); let (other_thread, s) = fork_with_thread_id(nice_sum_server); assert!(|| -> Result<(), Box> { let sum2 = nice_sum_client_accum(s, xs)?; assert_eq!(sum1, sum2); Ok(()) }().is_ok()); assert!(other_thread.join().is_ok()); } // Test selection. #[test] fn selection_works() { let mut other_threads = Vec::new(); let mut rs = Vec::new(); for i in 0..10 { let (other_thread, s) = fork_with_thread_id(move |s: Send| { sleep(Duration::from_millis(i * 100)); let s = send(9 - i, s); close(s) }); other_threads.push(other_thread); rs.push(s); } assert!(|| -> Result<(), Box> { let mut current_index = 9; loop { if rs.is_empty() { break Ok(()); } else { let (i, r) = select_mut(&mut rs)?; close(r)?; assert_eq!(current_index, i, "Messages were received out of order."); current_index = current_index.overflowing_sub(1).0; // decrement } } }().is_ok(), "Main thread crashed."); for other_thread in other_threads { let msg = format!("Thread {:?} crashed.", other_thread); assert!(other_thread.join().is_ok(), msg); } } #[allow(dead_code)] fn deadlock_loop() { let s = fork(move |s: Send<(), End>| { loop { // Let's trick the reachability checker if false { break; } } let s = send((), s); close(s)?; Ok(()) }); || -> Result<(), Box> { let ((), s) = recv(s)?; close(s)?; Ok(()) }().unwrap(); } #[allow(dead_code)] fn deadlock_forget() { let s = fork(move |s: Send<(), End>| { mem::forget(s); Ok(()) }); || -> Result<(), Box> { let ((), s) = recv(s)?; close(s)?; Ok(()) }().unwrap(); } #[allow(dead_code)] fn deadlock_new() { let (s1, r1) = >::new(); let r2 = fork(move |s2: Send<(), End>| { let (x, r1) = recv(r1)?; let s2 = send(x, s2); close(r1)?; close(s2)?; Ok(()) }); || -> Result<(), Box> { let (x, r2) = recv(r2)?; let s1 = send(x, s1); close(r2)?; close(s1)?; Ok(()) }().unwrap(); } // Bug with the constraint checker. #[allow(dead_code)] enum CalcOp2 { More(Send>>), Stop(Send), } #[allow(dead_code)] type NiceCalcServer2 = Recv, End>;