Skip to content

Commit

Permalink
pass part echo CI in windows (bytedance#248)
Browse files Browse the repository at this point in the history
* try fix echo_server

* try fix echo_server

* polish code

* polish code

* test WSASend

* pass part CI

* fix clippy

* fix clippy
  • Loading branch information
loongs-zhang authored Mar 18, 2024
1 parent cdc4f52 commit 4b9a125
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 53 deletions.
1 change: 0 additions & 1 deletion monoio/src/buf/io_vec_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ unsafe impl IoVecBuf for VecBuf {
}

#[cfg(unix)]

unsafe impl IoVecBuf for Vec<libc::iovec> {
fn read_iovec_ptr(&self) -> *const libc::iovec {
self.as_ptr()
Expand Down
31 changes: 24 additions & 7 deletions monoio/src/buf/vec_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ impl<'t, T: IoBufMut> From<&'t mut T> for IoVecMeta {
}
}

#[cfg(unix)]
#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -256,9 +255,18 @@ mod tests {
let meta = read_vec_meta(&iovec);
assert_eq!(meta.len(), 60);
assert_eq!(meta.data.len(), 3);
assert_eq!(meta.data[0].iov_len, 10);
assert_eq!(meta.data[1].iov_len, 20);
assert_eq!(meta.data[2].iov_len, 30);
#[cfg(unix)]
{
assert_eq!(meta.data[0].iov_len, 10);
assert_eq!(meta.data[1].iov_len, 20);
assert_eq!(meta.data[2].iov_len, 30);
}
#[cfg(windows)]
{
assert_eq!(meta.data[0].len, 10);
assert_eq!(meta.data[1].len, 20);
assert_eq!(meta.data[2].len, 30);
}
}

#[test]
Expand All @@ -267,8 +275,17 @@ mod tests {
let meta = write_vec_meta(&mut iovec);
assert_eq!(meta.len(), 60);
assert_eq!(meta.data.len(), 3);
assert_eq!(meta.data[0].iov_len, 10);
assert_eq!(meta.data[1].iov_len, 20);
assert_eq!(meta.data[2].iov_len, 30);
#[cfg(unix)]
{
assert_eq!(meta.data[0].iov_len, 10);
assert_eq!(meta.data[1].iov_len, 20);
assert_eq!(meta.data[2].iov_len, 30);
}
#[cfg(windows)]
{
assert_eq!(meta.data[0].len, 10);
assert_eq!(meta.data[1].len, 20);
assert_eq!(meta.data[2].len, 30);
}
}
}
27 changes: 14 additions & 13 deletions monoio/src/driver/op/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
std::ffi::c_void,
windows_sys::Win32::{
Foundation::TRUE,
Networking::WinSock::{WSAGetLastError, WSARecv, SOCKET_ERROR},
Networking::WinSock::{WSAGetLastError, WSARecv, WSAESHUTDOWN},
Storage::FileSystem::{ReadFile, SetFilePointer, FILE_CURRENT, INVALID_SET_FILE_POINTER},
},
};
Expand Down Expand Up @@ -155,9 +155,7 @@ impl<T: IoVecBufMut> Op<ReadVec<T>> {

if let Ok(n) = res {
// Safety: the kernel wrote `n` bytes to the buffer.
unsafe {
buf_vec.set_init(n);
}
unsafe { buf_vec.set_init(n) };
}
(res, buf_vec)
}
Expand Down Expand Up @@ -188,26 +186,29 @@ impl<T: IoVecBufMut> OpAble for ReadVec<T> {

#[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))]
fn legacy_call(&mut self) -> io::Result<u32> {
let mut bytes_recved = 0;
let mut nread = 0;
let mut flags = 0;
let ret = unsafe {
WSARecv(
self.fd.raw_socket() as _,
self.buf_vec.write_wsabuf_ptr(),
self.buf_vec.write_wsabuf_len() as _,
&mut bytes_recved,
std::ptr::null_mut(),
self.buf_vec.write_wsabuf_len().min(u32::MAX as usize) as _,
&mut nread,
&mut flags,
std::ptr::null_mut(),
None,
)
};
match ret {
0 => return Err(std::io::ErrorKind::WouldBlock.into()),
SOCKET_ERROR => {
0 => Ok(nread),
_ => {
let error = unsafe { WSAGetLastError() };
return Err(std::io::Error::from_raw_os_error(error));
if error == WSAESHUTDOWN {
Ok(0)
} else {
Err(io::Error::from_raw_os_error(error))
}
}
_ => (),
}
Ok(bytes_recved)
}
}
2 changes: 1 addition & 1 deletion monoio/src/driver/op/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl<T: IoBufMut> OpAble for Recv<T> {
recv(
fd as _,
self.buf.write_ptr(),
self.buf.bytes_total() as _,
self.buf.bytes_total().min(i32::MAX as usize) as _,
0
),
PartialOrd::lt,
Expand Down
38 changes: 14 additions & 24 deletions monoio/src/driver/op/write.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::io;
#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))]
use std::os::unix::prelude::AsRawFd;

#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
use windows_sys::Win32::{
Foundation::TRUE,
Networking::WinSock::{WSAGetLastError, WSASend, SOCKET_ERROR},
Networking::WinSock::WSASend,
Storage::FileSystem::{SetFilePointer, WriteFile, FILE_CURRENT, INVALID_SET_FILE_POINTER},
};
#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))]
use {crate::syscall_u32, std::os::unix::prelude::AsRawFd};

use super::{super::shared_fd::SharedFd, Op, OpAble};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::driver::ready::Direction;
use crate::{
buf::{IoBuf, IoVecBuf},
BufResult,
syscall_u32, BufResult,
};

pub(crate) struct Write<T> {
Expand Down Expand Up @@ -176,25 +176,15 @@ impl<T: IoVecBuf> OpAble for WriteVec<T> {
#[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))]
fn legacy_call(&mut self) -> io::Result<u32> {
let mut bytes_sent = 0;
let ret = unsafe {
WSASend(
self.fd.raw_socket() as _,
self.buf_vec.read_wsabuf_ptr(),
self.buf_vec.read_wsabuf_len() as _,
&mut bytes_sent,
0,
std::ptr::null_mut(),
None,
)
};
match ret {
0 => return Err(std::io::ErrorKind::WouldBlock.into()),
SOCKET_ERROR => {
let error = unsafe { WSAGetLastError() };
return Err(std::io::Error::from_raw_os_error(error));
}
_ => (),
}
Ok(bytes_sent)
syscall_u32!(WSASend(
self.fd.raw_socket() as _,
self.buf_vec.read_wsabuf_ptr(),
self.buf_vec.read_wsabuf_len() as _,
&mut bytes_sent,
0,
std::ptr::null_mut(),
None,
))
.map(|_| bytes_sent)
}
}
2 changes: 1 addition & 1 deletion monoio/src/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl ScheduledIo {
match slot {
Some(existing) => {
if !existing.will_wake(cx.waker()) {
*existing = cx.waker().clone();
existing.clone_from(cx.waker());
}
}
None => {
Expand Down
2 changes: 1 addition & 1 deletion monoio/src/driver/shared_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ impl Inner {
}
UringState::Waiting(Some(waker)) => {
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
waker.clone_from(cx.waker());
}

Poll::Pending
Expand Down
3 changes: 3 additions & 0 deletions monoio/src/driver/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ macro_rules! syscall {
#[macro_export]
macro_rules! syscall_u32 {
($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
#[cfg(windows)]
let res = unsafe { $fn($($arg, )*) };
#[cfg(unix)]
let res = unsafe { libc::$fn($($arg, )*) };
if res < 0 {
Err(std::io::Error::last_os_error())
Expand Down
12 changes: 7 additions & 5 deletions monoio/tests/tcp_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use monoio::{
net::{TcpListener, TcpStream},
};

// todo fix these CI in windows
#[cfg(not(windows))]
#[monoio::test_all]
async fn echo_server() {
const ITER: usize = 1024;
Expand Down Expand Up @@ -58,10 +56,14 @@ async fn echo_server() {
let (stream, _) = srv.accept().await.unwrap();
let (mut rd, mut wr) = stream.into_split();

let n = io::copy(&mut rd, &mut wr).await.unwrap();
assert_eq!(n, (ITER * (msg.len() + iov_msg.len())) as u64);
// todo fix these CI in windows
#[cfg(not(windows))]
{
let n = io::copy(&mut rd, &mut wr).await.unwrap();
assert_eq!(n, (ITER * (msg.len() + iov_msg.len())) as u64);

assert!(rx.await.is_ok());
assert!(rx.await.is_ok());
}
}

#[monoio::test_all(timer_enabled = true)]
Expand Down

0 comments on commit 4b9a125

Please sign in to comment.