Skip to content

Commit

Permalink
Support wasmedge
Browse files Browse the repository at this point in the history
Signed-off-by: csh <[email protected]>
  • Loading branch information
L-jasmine authored and CaptainVincent committed Dec 1, 2024
1 parent bb7ca75 commit 1f48654
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 32 deletions.
25 changes: 19 additions & 6 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,24 @@ signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.149" }
nix = { version = "0.29.0", default-features = false, features = ["aio", "fs", "socket"] }
nix = { version = "0.29.0", default-features = false, features = [
"aio",
"fs",
"socket",
] }

[target.'cfg(windows)'.dependencies.windows-sys]
version = "0.52"
optional = true

[target.'cfg(windows)'.dev-dependencies.windows-sys]
version = "0.52"
features = [
"Win32_Foundation",
"Win32_Security_Authorization",
]
features = ["Win32_Foundation", "Win32_Security_Authorization"]

[target.'cfg(all(target_os = "wasi", tokio_unstable))'.dependencies]
libc = { version = "0.2.149", optional = true }
wasmedge_wasi_socket = "0.5"
socket2 = { version = "0.5.5", optional = true, features = ["all"] }

[dev-dependencies]
tokio-test = { version = "0.4.0", path = "../tokio-test" }
Expand Down Expand Up @@ -154,7 +160,14 @@ loom = { version = "0.7", features = ["futures", "checkpoint"] }
[package.metadata.docs.rs]
all-features = true
# enable unstable features in the documentation
rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
rustdoc-args = [
"--cfg",
"docsrs",
"--cfg",
"tokio_unstable",
"--cfg",
"tokio_taskdump",
]
# it's necessary to _also_ pass `--cfg tokio_unstable` and `--cfg tokio_taskdump`
# to rustc, or else dependencies will not be enabled, and the docs build will fail.
rustc-args = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
Expand Down
14 changes: 13 additions & 1 deletion tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ macro_rules! cfg_unix {
}
}

/// Enables Wasi-specific code.
/// Use this macro instead of `cfg(target_os="wasi")` to generate docs properly.
macro_rules! cfg_wasi {
($($item:item)*) => {
$(
#[cfg(any(all(doc, docsrs), target_os="wasi"))]
#[cfg_attr(docsrs, doc(cfg(target_os="wasi")))]
$item
)*
}
}

