diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index 0c821596462..5fd10187627 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -360,6 +360,13 @@ This release bumps the MSRV of Tokio to 1.56. ([#5559]) [#5513]: https://github.com/tokio-rs/tokio/pull/5513 [#5517]: https://github.com/tokio-rs/tokio/pull/5517 +# 1.25.3 (December 17th, 2023) + +### Fixed +- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221]) + +[#6221]: https://github.com/tokio-rs/tokio/pull/6221 + # 1.25.2 (September 22, 2023) Forward ports 1.20.6 changes. diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 5943e9aa977..c49c08932b2 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -80,23 +80,23 @@ tokio_thread_local! { #[cfg(feature = "rt")] thread_id: Cell::new(None), - /// Tracks the current runtime handle to use when spawning, - /// accessing drivers, etc... + // Tracks the current runtime handle to use when spawning, + // accessing drivers, etc... #[cfg(feature = "rt")] current: current::HandleCell::new(), - /// Tracks the current scheduler internal context + // Tracks the current scheduler internal context #[cfg(feature = "rt")] scheduler: Scoped::new(), #[cfg(feature = "rt")] current_task_id: Cell::new(None), - /// Tracks if the current thread is currently driving a runtime. - /// Note, that if this is set to "entered", the current scheduler - /// handle may not reference the runtime currently executing. This - /// is because other runtime handles may be set to current from - /// within a runtime. + // Tracks if the current thread is currently driving a runtime. + // Note, that if this is set to "entered", the current scheduler + // handle may not reference the runtime currently executing. This + // is because other runtime handles may be set to current from + // within a runtime. #[cfg(feature = "rt")] runtime: Cell::new(EnterRuntime::NotEntered), diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 759589863eb..dc5961086f7 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -219,11 +219,16 @@ impl Registration { loop { let event = self.readiness(interest).await?; + let coop = crate::future::poll_fn(crate::runtime::coop::poll_proceed).await; + match f() { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.clear_readiness(event); } - x => return x, + x => { + coop.made_progress(); + return x; + } } } } diff --git a/tokio/tests/coop_budger.rs b/tokio/tests/coop_budger.rs new file mode 100644 index 00000000000..0c4cc7e6497 --- /dev/null +++ b/tokio/tests/coop_budger.rs @@ -0,0 +1,77 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", target_os = "linux"))] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::net::UdpSocket; + +/// Ensure that UDP sockets have functional budgeting +/// +/// # Design +/// Two sockets communicate by spamming packets from one to the other. +/// +/// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the +/// send system call because we are using the loopback interface. +/// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the +/// entirety of the lifecycle of a packet within the kernel network stack. +/// +/// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop +/// is through budgeting. +/// +/// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded. +/// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128 +/// and there are two budget events per packet, a send and a recv. +#[tokio::test] +async fn coop_budget_udp_send_recv() { + const BUDGET: usize = 128; + const N_ITERATIONS: usize = 1024; + + const PACKET: &[u8] = b"Hello, world"; + const PACKET_LEN: usize = 12; + + assert_eq!( + PACKET_LEN, + PACKET.len(), + "Defect in test, programmer can't do math" + ); + + // bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface + let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + tx.connect(rx.local_addr().unwrap()).await.unwrap(); + rx.connect(tx.local_addr().unwrap()).await.unwrap(); + + let tracker = Arc::new(AtomicUsize::default()); + + let tracker_clone = Arc::clone(&tracker); + + tokio::task::yield_now().await; + + tokio::spawn(async move { + loop { + tracker_clone.fetch_add(1, Ordering::SeqCst); + + tokio::task::yield_now().await; + } + }); + + for _ in 0..N_ITERATIONS { + tx.send(PACKET).await.unwrap(); + + let mut tmp = [0; PACKET_LEN]; + + // ensure that we aren't somehow accumulating other + assert_eq!( + PACKET_LEN, + rx.recv(&mut tmp).await.unwrap(), + "Defect in test case, received unexpected result from socket" + ); + assert_eq!( + PACKET, &tmp, + "Defect in test case, received unexpected result from socket" + ); + } + + assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst)); +}