Skip to content

Commit

Permalink
util: add lifetime parameter to ReusableBoxFuture (tokio-rs#3762)
Browse files Browse the repository at this point in the history
Co-authored-by: Toby Lawrence <[email protected]>
  • Loading branch information
Kestrer and tobz authored Feb 9, 2022
1 parent 52fb93d commit 9c688ec
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 24 deletions.
2 changes: 1 addition & 1 deletion tokio-stream/src/wrappers/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::task::{Context, Poll};
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct BroadcastStream<T> {
inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>,
}

/// An error returned from the inner stream of a [`BroadcastStream`].
Expand Down
2 changes: 1 addition & 1 deletion tokio-stream/src/wrappers/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use tokio::sync::watch::error::RecvError;
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct WatchStream<T> {
inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>,
}

async fn make_future<T: Clone + Send + Sync>(
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/src/sync/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ enum State<T> {
pub struct PollSender<T> {
sender: Option<Sender<T>>,
state: State<T>,
acquire: ReusableBoxFuture<Result<OwnedPermit<T>, PollSendError<T>>>,
acquire: ReusableBoxFuture<'static, Result<OwnedPermit<T>, PollSendError<T>>>,
}

// Creates a future for acquiring a permit from the underlying channel. This is used to ensure
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/src/sync/poll_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::ReusableBoxFuture;
/// [`Semaphore`]: tokio::sync::Semaphore
pub struct PollSemaphore {
semaphore: Arc<Semaphore>,
permit_fut: Option<ReusableBoxFuture<Result<OwnedSemaphorePermit, AcquireError>>>,
permit_fut: Option<ReusableBoxFuture<'static, Result<OwnedSemaphorePermit, AcquireError>>>,
}

impl PollSemaphore {
Expand Down
37 changes: 17 additions & 20 deletions tokio-util/src/sync/reusable_box.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,23 @@ use std::ptr::{self, NonNull};
use std::task::{Context, Poll};
use std::{fmt, panic};

/// A reusable `Pin<Box<dyn Future<Output = T> + Send>>`.
/// A reusable `Pin<Box<dyn Future<Output = T> + Send + 'a>>`.
///
/// This type lets you replace the future stored in the box without
/// reallocating when the size and alignment permits this.
pub struct ReusableBoxFuture<T> {
boxed: NonNull<dyn Future<Output = T> + Send>,
pub struct ReusableBoxFuture<'a, T> {
boxed: NonNull<dyn Future<Output = T> + Send + 'a>,
}

impl<T> ReusableBoxFuture<T> {
impl<'a, T> ReusableBoxFuture<'a, T> {
/// Create a new `ReusableBoxFuture<T>` containing the provided future.
pub fn new<F>(future: F) -> Self
where
F: Future<Output = T> + Send + 'static,
F: Future<Output = T> + Send + 'a,
{
let boxed: Box<dyn Future<Output = T> + Send> = Box::new(future);
let boxed: Box<dyn Future<Output = T> + Send + 'a> = Box::new(future);

let boxed = Box::into_raw(boxed);

// SAFETY: Box::into_raw does not return null pointers.
let boxed = unsafe { NonNull::new_unchecked(boxed) };
let boxed = NonNull::from(Box::leak(boxed));

Self { boxed }
}
Expand All @@ -36,7 +33,7 @@ impl<T> ReusableBoxFuture<T> {
/// different from the layout of the currently stored future.
pub fn set<F>(&mut self, future: F)
where
F: Future<Output = T> + Send + 'static,
F: Future<Output = T> + Send + 'a,
{
if let Err(future) = self.try_set(future) {
*self = Self::new(future);
Expand All @@ -50,7 +47,7 @@ impl<T> ReusableBoxFuture<T> {
/// future.
pub fn try_set<F>(&mut self, future: F) -> Result<(), F>
where
F: Future<Output = T> + Send + 'static,
F: Future<Output = T> + Send + 'a,
{
// SAFETY: The pointer is not dangling.
let self_layout = {
Expand Down Expand Up @@ -78,7 +75,7 @@ impl<T> ReusableBoxFuture<T> {
/// same as `self.layout`.
unsafe fn set_same_layout<F>(&mut self, future: F)
where
F: Future<Output = T> + Send + 'static,
F: Future<Output = T> + Send + 'a,
{
// Drop the existing future, catching any panics.
let result = panic::catch_unwind(AssertUnwindSafe(|| {
Expand Down Expand Up @@ -116,7 +113,7 @@ impl<T> ReusableBoxFuture<T> {
}
}

impl<T> Future for ReusableBoxFuture<T> {
impl<T> Future for ReusableBoxFuture<'_, T> {
type Output = T;

/// Poll the future stored inside this box.
Expand All @@ -125,26 +122,26 @@ impl<T> Future for ReusableBoxFuture<T> {
}
}

// The future stored inside ReusableBoxFuture<T> must be Send.
unsafe impl<T> Send for ReusableBoxFuture<T> {}
// The future stored inside ReusableBoxFuture<'_, T> must be Send.
unsafe impl<T> Send for ReusableBoxFuture<'_, T> {}

// The only method called on self.boxed is poll, which takes &mut self, so this
// struct being Sync does not permit any invalid access to the Future, even if
// the future is not Sync.
unsafe impl<T> Sync for ReusableBoxFuture<T> {}
unsafe impl<T> Sync for ReusableBoxFuture<'_, T> {}

// Just like a Pin<Box<dyn Future>> is always Unpin, so is this type.
impl<T> Unpin for ReusableBoxFuture<T> {}
impl<T> Unpin for ReusableBoxFuture<'_, T> {}

impl<T> Drop for ReusableBoxFuture<T> {
impl<T> Drop for ReusableBoxFuture<'_, T> {
fn drop(&mut self) {
unsafe {
drop(Box::from_raw(self.boxed.as_ptr()));
}
}
}

impl<T> fmt::Debug for ReusableBoxFuture<T> {
impl<T> fmt::Debug for ReusableBoxFuture<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReusableBoxFuture").finish()
}
Expand Down

0 comments on commit 9c688ec

Please sign in to comment.