Skip to content

Commit

Permalink
Fix stuck thread on server shutdown
Browse files Browse the repository at this point in the history
The connect namedpipe thread would be in a suspended state when shutdown on the server is called.  Setting the event to a signalled state to wake the thread up so everything can shut down properly.

Signed-off-by: James Sturtevant <[email protected]>
  • Loading branch information
jsturtevant committed Mar 7, 2023
1 parent 746ddd1 commit 416d571
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 30 deletions.
23 changes: 16 additions & 7 deletions src/sync/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,31 @@ impl Client {
pub fn connect(sockaddr: &str) -> Result<Client> {
let conn = ClientConnection::client_connect(sockaddr)?;

Ok(Self::new_client(conn))
Self::new_client(conn)
}

#[cfg(unix)]
/// Initialize a new [`Client`] from raw file descriptor.
pub fn new(fd: RawFd) -> Client {
let conn = ClientConnection::new(fd);

Self::new_client(conn)
// TODO: upgrade the API of Client::new and remove this panic for the major version release
Self::new_client(conn).unwrap_or_else(|e| {
panic!(
"client was not successfully initialized: {}", e
)
})
}

fn new_client(pipe_client: ClientConnection) -> Client {
fn new_client(pipe_client: ClientConnection) -> Result<Client> {
let client = Arc::new(pipe_client);

let (sender_tx, rx): (Sender, Receiver) = mpsc::channel();
let recver_map_orig = Arc::new(Mutex::new(HashMap::new()));


let receiver_map = recver_map_orig.clone();
let connection = Arc::new(client.get_pipe_connection());
let connection = Arc::new(client.get_pipe_connection()?);
let sender_client = connection.clone();

//Sender
Expand Down Expand Up @@ -171,10 +176,10 @@ impl Client {
trace!("Receiver quit");
});

Client {
Ok(Client {
_connection: client,
sender_tx,
}
})
}
pub fn request(&self, req: Request) -> Result<Response> {
let buf = req.encode().map_err(err_to_others_err!(e, ""))?;
Expand Down Expand Up @@ -220,7 +225,11 @@ impl Drop for ClientConnection {
#[cfg(windows)]
impl Drop for PipeConnection {
fn drop(&mut self) {
self.close().unwrap();
self.close().unwrap_or_else(|e| {
trace!(
"connection may already be closed: {}", e
)
});
trace!("pipe connection is dropped");
}
}
4 changes: 2 additions & 2 deletions src/sync/sys/unix/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ impl ClientConnection {
Ok(Some(()))
}

pub fn get_pipe_connection(&self) -> PipeConnection {
PipeConnection::new(self.fd)
pub fn get_pipe_connection(&self) -> Result<PipeConnection> {
Ok(PipeConnection::new(self.fd))
}

pub fn close_receiver(&self) -> Result<()> {
Expand Down
67 changes: 46 additions & 21 deletions src/sync/sys/windows/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_
use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX };
use windows_sys::Win32::System::IO::{ GetOverlappedResult, OVERLAPPED };
use windows_sys::Win32::System::Pipes::{ CreateNamedPipeW, ConnectNamedPipe,DisconnectNamedPipe, PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, PIPE_REJECT_REMOTE_CLIENTS };
use windows_sys::Win32::System::Threading::CreateEventW;
use windows_sys::Win32::System::Threading::{CreateEventW, SetEvent};

const PIPE_BUFFER_SIZE: u32 = 65536;
const WAIT_FOR_EVENT: i32 = 1;

pub struct PipeListener {
first_instance: AtomicBool,
address: String,
connection_event: isize,
}

#[repr(C)]
Expand All @@ -54,22 +55,18 @@ impl Overlapped {
ol
}

fn new() -> Overlapped {
Overlapped {
inner: UnsafeCell::new(unsafe { std::mem::zeroed() }),
}
}

fn as_mut_ptr(&self) -> *mut OVERLAPPED {
self.inner.get()
}
}

impl PipeListener {
pub(crate) fn new(sockaddr: &str) -> Result<PipeListener> {
let connection_event = create_event()?;
Ok(PipeListener {
first_instance: AtomicBool::new(true),
address: sockaddr.to_string(),
connection_event
})
}

Expand All @@ -83,30 +80,40 @@ impl PipeListener {
}

// Create a new pipe instance for every new client
let np = self.new_instance().unwrap();
let ol = Overlapped::new();
let instance = self.new_instance()?;
let np = match PipeConnection::new(instance) {
Ok(np) => np,
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("failed to create new pipe instance: {:?}", e),
));
}
};

let ol = Overlapped::new_with_event(self.connection_event);

trace!("listening for connection");
let result = unsafe { ConnectNamedPipe(np, ol.as_mut_ptr())};
let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr())};
if result != 0 {
return Err(io::Error::last_os_error());
}

match io::Error::last_os_error() {
e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => {
let mut bytes_transfered = 0;
let res = unsafe {GetOverlappedResult(np, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) };
let res = unsafe {GetOverlappedResult(np.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) };
match res {
0 => {
return Err(io::Error::last_os_error());
}
_ => {
Ok(Some(PipeConnection::new(np)))
Ok(Some(np))
}
}
}
e if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => {
Ok(Some(PipeConnection::new(np)))
Ok(Some(np))
}
e => {
return Err(io::Error::new(
Expand Down Expand Up @@ -143,7 +150,9 @@ impl PipeListener {
}

pub fn close(&self) -> Result<()> {
Ok(())
// release the ConnectNamedPipe thread by signaling the event and clean up event handle
set_event(self.connection_event)?;
close_handle(self.connection_event)
}
}

Expand All @@ -168,15 +177,15 @@ pub struct PipeConnection {
// "It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device."
// "In this situation, there is no way to know which operation caused the object's state to be signaled."
impl PipeConnection {
pub(crate) fn new(h: isize) -> PipeConnection {
pub(crate) fn new(h: isize) -> Result<PipeConnection> {
trace!("creating events for thread {:?} on pipe instance {}", std::thread::current().id(), h as i32);
let read_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) };
let write_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) };
PipeConnection {
let read_event = create_event()?;
let write_event = create_event()?;
Ok(PipeConnection {
named_pipe: h,
read_event: read_event,
write_event: write_event,
}
})
}

pub(crate) fn id(&self) -> i32 {
Expand Down Expand Up @@ -273,6 +282,22 @@ fn close_handle(handle: isize) -> Result<()> {
}
}

fn create_event() -> Result<isize> {
let result = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) };
match result {
0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())),
_ => Ok(result),
}
}

fn set_event(event: isize) -> Result<()> {
let result = unsafe { SetEvent(event) };
match result {
0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())),
_ => Ok(()),
}
}

impl ClientConnection {
pub fn client_connect(sockaddr: &str) -> Result<ClientConnection> {
Ok(ClientConnection::new(sockaddr))
Expand All @@ -289,14 +314,14 @@ impl ClientConnection {
Ok(Some(()))
}

pub fn get_pipe_connection(&self) -> PipeConnection {
pub fn get_pipe_connection(&self) -> Result<PipeConnection> {
let mut opts = OpenOptions::new();
opts.read(true)
.write(true)
.custom_flags(FILE_FLAG_OVERLAPPED);
let file = opts.open(self.address.as_str());

PipeConnection::new(file.unwrap().into_raw_handle() as isize)
return PipeConnection::new(file.unwrap().into_raw_handle() as isize)
}

pub fn close_receiver(&self) -> Result<()> {
Expand Down

0 comments on commit 416d571

Please sign in to comment.