Skip to content

Commit

Permalink
Future-proof the Futures API
Browse files Browse the repository at this point in the history
  • Loading branch information
cramertj committed Apr 5, 2019
1 parent 20dbf28 commit 1691e06
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 70 deletions.
6 changes: 3 additions & 3 deletions src/liballoc/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use core::ops::{
CoerceUnsized, DispatchFromDyn, Deref, DerefMut, Receiver, Generator, GeneratorState
};
use core::ptr::{self, NonNull, Unique};
use core::task::{Waker, Poll};
use core::task::{Context, Poll};

use crate::vec::Vec;
use crate::raw_vec::RawVec;
Expand Down Expand Up @@ -914,7 +914,7 @@ impl<G: ?Sized + Generator> Generator for Pin<Box<G>> {
impl<F: ?Sized + Future + Unpin> Future for Box<F> {
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
F::poll(Pin::new(&mut *self), waker)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
F::poll(Pin::new(&mut *self), cx)
}
}
18 changes: 10 additions & 8 deletions src/libcore/future/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use marker::Unpin;
use ops;
use pin::Pin;
use task::{Poll, Waker};
use task::{Context, Poll};

/// A future represents an asynchronous computation.
///
Expand Down Expand Up @@ -44,8 +44,9 @@ pub trait Future {
/// Once a future has finished, clients should not `poll` it again.
///
/// When a future is not ready yet, `poll` returns `Poll::Pending` and
/// stores a clone of the [`Waker`] to be woken once the future can
/// make progress. For example, a future waiting for a socket to become
/// stores a clone of the [`Waker`] copied from the current [`Context`].
/// This [`Waker`] is then woken once the future can make progress.
/// For example, a future waiting for a socket to become
/// readable would call `.clone()` on the [`Waker`] and store it.
/// When a signal arrives elsewhere indicating that the socket is readable,
/// `[Waker::wake]` is called and the socket future's task is awoken.
Expand Down Expand Up @@ -88,16 +89,17 @@ pub trait Future {
///
/// [`Poll::Pending`]: ../task/enum.Poll.html#variant.Pending
/// [`Poll::Ready(val)`]: ../task/enum.Poll.html#variant.Ready
/// [`Context`]: ../task/struct.Context.html
/// [`Waker`]: ../task/struct.Waker.html
/// [`Waker::wake`]: ../task/struct.Waker.html#method.wake
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

impl<F: ?Sized + Future + Unpin> Future for &mut F {
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
F::poll(Pin::new(&mut **self), waker)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
F::poll(Pin::new(&mut **self), cx)
}
}

Expand All @@ -108,7 +110,7 @@ where
{
type Output = <<P as ops::Deref>::Target as Future>::Output;

fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
Pin::get_mut(self).as_mut().poll(waker)
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::get_mut(self).as_mut().poll(cx)
}
}
2 changes: 1 addition & 1 deletion src/libcore/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ mod poll;
pub use self::poll::Poll;

mod wake;
pub use self::wake::{Waker, RawWaker, RawWakerVTable};
pub use self::wake::{Context, Waker, RawWaker, RawWakerVTable};
101 changes: 97 additions & 4 deletions src/libcore/task/wake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
issue = "50547")]

use fmt;
use marker::Unpin;
use marker::{PhantomData, Unpin};

/// A `RawWaker` allows the implementor of a task executor to create a [`Waker`]
/// which provides customized wakeup behavior.
Expand Down Expand Up @@ -36,6 +36,10 @@ impl RawWaker {
/// The `vtable` customizes the behavior of a `Waker` which gets created
/// from a `RawWaker`. For each operation on the `Waker`, the associated
/// function in the `vtable` of the underlying `RawWaker` will be called.
#[rustc_promotable]
#[unstable(feature = "futures_api",
reason = "futures in libcore are unstable",
issue = "50547")]
pub const fn new(data: *const (), vtable: &'static RawWakerVTable) -> RawWaker {
RawWaker {
data,
Expand Down Expand Up @@ -63,21 +67,105 @@ pub struct RawWakerVTable {
/// required for this additional instance of a [`RawWaker`] and associated
/// task. Calling `wake` on the resulting [`RawWaker`] should result in a wakeup
/// of the same task that would have been awoken by the original [`RawWaker`].
pub clone: unsafe fn(*const ()) -> RawWaker,
clone: unsafe fn(*const ()) -> RawWaker,

/// This function will be called when `wake` is called on the [`Waker`].
/// It must wake up the task associated with this [`RawWaker`].
///
/// The implemention of this function must not consume the provided data
/// pointer.
pub wake: unsafe fn(*const ()),
wake: unsafe fn(*const ()),

/// This function gets called when a [`RawWaker`] gets dropped.
///
/// The implementation of this function must make sure to release any
/// resources that are associated with this instance of a [`RawWaker`] and
/// associated task.
drop: unsafe fn(*const ()),
}

impl RawWakerVTable {
/// Creates a new `RawWakerVTable` from the provided `clone`, `wake`, and
/// `drop` functions.
///
/// # `clone`
///
/// This function will be called when the [`RawWaker`] gets cloned, e.g. when
/// the [`Waker`] in which the [`RawWaker`] is stored gets cloned.
///
/// The implementation of this function must retain all resources that are
/// required for this additional instance of a [`RawWaker`] and associated
/// task. Calling `wake` on the resulting [`RawWaker`] should result in a wakeup
/// of the same task that would have been awoken by the original [`RawWaker`].
///
/// # `wake`
///
/// This function will be called when `wake` is called on the [`Waker`].
/// It must wake up the task associated with this [`RawWaker`].
///
/// The implemention of this function must not consume the provided data
/// pointer.
///
/// # `drop`
///
/// This function gets called when a [`RawWaker`] gets dropped.
///
/// The implementation of this function must make sure to release any
/// resources that are associated with this instance of a [`RawWaker`] and
/// associated task.
pub drop: unsafe fn(*const ()),
#[rustc_promotable]
#[unstable(feature = "futures_api",
reason = "futures in libcore are unstable",
issue = "50547")]
pub const fn new(
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
) -> Self {
Self {
clone,
wake,
drop,
}
}
}

