Skip to content

Commit

Permalink
fix: skip slots with active reading Refs in push_ref (#81)
Browse files Browse the repository at this point in the history
`Core::push_ref` can go into an (almost infinite) spin loop waiting for
a `Ref` (created in `pop_ref`) to this slot to be dropped. This
behaviour can lead to writing being blocked unless all refs are dropped,
even if we have free space in the buffer. 

In this PR I've added a mechanism to skip such slots (by updating their
`state`) and attempt to write into them on the next lap. 

Fixes #83
Closes #80
  • Loading branch information
tukan authored Apr 6, 2024
1 parent b922903 commit a72a286
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 62 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ jobs:
- mpsc_send_recv_wrap
- mpsc_try_send_recv
- mpsc_try_recv_ref
- mpsc_test_skip_slot
- mpsc_async::rx_close_unconsumed
- mpsc_blocking::rx_close_unconsumed
name: model '${{ matrix.model }}''
Expand Down
177 changes: 130 additions & 47 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
use core::{cmp, fmt, mem::MaybeUninit, ops, ptr};

#[macro_use]
mod macros;

Expand Down Expand Up @@ -43,6 +44,13 @@ use crate::{
util::{Backoff, CachePadded},
};

const HAS_READER: usize = 1 << (usize::BITS - 1);

/// Maximum capacity of a `ThingBuf`. This is the largest number of elements that
/// can be stored in a `ThingBuf`. This is the highest power of two that can be expressed by a
/// `usize`, excluding the most significant bit reserved for the "has reader" flag.
pub const MAX_CAPACITY: usize = usize::MAX & !HAS_READER;

/// A reference to an entry in a [`ThingBuf`].
///
/// A `Ref` represents the exclusive permission to mutate a given element in a
Expand All @@ -62,6 +70,7 @@ pub struct Ref<'slot, T> {
ptr: MutPtr<MaybeUninit<T>>,
slot: &'slot Slot<T>,
new_state: usize,
is_pop: bool,
}

/// Error indicating that a `push` operation failed because a queue was at
Expand Down Expand Up @@ -100,12 +109,19 @@ struct Core {

struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
/// Each slot's state has two components: a flag indicated by the most significant bit (MSB), and the rest of the state.
/// The MSB is set when a reader is reading from this slot.
/// The rest of the state helps determine the availability of the slot for reading or writing:
/// - A slot is available for reading when the state (excluding the MSB) equals head + 1.
/// - A slot is available for writing when the state (excluding the MSB) equals tail.
/// At initialization, each slot's state is set to its ordinal index.
state: AtomicUsize,
}

