diff --git a/example/Cargo.toml b/example/Cargo.toml index 5b56ee1f..1e8247d9 100644 --- a/example/Cargo.toml +++ b/example/Cargo.toml @@ -18,11 +18,11 @@ byteorder = "1.3.2" log = "0.4.6" simple-logging = "2.0.2" nix = "0.23.0" +ttrpc = { path = "../", features = ["async"] } ctrlc = { version = "3.0", features = ["termination"] } +tokio = { version = "1.0.1", features = ["signal", "time"] } +async-trait = "0.1.42" rand = "0.8.5" -tokio = { version = "1.0.1", features = ["signal", "time"]} -async-trait = {version="0.1.42"} -ttrpc = { path = "../", features = ["async"] } [[example]] name = "client" diff --git a/src/sync/sys/windows/net.rs b/src/sync/sys/windows/net.rs index ef06b3fa..c3b23c30 100644 --- a/src/sync/sys/windows/net.rs +++ b/src/sync/sys/windows/net.rs @@ -169,7 +169,6 @@ pub struct PipeConnection { // "In this situation, there is no way to know which operation caused the object's state to be signaled." impl PipeConnection { pub(crate) fn new(h: isize) -> PipeConnection { - trace!("creating events for thread {:?} on pipe instance {}", std::thread::current().id(), h as i32); let read_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) }; let write_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) }; @@ -185,6 +184,7 @@ impl PipeConnection { } pub fn read(&self, buf: &mut [u8]) -> Result { + trace!("starting read for thread {:?} on pipe instance {}", std::thread::current().id(), self.named_pipe as i32); let ol = Overlapped::new_with_event(self.read_event); let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32; @@ -216,6 +216,7 @@ impl PipeConnection { } pub fn write(&self, buf: &[u8]) -> Result { + trace!("starting write for thread {:?} on pipe instance {}", std::thread::current().id(), self.named_pipe as i32); let ol = Overlapped::new_with_event(self.write_event); let mut bytes_written= 0; let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32; diff --git a/tests/sync-test.rs b/tests/sync-test.rs index 47f3ef89..87b69c74 100644 --- a/tests/sync-test.rs +++ b/tests/sync-test.rs @@ -1,24 +1,46 @@ -use std::{process::Command, time::Duration}; +use std::{process::{Command}, time::Duration, io::{BufReader, BufRead}}; #[test] fn run_sync_example() -> Result<(), Box> { // start the server and give it a moment to start. - let server = run_example("server").spawn(); + let mut server = run_example("server").spawn().unwrap(); std::thread::sleep(Duration::from_secs(2)); - let client = run_example("client").spawn(); + let mut client = run_example("client").spawn().unwrap(); let mut client_succeeded = false; - match client.unwrap().wait() { - Ok(status) => { - client_succeeded = status.success(); + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(600); + loop { + if start.elapsed() > timeout { + println!("Running the client timed out. output:"); + client.kill().unwrap_or_else(|e| { + println!("This may occur on Windows if the process has exited: {}", e); + }); + let output = client.stdout.unwrap(); + BufReader::new(output).lines().for_each(|line| { + println!("{}", line.unwrap()); + }); + break; } - Err(e) => { - println!("Error: {e}"); + + match client.try_wait() { + Ok(Some(status)) => { + client_succeeded = status.success(); + break; + } + Ok(None) => { + // still running + continue; + } + Err(e) => { + println!("Error: {e}"); + break; + } } } - + // be sure to clean up the server, the client should have run to completion - server.unwrap().kill()?; + server.kill()?; assert!(client_succeeded); Ok(()) }