Skip to content

Commit

Permalink
Implement the missing Windows part considering mio/tokio limitations.
Browse files Browse the repository at this point in the history
With tokio already being stable, but underlying mio being unstable, and none of them providing a generic interface like `AsyncFd` or `SourceFd` for Windows, we can't have nice things.
Currently, the only option is (ab)using the `NamedPipeClient` added in tokio 1.7, although that is only compatible to serial ports by accident.
As there is neither a way to convert mio's `NamedPipe` into tokio's `NamedPipeClient` (stability guarantees strike again), I can only duplicate much code instead of calling it.

I tried to keep the original author's intention of writing platform-independent code, hence I stuffed most of my changes into a new Windows compatibility layer.
I would love to see this replaced someday by a proper implementation, but this requires stable mio and fundamental changes to mio and tokio first (tokio-rs/tokio#3781).
My code is a pragmatic solution for those who need Windows support now.

Frankly, this is probably the scariest Rust code I have committed so far, and I claim no copyright on any of that :)
  • Loading branch information
ColinFinck committed Jul 13, 2021
1 parent bad8688 commit 693e7ee
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 13 deletions.
14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ codec = ["tokio-util/codec", "bytes"]
version = "0.3"

[dependencies.tokio]
version = "1.2"
version = "1.8.1"
default-features = false
features = ["net"]

Expand All @@ -36,7 +36,8 @@ default-features = false
optional = true

[dependencies.mio-serial]
version = "4.0.0-beta1"
git = "https://github.com/berkowski/mio-serial"
branch="v4.0_serialstream"
default-features = false

[dependencies.bytes]
Expand All @@ -45,15 +46,16 @@ default-features = false
optional = true

[dev-dependencies.tokio]
version = "1.2"
version = "1.8.1"
features = ["macros"]
default-features = false

[target.'cfg(windows)'.dependencies]
serialport = { version = "4", default-features = false }
winapi = { version = "0.3", features = ["commapi", "fileapi", "handleapi", "winbase", "winnt"] }


[[example]]
name = "serial_println"
path = "examples/serial_println.rs"
required-features = ["rt", "codec"]

[patch.crates-io]
mio-serial = {git = "https://github.com/berkowski/mio-serial", branch="v4.0_serialstream"}
100 changes: 93 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub use mio_serial::{
SerialPortBuilder, StopBits,
};

use futures::ready;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use std::io::{self, Read, Write};
Expand All @@ -24,11 +23,20 @@ use std::time::Duration;
#[cfg(feature = "codec")]
mod frame;

#[cfg(windows)]
mod windows;

#[cfg(unix)]
mod os_prelude {
pub use futures::ready;
pub use tokio::io::unix::AsyncFd;
}

#[cfg(windows)]
mod os_prelude {
pub use crate::windows::AsyncWindowsSerialStream;
}

use crate::os_prelude::*;

/// A type for results generated by interacting with serial ports.
Expand All @@ -47,17 +55,28 @@ pub type Result<T> = mio_serial::Result<T>;
pub struct SerialStream {
#[cfg(unix)]
inner: AsyncFd<mio_serial::SerialStream>,
#[cfg(windows)]
inner: AsyncWindowsSerialStream,
}

impl SerialStream {
/// Open serial port from a provided path, using the default reactor.
pub fn open(builder: &crate::SerialPortBuilder) -> crate::Result<Self> {
let port = mio_serial::SerialStream::open(builder)?;

#[cfg(unix)]
Ok(Self {
inner: AsyncFd::new(port)?,
})
{
let port = mio_serial::SerialStream::open(builder)?;

Ok(Self {
inner: AsyncFd::new(port)?,
})
}

#[cfg(windows)]
{
Ok(Self {
inner: AsyncWindowsSerialStream::open(builder)?,
})
}
}

/// Create a pair of pseudo serial terminals using the default reactor
Expand Down Expand Up @@ -151,6 +170,7 @@ impl SerialStream {
}
}

#[cfg(unix)]
impl AsyncRead for SerialStream {
/// Attempts to ready bytes on the serial port.
///
Expand Down Expand Up @@ -191,6 +211,7 @@ impl AsyncRead for SerialStream {
}
}

