From 1f48654663b2cf63c6b7526da06c118d4dac7c6d Mon Sep 17 00:00:00 2001 From: csh <458761603@qq.com> Date: Tue, 2 Apr 2024 00:03:31 +0800 Subject: [PATCH] Support wasmedge Signed-off-by: csh <458761603@qq.com> --- tokio/Cargo.toml | 25 ++++++++--- tokio/src/macros/cfg.rs | 14 +++++- tokio/src/net/addr.rs | 67 ++++++++++++++++++++++++++++ tokio/src/net/tcp/socket.rs | 60 +++++++++++++++++++++---- tokio/src/net/udp.rs | 56 +++++++++++++++++++---- tokio/src/runtime/builder.rs | 11 +++-- tokio/src/runtime/io/registration.rs | 2 +- tokio/src/runtime/runtime.rs | 7 ++- 8 files changed, 210 insertions(+), 32 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index ac789e242f9..4a1dcf4c2a0 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -114,7 +114,11 @@ signal-hook-registry = { version = "1.1.1", optional = true } [target.'cfg(unix)'.dev-dependencies] libc = { version = "0.2.149" } -nix = { version = "0.29.0", default-features = false, features = ["aio", "fs", "socket"] } +nix = { version = "0.29.0", default-features = false, features = [ + "aio", + "fs", + "socket", +] } [target.'cfg(windows)'.dependencies.windows-sys] version = "0.52" @@ -122,10 +126,12 @@ optional = true [target.'cfg(windows)'.dev-dependencies.windows-sys] version = "0.52" -features = [ - "Win32_Foundation", - "Win32_Security_Authorization", -] +features = ["Win32_Foundation", "Win32_Security_Authorization"] + +[target.'cfg(all(target_os = "wasi", tokio_unstable))'.dependencies] +libc = { version = "0.2.149", optional = true } +wasmedge_wasi_socket = "0.5" +socket2 = { version = "0.5.5", optional = true, features = ["all"] } [dev-dependencies] tokio-test = { version = "0.4.0", path = "../tokio-test" } @@ -154,7 +160,14 @@ loom = { version = "0.7", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true # enable unstable features in the documentation -rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"] +rustdoc-args = [ + "--cfg", + "docsrs", + "--cfg", + "tokio_unstable", + "--cfg", + "tokio_taskdump", +] # it's necessary to _also_ pass `--cfg tokio_unstable` and `--cfg tokio_taskdump` # to rustc, or else dependencies will not be enabled, and the docs build will fail. rustc-args = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"] diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 3242d3ce2ea..90ea835028c 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -37,6 +37,18 @@ macro_rules! cfg_unix { } } +/// Enables Wasi-specific code. +/// Use this macro instead of `cfg(target_os="wasi")` to generate docs properly. +macro_rules! cfg_wasi { + ($($item:item)*) => { + $( + #[cfg(any(all(doc, docsrs), target_os="wasi"))] + #[cfg_attr(docsrs, doc(cfg(target_os="wasi")))] + $item + )* + } +} + /// Enables unstable Windows-specific code. /// Use this macro instead of `cfg(windows)` to generate docs properly. macro_rules! cfg_unstable_windows { @@ -588,7 +600,7 @@ macro_rules! cfg_not_has_const_mutex_new { macro_rules! cfg_not_wasi { ($($item:item)*) => { $( - #[cfg(not(target_os = "wasi"))] + #[cfg(any(not(target_os = "wasi"), tokio_unstable))] $item )* } diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs index a098990bb79..a9fa80c55d1 100644 --- a/tokio/src/net/addr.rs +++ b/tokio/src/net/addr.rs @@ -161,6 +161,7 @@ cfg_net! { impl ToSocketAddrs for str {} + #[cfg(not(target_os = "wasi"))] impl sealed::ToSocketAddrsPriv for str { type Iter = sealed::OneOrMore; type Future = sealed::MaybeReady; @@ -185,10 +186,39 @@ cfg_net! { } } + + #[cfg(all(target_os = "wasi", tokio_unstable))] + impl sealed::ToSocketAddrsPriv for str { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self, _: sealed::Internal) -> Self::Future { + use sealed::MaybeReady; + + // First check if the input parses as a socket address + let res: Result = self.parse(); + + if let Ok(addr) = res { + return MaybeReady(sealed::State::Ready(Some(addr))); + } + + // Run DNS lookup on the blocking pool + let s = self.to_owned(); + + MaybeReady(sealed::State::Ready({ + let addrs = wasmedge_wasi_socket::ToSocketAddrs::to_socket_addrs(&s).ok(); + addrs.and_then(|mut addrs|{ + addrs.next() + }) + })) + } + } + // ===== impl (&str, u16) ===== impl ToSocketAddrs for (&str, u16) {} + #[cfg(not(target_os = "wasi"))] impl sealed::ToSocketAddrsPriv for (&str, u16) { type Iter = sealed::OneOrMore; type Future = sealed::MaybeReady; @@ -222,6 +252,43 @@ cfg_net! { } } + + #[cfg(all(target_os = "wasi", tokio_unstable))] + impl sealed::ToSocketAddrsPriv for (&str, u16) { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self, _: sealed::Internal) -> Self::Future { + use sealed::MaybeReady; + + let (host, port) = *self; + + // try to parse the host as a regular IP address first + if let Ok(addr) = host.parse::() { + let addr = SocketAddrV4::new(addr, port); + let addr = SocketAddr::V4(addr); + + return MaybeReady(sealed::State::Ready(Some(addr))); + } + + if let Ok(addr) = host.parse::() { + let addr = SocketAddrV6::new(addr, port, 0, 0); + let addr = SocketAddr::V6(addr); + + return MaybeReady(sealed::State::Ready(Some(addr))); + } + + let host = host.to_owned(); + + MaybeReady(sealed::State::Ready({ + let addrs = wasmedge_wasi_socket::ToSocketAddrs::to_socket_addrs(&(&host[..], port)).ok(); + addrs.and_then(|mut addrs|{ + addrs.next() + }) + })) + } + } + // ===== impl (String, u16) ===== impl ToSocketAddrs for (String, u16) {} diff --git a/tokio/src/net/tcp/socket.rs b/tokio/src/net/tcp/socket.rs index a9ba3b0660d..2b21c2bd0d4 100644 --- a/tokio/src/net/tcp/socket.rs +++ b/tokio/src/net/tcp/socket.rs @@ -4,8 +4,8 @@ use std::fmt; use std::io; use std::net::SocketAddr; -#[cfg(unix)] -use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; +#[cfg(not(windows))] +use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; use std::time::Duration; cfg_windows! { @@ -429,6 +429,7 @@ impl TcpSocket { /// # Ok(()) /// # } /// ``` + #[cfg(not(target_os = "wasi"))] pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { self.inner.set_nodelay(nodelay) } @@ -451,6 +452,7 @@ impl TcpSocket { /// # Ok(()) /// # } /// ``` + #[cfg(not(target_os = "wasi"))] pub fn nodelay(&self) -> io::Result { self.inner.nodelay() } @@ -469,6 +471,7 @@ impl TcpSocket { target_os = "redox", target_os = "solaris", target_os = "illumos", + target_os = "wasi", )))] #[cfg_attr( docsrs, @@ -477,6 +480,7 @@ impl TcpSocket { target_os = "redox", target_os = "solaris", target_os = "illumos", + target_os = "wasi", )))) )] pub fn tos(&self) -> io::Result { @@ -496,6 +500,7 @@ impl TcpSocket { target_os = "redox", target_os = "solaris", target_os = "illumos", + target_os = "wasi", )))] #[cfg_attr( docsrs, @@ -504,6 +509,7 @@ impl TcpSocket { target_os = "redox", target_os = "solaris", target_os = "illumos", + target_os = "wasi", )))) )] pub fn set_tos(&self, tos: u32) -> io::Result<()> { @@ -635,7 +641,7 @@ impl TcpSocket { /// ``` pub async fn connect(self, addr: SocketAddr) -> io::Result { if let Err(err) = self.inner.connect(&addr.into()) { - #[cfg(unix)] + #[cfg(any(unix, target_os = "wasi"))] if err.raw_os_error() != Some(libc::EINPROGRESS) { return Err(err); } @@ -644,9 +650,9 @@ impl TcpSocket { return Err(err); } } - #[cfg(unix)] + #[cfg(not(windows))] let mio = { - use std::os::unix::io::{FromRawFd, IntoRawFd}; + use std::os::fd::{FromRawFd, IntoRawFd}; let raw_fd = self.inner.into_raw_fd(); unsafe { mio::net::TcpStream::from_raw_fd(raw_fd) } @@ -700,9 +706,9 @@ impl TcpSocket { /// ``` pub fn listen(self, backlog: u32) -> io::Result { self.inner.listen(backlog as i32)?; - #[cfg(unix)] + #[cfg(not(windows))] let mio = { - use std::os::unix::io::{FromRawFd, IntoRawFd}; + use std::os::fd::{FromRawFd, IntoRawFd}; let raw_fd = self.inner.into_raw_fd(); unsafe { mio::net::TcpListener::from_raw_fd(raw_fd) } @@ -753,9 +759,9 @@ impl TcpSocket { /// } /// ``` pub fn from_std_stream(std_stream: std::net::TcpStream) -> TcpSocket { - #[cfg(unix)] + #[cfg(not(windows))] { - use std::os::unix::io::{FromRawFd, IntoRawFd}; + use std::os::fd::{FromRawFd, IntoRawFd}; let raw_fd = std_stream.into_raw_fd(); unsafe { TcpSocket::from_raw_fd(raw_fd) } @@ -772,6 +778,7 @@ impl TcpSocket { } fn convert_address(address: socket2::SockAddr) -> io::Result { + #[cfg(not(target_os = "wasi"))] match address.as_socket() { Some(address) => Ok(address), None => Err(io::Error::new( @@ -779,6 +786,8 @@ fn convert_address(address: socket2::SockAddr) -> io::Result { "invalid address family (not IPv4 or IPv6)", )), } + #[cfg(target_os = "wasi")] + Ok(address) } impl fmt::Debug for TcpSocket { @@ -820,6 +829,39 @@ cfg_unix! { } } +cfg_wasi! { + impl AsRawFd for TcpSocket { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } + } + + impl AsFd for TcpSocket { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } + } + + impl FromRawFd for TcpSocket { + /// Converts a `RawFd` to a `TcpSocket`. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the socket is in + /// non-blocking mode. + unsafe fn from_raw_fd(fd: RawFd) -> TcpSocket { + let inner = socket2::Socket::from_raw_fd(fd); + TcpSocket { inner } + } + } + + impl IntoRawFd for TcpSocket { + fn into_raw_fd(self) -> RawFd { + self.inner.into_raw_fd() + } + } +} + cfg_windows! { impl IntoRawSocket for TcpSocket { fn into_raw_socket(self) -> RawSocket { diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index a34f7b9225b..3804f0b6969 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -255,6 +255,15 @@ impl UdpSocket { .map(|raw_fd| unsafe { std::net::UdpSocket::from_raw_fd(raw_fd) }) } + #[cfg(all(target_os = "wasi", tokio_unstable))] + { + use std::os::wasi::io::{FromRawFd, IntoRawFd}; + self.io + .into_inner() + .map(IntoRawFd::into_raw_fd) + .map(|raw_fd| unsafe { std::net::UdpSocket::from_raw_fd(raw_fd) }) + } + #[cfg(windows)] { use std::os::windows::io::{FromRawSocket, IntoRawSocket}; @@ -1715,15 +1724,22 @@ impl UdpSocket { #[inline] fn peek_sender_inner(&self) -> io::Result { - self.io.try_io(|| { - self.as_socket() - .peek_sender()? - // May be `None` if the platform doesn't populate the sender for some reason. - // In testing, that only occurred on macOS if you pass a zero-sized buffer, - // but the implementation of `Socket::peek_sender()` covers that. - .as_socket() - .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "sender not available")) - }) + #[cfg(not(target_os = "wasi"))] + { + self.io.try_io(|| { + self.as_socket() + .peek_sender()? + // May be `None` if the platform doesn't populate the sender for some reason. + // In testing, that only occurred on macOS if you pass a zero-sized buffer, + // but the implementation of `Socket::peek_sender()` covers that. + .as_socket() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "sender not available")) + }) + } + #[cfg(target_os = "wasi")] + { + self.io.try_io(|| self.as_socket().peek_sender()) + } } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -1864,6 +1880,7 @@ impl UdpSocket { target_os = "redox", target_os = "solaris", target_os = "illumos", + target_os = "wasi", )))] #[cfg_attr( docsrs, @@ -1872,6 +1889,7 @@ impl UdpSocket { target_os = "redox", target_os = "solaris", target_os = "illumos", + target_os = "wasi", )))) )] pub fn tos(&self) -> io::Result { @@ -1891,6 +1909,7 @@ impl UdpSocket { target_os = "redox", target_os = "solaris", target_os = "illumos", + target_os = "wasi", )))] #[cfg_attr( docsrs, @@ -1899,6 +1918,7 @@ impl UdpSocket { target_os = "redox", target_os = "solaris", target_os = "illumos", + target_os = "wasi", )))) )] pub fn set_tos(&self, tos: u32) -> io::Result<()> { @@ -2031,6 +2051,24 @@ mod sys { } } +#[cfg(target_os = "wasi")] +mod sys { + use super::UdpSocket; + use std::os::wasi::prelude::*; + + impl AsRawFd for UdpSocket { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } + } + + impl AsFd for UdpSocket { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } + } +} + cfg_windows! { use crate::os::windows::io::{AsRawSocket, RawSocket}; use crate::os::windows::io::{AsSocket, BorrowedSocket}; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index ac13db68c4c..05a6f69aff5 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -240,8 +240,11 @@ impl Builder { /// Returns a new builder with the multi thread scheduler selected. /// /// Configuration methods can be chained on the return value. - #[cfg(feature = "rt-multi-thread")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] + #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg_attr( + docsrs, + doc(cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))) + )] pub fn new_multi_thread() -> Builder { // The number `61` is fairly arbitrary. I believe this value was copied from golang. Builder::new(Kind::MultiThread, 61) @@ -259,8 +262,8 @@ impl Builder { /// ready**. /// /// Configuration methods can be chained on the return value. - #[cfg(feature = "rt-multi-thread")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] + #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg_attr(docsrs, doc(cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))))] pub fn new_multi_thread_alt() -> Builder { // The number `61` is fairly arbitrary. I believe this value was copied from golang. Builder::new(Kind::MultiThreadAlt, 61) diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 16e79e82515..de3ca78e539 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -118,7 +118,7 @@ impl Registration { // Uses the poll path, requiring the caller to ensure mutual exclusion for // correctness. Only the last task to call this function is notified. - #[cfg(not(target_os = "wasi"))] + // #[cfg(not(target_os = "wasi"))] pub(crate) fn poll_read_io( &self, cx: &mut Context<'_>, diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 242c37e0fbc..d70dfd249b3 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -179,8 +179,11 @@ impl Runtime { /// [main]: ../attr.main.html /// [threaded scheduler]: index.html#threaded-scheduler /// [runtime builder]: crate::runtime::Builder - #[cfg(feature = "rt-multi-thread")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] + #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg_attr( + docsrs, + doc(cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))) + )] pub fn new() -> std::io::Result { Builder::new_multi_thread().enable_all().build() }