Skip to content

Commit

Permalink
fix(mpsc): try_recv_ref should return RecvRef (#61)
Browse files Browse the repository at this point in the history
`try_recv_ref` should return a `RecvRef` to notify the sender
threads to wake up.
  • Loading branch information
Name1e5s authored May 11, 2022
1 parent e04661f commit 47f16f5
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 86 deletions.
154 changes: 84 additions & 70 deletions src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,26 @@ struct SendRefInner<'a, T, N: Notify> {
_notify: NotifyRx<'a, N>,
}

struct RecvRefInner<'a, T, N: Notify + Unpin> {
// /!\ LOAD BEARING STRUCT DROP ORDER /!\
//
// The `Ref` field *must* be dropped before the `NotifyTx` field, or else
// loom tests will fail. This ensures that the mutable access to the slot is
// considered to have ended *before* the receiver thread/task is notified.
//
// The alternatives to a load-bearing drop order would be:
// (a) put one field inside an `Option` so it can be dropped before the
// other (not great, as it adds a little extra overhead even outside
// of Loom tests),
// (b) use `core::mem::ManuallyDrop` (also not great, requires additional
// unsafe code that in this case we can avoid)
//
// So, given that, relying on struct field drop order seemed like the least
// bad option here. Just don't reorder these fields. :)
slot: Ref<'a, T>,
_notify: crate::mpsc::NotifyTx<'a, N>,
}

struct NotifyRx<'a, N: Notify>(&'a WaitCell<N>);
struct NotifyTx<'a, N: Notify + Unpin>(&'a WaitQueue<N>);

Expand Down Expand Up @@ -356,8 +376,14 @@ where
}
}

fn try_recv_ref<'a, T>(&'a self, slots: &'a [Slot<T>]) -> Result<Ref<'a, T>, TryRecvError> {
self.core.pop_ref(slots)
fn try_recv_ref<'a, T>(
&'a self,
slots: &'a [Slot<T>],
) -> Result<RecvRefInner<'a, T, N>, TryRecvError> {
self.core.pop_ref(slots).map(|slot| RecvRefInner {
_notify: NotifyTx(&self.tx_wait),
slot,
})
}

fn try_recv<T, R>(&self, slots: &[Slot<T>], recycle: &R) -> Result<T, TryRecvError>
Expand Down Expand Up @@ -484,6 +510,52 @@ impl<T: fmt::Write, N: Notify> fmt::Write for SendRefInner<'_, T, N> {
}
}

// === impl RecvRefInner ===

impl<T, N: Notify + Unpin> core::ops::Deref for RecvRefInner<'_, T, N> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
self.slot.deref()
}
}

impl<T, N: Notify + Unpin> core::ops::DerefMut for RecvRefInner<'_, T, N> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.slot.deref_mut()
}
}

impl<T: fmt::Debug, N: Notify + Unpin> fmt::Debug for RecvRefInner<'_, T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.slot.fmt(f)
}
}

impl<T: fmt::Display, N: Notify + Unpin> fmt::Display for RecvRefInner<'_, T, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.slot.fmt(f)
}
}

impl<T: fmt::Write, N: Notify + Unpin> fmt::Write for RecvRefInner<'_, T, N> {
#[inline]
fn write_str(&mut self, s: &str) -> fmt::Result {
self.slot.write_str(s)
}

#[inline]
fn write_char(&mut self, c: char) -> fmt::Result {
self.slot.write_char(c)
}

#[inline]
fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
self.slot.write_fmt(f)
}
}

impl<N: Notify> Drop for NotifyRx<'_, N> {
#[inline]
fn drop(&mut self) {
Expand All @@ -500,10 +572,10 @@ impl<N: Notify + Unpin> Drop for NotifyTx<'_, N> {
}
}

