From 9c688ecdc363394d5debfd3f0eb86a52a3d99c5c Mon Sep 17 00:00:00 2001 From: Kestrer Date: Wed, 9 Feb 2022 19:29:21 +0000 Subject: [PATCH] util: add lifetime parameter to ReusableBoxFuture (#3762) Co-authored-by: Toby Lawrence --- tokio-stream/src/wrappers/broadcast.rs | 2 +- tokio-stream/src/wrappers/watch.rs | 2 +- tokio-util/src/sync/mpsc.rs | 2 +- tokio-util/src/sync/poll_semaphore.rs | 2 +- tokio-util/src/sync/reusable_box.rs | 37 ++++++++++++-------------- 5 files changed, 21 insertions(+), 24 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index c8346a68ec7..2064973d733 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -14,7 +14,7 @@ use std::task::{Context, Poll}; /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct BroadcastStream { - inner: ReusableBoxFuture<(Result, Receiver)>, + inner: ReusableBoxFuture<'static, (Result, Receiver)>, } /// An error returned from the inner stream of a [`BroadcastStream`]. diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index bd3a18a5831..c682c9c271d 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -49,7 +49,7 @@ use tokio::sync::watch::error::RecvError; /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct WatchStream { - inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver)>, + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver)>, } async fn make_future( diff --git a/tokio-util/src/sync/mpsc.rs b/tokio-util/src/sync/mpsc.rs index ebaa4ae14e3..34a47c18911 100644 --- a/tokio-util/src/sync/mpsc.rs +++ b/tokio-util/src/sync/mpsc.rs @@ -44,7 +44,7 @@ enum State { pub struct PollSender { sender: Option>, state: State, - acquire: ReusableBoxFuture, PollSendError>>, + acquire: ReusableBoxFuture<'static, Result, PollSendError>>, } // Creates a future for acquiring a permit from the underlying channel. This is used to ensure diff --git a/tokio-util/src/sync/poll_semaphore.rs b/tokio-util/src/sync/poll_semaphore.rs index fcf943e9b9e..d0b1dedc273 100644 --- a/tokio-util/src/sync/poll_semaphore.rs +++ b/tokio-util/src/sync/poll_semaphore.rs @@ -12,7 +12,7 @@ use super::ReusableBoxFuture; /// [`Semaphore`]: tokio::sync::Semaphore pub struct PollSemaphore { semaphore: Arc, - permit_fut: Option>>, + permit_fut: Option>>, } impl PollSemaphore { diff --git a/tokio-util/src/sync/reusable_box.rs b/tokio-util/src/sync/reusable_box.rs index 0fbc49f064a..3204207db79 100644 --- a/tokio-util/src/sync/reusable_box.rs +++ b/tokio-util/src/sync/reusable_box.rs @@ -6,26 +6,23 @@ use std::ptr::{self, NonNull}; use std::task::{Context, Poll}; use std::{fmt, panic}; -/// A reusable `Pin + Send>>`. +/// A reusable `Pin + Send + 'a>>`. /// /// This type lets you replace the future stored in the box without /// reallocating when the size and alignment permits this. -pub struct ReusableBoxFuture { - boxed: NonNull + Send>, +pub struct ReusableBoxFuture<'a, T> { + boxed: NonNull + Send + 'a>, } -impl ReusableBoxFuture { +impl<'a, T> ReusableBoxFuture<'a, T> { /// Create a new `ReusableBoxFuture` containing the provided future. pub fn new(future: F) -> Self where - F: Future + Send + 'static, + F: Future + Send + 'a, { - let boxed: Box + Send> = Box::new(future); + let boxed: Box + Send + 'a> = Box::new(future); - let boxed = Box::into_raw(boxed); - - // SAFETY: Box::into_raw does not return null pointers. - let boxed = unsafe { NonNull::new_unchecked(boxed) }; + let boxed = NonNull::from(Box::leak(boxed)); Self { boxed } } @@ -36,7 +33,7 @@ impl ReusableBoxFuture { /// different from the layout of the currently stored future. pub fn set(&mut self, future: F) where - F: Future + Send + 'static, + F: Future + Send + 'a, { if let Err(future) = self.try_set(future) { *self = Self::new(future); @@ -50,7 +47,7 @@ impl ReusableBoxFuture { /// future. pub fn try_set(&mut self, future: F) -> Result<(), F> where - F: Future + Send + 'static, + F: Future + Send + 'a, { // SAFETY: The pointer is not dangling. let self_layout = { @@ -78,7 +75,7 @@ impl ReusableBoxFuture { /// same as `self.layout`. unsafe fn set_same_layout(&mut self, future: F) where - F: Future + Send + 'static, + F: Future + Send + 'a, { // Drop the existing future, catching any panics. let result = panic::catch_unwind(AssertUnwindSafe(|| { @@ -116,7 +113,7 @@ impl ReusableBoxFuture { } } -impl Future for ReusableBoxFuture { +impl Future for ReusableBoxFuture<'_, T> { type Output = T; /// Poll the future stored inside this box. @@ -125,18 +122,18 @@ impl Future for ReusableBoxFuture { } } -// The future stored inside ReusableBoxFuture must be Send. -unsafe impl Send for ReusableBoxFuture {} +// The future stored inside ReusableBoxFuture<'_, T> must be Send. +unsafe impl Send for ReusableBoxFuture<'_, T> {} // The only method called on self.boxed is poll, which takes &mut self, so this // struct being Sync does not permit any invalid access to the Future, even if // the future is not Sync. -unsafe impl Sync for ReusableBoxFuture {} +unsafe impl Sync for ReusableBoxFuture<'_, T> {} // Just like a Pin> is always Unpin, so is this type. -impl Unpin for ReusableBoxFuture {} +impl Unpin for ReusableBoxFuture<'_, T> {} -impl Drop for ReusableBoxFuture { +impl Drop for ReusableBoxFuture<'_, T> { fn drop(&mut self) { unsafe { drop(Box::from_raw(self.boxed.as_ptr())); @@ -144,7 +141,7 @@ impl Drop for ReusableBoxFuture { } } -impl fmt::Debug for ReusableBoxFuture { +impl fmt::Debug for ReusableBoxFuture<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ReusableBoxFuture").finish() }