Skip to content

Commit

Permalink
task: add task IDs
Browse files Browse the repository at this point in the history
## Motivation

PR #4538 adds a prototype implementation of a `JoinMap` API in
`tokio::task`. In [this comment][1] on that PR, @carllerche pointed out
that a much simpler `JoinMap` type could be implemented outside of
`tokio` (either in `tokio-util` or in user code) if we just modified
`JoinSet` to return a task ID type when spawning new tasks, and when
tasks complete. This seems like a better approach for the following
reasons:

* A `JoinMap`-like type need not become a permanent part of `tokio`'s
  stable API
* Task IDs seem like something that could be generally useful outside of
  a `JoinMap` implementation

## Solution

This branch adds a `tokio::task::Id` type that uniquely identifies a
task relative to all currently spawned tasks. The ID is internally
represented as a `NonZeroUsize` that's based on the address of the
task's header. I thought that it was better to use addresses to generate
IDs than assigning sequential IDs to tasks, because a sequential ID
would mean adding an additional usize field to the task data
somewhere, making it a word bigger. I've added methods to `JoinHandle`
and `AbortHandle` for returning a task's ID.

In addition, I modified `JoinSet` to add a `join_with_id` method that
behaves identically to `join_one` but also returns an ID. This can be
used to implement a `JoinMap` type.

Note that because `join_with_id` must return a task ID regardless of
whether the task completes successfully or returns a `JoinError` (in
order to ensure that dead tasks are always cleaned up from a map), it
inverts the ordering of the `Option` and `Result` returned by `join_one`
--- which we've bikeshedded about a bit [here][2]. This makes the
method's return type inconsistent with the existing `join_one` method,
which feels not great. As I see it, there are three possible solutions
to this:

* change the existing `join_one` method to also swap the `Option` and
  `Result` nesting for consistency.
* change `join_with_id` to return `Result<Option<(Id, T)>, (Id,
  JoinError)>>`, which feels gross...
* add a task ID to `JoinError` as well.

[1]: #4538 (comment)
[2]: #4335 (comment)
  • Loading branch information
hawkw committed Apr 22, 2022
1 parent 1472af5 commit 1f43cb3
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 13 deletions.
9 changes: 9 additions & 0 deletions tokio/src/runtime/task/abort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ impl AbortHandle {
raw.remote_abort();
}
}

/// Returns a [task ID] that uniquely identifies this task relative to other
/// currently running tasks.
///
/// [task ID]: crate::task::Id
pub fn id(&self) -> super::Id {
// XXX(eliza): should this return an option instead? probably not...
self.raw.unwrap().id()
}
}

unsafe impl Send for AbortHandle {}
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/runtime/task/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ impl<T> JoinHandle<T> {
});
super::AbortHandle::new(raw)
}

/// Returns a [task ID] that uniquely identifies this task relative to other
/// currently running tasks.
///
/// [task ID]: crate::task::Id
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> super::Id {
// XXX(eliza): should this return an option instead? probably not...
self.raw.unwrap().id()
}
}

impl<T> Unpin for JoinHandle<T> {}
Expand Down
21 changes: 21 additions & 0 deletions tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,27 @@ use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{fmt, mem};

/// An opaque ID that uniquely identifies a task relative to all other currently
/// running tasks.
///
/// # Notes
///
/// - Task IDs are unique relative to other *currently running* tasks. When a
/// task completes, the same ID may be used for another task.
/// - Task IDs are *not* sequential, and do not indicate the order in which
/// tasks are spawned, what runtime a task is spawned on, or any other data.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [unstable]: crate#unstable-features
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
// TODO(eliza): there's almost certainly no reason not to make this `Copy` as well...
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct Id(std::num::NonZeroUsize);

/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
Expand Down
22 changes: 22 additions & 0 deletions tokio/src/runtime/task/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,28 @@ impl RawTask {
pub(super) fn ref_inc(self) {
self.header().state.ref_inc();
}

#[cfg(tokio_unstable)]
#[inline]
pub(super) fn id(&self) -> super::Id {
use std::num::NonZeroUsize;
let addr = self.ptr.as_ptr() as usize;

#[cfg(debug_assertions)]
let inner = NonZeroUsize::new(addr)
.expect("a `NonNull` pointer will never be 0 when cast to `usize`");

#[cfg(not(debug_assertions))]
let inner = unsafe {
// Safety: `addr` was cast from a `NonNull` pointer, which must
// never be null (0). Since the pointer is not null, the integer
// will never be zero, so this is safe as long as the `NonNull` was
// constructed safely.
NonZeroUsize::new_unchecked(addr)
};

super::Id(inner)
}
}