/// The `Context` of an asynchronous task.
///
/// Currently, `Context` only serves to provide access to a `&Waker`
/// which can be used to wake the current task.
pub struct Context<'a> {
waker: &'a Waker,
// Ensure we future-proof against variance changes by forcing
// the lifetime to be invariant (argument-position lifetimes
// are contravariant while return-position lifetimes are
// covariant).
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

impl<'a> Context<'a> {
/// Create a new `Context` from a `&Waker`.
#[inline]
pub fn from_waker(waker: &'a Waker) -> Self {
Context {
waker,
_marker: PhantomData,
}
}

/// Returns a reference to the `Waker` for the current task.
#[inline]
pub fn waker(&self) -> &'a Waker {
&self.waker
}
}

impl fmt::Debug for Context<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Context")
.field("waker", &self.waker)
.finish()
}
}

/// A `Waker` is a handle for waking up a task by notifying its executor that it
Expand All @@ -98,6 +186,7 @@ unsafe impl Sync for Waker {}

impl Waker {
/// Wake up the task associated with this `Waker`.
#[inline]
pub fn wake(&self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
Expand All @@ -115,6 +204,7 @@ impl Waker {
/// returns `true`, it is guaranteed that the `Waker`s will awaken the same task.
///
/// This function is primarily used for optimization purposes.
#[inline]
pub fn will_wake(&self, other: &Waker) -> bool {
self.waker == other.waker
}
Expand All @@ -124,6 +214,7 @@ impl Waker {
/// The behavior of the returned `Waker` is undefined if the contract defined
/// in [`RawWaker`]'s and [`RawWakerVTable`]'s documentation is not upheld.
/// Therefore this method is unsafe.
#[inline]
pub unsafe fn new_unchecked(waker: RawWaker) -> Waker {
Waker {
waker,
Expand All @@ -132,6 +223,7 @@ impl Waker {
}

impl Clone for Waker {
#[inline]
fn clone(&self) -> Self {
Waker {
// SAFETY: This is safe because `Waker::new_unchecked` is the only way
Expand All @@ -143,6 +235,7 @@ impl Clone for Waker {
}

impl Drop for Waker {
#[inline]
fn drop(&mut self) {
// SAFETY: This is safe because `Waker::new_unchecked` is the only way
// to initialize `drop` and `data` requiring the user to acknowledge
Expand Down
61 changes: 36 additions & 25 deletions src/libstd/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use core::marker::Unpin;
use core::pin::Pin;
use core::option::Option;
use core::ptr::NonNull;
use core::task::{Waker, Poll};
use core::task::{Context, Poll};
use core::ops::{Drop, Generator, GeneratorState};

#[doc(inline)]
Expand All @@ -32,72 +32,83 @@ impl<T: Generator<Yield = ()>> !Unpin for GenFuture<T> {}
#[unstable(feature = "gen_future", issue = "50547")]
impl<T: Generator<Yield = ()>> Future for GenFuture<T> {
type Output = T::Return;
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Safe because we're !Unpin + !Drop mapping to a ?Unpin value
let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };
set_task_waker(waker, || match gen.resume() {
set_task_context(cx, || match gen.resume() {
GeneratorState::Yielded(()) => Poll::Pending,
GeneratorState::Complete(x) => Poll::Ready(x),
})
}
}

thread_local! {
static TLS_WAKER: Cell<Option<NonNull<Waker>>> = Cell::new(None);
static TLS_CX: Cell<Option<NonNull<Context<'static>>>> = Cell::new(None);
}

struct SetOnDrop(Option<NonNull<Waker>>);
struct SetOnDrop(Option<NonNull<Context<'static>>>);

impl Drop for SetOnDrop {
fn drop(&mut self) {
TLS_WAKER.with(|tls_waker| {
tls_waker.set(self.0.take());
TLS_CX.with(|tls_cx| {
tls_cx.set(self.0.take());
});
}
}

#[unstable(feature = "gen_future", issue = "50547")]
/// Sets the thread-local task context used by async/await futures.
pub fn set_task_waker<F, R>(waker: &Waker, f: F) -> R
pub fn set_task_context<F, R>(cx: &mut Context<'_>, f: F) -> R
where
F: FnOnce() -> R
{
let old_waker = TLS_WAKER.with(|tls_waker| {
tls_waker.replace(Some(NonNull::from(waker)))
// transmute the context's lifetime to 'static so we can store it.
let cx = unsafe {
core::mem::transmute::<&mut Context<'_>, &mut Context<'static>>(cx)
};
let old_cx = TLS_CX.with(|tls_cx| {
tls_cx.replace(Some(NonNull::from(cx)))
});
let _reset_waker = SetOnDrop(old_waker);
let _reset = SetOnDrop(old_cx);
f()
}

#[unstable(feature = "gen_future", issue = "50547")]
/// Retrieves the thread-local task waker used by async/await futures.
/// Retrieves the thread-local task context used by async/await futures.
///
/// This function acquires exclusive access to the task waker.
/// This function acquires exclusive access to the task context.
///
/// Panics if no waker has been set or if the waker has already been
/// retrieved by a surrounding call to get_task_waker.
pub fn get_task_waker<F, R>(f: F) -> R
/// Panics if no context has been set or if the context has already been
/// retrieved by a surrounding call to get_task_context.
pub fn get_task_context<F, R>(f: F) -> R
where
F: FnOnce(&Waker) -> R
F: FnOnce(&mut Context<'_>) -> R
{
let waker_ptr = TLS_WAKER.with(|tls_waker| {
let cx_ptr = TLS_CX.with(|tls_cx| {
// Clear the entry so that nested `get_task_waker` calls
// will fail or set their own value.
tls_waker.replace(None)
tls_cx.replace(None)
});
let _reset_waker = SetOnDrop(waker_ptr);
let _reset = SetOnDrop(cx_ptr);

let waker_ptr = waker_ptr.expect(
"TLS Waker not set. This is a rustc bug. \
let mut cx_ptr = cx_ptr.expect(
"TLS Context not set. This is a rustc bug. \
Please file an issue on https://github.com/rust-lang/rust.");
unsafe { f(waker_ptr.as_ref()) }

// Safety: we've ensured exclusive access to the context by
// removing the pointer from TLS, only to be replaced once
// we're done with it.
//
// The pointer that was inserted came from an `&mut Context<'_>`,
// so it is safe to treat as mutable.
unsafe { f(cx_ptr.as_mut()) }
}

#[unstable(feature = "gen_future", issue = "50547")]
/// Polls a future in the current thread-local task waker.
pub fn poll_with_tls_waker<F>(f: Pin<&mut F>) -> Poll<F::Output>
pub fn poll_with_tls_context<F>(f: Pin<&mut F>) -> Poll<F::Output>
where
F: Future
{
get_task_waker(|waker| F::poll(f, waker))
get_task_context(|cx| F::poll(f, cx))
}
2 changes: 1 addition & 1 deletion src/libstd/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ macro_rules! r#await {
let mut pinned = $e;
loop {
if let $crate::task::Poll::Ready(x) =
$crate::future::poll_with_tls_waker(unsafe {
$crate::future::poll_with_tls_context(unsafe {
$crate::pin::Pin::new_unchecked(&mut pinned)
})
{
Expand Down
6 changes: 3 additions & 3 deletions src/libstd/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::panicking;
use crate::ptr::{Unique, NonNull};
use crate::rc::Rc;
use crate::sync::{Arc, Mutex, RwLock, atomic};
use crate::task::{Waker, Poll};
use crate::task::{Context, Poll};
use crate::thread::Result;

#[stable(feature = "panic_hooks", since = "1.10.0")]
Expand Down Expand Up @@ -323,9 +323,9 @@ impl<T: fmt::Debug> fmt::Debug for AssertUnwindSafe<T> {
impl<F: Future> Future for AssertUnwindSafe<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pinned_field = unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) };
F::poll(pinned_field, waker)
F::poll(pinned_field, cx)
}
}

Expand Down
Loading

0 comments on commit 1691e06

Please sign in to comment.