Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky test wake_while_rt_is_dropping #5905

Merged
merged 2 commits into from
Aug 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 12 additions & 33 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,10 +950,6 @@ rt_test! {
#[test]
fn wake_while_rt_is_dropping() {
use tokio::sync::Barrier;
use core::sync::atomic::{AtomicBool, Ordering};

let drop_triggered = Arc::new(AtomicBool::new(false));
let set_drop_triggered = drop_triggered.clone();

struct OnDrop<F: FnMut()>(F);

Expand All @@ -965,56 +961,39 @@ rt_test! {

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();

let barrier = Arc::new(Barrier::new(4));
let barrier = Arc::new(Barrier::new(3));
let barrier1 = barrier.clone();
let barrier2 = barrier.clone();
let barrier3 = barrier.clone();

let rt = rt();

rt.spawn(async move {
let mut tx2 = Some(tx2);
let _d = OnDrop(move || {
let _ = tx2.take().unwrap().send(());
});

// Ensure a waker gets stored in oneshot 1.
let _ = tokio::join!(rx1, barrier1.wait());
tx3.send(()).unwrap();
});

rt.spawn(async move {
let h1 = tokio::runtime::Handle::current();
// When this task is dropped, we'll be "closing remotes".
// We spawn a new task that owns the `tx1`, to move its Drop
// out of here.
//
// Importantly, the oneshot 1 has a waker already stored, so
// the eventual drop here will try to re-schedule again.
let mut opt_tx1 = Some(tx1);
let mut tx1 = Some(tx1);
let _d = OnDrop(move || {
let tx1 = opt_tx1.take().unwrap();
h1.spawn(async move {
tx1.send(()).unwrap();
});
// Just a sanity check that this entire thing actually happened
set_drop_triggered.store(true, Ordering::Relaxed);
let _ = tx1.take().unwrap().send(());
});
let _ = tokio::join!(rx2, barrier2.wait());
});

rt.spawn(async move {
let _ = tokio::join!(rx3, barrier3.wait());
// We'll never get here, but once task 3 drops, this will
// force task 2 to re-schedule since it's waiting on oneshot 2.
tx2.send(()).unwrap();
// Ensure a waker gets stored in oneshot 2.
let _ = tokio::join!(rx2, barrier2.wait());
});

// Wait until every oneshot channel has been polled.
rt.block_on(barrier.wait());

// Drop the rt
// Drop the rt. Regardless of which task is dropped first, its destructor will wake the
// other task.
drop(rt);

// Make sure that the spawn actually happened
assert!(drop_triggered.load(Ordering::Relaxed));
}

#[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
Expand Down
Loading