From 5e749ccc91140063d9287dccd36312f110c42836 Mon Sep 17 00:00:00 2001 From: Eliza Weisman <eliza@buoyant.io> Date: Thu, 2 Dec 2021 09:54:16 -0800 Subject: [PATCH] feat(mpsc): make errors more like other mpscs (#5) Errors for `TrySend` now return values. Signed-off-by: Eliza Weisman <eliza@buoyant.io> --- src/lib.rs | 19 +++++++++++++++---- src/mpsc.rs | 16 +++++++++++----- src/mpsc/async_impl.rs | 19 +++++++++++-------- src/mpsc/sync.rs | 21 ++++++++++++--------- src/mpsc/tests/mpsc_async.rs | 2 +- src/mpsc/tests/mpsc_sync.rs | 2 +- src/static_thingbuf.rs | 6 +++--- src/stringbuf.rs | 4 ++-- src/thingbuf.rs | 6 +++--- 9 files changed, 59 insertions(+), 36 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a4ee750..2b3a876 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,8 +37,7 @@ pub struct Ref<'slot, T> { new_state: usize, } -#[derive(Debug)] -pub struct AtCapacity(pub(crate) usize); +pub struct Full<T = ()>(T); #[derive(Debug)] struct Core { @@ -110,7 +109,7 @@ impl Core { self.capacity } - fn push_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, AtCapacity> + fn push_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, Full<()>> where T: Default, S: Index<usize, Output = Slot<T>> + ?Sized, @@ -164,7 +163,7 @@ impl Core { if state.wrapping_add(self.gen) == tail + 1 { if self.head.load(Ordering::SeqCst).wrapping_add(self.gen) == tail { - return Err(AtCapacity(self.capacity())); + return Err(Full(())); } backoff.spin(); @@ -341,3 +340,15 @@ impl<T> Slot<T> { } unsafe impl<T: Sync> Sync for Slot<T> {} + +impl<T> fmt::Debug for Full<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Full(..)") + } +} + +impl<T> fmt::Display for Full<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("channel full") + } +} diff --git a/src/mpsc.rs b/src/mpsc.rs index 2a423cd..936027b 100644 --- a/src/mpsc.rs +++ b/src/mpsc.rs @@ -19,13 +19,19 @@ feature! { #[derive(Debug)] #[non_exhaustive] -pub enum TrySendError { - AtCapacity(crate::AtCapacity), - Closed(Closed), +pub enum TrySendError<T = ()> { + Full(T), + Closed(T), } -#[derive(Debug)] -pub struct Closed(pub(crate) ()); +impl TrySendError { + fn with_value<T>(self, value: T) -> TrySendError<T> { + match self { + Self::Full(()) => TrySendError::Full(value), + Self::Closed(()) => TrySendError::Closed(value), + } + } +} #[cfg(test)] mod tests; diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index f217e4b..e0ea329 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -82,21 +82,24 @@ impl<T: Default> Sender<T> { inner: &*self.inner, slot, }) - .map_err(|e| { + .map_err(|_| { if self.inner.rx_wait.is_rx_closed() { - TrySendError::Closed(Closed(())) + TrySendError::Closed(()) } else { self.inner.rx_wait.notify(); - TrySendError::AtCapacity(e) + TrySendError::Full(()) } }) } - pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.try_send_ref()?.with_mut(|slot| { - *slot = val; - }); - Ok(()) + pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> { + match self.try_send_ref() { + Ok(mut slot) => { + slot.with_mut(|slot| *slot = val); + Ok(()) + } + Err(e) => Err(e.with_value(val)), + } } } diff --git a/src/mpsc/sync.rs b/src/mpsc/sync.rs index 24cc2f7..81777b3 100644 --- a/src/mpsc/sync.rs +++ b/src/mpsc/sync.rs @@ -3,7 +3,7 @@ //! This provides an equivalent API to the [`mpsc`](crate::mpsc) module, but the //! [`Receiver`] type in this module waits by blocking the current thread, //! rather than asynchronously yielding. -use super::{Closed, TrySendError}; +use super::TrySendError; use crate::{ loom::{ self, @@ -63,21 +63,24 @@ impl<T: Default> Sender<T> { inner: &*self.inner, slot, }) - .map_err(|e| { + .map_err(|_| { if self.inner.rx_wait.is_rx_closed() { - TrySendError::Closed(Closed(())) + TrySendError::Closed(()) } else { self.inner.rx_wait.notify(); - TrySendError::AtCapacity(e) + TrySendError::Full(()) } }) } - pub fn try_send(&self, val: T) -> Result<(), TrySendError> { - self.try_send_ref()?.with_mut(|slot| { - *slot = val; - }); - Ok(()) + pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> { + match self.try_send_ref() { + Ok(mut slot) => { + slot.with_mut(|slot| *slot = val); + Ok(()) + } + Err(e) => Err(e.with_value(val)), + } } } diff --git a/src/mpsc/tests/mpsc_async.rs b/src/mpsc/tests/mpsc_async.rs index 99338c9..f870836 100644 --- a/src/mpsc/tests/mpsc_async.rs +++ b/src/mpsc/tests/mpsc_async.rs @@ -49,7 +49,7 @@ fn rx_closes() { 'send: loop { match tx.try_send(i) { Ok(_) => break 'send, - Err(TrySendError::AtCapacity(_)) => thread::yield_now(), + Err(TrySendError::Full(_)) => thread::yield_now(), Err(TrySendError::Closed(_)) => break 'iters, } } diff --git a/src/mpsc/tests/mpsc_sync.rs b/src/mpsc/tests/mpsc_sync.rs index 9610688..80b1433 100644 --- a/src/mpsc/tests/mpsc_sync.rs +++ b/src/mpsc/tests/mpsc_sync.rs @@ -46,7 +46,7 @@ fn rx_closes() { 'send: loop { match tx.try_send(i) { Ok(_) => break 'send, - Err(TrySendError::AtCapacity(_)) => thread::yield_now(), + Err(TrySendError::Full(_)) => thread::yield_now(), Err(TrySendError::Closed(_)) => break 'iters, } } diff --git a/src/static_thingbuf.rs b/src/static_thingbuf.rs index 015106c..43cfad1 100644 --- a/src/static_thingbuf.rs +++ b/src/static_thingbuf.rs @@ -1,5 +1,5 @@ use crate::loom::atomic::Ordering; -use crate::{AtCapacity, Core, Ref, Slot}; +use crate::{Core, Full, Ref, Slot}; use core::{fmt, ptr}; pub struct StaticThingBuf<T, const CAP: usize> { @@ -39,12 +39,12 @@ impl<T, const CAP: usize> StaticThingBuf<T, CAP> { } impl<T: Default, const CAP: usize> StaticThingBuf<T, CAP> { - pub fn push_ref(&self) -> Result<Ref<'_, T>, AtCapacity> { + pub fn push_ref(&self) -> Result<Ref<'_, T>, Full> { self.core.push_ref(&self.slots) } #[inline] - pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, AtCapacity> { + pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, Full> { self.push_ref().map(|mut r| r.with_mut(f)) } diff --git a/src/stringbuf.rs b/src/stringbuf.rs index 2b1c160..af7cc7f 100644 --- a/src/stringbuf.rs +++ b/src/stringbuf.rs @@ -29,7 +29,7 @@ impl StringBuf { } #[inline] - pub fn write(&self) -> Result<Ref<'_, String>, AtCapacity> { + pub fn write(&self) -> Result<Ref<'_, String>, Full> { let mut string = self.inner.push_ref()?; string.with_mut(String::clear); Ok(string) @@ -74,7 +74,7 @@ impl<const CAP: usize> StaticStringBuf<CAP> { } #[inline] - pub fn write(&self) -> Result<Ref<'_, String>, AtCapacity> { + pub fn write(&self) -> Result<Ref<'_, String>, Full> { let mut string = self.inner.push_ref()?; string.with_mut(String::clear); Ok(string) diff --git a/src/thingbuf.rs b/src/thingbuf.rs index 7c1e1a0..6be593c 100644 --- a/src/thingbuf.rs +++ b/src/thingbuf.rs @@ -1,5 +1,5 @@ use crate::loom::atomic::Ordering; -use crate::{AtCapacity, Core, Ref, Slot}; +use crate::{Core, Full, Ref, Slot}; use alloc::boxed::Box; use core::{fmt, ptr}; @@ -23,12 +23,12 @@ impl<T: Default> ThingBuf<T> { } } - pub fn push_ref(&self) -> Result<Ref<'_, T>, AtCapacity> { + pub fn push_ref(&self) -> Result<Ref<'_, T>, Full> { self.core.push_ref(&*self.slots) } #[inline] - pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, AtCapacity> { + pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, Full> { self.push_ref().map(|mut r| r.with_mut(f)) }