impl Clone for RawTask {
Expand Down
47 changes: 35 additions & 12 deletions tokio/src/task/join_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use crate::runtime::Handle;
use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet};
use crate::task::{AbortHandle, Id, JoinError, JoinHandle, LocalSet};
use crate::util::IdleNotifiedSet;

/// A collection of tasks spawned on a Tokio runtime.
Expand Down Expand Up @@ -155,6 +155,25 @@ impl<T: 'static> JoinSet<T> {
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinSet`.
pub async fn join_one(&mut self) -> Result<Option<T>, JoinError> {
crate::future::poll_fn(|cx| self.poll_join_one(cx))
.await
.map(|(_, res)| res)
.transpose()
}

/// Waits until one of the tasks in the set completes and returns its
/// output, along with the [task ID] of the completed task.
///
/// Returns `None` if the set is empty.
///
/// # Cancel Safety
///
/// This method is cancel safe. If `join_with_id` is used as the event in a `tokio::select!`
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinSet`.
///
/// [task ID]: crate::task::Id
pub async fn join_with_id(&mut self) -> Option<(Id, Result<T, JoinError>)> {
crate::future::poll_fn(|cx| self.poll_join_one(cx)).await
}

Expand Down Expand Up @@ -191,8 +210,8 @@ impl<T: 'static> JoinSet<T> {

/// Polls for one of the tasks in the set to complete.
///
/// If this returns `Poll::Ready(Ok(Some(_)))` or `Poll::Ready(Err(_))`, then the task that
/// completed is removed from the set.
/// If this returns `Poll::Ready(Some((_, Ok(_))))` or `Poll::Ready(Some((_,
/// Err(_)))`, then the task that completed is removed from the set.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
/// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
Expand All @@ -205,24 +224,27 @@ impl<T: 'static> JoinSet<T> {
///
/// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
/// available right now.
/// * `Poll::Ready(Ok(Some(value)))` if one of the tasks in this `JoinSet` has completed. The
/// `value` is the return value of one of the tasks that completed.
/// * `Poll::Ready(Err(err))` if one of the tasks in this `JoinSet` has panicked or been
/// aborted.
/// * `Poll::Ready(Ok(None))` if the `JoinSet` is empty.
/// * `Poll::Ready(Some((Id, Ok(value))))` if one of the tasks in this `JoinSet` has completed. The
/// `value` is the return value of one of the tasks that completed, while
/// `id` is the [task ID] of that task.
/// * `Poll::Ready(Some((Id, Err(err))))` if one of the tasks in this `JoinSet` has panicked or been
/// aborted. The `err` is the `JoinError` from the panicked/aborted
/// task, and `id` is the [task ID] of that task.
/// * `Poll::Ready(None)` if the `JoinSet` is empty.
///
/// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
/// This can happen if the [coop budget] is reached.
///
/// [coop budget]: crate::task#cooperative-scheduling
fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<T>, JoinError>> {
/// [task ID]: crate::task::Id
fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Option<(Id, Result<T, JoinError>)>> {
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
// the `notified` list if the waker is notified in the `poll` call below.
let mut entry = match self.inner.pop_notified(cx.waker()) {
Some(entry) => entry,
None => {
if self.is_empty() {
return Poll::Ready(Ok(None));
return Poll::Ready(None);
} else {
// The waker was set by `pop_notified`.
return Poll::Pending;
Expand All @@ -233,8 +255,9 @@ impl<T: 'static> JoinSet<T> {
let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));

if let Poll::Ready(res) = res {
entry.remove();
Poll::Ready(Some(res).transpose())
let entry = entry.remove();
let id = entry.id();
Poll::Ready(Some((id, res)))
} else {
// A JoinHandle generally won't emit a wakeup without being ready unless
// the coop limit has been reached. We yield to the executor in this
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ cfg_rt! {
cfg_unstable! {
mod join_set;
pub use join_set::JoinSet;
pub use crate::runtime::task::AbortHandle;
pub use crate::runtime::task::{Id, AbortHandle};
}

cfg_trace! {
Expand Down

0 comments on commit 1f43cb3

Please sign in to comment.