#[cfg(unix)]
impl AsyncWrite for SerialStream {
/// Attempts to send data on the serial port
///
Expand Down Expand Up @@ -240,6 +261,36 @@ impl AsyncWrite for SerialStream {
}
}

#[cfg(windows)]
impl AsyncRead for SerialStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
}
}

#[cfg(windows)]
impl AsyncWrite for SerialStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_shutdown(cx)
}
}

impl crate::SerialPort for SerialStream {
#[inline(always)]
fn name(&self) -> Option<String> {
Expand Down Expand Up @@ -370,12 +421,14 @@ impl crate::SerialPort for SerialStream {
}
}

#[cfg(unix)]
impl Read for SerialStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.get_mut().read(buf)
}
}

#[cfg(unix)]
impl Write for SerialStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.get_mut().write(buf)
Expand All @@ -386,6 +439,40 @@ impl Write for SerialStream {
}
}

// [CF] Most tokio crates don't implement synchronous `Read` and `Write` next to `AsyncRead`/`AsyncWrite`,
// and mixing them is usually discouraged.
// But tokio-serial implements the `SerialPort` trait on `SerialStream`, and this one requires `Read` and `Write`.
//
// We can't just use the `Read` and `Write` implementations of `serialport::COMPort` here.
// They expect Win32 handles opened without FILE_FLAG_OVERLAPPED, but our handles are opened this way.
// Every ReadFile/WriteFile call of `serialport::COMPort` will error immediately due to the missing OVERLAPPED structure.
//
// The upcoming move to Overlapped I/O in https://gitlab.com/susurrus/serialport-rs/-/merge_requests/91 won't change that either.
// After that change and trying to use `serialport::COMPort::{read, write}` from here, we will get an access violation.
// This happens because every Overlapped I/O operation fires an event and every event is reported to the I/O Completion Port of
// our `NamedPipeClient`. The IOCP handler in `mio` expects a callback to be associated to each event, and if that callback
// doesn't exist, things go bust with a STATUS_ACCESS_VIOLATION exception.
//
// Tokio's architecture also doesn't let us use our async functions in a synchronous context.
// Hence, `Read` and `Write` are simply unimplemented under Windows.
#[cfg(windows)]
impl Read for SerialStream {
fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
unimplemented!("Use AsyncRead instead");
}
}

#[cfg(windows)]
impl Write for SerialStream {
fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
unimplemented!("Use AsyncWrite instead");
}

fn flush(&mut self) -> io::Result<()> {
unimplemented!("Use AsyncWrite instead");
}
}

