Skip to content

Commit

Permalink
fix(mpsc): ensure un-received messages are dropped (#29)
Browse files Browse the repository at this point in the history
This also adds loom leak checking tests. 

I also made `WaitQueue::close` into an RMW op to work around `loom`
not modeling `SeqCst` properly.
Signed-off-by: Eliza Weisman <[email protected]>

* fix(mpsc): ensure un-received messages are dropped

Signed-off-by: Eliza Weisman <[email protected]>

* fix(mpsc): make `WaitQueue::close` an RMW

I *think* this only fails loom because it doesn't fully model SeqCst,
correctly...but making this a swap rather than a store ensures it's an
RMW op, which appears to fix the loom test where the close was missed by
a sender...

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Jan 15, 2022
1 parent 07489d5 commit c444e50
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 47 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,42 @@ jobs:
command: test
args: --profile loom --lib -- mpsc_try_send_recv

async_rx_close_unconsumed:
name: "mpsc::rx_close_unconsumed"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
components: rustfmt
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
args: --profile loom --lib -- mpsc::rx_close_unconsumed

sync_rx_close_unconsumed:
name: "sync::rx_close_unconsumed"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
components: rustfmt
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
args: --profile loom --lib -- mpsc_sync::rx_close_unconsumed

loom_mpsc_async:
name: "mpsc"
runs-on: ubuntu-latest
Expand Down
42 changes: 41 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![cfg_attr(docsrs, doc = include_str!("../README.md"))]
#![cfg_attr(not(feature = "std"), no_std)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use core::{cmp, fmt, mem::MaybeUninit, ops};
use core::{cmp, fmt, mem::MaybeUninit, ops, ptr};

#[macro_use]
mod macros;
Expand Down Expand Up @@ -72,6 +72,8 @@ struct Core {
idx_mask: usize,
closed: usize,
capacity: usize,
/// Set when dropping the slots in the ring buffer, to avoid potential double-frees.
has_dropped_slots: bool,
}

struct Slot<T> {
Expand All @@ -94,6 +96,8 @@ impl Core {
closed,
idx_mask,
capacity,

has_dropped_slots: false,
}
}

Expand All @@ -111,6 +115,9 @@ impl Core {
gen_mask,
idx_mask,
capacity,

#[cfg(debug_assertions)]
has_dropped_slots: false,
}
}

Expand Down Expand Up @@ -321,6 +328,39 @@ impl Core {
}
}
}

fn drop_slots<T>(&mut self, slots: &mut [Slot<T>]) {
debug_assert!(
!self.has_dropped_slots,
"tried to drop slots twice! core={:#?}",
self
);
if self.has_dropped_slots {
return;
}

let tail = self.tail.load(SeqCst);
let (idx, gen) = self.idx_gen(tail);
let num_initialized = if gen > 0 { self.capacity() } else { idx };
for slot in &mut slots[..num_initialized] {
unsafe {
slot.value
.with_mut(|value| ptr::drop_in_place((*value).as_mut_ptr()));
}
}

self.has_dropped_slots = true;
}
}

impl Drop for Core {
fn drop(&mut self) {
debug_assert!(
self.has_dropped_slots,
"tried to drop Core without dropping slots! core={:#?}",
self
);
}
}

// === impl Ref ===
Expand Down
23 changes: 16 additions & 7 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,16 +595,25 @@ impl<T> PinnedDrop for SendRefFuture<'_, T> {
}
}

#[cfg(feature = "alloc")]
impl<T> fmt::Debug for Inner<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Inner")
.field("core", &self.core)
.field("slots", &format_args!("Box<[..]>"))
.finish()
feature! {
#![feature = "alloc"]
impl<T> fmt::Debug for Inner<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Inner")
.field("core", &self.core)
.field("slots", &format_args!("Box<[..]>"))
.finish()
}
}

impl<T> Drop for Inner<T> {
fn drop(&mut self) {
self.core.core.drop_slots(&mut self.slots[..])
}
}
}

#[cfg(feature = "alloc")]
#[cfg(test)]
mod tests {
use super::*;
Expand Down
21 changes: 16 additions & 5 deletions src/mpsc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
sync::Arc,
thread::{self, Thread},
},
util::Backoff,
wait::queue,
Ref,
};
Expand Down Expand Up @@ -385,6 +386,12 @@ impl<T> fmt::Debug for Inner<T> {
}
}

impl<T> Drop for Inner<T> {
fn drop(&mut self) {
self.core.core.drop_slots(&mut self.slots[..])
}
}

