diff --git a/README.md b/README.md index fb567dd6a23..6fe5b34ef82 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.39.3", features = ["full"] } +tokio = { version = "1.40.0", features = ["full"] } ``` Then, on your main.rs: diff --git a/spellcheck.dic b/spellcheck.dic index 238e24f7dc3..83b9d684dcb 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -285 +286 & + < @@ -99,6 +99,7 @@ errored EWMA expirations fcntl +fd fd's FIFOs filename diff --git a/tests-integration/src/bin/test-mem.rs b/tests-integration/src/bin/test-mem.rs index 98aa971ac60..c38c41b4ff2 100644 --- a/tests-integration/src/bin/test-mem.rs +++ b/tests-integration/src/bin/test-mem.rs @@ -1,4 +1,4 @@ -use futures::future::poll_fn; +use std::future::poll_fn; fn main() { let rt = tokio::runtime::Builder::new_multi_thread() diff --git a/tests-integration/tests/process_stdio.rs b/tests-integration/tests/process_stdio.rs index 526fd9ca607..df883ef767f 100644 --- a/tests-integration/tests/process_stdio.rs +++ b/tests-integration/tests/process_stdio.rs @@ -10,6 +10,7 @@ use futures::future::{self, FutureExt}; use std::env; use std::io; use std::process::{ExitStatus, Stdio}; +use std::task::ready; fn cat() -> Command { let mut cmd = Command::new(env!("CARGO_BIN_EXE_test-cat")); @@ -205,13 +206,13 @@ async fn vectored_writes() { let mut input = Bytes::from_static(b"hello\n").chain(Bytes::from_static(b"world!\n")); let mut writes_completed = 0; - futures::future::poll_fn(|cx| loop { + std::future::poll_fn(|cx| loop { let mut slices = [IoSlice::new(&[]); 2]; let vectored = input.chunks_vectored(&mut slices); if vectored == 0 { return std::task::Poll::Ready(std::io::Result::Ok(())); } - let n = futures::ready!(Pin::new(&mut stdin).poll_write_vectored(cx, &slices))?; + let n = ready!(Pin::new(&mut stdin).poll_write_vectored(cx, &slices))?; writes_completed += 1; input.advance(n); }) diff --git a/tokio-stream/CHANGELOG.md b/tokio-stream/CHANGELOG.md index 7f4ed6c32e3..8f0d2e30832 100644 --- a/tokio-stream/CHANGELOG.md +++ b/tokio-stream/CHANGELOG.md @@ -1,3 +1,15 @@ +# 0.1.16 (September 5th, 2024) + +This release bumps the MSRV of tokio-stream to 1.70. + +- stream: add `next_many` and `poll_next_many` to `StreamMap` ([#6409]) +- stream: make stream adapters public ([#6658]) +- readme: add readme for tokio-stream ([#6456]) + +[#6409]: https://github.com/tokio-rs/tokio/pull/6409 +[#6658]: https://github.com/tokio-rs/tokio/pull/6658 +[#6456]: https://github.com/tokio-rs/tokio/pull/6456 + # 0.1.15 (March 14th, 2024) This release bumps the MSRV of tokio-stream to 1.63. diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 87b2bf6db0a..3c6868b688c 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -4,7 +4,7 @@ name = "tokio-stream" # - Remove path dependencies # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.15" +version = "0.1.16" edition = "2021" rust-version = "1.70" authors = ["Tokio Contributors "] diff --git a/tokio-stream/src/lib.rs b/tokio-stream/src/lib.rs index 21f3fc92943..f2b463bcb9a 100644 --- a/tokio-stream/src/lib.rs +++ b/tokio-stream/src/lib.rs @@ -74,9 +74,6 @@ #[macro_use] mod macros; -mod poll_fn; -pub(crate) use poll_fn::poll_fn; - pub mod wrappers; mod stream_ext; diff --git a/tokio-stream/src/macros.rs b/tokio-stream/src/macros.rs index 1e3b61bac72..5aa797b4f41 100644 --- a/tokio-stream/src/macros.rs +++ b/tokio-stream/src/macros.rs @@ -57,12 +57,3 @@ macro_rules! cfg_signal { )* } } - -macro_rules! ready { - ($e:expr $(,)?) => { - match $e { - std::task::Poll::Ready(t) => t, - std::task::Poll::Pending => return std::task::Poll::Pending, - } - }; -} diff --git a/tokio-stream/src/poll_fn.rs b/tokio-stream/src/poll_fn.rs deleted file mode 100644 index 744f22f02b4..00000000000 --- a/tokio-stream/src/poll_fn.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -pub(crate) struct PollFn { - f: F, -} - -pub(crate) fn poll_fn(f: F) -> PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - PollFn { f } -} - -impl Future for PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Safety: We never construct a `Pin<&mut F>` anywhere, so accessing `f` - // mutably in an unpinned way is sound. - // - // This use of unsafe cannot be replaced with the pin-project macro - // because: - // * If we put `#[pin]` on the field, then it gives us a `Pin<&mut F>`, - // which we can't use to call the closure. - // * If we don't put `#[pin]` on the field, then it makes `PollFn` be - // unconditionally `Unpin`, which we also don't want. - let me = unsafe { Pin::into_inner_unchecked(self) }; - (me.f)(cx) - } -} diff --git a/tokio-stream/src/stream_ext/all.rs b/tokio-stream/src/stream_ext/all.rs index b4dbc1e97c3..6ce00a434c1 100644 --- a/tokio-stream/src/stream_ext/all.rs +++ b/tokio-stream/src/stream_ext/all.rs @@ -3,7 +3,7 @@ use crate::Stream; use core::future::Future; use core::marker::PhantomPinned; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; pin_project! { @@ -42,7 +42,7 @@ where // Take a maximum of 32 items from the stream before yielding. for _ in 0..32 { - match futures_core::ready!(stream.as_mut().poll_next(cx)) { + match ready!(stream.as_mut().poll_next(cx)) { Some(v) => { if !(me.f)(v) { return Poll::Ready(false); diff --git a/tokio-stream/src/stream_ext/any.rs b/tokio-stream/src/stream_ext/any.rs index 31394f249b8..52deec77415 100644 --- a/tokio-stream/src/stream_ext/any.rs +++ b/tokio-stream/src/stream_ext/any.rs @@ -3,7 +3,7 @@ use crate::Stream; use core::future::Future; use core::marker::PhantomPinned; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; pin_project! { @@ -42,7 +42,7 @@ where // Take a maximum of 32 items from the stream before yielding. for _ in 0..32 { - match futures_core::ready!(stream.as_mut().poll_next(cx)) { + match ready!(stream.as_mut().poll_next(cx)) { Some(v) => { if (me.f)(v) { return Poll::Ready(true); diff --git a/tokio-stream/src/stream_ext/chain.rs b/tokio-stream/src/stream_ext/chain.rs index bd64f33ce4e..f3d360d5397 100644 --- a/tokio-stream/src/stream_ext/chain.rs +++ b/tokio-stream/src/stream_ext/chain.rs @@ -2,7 +2,7 @@ use crate::stream_ext::Fuse; use crate::Stream; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; pin_project! { diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index 48acd9328bc..89b8d49a64d 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -4,7 +4,7 @@ use tokio::time::{sleep, Sleep}; use core::future::Future; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; use std::time::Duration; diff --git a/tokio-stream/src/stream_ext/collect.rs b/tokio-stream/src/stream_ext/collect.rs index 60b94d3fd0e..eb9e2197f14 100644 --- a/tokio-stream/src/stream_ext/collect.rs +++ b/tokio-stream/src/stream_ext/collect.rs @@ -4,7 +4,7 @@ use core::future::Future; use core::marker::PhantomPinned; use core::mem; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; // Do not export this struct until `FromStream` can be unsealed. diff --git a/tokio-stream/src/stream_ext/filter.rs b/tokio-stream/src/stream_ext/filter.rs index f3dd8716b48..1d5defb195b 100644 --- a/tokio-stream/src/stream_ext/filter.rs +++ b/tokio-stream/src/stream_ext/filter.rs @@ -2,7 +2,7 @@ use crate::Stream; use core::fmt; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; pin_project! { diff --git a/tokio-stream/src/stream_ext/filter_map.rs b/tokio-stream/src/stream_ext/filter_map.rs index fe604a6f4b5..6658d71f05e 100644 --- a/tokio-stream/src/stream_ext/filter_map.rs +++ b/tokio-stream/src/stream_ext/filter_map.rs @@ -2,7 +2,7 @@ use crate::Stream; use core::fmt; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; pin_project! { diff --git a/tokio-stream/src/stream_ext/fold.rs b/tokio-stream/src/stream_ext/fold.rs index e2e97d8f375..39bace6ec6b 100644 --- a/tokio-stream/src/stream_ext/fold.rs +++ b/tokio-stream/src/stream_ext/fold.rs @@ -3,7 +3,7 @@ use crate::Stream; use core::future::Future; use core::marker::PhantomPinned; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; pin_project! { diff --git a/tokio-stream/src/stream_ext/fuse.rs b/tokio-stream/src/stream_ext/fuse.rs index 2500641d95d..9d117cf0895 100644 --- a/tokio-stream/src/stream_ext/fuse.rs +++ b/tokio-stream/src/stream_ext/fuse.rs @@ -2,7 +2,7 @@ use crate::Stream; use pin_project_lite::pin_project; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// Stream returned by [`fuse()`][super::StreamExt::fuse]. diff --git a/tokio-stream/src/stream_ext/skip.rs b/tokio-stream/src/stream_ext/skip.rs index 80a0a0aff0d..dd310b8562d 100644 --- a/tokio-stream/src/stream_ext/skip.rs +++ b/tokio-stream/src/stream_ext/skip.rs @@ -2,7 +2,7 @@ use crate::Stream; use core::fmt; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; pin_project! { diff --git a/tokio-stream/src/stream_ext/skip_while.rs b/tokio-stream/src/stream_ext/skip_while.rs index 985a92666e0..d1accd52956 100644 --- a/tokio-stream/src/stream_ext/skip_while.rs +++ b/tokio-stream/src/stream_ext/skip_while.rs @@ -2,7 +2,7 @@ use crate::Stream; use core::fmt; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; pin_project! { diff --git a/tokio-stream/src/stream_ext/throttle.rs b/tokio-stream/src/stream_ext/throttle.rs index 50001392ee7..4e71debf3ce 100644 --- a/tokio-stream/src/stream_ext/throttle.rs +++ b/tokio-stream/src/stream_ext/throttle.rs @@ -5,7 +5,7 @@ use tokio::time::{Duration, Instant, Sleep}; use std::future::Future; use std::pin::Pin; -use std::task::{self, Poll}; +use std::task::{self, ready, Poll}; use pin_project_lite::pin_project; diff --git a/tokio-stream/src/stream_ext/timeout.rs b/tokio-stream/src/stream_ext/timeout.rs index 17d1349022e..d863af1dbdb 100644 --- a/tokio-stream/src/stream_ext/timeout.rs +++ b/tokio-stream/src/stream_ext/timeout.rs @@ -4,7 +4,7 @@ use tokio::time::{Instant, Sleep}; use core::future::Future; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; use std::fmt; use std::time::Duration; diff --git a/tokio-stream/src/stream_ext/timeout_repeating.rs b/tokio-stream/src/stream_ext/timeout_repeating.rs index 253d2fd677e..4822261d870 100644 --- a/tokio-stream/src/stream_ext/timeout_repeating.rs +++ b/tokio-stream/src/stream_ext/timeout_repeating.rs @@ -3,7 +3,7 @@ use crate::{Elapsed, Stream}; use tokio::time::Interval; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{ready, Context, Poll}; use pin_project_lite::pin_project; pin_project! { diff --git a/tokio-stream/src/stream_map.rs b/tokio-stream/src/stream_map.rs index cefedcd7e19..f276f99210e 100644 --- a/tokio-stream/src/stream_map.rs +++ b/tokio-stream/src/stream_map.rs @@ -1,9 +1,10 @@ -use crate::{poll_fn, Stream}; +use crate::Stream; use std::borrow::Borrow; +use std::future::poll_fn; use std::hash::Hash; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// Combine many streams into one, indexing each source stream with a unique /// key. diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 711066466a0..21677971d5f 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -6,7 +6,7 @@ use futures_core::Stream; use tokio_util::sync::ReusableBoxFuture; use std::fmt; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`]. /// diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index a1ea646035a..a1e4fbfa03a 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -5,7 +5,7 @@ use futures_core::Stream; use tokio_util::sync::ReusableBoxFuture; use std::fmt; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index c31d5be5de2..6f597ba2c9e 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -23,13 +23,13 @@ use tokio::sync::mpsc; use tokio::time::{self, Duration, Instant, Sleep}; use tokio_stream::wrappers::UnboundedReceiverStream; -use futures_core::{ready, Stream}; +use futures_core::Stream; use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::task::{self, Poll, Waker}; +use std::task::{self, ready, Poll, Waker}; use std::{cmp, io}; /// An I/O object that follows a predefined script. diff --git a/tokio-test/src/stream_mock.rs b/tokio-test/src/stream_mock.rs index a3f3c776502..1a8cf04df1b 100644 --- a/tokio-test/src/stream_mock.rs +++ b/tokio-test/src/stream_mock.rs @@ -36,10 +36,10 @@ use std::collections::VecDeque; use std::pin::Pin; -use std::task::Poll; +use std::task::{ready, Poll}; use std::time::Duration; -use futures_core::{ready, Stream}; +use futures_core::Stream; use std::future::Future; use tokio::time::{sleep_until, Instant, Sleep}; diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index 729c0352df6..db589136bab 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,3 +1,27 @@ +# 0.7.12 (September 5th, 2024) + +This release bumps the MSRV to 1.70. ([#6645]) + +### Added +- sync: Add `run_until_cancelled` to `tokio_util::sync::CancellationToken` ([#6618]) +- task: add `AbortOnDropHandle` type ([#6786]) + +### Changed +- deps: no default features for hashbrown ([#6541]) +- time: wake `DelayQueue` when removing last item ([#6752]) +- deps: enable the full feature when compiled for the playground ([#6818]) + +### Documented +- task: fix typo in `TaskTracker` docs ([#6792]) + +[#6645]: https://github.com/tokio-rs/tokio/pull/6645 +[#6541]: https://github.com/tokio-rs/tokio/pull/6541 +[#6618]: https://github.com/tokio-rs/tokio/pull/6618 +[#6752]: https://github.com/tokio-rs/tokio/pull/6752 +[#6786]: https://github.com/tokio-rs/tokio/pull/6786 +[#6792]: https://github.com/tokio-rs/tokio/pull/6792 +[#6818]: https://github.com/tokio-rs/tokio/pull/6818 + # 0.7.11 (May 4th, 2024) This release updates the MSRV to 1.63. ([#6126]) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 23a577a37fe..a73eec8799a 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -4,7 +4,7 @@ name = "tokio-util" # - Remove path dependencies # - Update CHANGELOG.md. # - Create "tokio-util-0.7.x" git tag. -version = "0.7.11" +version = "0.7.12" edition = "2021" rust-version = "1.70" authors = ["Tokio Contributors "] @@ -65,3 +65,6 @@ rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"] # it's necessary to _also_ pass `--cfg tokio_unstable` to rustc, or else # dependencies will not be enabled, and the docs build will fail. rustc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"] + +[package.metadata.playground] +features = ["full"] diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index e7cb691aed5..5647023ff9a 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -5,13 +5,12 @@ use futures_core::Stream; use tokio::io::{AsyncRead, AsyncWrite}; use bytes::BytesMut; -use futures_core::ready; use futures_sink::Sink; use pin_project_lite::pin_project; use std::borrow::{Borrow, BorrowMut}; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { #[derive(Debug)] diff --git a/tokio-util/src/compat.rs b/tokio-util/src/compat.rs index 423bd956da0..b71e2b1b0f2 100644 --- a/tokio-util/src/compat.rs +++ b/tokio-util/src/compat.rs @@ -1,10 +1,9 @@ //! Compatibility between the `tokio::io` and `futures-io` versions of the //! `AsyncRead` and `AsyncWrite` traits. -use futures_core::ready; use pin_project_lite::pin_project; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// A compatibility layer that allows conversion between the diff --git a/tokio-util/src/io/inspect.rs b/tokio-util/src/io/inspect.rs index 7604d9a3de1..48fda6170e7 100644 --- a/tokio-util/src/io/inspect.rs +++ b/tokio-util/src/io/inspect.rs @@ -1,8 +1,7 @@ -use futures_core::ready; use pin_project_lite::pin_project; use std::io::{IoSlice, Result}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; diff --git a/tokio-util/src/io/sink_writer.rs b/tokio-util/src/io/sink_writer.rs index e0789528751..134acbe807d 100644 --- a/tokio-util/src/io/sink_writer.rs +++ b/tokio-util/src/io/sink_writer.rs @@ -1,11 +1,10 @@ -use futures_core::ready; use futures_sink::Sink; use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; pin_project! { diff --git a/tokio-util/src/sync/poll_semaphore.rs b/tokio-util/src/sync/poll_semaphore.rs index 4960a7c8bd0..ae23d8b73d0 100644 --- a/tokio-util/src/sync/poll_semaphore.rs +++ b/tokio-util/src/sync/poll_semaphore.rs @@ -1,8 +1,8 @@ -use futures_core::{ready, Stream}; +use futures_core::Stream; use std::fmt; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, TryAcquireError}; use super::ReusableBoxFuture; diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index 55dd311a03e..9dadc3f00ae 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -6,7 +6,6 @@ use crate::time::wheel::{self, Wheel}; -use futures_core::ready; use tokio::time::{sleep_until, Duration, Instant, Sleep}; use core::ops::{Index, IndexMut}; @@ -19,7 +18,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; -use std::task::{self, Poll, Waker}; +use std::task::{self, ready, Poll, Waker}; /// A queue of delayed elements. /// @@ -74,9 +73,8 @@ use std::task::{self, Poll, Waker}; /// ```rust,no_run /// use tokio_util::time::{DelayQueue, delay_queue}; /// -/// use futures::ready; /// use std::collections::HashMap; -/// use std::task::{Context, Poll}; +/// use std::task::{ready, Context, Poll}; /// use std::time::Duration; /// # type CacheKey = String; /// # type Value = String; diff --git a/tokio-util/src/udp/frame.rs b/tokio-util/src/udp/frame.rs index d094c04c6da..38e790ae980 100644 --- a/tokio-util/src/udp/frame.rs +++ b/tokio-util/src/udp/frame.rs @@ -4,10 +4,9 @@ use futures_core::Stream; use tokio::{io::ReadBuf, net::UdpSocket}; use bytes::{BufMut, BytesMut}; -use futures_core::ready; use futures_sink::Sink; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::{ borrow::Borrow, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, diff --git a/tokio-util/src/util/poll_buf.rs b/tokio-util/src/util/poll_buf.rs index 5a72bc4c822..04985d4922f 100644 --- a/tokio-util/src/util/poll_buf.rs +++ b/tokio-util/src/util/poll_buf.rs @@ -1,11 +1,10 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use bytes::{Buf, BufMut}; -use futures_core::ready; use std::io::{self, IoSlice}; use std::mem::MaybeUninit; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// Try to read data from an `AsyncRead` into an implementer of the [`BufMut`] trait. /// @@ -18,7 +17,7 @@ use std::task::{Context, Poll}; /// use tokio_stream as stream; /// use tokio::io::Result; /// use tokio_util::io::{StreamReader, poll_read_buf}; -/// use futures::future::poll_fn; +/// use std::future::poll_fn; /// use std::pin::Pin; /// # #[tokio::main] /// # async fn main() -> std::io::Result<()> { @@ -96,9 +95,9 @@ pub fn poll_read_buf( /// use tokio::fs::File; /// /// use bytes::Buf; +/// use std::future::poll_fn; /// use std::io::Cursor; /// use std::pin::Pin; -/// use futures::future::poll_fn; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { diff --git a/tokio-util/tests/io_inspect.rs b/tokio-util/tests/io_inspect.rs index e6319afcf1b..ee8b3f0c604 100644 --- a/tokio-util/tests/io_inspect.rs +++ b/tokio-util/tests/io_inspect.rs @@ -1,5 +1,5 @@ -use futures::future::poll_fn; use std::{ + future::poll_fn, io::IoSlice, pin::Pin, task::{Context, Poll}, diff --git a/tokio-util/tests/mpsc.rs b/tokio-util/tests/mpsc.rs index cf4dcd55f63..d53c81c2a9a 100644 --- a/tokio-util/tests/mpsc.rs +++ b/tokio-util/tests/mpsc.rs @@ -1,5 +1,5 @@ -use futures::future::poll_fn; use futures::sink::SinkExt; +use std::future::poll_fn; use tokio::sync::mpsc::channel; use tokio_test::task::spawn; use tokio_test::{ diff --git a/tokio-util/tests/poll_semaphore.rs b/tokio-util/tests/poll_semaphore.rs index 28beca19fa3..fe947f9164a 100644 --- a/tokio-util/tests/poll_semaphore.rs +++ b/tokio-util/tests/poll_semaphore.rs @@ -9,7 +9,7 @@ type SemRet = Option; fn semaphore_poll( sem: &mut PollSemaphore, ) -> tokio_test::task::Spawn + '_> { - let fut = futures::future::poll_fn(move |cx| sem.poll_acquire(cx)); + let fut = std::future::poll_fn(move |cx| sem.poll_acquire(cx)); tokio_test::task::spawn(fut) } @@ -17,7 +17,7 @@ fn semaphore_poll_many( sem: &mut PollSemaphore, permits: u32, ) -> tokio_test::task::Spawn + '_> { - let fut = futures::future::poll_fn(move |cx| sem.poll_acquire_many(cx, permits)); + let fut = std::future::poll_fn(move |cx| sem.poll_acquire_many(cx, permits)); tokio_test::task::spawn(fut) } diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index 4082b1b9392..bddb5c421b6 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -1,3 +1,52 @@ +# 1.40.0 (August 30th, 2024) + +### Added + +- io: add `util::SimplexStream` ([#6589]) +- process: stabilize `Command::process_group` ([#6731]) +- sync: add `{TrySendError,SendTimeoutError}::into_inner` ([#6755]) +- task: add `JoinSet::join_all` ([#6784]) + +### Added (unstable) + +- runtime: add `Builder::{on_task_spawn, on_task_terminate}` ([#6742]) + +### Changed + +- io: use vectored io for `write_all_buf` when possible ([#6724]) +- runtime: prevent niche-optimization to avoid triggering miri ([#6744]) +- sync: mark mpsc types as `UnwindSafe` ([#6783]) +- sync,time: make `Sleep` and `BatchSemaphore` instrumentation explicit roots ([#6727]) +- task: use `NonZeroU64` for `task::Id` ([#6733]) +- task: include panic message when printing `JoinError` ([#6753]) +- task: add `#[must_use]` to `JoinHandle::abort_handle` ([#6762]) +- time: eliminate timer wheel allocations ([#6779]) + +### Documented + +- docs: clarify that `[build]` section doesn't go in Cargo.toml ([#6728]) +- io: clarify zero remaining capacity case ([#6790]) +- macros: improve documentation for `select!` ([#6774]) +- sync: document mpsc channel allocation behavior ([#6773]) + +[#6589]: https://github.com/tokio-rs/tokio/pull/6589 +[#6724]: https://github.com/tokio-rs/tokio/pull/6724 +[#6727]: https://github.com/tokio-rs/tokio/pull/6727 +[#6728]: https://github.com/tokio-rs/tokio/pull/6728 +[#6731]: https://github.com/tokio-rs/tokio/pull/6731 +[#6733]: https://github.com/tokio-rs/tokio/pull/6733 +[#6742]: https://github.com/tokio-rs/tokio/pull/6742 +[#6744]: https://github.com/tokio-rs/tokio/pull/6744 +[#6753]: https://github.com/tokio-rs/tokio/pull/6753 +[#6755]: https://github.com/tokio-rs/tokio/pull/6755 +[#6762]: https://github.com/tokio-rs/tokio/pull/6762 +[#6773]: https://github.com/tokio-rs/tokio/pull/6773 +[#6774]: https://github.com/tokio-rs/tokio/pull/6774 +[#6779]: https://github.com/tokio-rs/tokio/pull/6779 +[#6783]: https://github.com/tokio-rs/tokio/pull/6783 +[#6784]: https://github.com/tokio-rs/tokio/pull/6784 +[#6790]: https://github.com/tokio-rs/tokio/pull/6790 + # 1.39.3 (August 17th, 2024) This release fixes a regression where the unix socket api stopped accepting diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 37dd5e7b7b8..4c5f7d46acb 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -6,7 +6,7 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.x.y" git tag. -version = "1.39.3" +version = "1.40.0" edition = "2021" rust-version = "1.70" authors = ["Tokio Contributors "] diff --git a/tokio/README.md b/tokio/README.md index fb567dd6a23..6fe5b34ef82 100644 --- a/tokio/README.md +++ b/tokio/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.39.3", features = ["full"] } +tokio = { version = "1.40.0", features = ["full"] } ``` Then, on your main.rs: diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 27d91debbe8..63dd8af3e98 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -14,8 +14,7 @@ use std::io::{self, Seek, SeekFrom}; use std::path::Path; use std::pin::Pin; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; +use std::task::{ready, Context, Poll}; #[cfg(test)] use super::mocks::JoinHandle; @@ -937,7 +936,7 @@ cfg_windows! { impl Inner { async fn complete_inflight(&mut self) { - use crate::future::poll_fn; + use std::future::poll_fn; poll_fn(|cx| self.poll_complete_inflight(cx)).await; } diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 20d74967cd4..2e7288adb34 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -8,8 +8,7 @@ use std::io; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; +use std::task::{ready, Context, Poll}; #[cfg(test)] use super::mocks::spawn_blocking; @@ -77,7 +76,7 @@ impl ReadDir { /// /// This method is cancellation safe. pub async fn next_entry(&mut self) -> io::Result> { - use crate::future::poll_fn; + use std::future::poll_fn; poll_fn(|cx| self.poll_next_entry(cx)).await } diff --git a/tokio/src/future/maybe_done.rs b/tokio/src/future/maybe_done.rs index 9ae795f7a7f..ed2136526fc 100644 --- a/tokio/src/future/maybe_done.rs +++ b/tokio/src/future/maybe_done.rs @@ -3,7 +3,7 @@ use pin_project_lite::pin_project; use std::future::{Future, IntoFuture}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// A future that may have completed. diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs index 12b6bbc4945..3cf23642dc0 100644 --- a/tokio/src/future/mod.rs +++ b/tokio/src/future/mod.rs @@ -5,10 +5,6 @@ #[cfg(any(feature = "macros", feature = "process"))] pub(crate) mod maybe_done; -mod poll_fn; -#[allow(unused_imports)] -pub use poll_fn::poll_fn; - cfg_process! { mod try_join; pub(crate) use try_join::try_join3; diff --git a/tokio/src/future/poll_fn.rs b/tokio/src/future/poll_fn.rs deleted file mode 100644 index 074d9438eb4..00000000000 --- a/tokio/src/future/poll_fn.rs +++ /dev/null @@ -1,60 +0,0 @@ -#![allow(dead_code)] - -//! Definition of the `PollFn` adapter combinator. - -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -// This struct is intentionally `!Unpin` when `F` is `!Unpin`. This is to -// mitigate the issue where rust puts noalias on mutable references to the -// `PollFn` type if it is `Unpin`. If the closure has ownership of a future, -// then this "leaks" and the future is affected by noalias too, which we don't -// want. -// -// See this thread for more information: -// -// -// The fact that `PollFn` is not `Unpin` when it shouldn't be is tested in -// `tests/async_send_sync.rs`. - -/// Future for the [`poll_fn`] function. -pub struct PollFn { - f: F, -} - -/// Creates a new future wrapping around a function returning [`Poll`]. -pub fn poll_fn(f: F) -> PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - PollFn { f } -} - -impl fmt::Debug for PollFn { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PollFn").finish() - } -} - -impl Future for PollFn -where - F: FnMut(&mut Context<'_>) -> Poll, -{ - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Safety: We never construct a `Pin<&mut F>` anywhere, so accessing `f` - // mutably in an unpinned way is sound. - // - // This use of unsafe cannot be replaced with the pin-project macro - // because: - // * If we put `#[pin]` on the field, then it gives us a `Pin<&mut F>`, - // which we can't use to call the closure. - // * If we don't put `#[pin]` on the field, then it makes `PollFn` be - // unconditionally `Unpin`, which we also don't want. - let me = unsafe { Pin::into_inner_unchecked(self) }; - (me.f)(cx) - } -} diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 96d0518a6e5..9a511441d66 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -7,7 +7,7 @@ use std::error::Error; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; -use std::{task::Context, task::Poll}; +use std::task::{ready, Context, Poll}; /// Associates an IO object backed by a Unix file descriptor with the tokio /// reactor, allowing for readiness to be polled. The file descriptor must be of @@ -21,11 +21,13 @@ use std::{task::Context, task::Poll}; /// the [`AsyncFd`] is dropped. /// /// The [`AsyncFd`] takes ownership of an arbitrary object to represent the IO -/// object. It is intended that this object will handle closing the file +/// object. It is intended that the inner object will handle closing the file /// descriptor when it is dropped, avoiding resource leaks and ensuring that the /// [`AsyncFd`] can clean up the registration before closing the file descriptor. /// The [`AsyncFd::into_inner`] function can be used to extract the inner object -/// to retake control from the tokio IO reactor. +/// to retake control from the tokio IO reactor. The [`OwnedFd`] type is often +/// used as the inner object, as it is the simplest type that closes the fd on +/// drop. /// /// The inner object is required to implement [`AsRawFd`]. This file descriptor /// must not change while [`AsyncFd`] owns the inner object, i.e. the @@ -71,11 +73,10 @@ use std::{task::Context, task::Poll}; /// and using the IO traits [`AsyncRead`] and [`AsyncWrite`]. /// /// ```no_run -/// use futures::ready; /// use std::io::{self, Read, Write}; /// use std::net::TcpStream; /// use std::pin::Pin; -/// use std::task::{Context, Poll}; +/// use std::task::{ready, Context, Poll}; /// use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; /// use tokio::io::unix::AsyncFd; /// @@ -176,6 +177,7 @@ use std::{task::Context, task::Poll}; /// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream /// [`AsyncRead`]: trait@crate::io::AsyncRead /// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`OwnedFd`]: struct@std::os::fd::OwnedFd pub struct AsyncFd { registration: Registration, // The inner value is always present. the Option is required for `drop` and `into_inner`. diff --git a/tokio/src/io/blocking.rs b/tokio/src/io/blocking.rs index 52aa798c4fe..f189136b52e 100644 --- a/tokio/src/io/blocking.rs +++ b/tokio/src/io/blocking.rs @@ -6,7 +6,7 @@ use std::future::Future; use std::io; use std::io::prelude::*; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// `T` should not implement _both_ Read and Write. #[derive(Debug)] diff --git a/tokio/src/io/bsd/poll_aio.rs b/tokio/src/io/bsd/poll_aio.rs index 708ca7484e7..086ba6d93bc 100644 --- a/tokio/src/io/bsd/poll_aio.rs +++ b/tokio/src/io/bsd/poll_aio.rs @@ -11,7 +11,7 @@ use std::io; use std::ops::{Deref, DerefMut}; use std::os::unix::io::AsRawFd; use std::os::unix::prelude::RawFd; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// Like [`mio::event::Source`], but for POSIX AIO only. /// diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 73cb8e5809e..0e34fbe3c22 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -7,6 +7,7 @@ use std::fmt; use std::io; use std::ops::Deref; use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::task::ready; cfg_io_driver! { /// Associates an I/O resource that implements the [`std::io::Read`] and/or diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs index e64205d9cf6..f28faa85455 100644 --- a/tokio/src/io/seek.rs +++ b/tokio/src/io/seek.rs @@ -5,7 +5,7 @@ use std::future::Future; use std::io::{self, SeekFrom}; use std::marker::PhantomPinned; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method. diff --git a/tokio/src/io/util/buf_reader.rs b/tokio/src/io/util/buf_reader.rs index d9307202c13..c89fe43dc1a 100644 --- a/tokio/src/io/util/buf_reader.rs +++ b/tokio/src/io/util/buf_reader.rs @@ -4,7 +4,7 @@ use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; use std::io::{self, IoSlice, SeekFrom}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::{cmp, fmt, mem}; pin_project! { diff --git a/tokio/src/io/util/buf_writer.rs b/tokio/src/io/util/buf_writer.rs index 2971a8e057a..ea9076ea693 100644 --- a/tokio/src/io/util/buf_writer.rs +++ b/tokio/src/io/util/buf_writer.rs @@ -5,7 +5,7 @@ use pin_project_lite::pin_project; use std::fmt; use std::io::{self, IoSlice, SeekFrom, Write}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// Wraps a writer and buffers its output. diff --git a/tokio/src/io/util/chain.rs b/tokio/src/io/util/chain.rs index f96f42f4a99..9f1912771b8 100644 --- a/tokio/src/io/util/chain.rs +++ b/tokio/src/io/util/chain.rs @@ -4,7 +4,7 @@ use pin_project_lite::pin_project; use std::fmt; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// Stream for the [`chain`](super::AsyncReadExt::chain) method. diff --git a/tokio/src/io/util/copy.rs b/tokio/src/io/util/copy.rs index 4fcec907578..87592608bed 100644 --- a/tokio/src/io/util/copy.rs +++ b/tokio/src/io/util/copy.rs @@ -3,7 +3,7 @@ use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use std::future::Future; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; #[derive(Debug)] pub(super) struct CopyBuffer { diff --git a/tokio/src/io/util/copy_bidirectional.rs b/tokio/src/io/util/copy_bidirectional.rs index ce90141e5a5..ea40a3b4c95 100644 --- a/tokio/src/io/util/copy_bidirectional.rs +++ b/tokio/src/io/util/copy_bidirectional.rs @@ -1,11 +1,11 @@ use super::copy::CopyBuffer; -use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite}; +use std::future::poll_fn; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; enum TransferState { Running(CopyBuffer), diff --git a/tokio/src/io/util/copy_buf.rs b/tokio/src/io/util/copy_buf.rs index c23fc9a2b4c..db622e94aae 100644 --- a/tokio/src/io/util/copy_buf.rs +++ b/tokio/src/io/util/copy_buf.rs @@ -2,7 +2,7 @@ use crate::io::{AsyncBufRead, AsyncWrite}; use std::future::Future; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_io_util! { /// A future that asynchronously copies the entire contents of a reader into a diff --git a/tokio/src/io/util/empty.rs b/tokio/src/io/util/empty.rs index cc86e8b09a1..8821f4f2e35 100644 --- a/tokio/src/io/util/empty.rs +++ b/tokio/src/io/util/empty.rs @@ -4,7 +4,7 @@ use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; use std::fmt; use std::io::{self, SeekFrom}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_io_util! { /// `Empty` ignores any data written via [`AsyncWrite`], and will always be empty diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs index 717f633f950..9472557b498 100644 --- a/tokio/src/io/util/lines.rs +++ b/tokio/src/io/util/lines.rs @@ -5,7 +5,7 @@ use pin_project_lite::pin_project; use std::io; use std::mem; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// Reads lines from an [`AsyncBufRead`]. @@ -67,7 +67,7 @@ where /// # } /// ``` pub async fn next_line(&mut self) -> io::Result> { - use crate::future::poll_fn; + use std::future::poll_fn; poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await } diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 15916cc0052..bb5ab1e39e2 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -7,7 +7,7 @@ use bytes::{Buf, BytesMut}; use std::{ pin::Pin, sync::Arc, - task::{self, Poll, Waker}, + task::{self, ready, Poll, Waker}, }; /// A bidirectional pipe to read and write bytes in memory. diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index b2f8618c7ee..44556e867ca 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -88,7 +88,7 @@ cfg_io_util! { cfg_coop! { fn poll_proceed_and_make_progress(cx: &mut std::task::Context<'_>) -> std::task::Poll<()> { - let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let coop = std::task::ready!(crate::runtime::coop::poll_proceed(cx)); coop.made_progress(); std::task::Poll::Ready(()) } diff --git a/tokio/src/io/util/read.rs b/tokio/src/io/util/read.rs index a1f9c8a0505..26e59c61fa2 100644 --- a/tokio/src/io/util/read.rs +++ b/tokio/src/io/util/read.rs @@ -6,7 +6,7 @@ use std::io; use std::marker::PhantomPinned; use std::marker::Unpin; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// Tries to read some bytes directly into the given `buf` in asynchronous /// manner, returning a future type. diff --git a/tokio/src/io/util/read_buf.rs b/tokio/src/io/util/read_buf.rs index 750497c9e0c..c211e042243 100644 --- a/tokio/src/io/util/read_buf.rs +++ b/tokio/src/io/util/read_buf.rs @@ -6,7 +6,7 @@ use std::future::Future; use std::io; use std::marker::PhantomPinned; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pub(crate) fn read_buf<'a, R, B>(reader: &'a mut R, buf: &'a mut B) -> ReadBuf<'a, R, B> where diff --git a/tokio/src/io/util/read_exact.rs b/tokio/src/io/util/read_exact.rs index 217315dcb91..e9e5afbf0b9 100644 --- a/tokio/src/io/util/read_exact.rs +++ b/tokio/src/io/util/read_exact.rs @@ -6,7 +6,7 @@ use std::io; use std::marker::PhantomPinned; use std::marker::Unpin; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// A future which can be used to easily read exactly enough bytes to fill /// a buffer. diff --git a/tokio/src/io/util/read_line.rs b/tokio/src/io/util/read_line.rs index eacc7d59602..fdcbc417e21 100644 --- a/tokio/src/io/util/read_line.rs +++ b/tokio/src/io/util/read_line.rs @@ -8,7 +8,7 @@ use std::marker::PhantomPinned; use std::mem; use std::pin::Pin; use std::string::FromUtf8Error; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// Future for the [`read_line`](crate::io::AsyncBufReadExt::read_line) method. diff --git a/tokio/src/io/util/read_to_end.rs b/tokio/src/io/util/read_to_end.rs index b56a940eb50..05665026d29 100644 --- a/tokio/src/io/util/read_to_end.rs +++ b/tokio/src/io/util/read_to_end.rs @@ -7,7 +7,7 @@ use std::io; use std::marker::PhantomPinned; use std::mem::{self, MaybeUninit}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { #[derive(Debug)] diff --git a/tokio/src/io/util/read_to_string.rs b/tokio/src/io/util/read_to_string.rs index b3d82a26bae..18196c52502 100644 --- a/tokio/src/io/util/read_to_string.rs +++ b/tokio/src/io/util/read_to_string.rs @@ -7,7 +7,7 @@ use pin_project_lite::pin_project; use std::future::Future; use std::marker::PhantomPinned; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::{io, mem}; pin_project! { diff --git a/tokio/src/io/util/read_until.rs b/tokio/src/io/util/read_until.rs index fb6fb22d9f1..fbe2609a64c 100644 --- a/tokio/src/io/util/read_until.rs +++ b/tokio/src/io/util/read_until.rs @@ -7,7 +7,7 @@ use std::io; use std::marker::PhantomPinned; use std::mem; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method. diff --git a/tokio/src/io/util/repeat.rs b/tokio/src/io/util/repeat.rs index 4a3ac78e49e..ecdbc7d062c 100644 --- a/tokio/src/io/util/repeat.rs +++ b/tokio/src/io/util/repeat.rs @@ -3,7 +3,7 @@ use crate::io::{AsyncRead, ReadBuf}; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_io_util! { /// An async reader which yields one byte over and over and over and over and diff --git a/tokio/src/io/util/sink.rs b/tokio/src/io/util/sink.rs index 1c0102d4b2f..01a4b111cc0 100644 --- a/tokio/src/io/util/sink.rs +++ b/tokio/src/io/util/sink.rs @@ -4,7 +4,7 @@ use crate::io::AsyncWrite; use std::fmt; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_io_util! { /// An async writer which will move data into the void. diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs index 7489c242811..1afd6bbefb9 100644 --- a/tokio/src/io/util/split.rs +++ b/tokio/src/io/util/split.rs @@ -5,7 +5,7 @@ use pin_project_lite::pin_project; use std::io; use std::mem; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// Splitter for the [`split`](crate::io::AsyncBufReadExt::split) method. @@ -59,7 +59,7 @@ where /// # } /// ``` pub async fn next_segment(&mut self) -> io::Result>> { - use crate::future::poll_fn; + use std::future::poll_fn; poll_fn(|cx| Pin::new(&mut *self).poll_next_segment(cx)).await } diff --git a/tokio/src/io/util/take.rs b/tokio/src/io/util/take.rs index 0787defbe05..b49e00bfcb0 100644 --- a/tokio/src/io/util/take.rs +++ b/tokio/src/io/util/take.rs @@ -3,7 +3,7 @@ use crate::io::{AsyncBufRead, AsyncRead, ReadBuf}; use pin_project_lite::pin_project; use std::convert::TryFrom; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::{cmp, io}; pin_project! { diff --git a/tokio/src/io/util/write_all.rs b/tokio/src/io/util/write_all.rs index abd3e39d310..8330d89e1fc 100644 --- a/tokio/src/io/util/write_all.rs +++ b/tokio/src/io/util/write_all.rs @@ -6,7 +6,7 @@ use std::io; use std::marker::PhantomPinned; use std::mem; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { #[derive(Debug)] diff --git a/tokio/src/io/util/write_all_buf.rs b/tokio/src/io/util/write_all_buf.rs index dd4709aa810..02054913807 100644 --- a/tokio/src/io/util/write_all_buf.rs +++ b/tokio/src/io/util/write_all_buf.rs @@ -6,7 +6,7 @@ use std::future::Future; use std::io::{self, IoSlice}; use std::marker::PhantomPinned; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// A future to write some of the buffer to an `AsyncWrite`. diff --git a/tokio/src/io/util/write_buf.rs b/tokio/src/io/util/write_buf.rs index 82fd7a759f6..879dabcb594 100644 --- a/tokio/src/io/util/write_buf.rs +++ b/tokio/src/io/util/write_buf.rs @@ -6,7 +6,7 @@ use std::future::Future; use std::io; use std::marker::PhantomPinned; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pin_project! { /// A future to write some of the buffer to an `AsyncWrite`. diff --git a/tokio/src/loom/mocked.rs b/tokio/src/loom/mocked.rs index c25018e7e8c..cfcbb2967e1 100644 --- a/tokio/src/loom/mocked.rs +++ b/tokio/src/loom/mocked.rs @@ -2,7 +2,7 @@ pub(crate) use loom::*; pub(crate) mod sync { - pub(crate) use loom::sync::MutexGuard; + pub(crate) use loom::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard}; #[derive(Debug)] pub(crate) struct Mutex(loom::sync::Mutex); @@ -30,6 +30,38 @@ pub(crate) mod sync { self.0.get_mut().unwrap() } } + + #[derive(Debug)] + pub(crate) struct RwLock(loom::sync::RwLock); + + #[allow(dead_code)] + impl RwLock { + #[inline] + pub(crate) fn new(t: T) -> Self { + Self(loom::sync::RwLock::new(t)) + } + + #[inline] + pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> { + self.0.read().unwrap() + } + + #[inline] + pub(crate) fn try_read(&self) -> Option> { + self.0.try_read().ok() + } + + #[inline] + pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> { + self.0.write().unwrap() + } + + #[inline] + pub(crate) fn try_write(&self) -> Option> { + self.0.try_write().ok() + } + } + pub(crate) use loom::sync::*; pub(crate) mod atomic { diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 985d8d73aeb..d446f2ee804 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -8,6 +8,7 @@ mod barrier; mod mutex; #[cfg(all(feature = "parking_lot", not(miri)))] mod parking_lot; +mod rwlock; mod unsafe_cell; pub(crate) mod cell { @@ -64,11 +65,14 @@ pub(crate) mod sync { #[cfg(not(all(feature = "parking_lot", not(miri))))] #[allow(unused_imports)] - pub(crate) use std::sync::{Condvar, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult}; + pub(crate) use std::sync::{Condvar, MutexGuard, RwLockReadGuard, WaitTimeoutResult}; #[cfg(not(all(feature = "parking_lot", not(miri))))] pub(crate) use crate::loom::std::mutex::Mutex; + #[cfg(not(all(feature = "parking_lot", not(miri))))] + pub(crate) use crate::loom::std::rwlock::RwLock; + pub(crate) mod atomic { pub(crate) use crate::loom::std::atomic_u16::AtomicU16; pub(crate) use crate::loom::std::atomic_u32::AtomicU32; diff --git a/tokio/src/loom/std/parking_lot.rs b/tokio/src/loom/std/parking_lot.rs index 9b9a81d35b0..6a8375b0787 100644 --- a/tokio/src/loom/std/parking_lot.rs +++ b/tokio/src/loom/std/parking_lot.rs @@ -96,12 +96,20 @@ impl RwLock { RwLock(PhantomData, parking_lot::RwLock::new(t)) } - pub(crate) fn read(&self) -> LockResult> { - Ok(RwLockReadGuard(PhantomData, self.1.read())) + pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> { + RwLockReadGuard(PhantomData, self.1.read()) } - pub(crate) fn write(&self) -> LockResult> { - Ok(RwLockWriteGuard(PhantomData, self.1.write())) + pub(crate) fn try_read(&self) -> Option> { + Some(RwLockReadGuard(PhantomData, self.1.read())) + } + + pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> { + RwLockWriteGuard(PhantomData, self.1.write()) + } + + pub(crate) fn try_write(&self) -> Option> { + Some(RwLockWriteGuard(PhantomData, self.1.write())) } } diff --git a/tokio/src/loom/std/rwlock.rs b/tokio/src/loom/std/rwlock.rs new file mode 100644 index 00000000000..2b2c5f3fcde --- /dev/null +++ b/tokio/src/loom/std/rwlock.rs @@ -0,0 +1,48 @@ +use std::sync::{self, RwLockReadGuard, RwLockWriteGuard, TryLockError}; + +/// Adapter for `std::sync::RwLock` that removes the poisoning aspects +/// from its api. +#[derive(Debug)] +pub(crate) struct RwLock(sync::RwLock); + +#[allow(dead_code)] +impl RwLock { + #[inline] + pub(crate) fn new(t: T) -> Self { + Self(sync::RwLock::new(t)) + } + + #[inline] + pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> { + match self.0.read() { + Ok(guard) => guard, + Err(p_err) => p_err.into_inner(), + } + } + + #[inline] + pub(crate) fn try_read(&self) -> Option> { + match self.0.try_read() { + Ok(guard) => Some(guard), + Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()), + Err(TryLockError::WouldBlock) => None, + } + } + + #[inline] + pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> { + match self.0.write() { + Ok(guard) => guard, + Err(p_err) => p_err.into_inner(), + } + } + + #[inline] + pub(crate) fn try_write(&self) -> Option> { + match self.0.try_write() { + Ok(guard) => Some(guard), + Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()), + Err(TryLockError::WouldBlock) => None, + } + } +} diff --git a/tokio/src/macros/join.rs b/tokio/src/macros/join.rs index d82fd5cc26d..163053b25cb 100644 --- a/tokio/src/macros/join.rs +++ b/tokio/src/macros/join.rs @@ -1,60 +1,72 @@ -/// Waits on multiple concurrent branches, returning when **all** branches -/// complete. -/// -/// The `join!` macro must be used inside of async functions, closures, and -/// blocks. -/// -/// The `join!` macro takes a list of async expressions and evaluates them -/// concurrently on the same task. Each async expression evaluates to a future -/// and the futures from each expression are multiplexed on the current task. -/// -/// When working with async expressions returning `Result`, `join!` will wait -/// for **all** branches complete regardless if any complete with `Err`. Use -/// [`try_join!`] to return early when `Err` is encountered. -/// -/// [`try_join!`]: crate::try_join -/// -/// # Notes -/// -/// The supplied futures are stored inline and do not require allocating a -/// `Vec`. -/// -/// ### Runtime characteristics -/// -/// By running all async expressions on the current task, the expressions are -/// able to run **concurrently** but not in **parallel**. This means all -/// expressions are run on the same thread and if one branch blocks the thread, -/// all other expressions will be unable to continue. If parallelism is -/// required, spawn each async expression using [`tokio::spawn`] and pass the -/// join handle to `join!`. -/// -/// [`tokio::spawn`]: crate::spawn -/// -/// # Examples -/// -/// Basic join with two branches -/// -/// ``` -/// async fn do_stuff_async() { -/// // async work -/// } -/// -/// async fn more_async_work() { -/// // more here -/// } -/// -/// #[tokio::main] -/// async fn main() { -/// let (first, second) = tokio::join!( -/// do_stuff_async(), -/// more_async_work()); -/// -/// // do something with the values -/// } -/// ``` -#[macro_export] -#[cfg_attr(docsrs, doc(cfg(feature = "macros")))] -macro_rules! join { +macro_rules! doc { + ($join:item) => { + /// Waits on multiple concurrent branches, returning when **all** branches + /// complete. + /// + /// The `join!` macro must be used inside of async functions, closures, and + /// blocks. + /// + /// The `join!` macro takes a list of async expressions and evaluates them + /// concurrently on the same task. Each async expression evaluates to a future + /// and the futures from each expression are multiplexed on the current task. + /// + /// When working with async expressions returning `Result`, `join!` will wait + /// for **all** branches complete regardless if any complete with `Err`. Use + /// [`try_join!`] to return early when `Err` is encountered. + /// + /// [`try_join!`]: crate::try_join + /// + /// # Notes + /// + /// The supplied futures are stored inline and do not require allocating a + /// `Vec`. + /// + /// ### Runtime characteristics + /// + /// By running all async expressions on the current task, the expressions are + /// able to run **concurrently** but not in **parallel**. This means all + /// expressions are run on the same thread and if one branch blocks the thread, + /// all other expressions will be unable to continue. If parallelism is + /// required, spawn each async expression using [`tokio::spawn`] and pass the + /// join handle to `join!`. + /// + /// [`tokio::spawn`]: crate::spawn + /// + /// # Examples + /// + /// Basic join with two branches + /// + /// ``` + /// async fn do_stuff_async() { + /// // async work + /// } + /// + /// async fn more_async_work() { + /// // more here + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let (first, second) = tokio::join!( + /// do_stuff_async(), + /// more_async_work()); + /// + /// // do something with the values + /// } + /// ``` + #[macro_export] + #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] + $join + }; +} + +#[cfg(doc)] +doc! {macro_rules! join { + ($($future:expr),*) => { unimplemented!() } +}} + +#[cfg(not(doc))] +doc! {macro_rules! join { (@ { // One `_` for each branch in the `join!` macro. This is not used once // normalization is complete. @@ -163,4 +175,4 @@ macro_rules! join { }; () => { async {}.await } -} +}} diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs index 82f42dbff35..acf3d010d69 100644 --- a/tokio/src/macros/mod.rs +++ b/tokio/src/macros/mod.rs @@ -9,9 +9,6 @@ mod loom; #[macro_use] mod pin; -#[macro_use] -mod ready; - #[macro_use] mod thread_local; diff --git a/tokio/src/macros/ready.rs b/tokio/src/macros/ready.rs deleted file mode 100644 index 1f48623b801..00000000000 --- a/tokio/src/macros/ready.rs +++ /dev/null @@ -1,8 +0,0 @@ -macro_rules! ready { - ($e:expr $(,)?) => { - match $e { - std::task::Poll::Ready(t) => t, - std::task::Poll::Pending => return std::task::Poll::Pending, - } - }; -} diff --git a/tokio/src/macros/support.rs b/tokio/src/macros/support.rs index d077a0823c7..8588f75c323 100644 --- a/tokio/src/macros/support.rs +++ b/tokio/src/macros/support.rs @@ -1,7 +1,8 @@ cfg_macros! { - pub use crate::future::poll_fn; pub use crate::future::maybe_done::maybe_done; + pub use std::future::poll_fn; + #[doc(hidden)] pub fn thread_rng_n(n: u32) -> u32 { crate::runtime::context::thread_rng_n(n) diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs index fb8248fb3ba..a098990bb79 100644 --- a/tokio/src/net/addr.rs +++ b/tokio/src/net/addr.rs @@ -274,7 +274,7 @@ pub(crate) mod sealed { use std::option; use std::pin::Pin; - use std::task::{Context, Poll}; + use std::task::{ready,Context, Poll}; use std::vec; #[doc(hidden)] diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 3f6592abe19..618da62b477 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -8,7 +8,7 @@ cfg_not_wasi! { use std::fmt; use std::io; use std::net::{self, SocketAddr}; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_net! { /// A TCP socket server, listening for connections. diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs index 1a1e22253f2..8cf53abf53f 100644 --- a/tokio/src/net/tcp/split.rs +++ b/tokio/src/net/tcp/split.rs @@ -8,10 +8,10 @@ //! split has no associated overhead and enforces all invariants at the type //! level. -use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready}; use crate::net::TcpStream; +use std::future::poll_fn; use std::io; use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; @@ -69,7 +69,7 @@ impl ReadHalf<'_> { /// use tokio::io::{self, ReadBuf}; /// use tokio::net::TcpStream; /// - /// use futures::future::poll_fn; + /// use std::future::poll_fn; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { diff --git a/tokio/src/net/tcp/split_owned.rs b/tokio/src/net/tcp/split_owned.rs index 6771d6497a2..025d2fe73ec 100644 --- a/tokio/src/net/tcp/split_owned.rs +++ b/tokio/src/net/tcp/split_owned.rs @@ -8,11 +8,11 @@ //! split has no associated overhead and enforces all invariants at the type //! level. -use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready}; use crate::net::TcpStream; use std::error::Error; +use std::future::poll_fn; use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; use std::sync::Arc; @@ -124,7 +124,7 @@ impl OwnedReadHalf { /// use tokio::io::{self, ReadBuf}; /// use tokio::net::TcpStream; /// - /// use futures::future::poll_fn; + /// use std::future::poll_fn; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index e20473e5cc3..2714592d3a3 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -1,6 +1,6 @@ cfg_not_wasi! { - use crate::future::poll_fn; use crate::net::{to_socket_addrs, ToSocketAddrs}; + use std::future::poll_fn; use std::time::Duration; } @@ -12,7 +12,7 @@ use std::fmt; use std::io; use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_io_util! { use bytes::BufMut; @@ -340,7 +340,7 @@ impl TcpStream { /// use tokio::io::{self, ReadBuf}; /// use tokio::net::TcpStream; /// - /// use futures::future::poll_fn; + /// use std::future::poll_fn; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index 03a66585047..a34f7b9225b 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -4,7 +4,7 @@ use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::fmt; use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_io_util! { use bytes::BufMut; diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index d7786ca82d7..0fde57da133 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -7,7 +7,7 @@ use std::net::Shutdown; use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::net; use std::path::Path; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_io_util! { use bytes::BufMut; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 5b28dc03f8f..ddd9669f6d1 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -12,7 +12,7 @@ use std::os::unix::ffi::OsStrExt; use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr as StdSocketAddr}; use std::path::Path; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_net_unix! { /// A Unix socket which can accept connections from other Unix sockets. diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index a8b6479f1f8..466ed21c02e 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -1,4 +1,3 @@ -use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; use crate::net::unix::split::{split, ReadHalf, WriteHalf}; use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; @@ -6,6 +5,7 @@ use crate::net::unix::ucred::{self, UCred}; use crate::net::unix::SocketAddr; use std::fmt; +use std::future::poll_fn; use std::io::{self, Read, Write}; use std::net::Shutdown; #[cfg(target_os = "android")] diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index cf1952ac189..bd4a7ecee7b 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -250,8 +250,7 @@ use std::io; use std::path::Path; use std::pin::Pin; use std::process::{Command as StdCommand, ExitStatus, Output, Stdio}; -use std::task::Context; -use std::task::Poll; +use std::task::{ready, Context, Poll}; #[cfg(unix)] use std::os::unix::process::CommandExt; diff --git a/tokio/src/process/unix/pidfd_reaper.rs b/tokio/src/process/unix/pidfd_reaper.rs index 3b7d622b34b..3c540b2b4f2 100644 --- a/tokio/src/process/unix/pidfd_reaper.rs +++ b/tokio/src/process/unix/pidfd_reaper.rs @@ -19,7 +19,7 @@ use std::{ pin::Pin, process::ExitStatus, sync::atomic::{AtomicBool, Ordering::Relaxed}, - task::{Context, Poll}, + task::{ready, Context, Poll}, }; #[derive(Debug)] diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index f2afa75c9c4..aaca8b6baa2 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -255,7 +255,7 @@ mod test { #[test] fn budgeting() { - use futures::future::poll_fn; + use std::future::poll_fn; use tokio_test::*; assert!(get().0.is_none()); @@ -312,7 +312,7 @@ mod test { } let mut task = task::spawn(poll_fn(|cx| { - let coop = ready!(poll_proceed(cx)); + let coop = std::task::ready!(poll_proceed(cx)); coop.made_progress(); Poll::Ready(()) })); diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index dc5961086f7..16e79e82515 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -7,7 +7,7 @@ use crate::runtime::scheduler; use mio::event::Source; use std::io; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; cfg_io_driver! { /// Associates an I/O resource with the reactor instance that drives it. @@ -219,7 +219,7 @@ impl Registration { loop { let event = self.readiness(interest).await?; - let coop = crate::future::poll_fn(crate::runtime::coop::poll_proceed).await; + let coop = std::future::poll_fn(crate::runtime::coop::poll_proceed).await; match f() { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 28861d98305..e99036a4f31 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -1,4 +1,3 @@ -use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Arc; use crate::runtime::driver::{self, Driver}; @@ -15,7 +14,7 @@ use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::sync::atomic::Ordering::{AcqRel, Release}; use std::task::Poll::{Pending, Ready}; use std::task::Waker; diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index 7e47e568730..aad6094b5ab 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -5,7 +5,7 @@ use std::future::Future; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::pin::Pin; -use std::task::{Context, Poll, Waker}; +use std::task::{ready, Context, Poll, Waker}; cfg_rt! { /// An owned permission to join on a task (await its termination). diff --git a/tokio/src/runtime/tests/loom_local.rs b/tokio/src/runtime/tests/loom_local.rs index 89d025b811c..e3060d72dce 100644 --- a/tokio/src/runtime/tests/loom_local.rs +++ b/tokio/src/runtime/tests/loom_local.rs @@ -23,7 +23,7 @@ fn wake_during_shutdown() { ls.spawn_local(async move { let mut send = Some(send); - let () = futures::future::poll_fn(|cx| { + let () = std::future::poll_fn(|cx| { if let Some(send) = send.take() { send.send(cx.waker().clone()); } diff --git a/tokio/src/runtime/tests/loom_multi_thread.rs b/tokio/src/runtime/tests/loom_multi_thread.rs index c5980c226e0..ddd14b7fb3f 100644 --- a/tokio/src/runtime/tests/loom_multi_thread.rs +++ b/tokio/src/runtime/tests/loom_multi_thread.rs @@ -8,7 +8,6 @@ mod yield_now; /// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test. /// /// In order to speed up the C -use crate::future::poll_fn; use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; use crate::{spawn, task}; @@ -18,10 +17,10 @@ use loom::sync::atomic::{AtomicBool, AtomicUsize}; use loom::sync::Arc; use pin_project_lite::pin_project; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::pin::Pin; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; mod atomic_take { use loom::sync::atomic::AtomicBool; diff --git a/tokio/src/runtime/tests/loom_multi_thread_alt.rs b/tokio/src/runtime/tests/loom_multi_thread_alt.rs index c8d140e09e3..3ca1335a207 100644 --- a/tokio/src/runtime/tests/loom_multi_thread_alt.rs +++ b/tokio/src/runtime/tests/loom_multi_thread_alt.rs @@ -10,7 +10,6 @@ mod yield_now; /// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test. /// /// In order to speed up the C -use crate::future::poll_fn; use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; use crate::{spawn, task}; @@ -20,10 +19,10 @@ use loom::sync::atomic::{AtomicBool, AtomicUsize}; use loom::sync::Arc; use pin_project_lite::pin_project; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::pin::Pin; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; mod atomic_take { use loom::sync::atomic::AtomicBool; diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 4035653283d..ea48b8e5199 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -228,7 +228,7 @@ fn shutdown_immediately() { // Test for https://github.com/tokio-rs/tokio/issues/6729 #[test] fn spawn_niche_in_task() { - use crate::future::poll_fn; + use std::future::poll_fn; use std::task::{Context, Poll, Waker}; with(|rt| { diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 50603ed9ef4..56e0ba64d9c 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -20,7 +20,7 @@ pub(crate) use source::TimeSource; mod wheel; use crate::loom::sync::atomic::{AtomicBool, Ordering}; -use crate::loom::sync::Mutex; +use crate::loom::sync::{Mutex, RwLock}; use crate::runtime::driver::{self, IoHandle, IoStack}; use crate::time::error::Error; use crate::time::{Clock, Duration}; @@ -28,7 +28,6 @@ use crate::util::WakeList; use crate::loom::sync::atomic::AtomicU64; use std::fmt; -use std::sync::RwLock; use std::{num::NonZeroU64, ptr::NonNull}; struct AtomicOptionNonZeroU64(AtomicU64); @@ -199,12 +198,7 @@ impl Driver { // Finds out the min expiration time to park. let expiration_time = { - let mut wheels_lock = rt_handle - .time() - .inner - .wheels - .write() - .expect("Timer wheel shards poisoned"); + let mut wheels_lock = rt_handle.time().inner.wheels.write(); let expiration_time = wheels_lock .0 .iter_mut() @@ -324,11 +318,7 @@ impl Handle { // Returns the next wakeup time of this shard. pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) -> Option { let mut waker_list = WakeList::new(); - let mut wheels_lock = self - .inner - .wheels - .read() - .expect("Timer wheel shards poisoned"); + let mut wheels_lock = self.inner.wheels.read(); let mut lock = wheels_lock.lock_sharded_wheel(id); if now < lock.elapsed() { @@ -355,11 +345,7 @@ impl Handle { waker_list.wake_all(); - wheels_lock = self - .inner - .wheels - .read() - .expect("Timer wheel shards poisoned"); + wheels_lock = self.inner.wheels.read(); lock = wheels_lock.lock_sharded_wheel(id); } } @@ -384,11 +370,7 @@ impl Handle { /// `add_entry` must not be called concurrently. pub(self) unsafe fn clear_entry(&self, entry: NonNull) { unsafe { - let wheels_lock = self - .inner - .wheels - .read() - .expect("Timer wheel shards poisoned"); + let wheels_lock = self.inner.wheels.read(); let mut lock = wheels_lock.lock_sharded_wheel(entry.as_ref().shard_id()); if entry.as_ref().might_be_registered() { @@ -412,11 +394,7 @@ impl Handle { entry: NonNull, ) { let waker = unsafe { - let wheels_lock = self - .inner - .wheels - .read() - .expect("Timer wheel shards poisoned"); + let wheels_lock = self.inner.wheels.read(); let mut lock = wheels_lock.lock_sharded_wheel(entry.as_ref().shard_id()); diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 0e453433691..a2271b6fb9d 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -54,10 +54,7 @@ fn single_timer() { ); pin!(entry); - block_on(futures::future::poll_fn(|cx| { - entry.as_mut().poll_elapsed(cx) - })) - .unwrap(); + block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap(); }); thread::yield_now(); @@ -126,10 +123,7 @@ fn change_waker() { .as_mut() .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); - block_on(futures::future::poll_fn(|cx| { - entry.as_mut().poll_elapsed(cx) - })) - .unwrap(); + block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap(); }); thread::yield_now(); @@ -167,10 +161,7 @@ fn reset_future() { entry.as_mut().reset(start + Duration::from_secs(2), true); // shouldn't complete before 2s - block_on(futures::future::poll_fn(|cx| { - entry.as_mut().poll_elapsed(cx) - })) - .unwrap(); + block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap(); finished_early_.store(true, Ordering::Relaxed); }); diff --git a/tokio/src/signal/mod.rs b/tokio/src/signal/mod.rs index 5778f22ed12..cca3963e113 100644 --- a/tokio/src/signal/mod.rs +++ b/tokio/src/signal/mod.rs @@ -84,7 +84,7 @@ impl RxFuture { } async fn recv(&mut self) -> Option<()> { - use crate::future::poll_fn; + use std::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 84f88fb390e..aabee0f5c0e 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -28,7 +28,7 @@ use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; use std::sync::atomic::Ordering::*; -use std::task::{Context, Poll, Waker}; +use std::task::{ready, Context, Poll, Waker}; use std::{cmp, fmt}; /// An asynchronous counting semaphore which permits waiting on multiple permits at once. diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index ba0a44fb8b9..67d67a666e3 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -128,7 +128,7 @@ use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; -use std::task::{Context, Poll, Waker}; +use std::task::{ready, Context, Poll, Waker}; /// Sending-half of the [`broadcast`] channel. /// @@ -599,7 +599,7 @@ impl Sender { tail.pos = tail.pos.wrapping_add(1); // Get the slot - let mut slot = self.shared.buffer[idx].write().unwrap(); + let mut slot = self.shared.buffer[idx].write(); // Track the position slot.pos = pos; @@ -695,7 +695,7 @@ impl Sender { while low < high { let mid = low + (high - low) / 2; let idx = base_idx.wrapping_add(mid) & self.shared.mask; - if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 { + if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 { low = mid + 1; } else { high = mid; @@ -737,7 +737,7 @@ impl Sender { let tail = self.shared.tail.lock(); let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize; - self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 + self.shared.buffer[idx].read().rem.load(SeqCst) == 0 } /// Returns the number of active receivers. @@ -1057,7 +1057,7 @@ impl Receiver { let idx = (self.next & self.shared.mask as u64) as usize; // The slot holding the next value to read - let mut slot = self.shared.buffer[idx].read().unwrap(); + let mut slot = self.shared.buffer[idx].read(); if slot.pos != self.next { // Release the `slot` lock before attempting to acquire the `tail` @@ -1074,7 +1074,7 @@ impl Receiver { let mut tail = self.shared.tail.lock(); // Acquire slot lock again - slot = self.shared.buffer[idx].read().unwrap(); + slot = self.shared.buffer[idx].read(); // Make sure the position did not change. This could happen in the // unlikely event that the buffer is wrapped between dropping the diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index beda7fe1bf4..a6aecf007ca 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -238,7 +238,7 @@ impl Receiver { /// } /// ``` pub async fn recv(&mut self) -> Option { - use crate::future::poll_fn; + use std::future::poll_fn; poll_fn(|cx| self.chan.recv(cx)).await } @@ -314,7 +314,7 @@ impl Receiver { /// } /// ``` pub async fn recv_many(&mut self, buffer: &mut Vec, limit: usize) -> usize { - use crate::future::poll_fn; + use std::future::poll_fn; poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 1edd2a755ae..f4cedf0d4dd 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -13,7 +13,7 @@ use std::panic; use std::process; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// Channel sender. pub(crate) struct Tx { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 47e1b6c7c77..f794f4073d2 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -165,7 +165,7 @@ impl UnboundedReceiver { /// } /// ``` pub async fn recv(&mut self) -> Option { - use crate::future::poll_fn; + use std::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } @@ -239,7 +239,7 @@ impl UnboundedReceiver { /// } /// ``` pub async fn recv_many(&mut self, buffer: &mut Vec, limit: usize) -> usize { - use crate::future::poll_fn; + use std::future::poll_fn; poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await } diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 5d344f70411..99c44295776 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -379,6 +379,7 @@ impl Drop for NotifyWaitersList<'_> { /// This future is fused, so once it has completed, any future calls to poll /// will immediately return `Poll::Ready`. #[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Notified<'a> { /// The `Notify` being received on. notify: &'a Notify, @@ -1041,7 +1042,7 @@ impl Notified<'_> { #[cfg(tokio_taskdump)] if let Some(waker) = waker { let mut ctx = Context::from_waker(waker); - ready!(crate::trace::trace_leaf(&mut ctx)); + std::task::ready!(crate::trace::trace_leaf(&mut ctx)); } if waiter.notification.load(Acquire).is_some() { @@ -1135,7 +1136,7 @@ impl Notified<'_> { #[cfg(tokio_taskdump)] if let Some(waker) = waker { let mut ctx = Context::from_waker(waker); - ready!(crate::trace::trace_leaf(&mut ctx)); + std::task::ready!(crate::trace::trace_leaf(&mut ctx)); } return Poll::Ready(()); } diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index ab29b3e3edd..2b346eae81c 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -135,7 +135,7 @@ use std::mem::MaybeUninit; use std::pin::Pin; use std::sync::atomic::Ordering::{self, AcqRel, Acquire}; use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll, Waker}; +use std::task::{ready, Context, Poll, Waker}; /// Sends a value to the associated [`Receiver`]. /// @@ -698,7 +698,7 @@ impl Sender { /// } /// ``` pub async fn closed(&mut self) { - use crate::future::poll_fn; + use std::future::poll_fn; #[cfg(all(tokio_unstable, feature = "tracing"))] let resource_span = self.resource_span.clone(); @@ -775,7 +775,7 @@ impl Sender { /// ``` /// use tokio::sync::oneshot; /// - /// use futures::future::poll_fn; + /// use std::future::poll_fn; /// /// #[tokio::main] /// async fn main() { diff --git a/tokio/src/sync/tests/loom_atomic_waker.rs b/tokio/src/sync/tests/loom_atomic_waker.rs index f8bae65d130..688bf95b662 100644 --- a/tokio/src/sync/tests/loom_atomic_waker.rs +++ b/tokio/src/sync/tests/loom_atomic_waker.rs @@ -1,9 +1,9 @@ use crate::sync::task::AtomicWaker; -use futures::future::poll_fn; use loom::future::block_on; use loom::sync::atomic::AtomicUsize; use loom::thread; +use std::future::poll_fn; use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs index 1dbe5ea419c..039b87a7734 100644 --- a/tokio/src/sync/tests/loom_mpsc.rs +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -1,9 +1,9 @@ use crate::sync::mpsc; -use futures::future::poll_fn; use loom::future::block_on; use loom::sync::Arc; use loom::thread; +use std::future::poll_fn; use tokio_test::assert_ok; #[test] diff --git a/tokio/src/sync/tests/loom_notify.rs b/tokio/src/sync/tests/loom_notify.rs index a4ded1d35bc..e58a78ff1c7 100644 --- a/tokio/src/sync/tests/loom_notify.rs +++ b/tokio/src/sync/tests/loom_notify.rs @@ -108,8 +108,7 @@ fn notify_multi() { #[test] fn notify_drop() { - use crate::future::poll_fn; - use std::future::Future; + use std::future::{poll_fn, Future}; use std::task::Poll; loom::model(|| { diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs index 717edcfd2a3..62e1251c26e 100644 --- a/tokio/src/sync/tests/loom_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -1,8 +1,8 @@ use crate::sync::oneshot; -use futures::future::poll_fn; use loom::future::block_on; use loom::thread; +use std::future::poll_fn; use std::task::Poll::{Pending, Ready}; #[test] diff --git a/tokio/src/sync/tests/loom_semaphore_batch.rs b/tokio/src/sync/tests/loom_semaphore_batch.rs index 85cd584d2d4..27a459521ed 100644 --- a/tokio/src/sync/tests/loom_semaphore_batch.rs +++ b/tokio/src/sync/tests/loom_semaphore_batch.rs @@ -1,10 +1,9 @@ use crate::sync::batch_semaphore::*; -use futures::future::poll_fn; use loom::future::block_on; use loom::sync::atomic::AtomicUsize; use loom::thread; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::pin::Pin; use std::sync::atomic::Ordering::SeqCst; use std::sync::Arc; diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 366066797f1..490b9e4df88 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -575,7 +575,7 @@ impl Receiver { /// assert_eq!(*rx.borrow(), "hello"); /// ``` pub fn borrow(&self) -> Ref<'_, T> { - let inner = self.shared.value.read().unwrap(); + let inner = self.shared.value.read(); // After obtaining a read-lock no concurrent writes could occur // and the loaded version matches that of the borrowed reference. @@ -622,7 +622,7 @@ impl Receiver { /// [`changed`]: Receiver::changed /// [`borrow`]: Receiver::borrow pub fn borrow_and_update(&mut self) -> Ref<'_, T> { - let inner = self.shared.value.read().unwrap(); + let inner = self.shared.value.read(); // After obtaining a read-lock no concurrent writes could occur // and the loaded version matches that of the borrowed reference. @@ -813,7 +813,7 @@ impl Receiver { let mut closed = false; loop { { - let inner = self.shared.value.read().unwrap(); + let inner = self.shared.value.read(); let new_version = self.shared.state.load().version(); let has_changed = self.version != new_version; @@ -1087,7 +1087,7 @@ impl Sender { { { // Acquire the write lock and update the value. - let mut lock = self.shared.value.write().unwrap(); + let mut lock = self.shared.value.write(); // Update the value and catch possible panic inside func. let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock))); @@ -1164,7 +1164,7 @@ impl Sender { /// assert_eq!(*tx.borrow(), "hello"); /// ``` pub fn borrow(&self) -> Ref<'_, T> { - let inner = self.shared.value.read().unwrap(); + let inner = self.shared.value.read(); // The sender/producer always sees the current version let has_changed = false; diff --git a/tokio/src/task/consume_budget.rs b/tokio/src/task/consume_budget.rs index 1e398d73b2a..85ef1bfb2d2 100644 --- a/tokio/src/task/consume_budget.rs +++ b/tokio/src/task/consume_budget.rs @@ -1,4 +1,4 @@ -use std::task::Poll; +use std::task::{ready, Poll}; /// Consumes a unit of budget and returns the execution back to the Tokio /// runtime *if* the task's coop budget was exhausted. @@ -27,7 +27,7 @@ use std::task::Poll; pub async fn consume_budget() { let mut status = Poll::Pending; - crate::future::poll_fn(move |cx| { + std::future::poll_fn(move |cx| { ready!(crate::trace::trace_leaf(cx)); if status.is_ready() { return status; diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index a9cd8f52d55..4bbd1d91a95 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -281,7 +281,7 @@ impl JoinSet { /// statement and some other branch completes first, it is guaranteed that no tasks were /// removed from this `JoinSet`. pub async fn join_next(&mut self) -> Option> { - crate::future::poll_fn(|cx| self.poll_join_next(cx)).await + std::future::poll_fn(|cx| self.poll_join_next(cx)).await } /// Waits until one of the tasks in the set completes and returns its @@ -303,7 +303,7 @@ impl JoinSet { #[cfg(tokio_unstable)] #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] pub async fn join_next_with_id(&mut self) -> Option> { - crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await + std::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await } /// Tries to join one of the tasks in the set that has completed and return its output. diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 4f648160dec..ccc7725c261 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -1280,7 +1280,7 @@ mod tests { })); // poll the run until future once - crate::future::poll_fn(|cx| { + std::future::poll_fn(|cx| { let _ = run_until.as_mut().poll(cx); Poll::Ready(()) }) diff --git a/tokio/src/task/yield_now.rs b/tokio/src/task/yield_now.rs index 70a5de53dea..27c1479671f 100644 --- a/tokio/src/task/yield_now.rs +++ b/tokio/src/task/yield_now.rs @@ -2,7 +2,7 @@ use crate::runtime::context; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// Yields execution back to the Tokio runtime. /// diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 2b5246acfa4..0153a567f10 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -1,11 +1,10 @@ -use crate::future::poll_fn; use crate::time::{sleep_until, Duration, Instant, Sleep}; use crate::util::trace; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::panic::Location; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// Creates new [`Interval`] that yields with interval of `period`. The first /// tick completes immediately. The default [`MissedTickBehavior`] is diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index d5a68f09554..7e393d0d17a 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -6,7 +6,7 @@ use pin_project_lite::pin_project; use std::future::Future; use std::panic::Location; use std::pin::Pin; -use std::task::{self, Poll}; +use std::task::{self, ready, Poll}; /// Waits until `deadline` is reached. /// diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index f4dcfcf0927..856ac6db19b 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -400,12 +400,12 @@ async fn poll_fns() { let read_fut = tokio::spawn(async move { // Move waker onto this task first - assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2 + assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2 .as_ref() .poll_read_ready(cx)))); barrier_clone.wait().await; - let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await; + let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await; }); let afd_a_2 = afd_a.clone(); @@ -414,12 +414,12 @@ async fn poll_fns() { let mut write_fut = tokio::spawn(async move { // Move waker onto this task first - assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2 + assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2 .as_ref() .poll_write_ready(cx)))); barrier_clone.wait().await; - let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await; + let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await; }); r_barrier.wait().await; @@ -530,11 +530,11 @@ fn driver_shutdown_wakes_pending_race() { } async fn poll_readable(fd: &AsyncFd) -> std::io::Result> { - futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await + std::future::poll_fn(|cx| fd.poll_read_ready(cx)).await } async fn poll_writable(fd: &AsyncFd) -> std::io::Result> { - futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await + std::future::poll_fn(|cx| fd.poll_write_ready(cx)).await } #[test] diff --git a/tokio/tests/io_copy.rs b/tokio/tests/io_copy.rs index 82d92a9688b..3bde8e7fa69 100644 --- a/tokio/tests/io_copy.rs +++ b/tokio/tests/io_copy.rs @@ -2,12 +2,11 @@ #![cfg(feature = "full")] use bytes::BytesMut; -use futures::ready; use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio_test::assert_ok; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; #[tokio::test] async fn copy() { diff --git a/tokio/tests/io_take.rs b/tokio/tests/io_take.rs index 539f17f3a2d..1ae5f6908f5 100644 --- a/tokio/tests/io_take.rs +++ b/tokio/tests/io_take.rs @@ -33,7 +33,7 @@ async fn issue_4435() { let mut read_buf = ReadBuf::new(&mut buf); read_buf.put_slice(b"AB"); - futures::future::poll_fn(|cx| rd.as_mut().poll_read(cx, &mut read_buf)) + std::future::poll_fn(|cx| rd.as_mut().poll_read(cx, &mut read_buf)) .await .unwrap(); assert_eq!(&buf, &b"ABhell\0\0"[..]); diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index 6c05a3fda0d..fdf7fde1342 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -11,7 +11,7 @@ use tokio::test as maybe_tokio_test; use tokio::sync::oneshot; use tokio_test::{assert_ok, assert_pending, assert_ready}; -use futures::future::poll_fn; +use std::future::poll_fn; use std::task::Poll::Ready; #[maybe_tokio_test] diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 75a20057166..12f9e16592f 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -112,8 +112,7 @@ rt_test! { use tokio_test::assert_err; use tokio_test::assert_ok; - use futures::future::poll_fn; - use std::future::Future; + use std::future::{poll_fn, Future}; use std::pin::Pin; #[cfg(not(target_os="wasi"))] @@ -696,7 +695,7 @@ rt_test! { loop { // Don't use Tokio's `yield_now()` to avoid special defer // logic. - futures::future::poll_fn::<(), _>(|cx| { + std::future::poll_fn::<(), _>(|cx| { cx.waker().wake_by_ref(); std::task::Poll::Pending }).await; @@ -785,9 +784,9 @@ rt_test! { barrier.wait(); let (fail_test, fail_test_recv) = oneshot::channel::<()>(); - + let flag_clone = flag.clone(); let jh = tokio::spawn(async move { - // Create a TCP litener + // Create a TCP listener let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); @@ -798,7 +797,7 @@ rt_test! { // Yield until connected let mut cnt = 0; - while !flag.load(SeqCst){ + while !flag_clone.load(SeqCst){ tokio::task::yield_now().await; cnt += 1; @@ -814,7 +813,7 @@ rt_test! { }, async { let _ = listener.accept().await.unwrap(); - flag.store(true, SeqCst); + flag_clone.store(true, SeqCst); } ); }); @@ -824,6 +823,11 @@ rt_test! { let success = fail_test_recv.await.is_err(); if success { + // Setting flag to true ensures that the tasks we spawned at + // the beginning of the test will exit. + // If we don't do this, the test will hang since the runtime waits + // for all spawned tasks to finish when dropping. + flag.store(true, SeqCst); // Check for panics in spawned task. jh.abort(); jh.await.unwrap(); diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index a4742dd234e..b6666616f9f 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -8,8 +8,7 @@ use tokio::runtime; use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; -use futures::future::poll_fn; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::pin::Pin; use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::{AtomicUsize, Ordering}; diff --git a/tokio/tests/rt_threaded_alt.rs b/tokio/tests/rt_threaded_alt.rs index 7d723605a27..f7e52af83dd 100644 --- a/tokio/tests/rt_threaded_alt.rs +++ b/tokio/tests/rt_threaded_alt.rs @@ -9,8 +9,7 @@ use tokio::runtime; use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; -use futures::future::poll_fn; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; diff --git a/tokio/tests/tcp_accept.rs b/tokio/tests/tcp_accept.rs index d547c48daa3..c72dcea39d6 100644 --- a/tokio/tests/tcp_accept.rs +++ b/tokio/tests/tcp_accept.rs @@ -96,8 +96,7 @@ async fn no_extra_poll() { #[tokio::test] async fn accept_many() { - use futures::future::poll_fn; - use std::future::Future; + use std::future::{poll_fn, Future}; use std::sync::atomic::AtomicBool; const N: usize = 50; diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs index 725a60169ea..b06628f03a0 100644 --- a/tokio/tests/tcp_stream.rs +++ b/tokio/tests/tcp_stream.rs @@ -7,12 +7,11 @@ use tokio::try_join; use tokio_test::task; use tokio_test::{assert_ok, assert_pending, assert_ready_ok}; +use std::future::poll_fn; use std::io; use std::task::Poll; use std::time::Duration; -use futures::future::poll_fn; - #[tokio::test] async fn set_linger() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index eea281c2316..586ceb42e5b 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support bind or UDP -use futures::future::poll_fn; +use std::future::poll_fn; use std::io; use std::sync::Arc; use tokio::{io::ReadBuf, net::UdpSocket}; diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs index ad22a0b99dd..126af1cac7c 100644 --- a/tokio/tests/uds_datagram.rs +++ b/tokio/tests/uds_datagram.rs @@ -2,11 +2,11 @@ #![cfg(feature = "full")] #![cfg(unix)] -use futures::future::poll_fn; use tokio::io::ReadBuf; use tokio::net::UnixDatagram; use tokio::try_join; +use std::future::poll_fn; use std::io; use std::sync::Arc;