/// Enables unstable Windows-specific code.
/// Use this macro instead of `cfg(windows)` to generate docs properly.
macro_rules! cfg_unstable_windows {
Expand Down Expand Up @@ -588,7 +600,7 @@ macro_rules! cfg_not_has_const_mutex_new {
macro_rules! cfg_not_wasi {
($($item:item)*) => {
$(
#[cfg(not(target_os = "wasi"))]
#[cfg(any(not(target_os = "wasi"), tokio_unstable))]
$item
)*
}
Expand Down
67 changes: 67 additions & 0 deletions tokio/src/net/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ cfg_net! {

impl ToSocketAddrs for str {}

#[cfg(not(target_os = "wasi"))]
impl sealed::ToSocketAddrsPriv for str {
type Iter = sealed::OneOrMore;
type Future = sealed::MaybeReady;
Expand All @@ -185,10 +186,39 @@ cfg_net! {
}
}


#[cfg(all(target_os = "wasi", tokio_unstable))]
impl sealed::ToSocketAddrsPriv for str {
type Iter = sealed::OneOrMore;
type Future = sealed::MaybeReady;

fn to_socket_addrs(&self, _: sealed::Internal) -> Self::Future {
use sealed::MaybeReady;

// First check if the input parses as a socket address
let res: Result<SocketAddr, _> = self.parse();

if let Ok(addr) = res {
return MaybeReady(sealed::State::Ready(Some(addr)));
}

// Run DNS lookup on the blocking pool
let s = self.to_owned();

MaybeReady(sealed::State::Ready({
let addrs = wasmedge_wasi_socket::ToSocketAddrs::to_socket_addrs(&s).ok();
addrs.and_then(|mut addrs|{
addrs.next()
})
}))
}
}

// ===== impl (&str, u16) =====

impl ToSocketAddrs for (&str, u16) {}

#[cfg(not(target_os = "wasi"))]
impl sealed::ToSocketAddrsPriv for (&str, u16) {
type Iter = sealed::OneOrMore;
type Future = sealed::MaybeReady;
Expand Down Expand Up @@ -222,6 +252,43 @@ cfg_net! {
}
}


#[cfg(all(target_os = "wasi", tokio_unstable))]
impl sealed::ToSocketAddrsPriv for (&str, u16) {
type Iter = sealed::OneOrMore;
type Future = sealed::MaybeReady;

fn to_socket_addrs(&self, _: sealed::Internal) -> Self::Future {
use sealed::MaybeReady;

let (host, port) = *self;

// try to parse the host as a regular IP address first
if let Ok(addr) = host.parse::<Ipv4Addr>() {
let addr = SocketAddrV4::new(addr, port);
let addr = SocketAddr::V4(addr);

return MaybeReady(sealed::State::Ready(Some(addr)));
}

if let Ok(addr) = host.parse::<Ipv6Addr>() {
let addr = SocketAddrV6::new(addr, port, 0, 0);
let addr = SocketAddr::V6(addr);

return MaybeReady(sealed::State::Ready(Some(addr)));
}

let host = host.to_owned();

MaybeReady(sealed::State::Ready({
let addrs = wasmedge_wasi_socket::ToSocketAddrs::to_socket_addrs(&(&host[..], port)).ok();
addrs.and_then(|mut addrs|{
addrs.next()
})
}))
}
}

// ===== impl (String, u16) =====

impl ToSocketAddrs for (String, u16) {}
Expand Down
60 changes: 51 additions & 9 deletions tokio/src/net/tcp/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::fmt;
use std::io;
use std::net::SocketAddr;

#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(not(windows))]
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use std::time::Duration;

cfg_windows! {
Expand Down Expand Up @@ -429,6 +429,7 @@ impl TcpSocket {
/// # Ok(())
/// # }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
self.inner.set_nodelay(nodelay)
}
Expand All @@ -451,6 +452,7 @@ impl TcpSocket {
/// # Ok(())
/// # }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn nodelay(&self) -> io::Result<bool> {
self.inner.nodelay()
}
Expand All @@ -469,6 +471,7 @@ impl TcpSocket {
target_os = "redox",
target_os = "solaris",
target_os = "illumos",
target_os = "wasi",
)))]
#[cfg_attr(
docsrs,
Expand All @@ -477,6 +480,7 @@ impl TcpSocket {
target_os = "redox",
target_os = "solaris",
target_os = "illumos",
target_os = "wasi",
))))
)]
pub fn tos(&self) -> io::Result<u32> {
Expand All @@ -496,6 +500,7 @@ impl TcpSocket {
target_os = "redox",
target_os = "solaris",
target_os = "illumos",
target_os = "wasi",
)))]
#[cfg_attr(
docsrs,
Expand All @@ -504,6 +509,7 @@ impl TcpSocket {
target_os = "redox",
target_os = "solaris",
target_os = "illumos",
target_os = "wasi",
))))
)]
pub fn set_tos(&self, tos: u32) -> io::Result<()> {
Expand Down Expand Up @@ -635,7 +641,7 @@ impl TcpSocket {
/// ```
pub async fn connect(self, addr: SocketAddr) -> io::Result<TcpStream> {
if let Err(err) = self.inner.connect(&addr.into()) {
#[cfg(unix)]
#[cfg(any(unix, target_os = "wasi"))]
if err.raw_os_error() != Some(libc::EINPROGRESS) {
return Err(err);
}
Expand All @@ -644,9 +650,9 @@ impl TcpSocket {
return Err(err);
}
}
#[cfg(unix)]
#[cfg(not(windows))]
let mio = {
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::os::fd::{FromRawFd, IntoRawFd};

let raw_fd = self.inner.into_raw_fd();
unsafe { mio::net::TcpStream::from_raw_fd(raw_fd) }
Expand Down Expand Up @@ -700,9 +706,9 @@ impl TcpSocket {
/// ```
pub fn listen(self, backlog: u32) -> io::Result<TcpListener> {
self.inner.listen(backlog as i32)?;
#[cfg(unix)]
#[cfg(not(windows))]
let mio = {
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::os::fd::{FromRawFd, IntoRawFd};

let raw_fd = self.inner.into_raw_fd();
unsafe { mio::net::TcpListener::from_raw_fd(raw_fd) }
Expand Down Expand Up @@ -753,9 +759,9 @@ impl TcpSocket {
/// }
/// ```
pub fn from_std_stream(std_stream: std::net::TcpStream) -> TcpSocket {
#[cfg(unix)]
#[cfg(not(windows))]
{
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::os::fd::{FromRawFd, IntoRawFd};

let raw_fd = std_stream.into_raw_fd();
unsafe { TcpSocket::from_raw_fd(raw_fd) }
Expand All @@ -772,13 +778,16 @@ impl TcpSocket {
}

fn convert_address(address: socket2::SockAddr) -> io::Result<SocketAddr> {
#[cfg(not(target_os = "wasi"))]
match address.as_socket() {
Some(address) => Ok(address),
None => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid address family (not IPv4 or IPv6)",
)),
}
#[cfg(target_os = "wasi")]
Ok(address)
}

impl fmt::Debug for TcpSocket {
Expand Down Expand Up @@ -820,6 +829,39 @@ cfg_unix! {
}
}

cfg_wasi! {
impl AsRawFd for TcpSocket {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}

impl AsFd for TcpSocket {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
}
}

impl FromRawFd for TcpSocket {
/// Converts a `RawFd` to a `TcpSocket`.
///
/// # Notes
///
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_fd(fd: RawFd) -> TcpSocket {
let inner = socket2::Socket::from_raw_fd(fd);
TcpSocket { inner }
}
}

impl IntoRawFd for TcpSocket {
fn into_raw_fd(self) -> RawFd {
self.inner.into_raw_fd()
}
}
}

cfg_windows! {
impl IntoRawSocket for TcpSocket {
fn into_raw_socket(self) -> RawSocket {
Expand Down
Loading

0 comments on commit 1f48654

Please sign in to comment.