Skip to content

Commit

Permalink
feat!: introduce Runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
jbr committed Apr 19, 2024
1 parent a40cdf9 commit c8a46cc
Show file tree
Hide file tree
Showing 59 changed files with 1,390 additions and 696 deletions.
3 changes: 2 additions & 1 deletion async-std/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ keywords = ["trillium", "framework", "async"]
categories = ["web-programming::http-server", "web-programming"]

[dependencies]
async-std = "1.12.0"
async-std = { version = "1.12.0", features = ["unstable"] }
futures-lite = "2.3.0"
log = "0.4.20"
trillium = { path = "../trillium", version = "0.2.19" }
trillium-http = { path = "../http", version = "0.3.16" }
Expand Down
16 changes: 5 additions & 11 deletions async-std/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::AsyncStdTransport;
use crate::{AsyncStdRuntime, AsyncStdTransport};
use async_std::net::TcpStream;
use std::{
future::Future,
io::{Error, ErrorKind, Result},
};
use std::io::{Error, ErrorKind, Result};
use trillium_server_common::{
url::{Host, Url},
Connector, Transport,
Expand Down Expand Up @@ -45,6 +42,7 @@ impl ClientConfig {

impl Connector for ClientConfig {
type Transport = AsyncStdTransport<TcpStream>;
type Runtime = AsyncStdRuntime;

async fn connect(&self, url: &Url) -> Result<Self::Transport> {
if url.scheme() != "http" {
Expand Down Expand Up @@ -80,11 +78,7 @@ impl Connector for ClientConfig {
Ok(tcp)
}

fn spawn<Fut: Future<Output = ()> + Send + 'static>(&self, fut: Fut) {
async_std::task::spawn(fut);
}

async fn delay(&self, duration: std::time::Duration) {
let _ = async_std::future::timeout(duration, std::future::pending::<()>()).await;
fn runtime(&self) -> Self::Runtime {
AsyncStdRuntime::default()
}
}
8 changes: 2 additions & 6 deletions async-std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ async fn main() {
```
*/

use std::future::Future;

use trillium::Handler;
pub use trillium_server_common::{Binding, Swansong};

Expand Down Expand Up @@ -113,7 +111,5 @@ pub fn config() -> Config<()> {
Config::new()
}

/// spawn and detach a Future that returns ()
pub fn spawn<Fut: Future<Output = ()> + Send + 'static>(future: Fut) {
async_std::task::spawn(future);
}
mod runtime;
pub use runtime::AsyncStdRuntime;
86 changes: 86 additions & 0 deletions async-std/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use futures_lite::future::FutureExt;
use std::{future::Future, time::Duration};
use trillium_server_common::{DroppableFuture, Runtime, RuntimeTrait, Stream};

/// async-std runtime
#[derive(Clone, Copy, Default, Debug)]
pub struct AsyncStdRuntime(());

impl RuntimeTrait for AsyncStdRuntime {
fn spawn<Fut>(
&self,
fut: Fut,
) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let join_handle = async_std::task::spawn(fut);
DroppableFuture::new(async move { join_handle.catch_unwind().await.ok() })
}

async fn delay(&self, duration: Duration) {
async_std::task::sleep(duration).await
}

fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
async_std::stream::interval(period)
}

fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
async_std::task::block_on(fut)
}
}

impl AsyncStdRuntime {
/// Spawn a future on the runtime, returning a future that has detach-on-drop semantics
///
/// Spawned tasks conform to the following behavior:
///
/// * detach on drop: If the returned [`DroppableFuture`] is dropped immediately, the task will
/// continue to execute until completion.
///
/// * unwinding: If the spawned future panics, this must not propagate to the join
/// handle. Instead, the awaiting the join handle returns None in case of panic.
pub fn spawn<Fut>(
&self,
fut: Fut,
) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let join_handle = async_std::task::spawn(fut);
DroppableFuture::new(async move { join_handle.catch_unwind().await.ok() })
}

/// Wake in this amount of wall time
pub async fn delay(&self, duration: Duration) {
async_std::task::sleep(duration).await
}

/// Returns a [`Stream`] that yields a `()` on the provided period
pub fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
async_std::stream::interval(period)
}

/// Runtime implementation hook for blocking on a top level future.
pub fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
async_std::task::block_on(fut)
}

/// Race a future against the provided duration, returning None in case of timeout.
pub async fn timeout<Fut>(&self, duration: Duration, fut: Fut) -> Option<Fut::Output>
where
Fut: Future + Send,
Fut::Output: Send + 'static,
{
RuntimeTrait::timeout(self, duration, fut).await
}
}

impl From<AsyncStdRuntime> for Runtime {
fn from(value: AsyncStdRuntime) -> Self {
Runtime::new(value)
}
}
1 change: 0 additions & 1 deletion async-std/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ mod unix;
#[cfg(unix)]
pub use unix::AsyncStdServer;

#[cfg(not(unix))]
mod tcp;
#[cfg(not(unix))]
pub use tcp::AsyncStdServer;
Expand Down
16 changes: 6 additions & 10 deletions async-std/src/server/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::AsyncStdTransport;
use crate::{AsyncStdRuntime, AsyncStdTransport};
use async_std::net::{TcpListener, TcpStream};
use async_std::task::{block_on, spawn};
use std::{convert::TryInto, env, future::Future, io::Result};
use std::{env, io::Result};
use trillium::Info;
use trillium_server_common::Server;

Expand All @@ -20,6 +19,7 @@ impl From<std::net::TcpListener> for AsyncStdServer {
}

impl Server for AsyncStdServer {
type Runtime = AsyncStdRuntime;
type Transport = AsyncStdTransport<TcpStream>;
const DESCRIPTION: &'static str = concat!(
" (",
Expand All @@ -34,18 +34,14 @@ impl Server for AsyncStdServer {
}

fn listener_from_tcp(tcp: std::net::TcpListener) -> Self {
Self(tcp.try_into().unwrap())
Self(tcp.into())
}

fn info(&self) -> Info {
self.0.local_addr().unwrap().into()
}

fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
spawn(fut);
}

fn block_on(fut: impl Future<Output = ()> + 'static) {
block_on(fut)
fn runtime() -> Self::Runtime {
AsyncStdRuntime::default()
}
}
15 changes: 6 additions & 9 deletions async-std/src/server/unix.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::AsyncStdTransport;
use crate::{AsyncStdRuntime, AsyncStdTransport};
use async_std::{
net::{TcpListener, TcpStream},
os::unix::net::{UnixListener, UnixStream},
stream::StreamExt,
task::{block_on, spawn},
};
use std::{env, future::Future, io::Result};
use std::{env, io::Result};
use trillium::{log_error, Info};
use trillium_server_common::{
Binding::{self, *},
Expand Down Expand Up @@ -39,6 +38,8 @@ impl From<std::os::unix::net::UnixListener> for AsyncStdServer {

#[cfg(unix)]
impl Server for AsyncStdServer {
type Runtime = AsyncStdRuntime;

type Transport = Binding<AsyncStdTransport<TcpStream>, AsyncStdTransport<UnixStream>>;
const DESCRIPTION: &'static str = concat!(
" (",
Expand Down Expand Up @@ -94,12 +95,8 @@ impl Server for AsyncStdServer {
}
}

fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
spawn(fut);
}

fn block_on(fut: impl Future<Output = ()> + 'static) {
block_on(fut);
fn runtime() -> Self::Runtime {
AsyncStdRuntime::default()
}

async fn clean_up(self) {
Expand Down
Loading

0 comments on commit c8a46cc

Please sign in to comment.