impl Core {
#[cfg(not(all(loom, test)))]
const fn new(capacity: usize) -> Self {
assert!(capacity <= MAX_CAPACITY);
let closed = (capacity + 1).next_power_of_two();
let idx_mask = closed - 1;
let gen = closed << 1;
Expand Down Expand Up @@ -156,7 +172,7 @@ impl Core {
} else {
// We've reached the end of the current lap, wrap the index around
// to 0.
gen.wrapping_add(self.gen)
wrapping_add(gen, self.gen)
}
}

Expand Down Expand Up @@ -184,8 +200,7 @@ impl Core {
{
test_println!("push_ref");
let mut backoff = Backoff::new();
let mut tail = self.tail.load(Relaxed);

let mut tail = test_dbg!(self.tail.load(Relaxed));
loop {
if test_dbg!(tail & self.closed != 0) {
return Err(TrySendError::Closed(()));
Expand All @@ -210,21 +225,43 @@ impl Core {
);
slots.get_unchecked(idx)
};
let state = test_dbg!(slot.state.load(Acquire));

let raw_state = test_dbg!(slot.state.load(SeqCst));
let state = test_dbg!(clear_has_reader(raw_state));
// slot is writable
if test_dbg!(state == tail) {
// Move the tail index forward by 1.
let next_tail = self.next(idx, gen);
// try to advance the tail
match test_dbg!(self
.tail
.compare_exchange_weak(tail, next_tail, SeqCst, Acquire))
{
Ok(_) if test_dbg!(check_has_reader(raw_state)) => {
test_println!(
"advanced tail {} to {}; has an active reader, skipping slot [{}]",
tail,
next_tail,
idx
);
let next_state = wrapping_add(tail, self.gen);
test_dbg!(slot
.state
.fetch_update(SeqCst, SeqCst, |state| {
Some(state & HAS_READER | next_state)
})
.unwrap_or_else(|_| unreachable!()));
backoff.spin();
continue;
}
Ok(_) => {
// We got the slot! It's now okay to write to it
test_println!("claimed tail slot [{}]", idx);
test_println!(
"advanced tail {} to {}; claimed slot [{}]",
tail,
next_tail,
idx
);
// Claim exclusive ownership over the slot
let ptr = slot.value.get_mut();

// Initialize or recycle the element.
unsafe {
// Safety: we have just claimed exclusive ownership over
Expand All @@ -240,42 +277,39 @@ impl Core {
test_println!("-> recycled");
}
}

return Ok(Ref {
ptr,
new_state: tail + 1,
slot,
is_pop: false,
});
}
Err(actual) => {
// Someone else took this slot and advanced the tail
// index. Try to claim the new tail.
test_println!("failed to advance tail {} to {}", tail, next_tail);
tail = actual;
backoff.spin();
continue;
}
}
}

if test_dbg!(state.wrapping_add(self.gen) == tail + 1) {
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
// this correctly, while it does not understand an explicit
// SeqCst fence and a load.
// XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
// load it gets reordered differently in the model checker lmao...
let head = test_dbg!(self.head.fetch_or(0, SeqCst));
if test_dbg!(head.wrapping_add(self.gen) == tail) {
test_println!("channel full");
return Err(TrySendError::Full(()));
}

backoff.spin();
} else {
backoff.spin_yield();
// check if we have any available slots
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
// this correctly, while it does not understand an explicit
// SeqCst fence and a load.
// XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
// load it gets reordered differently in the model checker lmao...
let head = test_dbg!(self.head.fetch_or(0, SeqCst));
if test_dbg!(wrapping_add(head, self.gen) == tail) {
test_println!("channel full");
return Err(TrySendError::Full(()));
}

backoff.spin_yield();
tail = test_dbg!(self.tail.load(Acquire));
}
}
Expand Down Expand Up @@ -308,43 +342,47 @@ impl Core {
);
slots.get_unchecked(idx)
};
let state = test_dbg!(slot.state.load(Acquire));

// If the slot's state is ahead of the head index by one, we can pop
// it.
if test_dbg!(state == head + 1) {
let next_head = self.next(idx, gen);
let raw_state = test_dbg!(slot.state.load(Acquire));
let next_head = self.next(idx, gen);

// If the slot's state is ahead of the head index by one, we can pop it.
if test_dbg!(raw_state == head + 1) {
// try to advance the head index
match test_dbg!(self
.head
.compare_exchange_weak(head, next_head, SeqCst, Acquire))
{
Ok(_) => {
test_println!("claimed head slot [{}]", idx);
test_println!("advanced head {} to {}", head, next_head);
test_println!("claimed slot [{}]", idx);
let mut new_state = wrapping_add(head, self.gen);
new_state = set_has_reader(new_state);
test_dbg!(slot.state.store(test_dbg!(new_state), SeqCst));
return Ok(Ref {
new_state: head.wrapping_add(self.gen),
new_state,
ptr: slot.value.get_mut(),
slot,
is_pop: true,
});
}
Err(actual) => {
test_println!("failed to advance head, head={}, actual={}", head, actual);
head = actual;
backoff.spin();
continue;
}
}
}

if test_dbg!(state == head) {
} else {
// Maybe we reached the tail index? If so, the buffer is empty.
// fake RMW op to placate loom. this should be equivalent to
// doing a relaxed load after a SeqCst fence (per Godbolt
// https://godbolt.org/z/zb15qfEa9), however, loom understands
// this correctly, while it does not understand an explicit
// SeqCst fence and a load.
// XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
// load it gets reordered differently in the model checker lmao...

let tail = test_dbg!(self.tail.fetch_or(0, SeqCst));

if test_dbg!(tail & !self.closed == head) {
return if test_dbg!(tail & self.closed != 0) {
Err(TryRecvError::Closed)
Expand All @@ -354,16 +392,32 @@ impl Core {
};
}

if test_dbg!(backoff.done_spinning()) {
return Err(TryRecvError::Empty);
// Is anyone writing to the slot from this generation?
if test_dbg!(raw_state == head) {
if test_dbg!(backoff.done_spinning()) {
return Err(TryRecvError::Empty);
}
backoff.spin();
continue;
}

backoff.spin();
} else {
backoff.spin_yield();
// The slot is in an invalid state (was skipped). Try to advance the head index.
match test_dbg!(self.head.compare_exchange(head, next_head, SeqCst, Acquire)) {
Ok(_) => {
test_println!("skipped head slot [{}], new head={}", idx, next_head);
}
Err(actual) => {
test_println!(
"failed to skip head slot [{}], head={}, actual={}",
idx,
head,
actual
);
head = actual;
backoff.spin();
}
}
}

head = test_dbg!(self.head.load(Acquire));
}
}

Expand Down Expand Up @@ -409,6 +463,26 @@ impl Core {
}
}

#[inline]
fn check_has_reader(state: usize) -> bool {
state & HAS_READER == HAS_READER
}

#[inline]
fn set_has_reader(state: usize) -> usize {
state | HAS_READER
}

#[inline]
fn clear_has_reader(state: usize) -> usize {
state & !HAS_READER
}

#[inline]
fn wrapping_add(a: usize, b: usize) -> usize {
(a + b) & MAX_CAPACITY
}

impl Drop for Core {
fn drop(&mut self) {
debug_assert!(
Expand Down Expand Up @@ -475,8 +549,17 @@ impl<T> ops::DerefMut for Ref<'_, T> {
impl<T> Drop for Ref<'_, T> {
#[inline]
fn drop(&mut self) {
test_println!("drop Ref<{}>", core::any::type_name::<T>());
test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
if self.is_pop {
test_println!("drop Ref<{}> (pop)", core::any::type_name::<T>());
test_dbg!(self.slot.state.fetch_and(!HAS_READER, SeqCst));
} else {
test_println!(
"drop Ref<{}> (push), new_state = {}",
core::any::type_name::<T>(),
self.new_state
);
test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
}
}
}

Expand Down
9 changes: 8 additions & 1 deletion src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use errors::*;
feature! {
#![feature = "alloc"]

use crate::loom::sync::Arc;
use crate::{MAX_CAPACITY, loom::sync::Arc};
use alloc::boxed::Box;

/// Returns a new asynchronous multi-producer, single consumer (MPSC)
Expand All @@ -32,10 +32,17 @@ feature! {
/// Returns a new asynchronous multi-producer, single consumer (MPSC)
/// channel with the provided capacity and [recycling policy].
///
/// # Panics
///
/// Panics if the capacity exceeds `usize::MAX & !(1 << (usize::BITS - 1))`. This value
/// represents the highest power of two that can be expressed by a `usize`, excluding the most
/// significant bit.
///
/// [recycling policy]: crate::recycling::Recycle
#[must_use]
pub fn with_recycle<T, R: Recycle<T>>(capacity: usize, recycle: R) -> (Sender<T, R>, Receiver<T, R>) {
assert!(capacity > 0);
assert!(capacity <= MAX_CAPACITY);
let inner = Arc::new(Inner {
core: ChannelCore::new(capacity),
slots: Slot::make_boxed_array(capacity),
Expand Down
21 changes: 11 additions & 10 deletions src/mpsc/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@
//! [`Receiver`] types in this module wait by blocking the current thread,
//! rather than asynchronously yielding.
use super::*;
use crate::{
loom::{
atomic::{self, Ordering},
sync::Arc,
thread::{self, Thread},
},
recycling::{self, Recycle},
util::Backoff,
wait::queue,
};
use crate::{loom::{
atomic::{self, Ordering},
sync::Arc,
thread::{self, Thread},
}, MAX_CAPACITY, recycling::{self, Recycle}, util::Backoff, wait::queue};
use core::{fmt, pin::Pin};
use errors::*;
use std::time::{Duration, Instant};
Expand All @@ -32,13 +27,19 @@ pub fn channel<T: Default + Clone>(capacity: usize) -> (Sender<T>, Receiver<T>)
/// Returns a new synchronous multi-producer, single consumer channel with
/// the provided [recycling policy].
///
/// # Panics
///
/// Panics if the capacity exceeds `usize::MAX & !(1 << (usize::BITS - 1))`. This value represents
/// the highest power of two that can be expressed by a `usize`, excluding the most significant bit.
///
/// [recycling policy]: crate::recycling::Recycle
#[must_use]
pub fn with_recycle<T, R: Recycle<T>>(
capacity: usize,
recycle: R,
) -> (Sender<T, R>, Receiver<T, R>) {
assert!(capacity > 0);
assert!(capacity <= MAX_CAPACITY);
let inner = Arc::new(Inner {
core: ChannelCore::new(capacity),
slots: Slot::make_boxed_array(capacity),
Expand Down
Loading

0 comments on commit a72a286

Please sign in to comment.