Skip to content

Commit

Permalink
Provide select! macro (#2152)
Browse files Browse the repository at this point in the history
Provides a `select!` macro for concurrently waiting on multiple async
expressions. The macro has similar goals and syntax as the one provided
by the `futures` crate, but differs significantly in implementation.

First, this implementation does not require special traits to be
implemented on futures or streams (i.e., no `FuseFuture`). A design goal
is to be able to pass a "plain" async fn result into the select! macro.

Even without `FuseFuture`, this `select!` implementation is able to
handle all cases the `futures::select!` macro can handle. It does this
by supporting pre-poll conditions on branches and result pattern
matching. For pre-conditions, each branch is able to include a condition
that disables the branch if it evaluates to false. This allows the user
to guard futures that have already been polled, preventing double
polling. Pattern matching can be used to disable streams that complete.

A second big difference is the macro is implemented almost entirely as a
declarative macro. The biggest advantage to using this strategy is that
the user will not need to alter the rustc recursion limit except in the
most extreme cases.

The resulting future also tends to be smaller in many cases.
  • Loading branch information
carllerche authored Jan 23, 2020
1 parent f9ea576 commit 8cf98d6
Show file tree
Hide file tree
Showing 14 changed files with 1,468 additions and 10 deletions.
9 changes: 8 additions & 1 deletion tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ edition = "2018"
publish = false

[features]
full = ["tokio/full", "tokio-test"]
full = [
"macros",
"rt-core",
"rt-threaded",

"tokio/full",
"tokio-test"
]
macros = ["tokio/macros"]
rt-core = ["tokio/rt-core"]
rt-threaded = ["rt-core", "tokio/rt-threaded"]
Expand Down
33 changes: 33 additions & 0 deletions tests-integration/tests/macros_select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#![cfg(feature = "macros")]

use futures::channel::oneshot;
use futures::executor::block_on;
use std::thread;

#[test]
fn join_with_select() {
block_on(async {
let (tx1, mut rx1) = oneshot::channel::<i32>();
let (tx2, mut rx2) = oneshot::channel::<i32>();

thread::spawn(move || {
tx1.send(123).unwrap();
tx2.send(456).unwrap();
});

let mut a = None;
let mut b = None;

while a.is_none() || b.is_none() {
tokio::select! {
v1 = (&mut rx1), if a.is_none() => a = Some(v1.unwrap()),
v2 = (&mut rx2), if b.is_none() => b = Some(v2.unwrap()),
}
}

let (a, b) = (a.unwrap(), b.unwrap());

assert_eq!(a, 123);
assert_eq!(b, 456);
});
}
1 change: 1 addition & 0 deletions tokio-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ proc-macro = true
[features]

[dependencies]
proc-macro2 = "1.0.7"
quote = "1"
syn = { version = "1.0.3", features = ["full"] }

Expand Down
13 changes: 11 additions & 2 deletions tokio-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

//! Macros for use with Tokio
mod entry;

extern crate proc_macro;

mod entry;
mod select;

use proc_macro::TokenStream;

/// Marks async function to be executed by selected runtime.
Expand Down Expand Up @@ -198,3 +199,11 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
pub fn test_basic(args: TokenStream, item: TokenStream) -> TokenStream {
entry::test(args, item, false)
}

/// Implementation detail of the `select!` macro. This macro is **not** intended
/// to be used as part of the public API and is permitted to change.
#[proc_macro]
#[doc(hidden)]
pub fn select_priv_declare_output_enum(input: TokenStream) -> TokenStream {
select::declare_output_enum(input)
}
43 changes: 43 additions & 0 deletions tokio-macros/src/select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use proc_macro::{TokenStream, TokenTree};
use proc_macro2::Span;
use quote::quote;
use syn::Ident;

pub(crate) fn declare_output_enum(input: TokenStream) -> TokenStream {
// passed in is: `(_ _ _)` with one `_` per branch
let branches = match input.into_iter().next() {
Some(TokenTree::Group(group)) => group.stream().into_iter().count(),
_ => panic!("unexpected macro input"),
};

let variants = (0..branches)
.map(|num| Ident::new(&format!("_{}", num), Span::call_site()))
.collect::<Vec<_>>();

// Use a bitfield to track which futures completed
let mask = Ident::new(
if branches <= 8 {
"u8"
} else if branches <= 16 {
"u16"
} else if branches <= 32 {
"u32"
} else if branches <= 64 {
"u64"
} else {
panic!("up to 64 branches supported");
},
Span::call_site(),
);

TokenStream::from(quote! {
pub(super) enum Out<#( #variants ),*> {
#( #variants(#variants), )*
// Include a `Disabled` variant signifying that all select branches
// failed to resolve.
Disabled,
}

pub(super) type Mask = #mask;
})
}
2 changes: 1 addition & 1 deletion tokio/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod maybe_done;
pub(crate) use maybe_done::{maybe_done, MaybeDone};

mod poll_fn;
pub(crate) use poll_fn::poll_fn;
pub use poll_fn::poll_fn;

mod ready;
pub(crate) use ready::{ok, Ready};
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/future/poll_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use std::pin::Pin;
use std::task::{Context, Poll};

/// Future for the [`poll_fn`] function.
pub(crate) struct PollFn<F> {
pub struct PollFn<F> {
f: F,
}

impl<F> Unpin for PollFn<F> {}

/// Creates a new future wrapping around a function returning [`Poll`].
pub(crate) fn poll_fn<T, F>(f: F) -> PollFn<F>
pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
Expand Down
17 changes: 14 additions & 3 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,20 @@
//! }
//! ```
// macros used internally
// Includes re-exports used by macros.
//
// This module is not intended to be part of the public API. In general, any
// `doc(hidden)` code is not part of Tokio's public and stable API.
#[macro_use]
mod macros;
#[doc(hidden)]
pub mod macros;

cfg_fs! {
pub mod fs;
}

mod future;
#[doc(hidden)]
pub mod future;

pub mod io;
pub mod net;
Expand Down Expand Up @@ -333,6 +338,12 @@ cfg_time! {
mod util;

cfg_macros! {
/// Implementation detail of the `select!` macro. This macro is **not**
/// intended to be used as part of the public API and is permitted to
/// change.
#[doc(hidden)]
pub use tokio_macros::select_priv_declare_output_enum;

doc_rt_core! {
cfg_rt_threaded! {
#[cfg(not(test))] // Work around for rust-lang/rust#62127
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/macros/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,14 @@ mod loom;
#[macro_use]
mod ready;

#[macro_use]
mod select;

#[macro_use]
mod thread_local;

cfg_macros! {
// Includes re-exports needed to implement macros
#[doc(hidden)]
pub mod support;
}
Loading

0 comments on commit 8cf98d6

Please sign in to comment.