#[cfg(unix)]
mod sys {
use super::SerialStream;
Expand Down Expand Up @@ -413,7 +500,6 @@ pub trait SerialPortBuilderExt {
// /// Open a cross-platform interface to the port with the specified settings
// fn open_async(self) -> Result<Box<dyn MioSerialPort>>;

#[cfg(unix)]
/// Open a platform-specific interface to the port with the specified settings
fn open_async(self) -> Result<SerialStream>;
}
Expand Down
167 changes: 167 additions & 0 deletions src/windows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use serialport::SerialPort;
use std::ffi::OsStr;
use std::io;
use std::mem;
use std::os::windows::ffi::OsStrExt;
use std::os::windows::io::{FromRawHandle, RawHandle};
use std::path::Path;
use std::pin::Pin;
use std::ptr;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::windows::named_pipe::NamedPipeClient;
use winapi::um::commapi::SetCommTimeouts;
use winapi::um::fileapi::{CreateFileW, OPEN_EXISTING};
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
use winapi::um::winbase::{COMMTIMEOUTS, FILE_FLAG_OVERLAPPED};
use winapi::um::winnt::{FILE_ATTRIBUTE_NORMAL, GENERIC_READ, GENERIC_WRITE, HANDLE};

#[derive(Debug)]
pub struct AsyncWindowsSerialStream {
inner: serialport::COMPort,

// [CF] Named pipes and COM ports are actually two entirely different things that hardly have anything in common.
// The only thing they share is the opaque `HANDLE` type that can be fed into `CreateFileW`, `ReadFile`, `WriteFile`, etc.
//
// Both `mio` and `tokio` don't yet have any code to work on arbitrary HANDLEs.
// But they have code for dealing with named pipes, and we (ab)use that here to work on COM ports.
pipe: NamedPipeClient,
}

impl AsyncWindowsSerialStream {
/// Opens a COM port at the specified path
// [CF] Copied from https://github.com/berkowski/mio-serial/blob/38a3778324da5e312cfb31402bd89e52d0548a4c/src/lib.rs#L113-L166
// See remarks in the code for important changes!
pub fn open(builder: &crate::SerialPortBuilder) -> crate::Result<Self> {
let (path, baud, parity, data_bits, stop_bits, flow_control) = {
let com_port = serialport::COMPort::open(builder)?;
let name = com_port.name().ok_or(crate::Error::new(
crate::ErrorKind::NoDevice,
"Empty device name",
))?;
let baud = com_port.baud_rate()?;
let parity = com_port.parity()?;
let data_bits = com_port.data_bits()?;
let stop_bits = com_port.stop_bits()?;
let flow_control = com_port.flow_control()?;

let mut path = Vec::<u16>::new();
path.extend(OsStr::new("\\\\.\\").encode_wide());
path.extend(Path::new(&name).as_os_str().encode_wide());
path.push(0);

(path, baud, parity, data_bits, stop_bits, flow_control)
};

let handle = unsafe {
CreateFileW(
path.as_ptr(),
GENERIC_READ | GENERIC_WRITE,
0,
ptr::null_mut(),
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
0 as HANDLE,
)
};

if handle == INVALID_HANDLE_VALUE {
return Err(crate::Error::from(io::Error::last_os_error()));
}
let handle = unsafe { mem::transmute(handle) };

// Construct NamedPipe and COMPort from Handle
//
// [CF] ATTENTION: First set the COM Port parameters, THEN create the NamedPipeClient.
// If I do it the other way round (as mio-serial does), the runtime hangs
// indefinitely when querying for read readiness!
//
let mut com_port = unsafe { serialport::COMPort::from_raw_handle(handle) };
com_port.set_baud_rate(baud)?;
com_port.set_parity(parity)?;
com_port.set_data_bits(data_bits)?;
com_port.set_stop_bits(stop_bits)?;
com_port.set_flow_control(flow_control)?;
Self::override_comm_timeouts(handle)?;

let pipe = unsafe { NamedPipeClient::from_raw_handle(handle)? };

Ok(Self {
inner: com_port,
pipe,
})
}

// [CF] Get a reference to the underlying `COMPort`.
// The Unix implementation gets a reference to the underlying `mio_serial::SerialStream`,
// but we don't use `mio_serial::SerialStream` for tokio-serial's Windows implementation.
// As long as tokio-serial only calls `SerialPort` trait methods on the returned reference,
// this is compatible.
pub fn get_ref(&self) -> &serialport::COMPort {
&self.inner
}

// [CF] Get a mutable reference to the underlying `COMPort`.
// Notes from above apply.
pub fn get_mut(&mut self) -> &mut serialport::COMPort {
&mut self.inner
}

pub async fn readable(&self) -> io::Result<()> {
self.pipe.readable().await
}

pub async fn writable(&self) -> io::Result<()> {
self.pipe.writable().await
}

/// Overrides timeout value set by serialport-rs so that the read end will
/// never wake up with 0-byte payload.
// [CF] Copied from https://github.com/berkowski/mio-serial/blob/38a3778324da5e312cfb31402bd89e52d0548a4c/src/lib.rs#L685-L702
fn override_comm_timeouts(handle: RawHandle) -> io::Result<()> {
let mut timeouts = COMMTIMEOUTS {
// wait at most 1ms between two bytes (0 means no timeout)
ReadIntervalTimeout: 1,
// disable "total" timeout to wait at least 1 byte forever
ReadTotalTimeoutMultiplier: 0,
ReadTotalTimeoutConstant: 0,
// write timeouts are just copied from serialport-rs
WriteTotalTimeoutMultiplier: 0,
WriteTotalTimeoutConstant: 0,
};

let r = unsafe { SetCommTimeouts(handle, &mut timeouts) };
if r == 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
}

impl AsyncRead for AsyncWindowsSerialStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().pipe).poll_read(cx, buf)
}
}

impl AsyncWrite for AsyncWindowsSerialStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.get_mut().pipe).poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().pipe).poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().pipe).poll_shutdown(cx)
}
}

0 comments on commit 693e7ee

Please sign in to comment.