#![cfg(unix)] extern crate conch_runtime; extern crate futures; extern crate tokio_core; extern crate tokio_io; #[macro_use] mod support; pub use self::support::*; use conch_runtime::env::AsyncIoEnvironment; use conch_runtime::io::{FileDesc, Pipe}; use conch_runtime::os::unix::env::EventedAsyncIoEnv; use conch_runtime::os::unix::io::{FileDescExt, MaybeEventedFd}; use std::fs::File; use std::io::{ErrorKind, Read, Result, Write}; use std::time::Duration; use std::thread; use tokio_io::AsyncRead; use tokio_io::io::read_to_end; use tokio_core::reactor::Core; struct TimesRead { times_read: usize, times_would_block: usize, reader: R, } impl TimesRead { fn new(reader: R) -> Self { TimesRead { times_read: 0, times_would_block: 0, reader: reader, } } } impl AsyncRead for TimesRead {} impl Read for TimesRead { fn read(&mut self, buf: &mut [u8]) -> Result { match self.reader.read(buf) { ret@Ok(0) => ret, ret@Ok(_) => { self.times_read += 1; ret }, Err(e) => { if e.kind() == ErrorKind::WouldBlock { self.times_would_block += 1; } Err(e) }, } } } #[test] fn evented_is_async() { let msg = "hello world"; let Pipe { reader, mut writer } = Pipe::new().expect("failed to create pipe"); let mut lp = Core::new().expect("failed to create event loop"); let reader = reader.into_evented2(&lp.handle()) .expect("failed to register reader with event loop"); let reader = if let MaybeEventedFd::Registered(fd) = reader { fd } else { panic!("unexpected result: {:#?}", reader); }; let join_handle = thread::spawn(move || { // Give the future a chance to block for the first time thread::sleep(Duration::from_millis(10)); for c in msg.as_bytes() { writer.write(&[*c]).expect("failed to write byte"); // Give event loop a chance to settle and read data one byte at a time thread::sleep(Duration::from_millis(10)); } }); let (tr, data) = lp.run(read_to_end(TimesRead::new(reader), vec!())) .map(|(tr, data)| (tr, String::from_utf8(data).expect("invaild utf8"))) .expect("future did not exit successfully"); join_handle.join().expect("thread did not exit successfully"); assert_eq!(data, msg); // NB: we used to assert the number of times read equals the number of bytes // in the message, but due to seeing some sporadic failures here in the CI, // it's probably good enough to ensure we didn't run in a single read. assert!(tr.times_read > 1); assert!(tr.times_would_block > 1); } #[test] fn evented_supports_regular_files() { let tempdir = mktmp!(); let path = tempdir.path().join("sample_file"); let msg = "hello\nworld\n"; let mut lp = Core::new().expect("failed to create event loop"); let mut env = EventedAsyncIoEnv::new(lp.remote()); // Test spawning directly within the event loop lp.run(futures::lazy(|| { let fd = File::create(&path) .map(FileDesc::from) .expect("failed to create file"); env.write_all(fd, msg.to_owned().into_bytes()) })).expect("failed to write data"); // Test spawning outside of the event loop let fd = File::open(path) .map(FileDesc::from) .expect("failed to open file"); let data = lp.run(read_to_end(env.read_async(fd), vec!())) .map(|(_, data)| String::from_utf8(data).expect("invaild utf8")) .expect("future did not exit successfully"); assert_eq!(data, msg); }