#[inline]
fn recv_ref<'a, T: Default>(
core: &'a ChannelCore<Thread>,
Expand Down Expand Up @@ -422,6 +429,7 @@ fn send_ref<'a, T: Default>(
let mut waiter = queue::Waiter::new();
let mut unqueued = true;
let thread = thread::current();
let mut boff = Backoff::new();
loop {
let node = unsafe {
// Safety: in this case, it's totally safe to pin the waiter, as
Expand All @@ -438,11 +446,14 @@ fn send_ref<'a, T: Default>(

match wait {
WaitResult::Closed => return Err(Closed(())),
WaitResult::Notified => match core.try_send_ref(slots.as_ref()) {
Ok(slot) => return Ok(SendRef(slot)),
Err(TrySendError::Closed(_)) => return Err(Closed(())),
_ => {}
},
WaitResult::Notified => {
boff.spin_yield();
match core.try_send_ref(slots.as_ref()) {
Ok(slot) => return Ok(SendRef(slot)),
Err(TrySendError::Closed(_)) => return Err(Closed(())),
_ => {}
}
}
WaitResult::Wait => {
unqueued = false;
thread::park();
Expand Down
81 changes: 76 additions & 5 deletions src/mpsc/tests/mpsc_async.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use crate::{
loom::{self, future, thread},
loom::{self, alloc::Track, future, thread},
ThingBuf,
};

Expand All @@ -13,18 +13,18 @@ fn mpsc_try_send_recv() {
let p1 = {
let tx = tx.clone();
thread::spawn(move || {
*tx.try_send_ref().unwrap() = 1;
*tx.try_send_ref().unwrap() = Track::new(1);
})
};
let p2 = thread::spawn(move || {
*tx.try_send_ref().unwrap() = 2;
*tx.try_send_ref().unwrap() = 3;
*tx.try_send_ref().unwrap() = Track::new(2);
*tx.try_send_ref().unwrap() = Track::new(3);
});

let mut vals = future::block_on(async move {
let mut vals = Vec::new();
while let Some(val) = rx.recv_ref().await {
vals.push(*val);
vals.push(*val.get_ref());
}
vals
});
Expand Down Expand Up @@ -74,6 +74,77 @@ fn rx_closes() {
})
}

#[test]
fn rx_close_unconsumed_spsc() {
// Tests that messages that have not been consumed by the receiver are
// dropped when dropping the channel.
const MESSAGES: usize = 4;

loom::model(|| {
let (tx, rx) = channel(MESSAGES);

let consumer = thread::spawn(move || {
future::block_on(async move {
// recieve one message
let msg = rx.recv().await;
test_println!("recv {:?}", msg);
assert!(msg.is_some());
// drop the receiver...
})
});

future::block_on(async move {
let mut i = 1;
while let Ok(mut slot) = tx.send_ref().await {
test_println!("producer sending {}...", i);
*slot = Track::new(i);
i += 1;
}
});

consumer.join().unwrap();
})
}

#[test]
#[ignore] // This is marked as `ignore` because it takes over an hour to run.
fn rx_close_unconsumed_mpsc() {
const MESSAGES: usize = 2;

async fn do_producer(tx: Sender<Track<i32>>, n: usize) {
let mut i = 1;
while let Ok(mut slot) = tx.send_ref().await {
test_println!("producer {} sending {}...", n, i);
*slot = Track::new(i);
i += 1;
}
}

loom::model(|| {
let (tx, rx) = channel(MESSAGES);

let consumer = thread::spawn(move || {
future::block_on(async move {
// recieve one message
let msg = rx.recv().await;
test_println!("recv {:?}", msg);
assert!(msg.is_some());
// drop the receiver...
})
});

let producer = {
let tx = tx.clone();
thread::spawn(move || future::block_on(do_producer(tx, 1)))
};

future::block_on(do_producer(tx, 2));

producer.join().unwrap();
consumer.join().unwrap();
})
}

#[test]
fn spsc_recv_then_send() {
loom::model(|| {
Expand Down
67 changes: 66 additions & 1 deletion src/mpsc/tests/mpsc_sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use crate::{
loom::{self, thread},
loom::{self, alloc::Track, thread},
ThingBuf,
};

Expand Down Expand Up @@ -78,6 +78,71 @@ fn rx_closes() {
})
}

#[test]
fn rx_close_unconsumed_spsc() {
// Tests that messages that have not been consumed by the receiver are
// dropped when dropping the channel.
const MESSAGES: usize = 4;

loom::model(|| {
let (tx, rx) = sync::channel(MESSAGES);

let consumer = thread::spawn(move || {
// recieve one message
let msg = rx.recv();
test_println!("recv {:?}", msg);
assert!(msg.is_some());
// drop the receiver...
});

let mut i = 1;
while let Ok(mut slot) = tx.send_ref() {
test_println!("producer sending {}...", i);
*slot = Track::new(i);
i += 1;
}

consumer.join().unwrap();
drop(tx);
})
}

#[test]
#[ignore] // This is marked as `ignore` because it takes over an hour to run.
fn rx_close_unconsumed_mpsc() {
const MESSAGES: usize = 2;

fn do_producer(tx: sync::Sender<Track<i32>>, n: usize) -> impl FnOnce() + Send + Sync {
move || {
let mut i = 1;
while let Ok(mut slot) = tx.send_ref() {
test_println!("producer {} sending {}...", n, i);
*slot = Track::new(i);
i += 1;
}
}
}

loom::model(|| {
let (tx, rx) = sync::channel(MESSAGES);

let consumer = thread::spawn(move || {
// recieve one message
let msg = rx.recv();
test_println!("recv {:?}", msg);
assert!(msg.is_some());
// drop the receiver...
});

let producer = thread::spawn(do_producer(tx.clone(), 1));

do_producer(tx, 2)();

producer.join().unwrap();
consumer.join().unwrap();
})
}

#[test]
fn spsc_recv_then_try_send() {
loom::model(|| {
Expand Down
Loading

0 comments on commit c444e50

Please sign in to comment.