Skip to content

Commit

Permalink
task: add join_set::Builder for configuring JoinSet tasks (#4687)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw authored May 30, 2022
1 parent 88e8c62 commit 2bad98f
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 3 deletions.
2 changes: 1 addition & 1 deletion tokio/src/task/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl<'a> Builder<'a> {
/// Spawns `!Send` a task on the current [`LocalSet`] with this builder's
/// settings.
///
/// The spawned future will be run on the same thread that called `spawn_local.`
/// The spawned future will be run on the same thread that called `spawn_local`.
/// This may only be called from the context of a [local task set][`LocalSet`].
///
/// # Panics
Expand Down
156 changes: 155 additions & 1 deletion tokio/src/task/join_set.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
//! A collection of tasks spawned on a Tokio runtime.
//!
//! This module provides the [`JoinSet`] type, a collection which stores a set
//! of spawned tasks and allows asynchronously awaiting the output of those
//! tasks as they complete. See the documentation for the [`JoinSet`] type for
//! details.
use std::fmt;
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -53,6 +59,18 @@ pub struct JoinSet<T> {
inner: IdleNotifiedSet<JoinHandle<T>>,
}

/// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather
/// than on the current default runtime.
///
/// [`task::Builder`]: crate::task::Builder
#[cfg(all(tokio_unstable, feature = "tracing"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
#[must_use = "builders do nothing unless used to spawn a task"]
pub struct Builder<'a, T> {
joinset: &'a mut JoinSet<T>,
builder: super::Builder<'a>,
}

impl<T> JoinSet<T> {
/// Create a new `JoinSet`.
pub fn new() -> Self {
Expand All @@ -73,6 +91,33 @@ impl<T> JoinSet<T> {
}

impl<T: 'static> JoinSet<T> {
/// Returns a [`Builder`] that can be used to configure a task prior to
/// spawning it on this `JoinSet`.
///
/// # Examples
///
/// ```
/// use tokio::task::JoinSet;
///
/// #[tokio::main]
/// async fn main() {
/// let mut set = JoinSet::new();
///
/// // Use the builder to configure a task's name before spawning it.
/// set.build_task()
/// .name("my_task")
/// .spawn(async { /* ... */ });
/// }
/// ```
#[cfg(all(tokio_unstable, feature = "tracing"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
pub fn build_task(&mut self) -> Builder<'_, T> {
Builder {
builder: super::Builder::new(),
joinset: self,
}
}

/// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
/// that can be used to remotely cancel the task.
///
Expand Down Expand Up @@ -107,7 +152,7 @@ impl<T: 'static> JoinSet<T> {
}

/// Spawn the provided task on the current [`LocalSet`] and store it in this
/// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
/// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
/// cancel the task.
///
/// # Panics
Expand Down Expand Up @@ -289,3 +334,112 @@ impl<T> Default for JoinSet<T> {
Self::new()
}
}

// === impl Builder ===

#[cfg(all(tokio_unstable, feature = "tracing"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
impl<'a, T: 'static> Builder<'a, T> {
/// Assigns a name to the task which will be spawned.
pub fn name(self, name: &'a str) -> Self {
let builder = self.builder.name(name);
Self { builder, ..self }
}

/// Spawn the provided task with this builder's settings and store it in the
/// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely
/// cancel the task.
///
/// # Returns
///
/// An [`AbortHandle`] that can be used to remotely cancel the task.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
pub fn spawn<F>(self, future: F) -> AbortHandle
where
F: Future<Output = T>,
F: Send + 'static,
T: Send,
{
self.joinset.insert(self.builder.spawn(future))
}

/// Spawn the provided task on the provided [runtime handle] with this
/// builder's settings, and store it in the [`JoinSet`].
///
/// # Returns
///
/// An [`AbortHandle`] that can be used to remotely cancel the task.
///
///
/// [`AbortHandle`]: crate::task::AbortHandle
/// [runtime handle]: crate::runtime::Handle
#[track_caller]
pub fn spawn_on<F>(mut self, future: F, handle: &Handle) -> AbortHandle
where
F: Future<Output = T>,
F: Send + 'static,
T: Send,
{
self.joinset.insert(self.builder.spawn_on(future, handle))
}

/// Spawn the provided task on the current [`LocalSet`] with this builder's
/// settings, and store it in the [`JoinSet`].
///
/// # Returns
///
/// An [`AbortHandle`] that can be used to remotely cancel the task.
///
/// # Panics
///
/// This method panics if it is called outside of a `LocalSet`.
///
/// [`LocalSet`]: crate::task::LocalSet
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
pub fn spawn_local<F>(self, future: F) -> AbortHandle
where
F: Future<Output = T>,
F: 'static,
{
self.joinset.insert(self.builder.spawn_local(future))
}

/// Spawn the provided task on the provided [`LocalSet`] with this builder's
/// settings, and store it in the [`JoinSet`].
///
/// # Returns
///
/// An [`AbortHandle`] that can be used to remotely cancel the task.
///
/// [`LocalSet`]: crate::task::LocalSet
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> AbortHandle
where
F: Future<Output = T>,
F: 'static,
{
self.joinset
.insert(self.builder.spawn_local_on(future, local_set))
}
}

// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is
// `Debug`.
#[cfg(all(tokio_unstable, feature = "tracing"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
impl<'a, T> fmt::Debug for Builder<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("join_set::Builder")
.field("joinset", &self.joinset)
.field("builder", &self.builder)
.finish()
}
}
6 changes: 6 additions & 0 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ impl LocalSet {
{
let handle = self.context.spawn(future, name);

// Because a task was spawned from *outside* the `LocalSet`, wake the
// `LocalSet` future to execute the new task, if it hasn't been woken.
//
// Spawning via the free fn `spawn` does not require this, as it can
// only be called from *within* a future executing on the `LocalSet` —
// in that case, the `LocalSet` must already be awake.
self.context.shared.waker.wake();
handle
}
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ cfg_rt! {
pub use unconstrained::{unconstrained, Unconstrained};

cfg_unstable! {
mod join_set;
pub mod join_set;
#[doc(inline)]
pub use join_set::JoinSet;
pub use crate::runtime::task::{Id, AbortHandle};
}
Expand Down

0 comments on commit 2bad98f

Please sign in to comment.