macro_rules! impl_send_ref {
($(#[$m:meta])* pub struct $name:ident<$notify:ty>;) => {
macro_rules! impl_ref_inner {
($(#[$m:meta])*, $inner:ident, $name:ident, $notify:ty) => {
$(#[$m])*
pub struct $name<'sender, T>(SendRefInner<'sender, T, $notify>);
pub struct $name<'a, T>($inner<'a, T, $notify>);

impl<T> core::ops::Deref for $name<'_, T> {
type Target = T;
Expand Down Expand Up @@ -552,73 +624,15 @@ macro_rules! impl_send_ref {
};
}

macro_rules! impl_recv_ref {
macro_rules! impl_send_ref {
($(#[$m:meta])* pub struct $name:ident<$notify:ty>;) => {
$(#[$m])*
pub struct $name<'recv, T> {
// /!\ LOAD BEARING STRUCT DROP ORDER /!\
//
// The `Ref` field *must* be dropped before the `NotifyTx` field, or else
// loom tests will fail. This ensures that the mutable access to the slot is
// considered to have ended *before* the receiver thread/task is notified.
//
// The alternatives to a load-bearing drop order would be:
// (a) put one field inside an `Option` so it can be dropped before the
// other (not great, as it adds a little extra overhead even outside
// of Loom tests),
// (b) use `core::mem::ManuallyDrop` (also not great, requires additional
// unsafe code that in this case we can avoid)
//
// So, given that, relying on struct field drop order seemed like the least
// bad option here. Just don't reorder these fields. :)
slot: Ref<'recv, T>,
_notify: crate::mpsc::NotifyTx<'recv, $notify>,
}

impl<T> core::ops::Deref for $name<'_, T> {
type Target = T;

#[inline]
fn deref(&self) -> &Self::Target {
self.slot.deref()
}
}

impl<T> core::ops::DerefMut for $name<'_, T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.slot.deref_mut()
}
}

impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.slot.fmt(f)
}
}

impl<T: fmt::Display> fmt::Display for $name<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.slot.fmt(f)
}
}

impl<T: fmt::Write> fmt::Write for $name<'_, T> {
#[inline]
fn write_str(&mut self, s: &str) -> fmt::Result {
self.slot.write_str(s)
}

#[inline]
fn write_char(&mut self, c: char) -> fmt::Result {
self.slot.write_char(c)
}
impl_ref_inner!($(#[$m])*, SendRefInner, $name, $notify);
};
}

#[inline]
fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
self.slot.write_fmt(f)
}
}
macro_rules! impl_recv_ref {
($(#[$m:meta])* pub struct $name:ident<$notify:ty>;) => {
impl_ref_inner!($(#[$m])*, RecvRefInner, $name, $notify);
};
}

Expand Down
17 changes: 9 additions & 8 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::{
loom::atomic::{self, Ordering},
recycling::{self, Recycle},
wait::queue,
Ref,
};
use core::{
fmt,
Expand Down Expand Up @@ -449,11 +448,11 @@ feature! {
/// ```
///
/// [`recv_ref`]: Self::recv_ref
pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError>
pub fn try_recv_ref(&self) -> Result<RecvRef<'_, T>, TryRecvError>
where
R: Recycle<T>,
{
self.inner.core.try_recv_ref(self.inner.slots.as_ref())
self.inner.core.try_recv_ref(self.inner.slots.as_ref()).map(RecvRef)
}

/// Attempts to receive the next message for this receiver by reference
Expand Down Expand Up @@ -1154,11 +1153,11 @@ feature! {
/// ```
///
/// [`recv_ref`]: Self::recv_ref
pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError>
pub fn try_recv_ref(&self) -> Result<RecvRef<'_, T>, TryRecvError>
where
R: Recycle<T>,
{
self.core.try_recv_ref(self.slots.as_ref())
self.core.try_recv_ref(self.slots.as_ref()).map(RecvRef)
}

/// Attempts to receive the next message for this receiver by reference
Expand Down Expand Up @@ -1394,9 +1393,11 @@ fn poll_recv_ref<'a, T>(
) -> Poll<Option<RecvRef<'a, T>>> {
core.poll_recv_ref(slots, || cx.waker().clone())
.map(|some| {
some.map(|slot| RecvRef {
_notify: super::NotifyTx(&core.tx_wait),
slot,
some.map(|slot| {
RecvRef(RecvRefInner {
_notify: super::NotifyTx(&core.tx_wait),
slot,
})
})
})
}
Expand Down
20 changes: 12 additions & 8 deletions src/mpsc/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::{
recycling::{self, Recycle},
util::Backoff,
wait::queue,
Ref,
};
use core::{fmt, pin::Pin};
use errors::*;
Expand Down Expand Up @@ -602,11 +601,11 @@ feature! {
/// ```
///
/// [`recv_ref`]: Self::recv_ref
pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError>
pub fn try_recv_ref(&self) -> Result<RecvRef<'_, T>, TryRecvError>
where
R: Recycle<T>,
{
self.core.try_recv_ref(self.slots.as_ref())
self.core.try_recv_ref(self.slots.as_ref()).map(RecvRef)
}

/// Attempts to receive the next message for this receiver by value
Expand Down Expand Up @@ -1061,8 +1060,11 @@ impl<T, R> Receiver<T, R> {
/// ```
///
/// [`recv_ref`]: Self::recv_ref
pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError> {
self.inner.core.try_recv_ref(self.inner.slots.as_ref())
pub fn try_recv_ref(&self) -> Result<RecvRef<'_, T>, TryRecvError> {
self.inner
.core
.try_recv_ref(self.inner.slots.as_ref())
.map(RecvRef)
}

/// Attempts to receive the next message for this receiver by value
Expand Down Expand Up @@ -1148,9 +1150,11 @@ fn recv_ref<'a, T>(core: &'a ChannelCore<Thread>, slots: &'a [Slot<T>]) -> Optio
loop {
match core.poll_recv_ref(slots, thread::current) {
Poll::Ready(r) => {
return r.map(|slot| RecvRef {
_notify: super::NotifyTx(&core.tx_wait),
slot,
return r.map(|slot| {
RecvRef(RecvRefInner {
_notify: super::NotifyTx(&core.tx_wait),
slot,
})
})
}
Poll::Pending => {
Expand Down

0 comments on commit 47f16f5

Please sign in to comment.