Skip to content

Commit

Permalink
Add support for send/recv timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
sbarral committed May 15, 2024
1 parent e274e9f commit 2f76dac
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
matrix:
rust:
- stable
- 1.56.0
- 1.77.0
steps:
- name: Checkout sources
uses: actions/checkout@v3
Expand Down
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name = "tachyonix"
version = "0.2.1"
authors = ["Serge Barral <[email protected]>"]
edition = "2021"
rust-version = "1.56"
rust-version = "1.77"
license = "MIT OR Apache-2.0"
repository = "https://github.com/asynchronics/tachyonix"
readme = "README.md"
Expand All @@ -20,10 +20,11 @@ keywords = ["async", "channel", "futures", "mpsc"]
autotests = false

[dependencies]
async-event = "0.1"
async-event = "0.2"
crossbeam-utils = "0.8"
diatomic-waker = "0.1"
futures-core = "0.3"
pin-project-lite = "0.2"

[target.'cfg(tachyonix_loom)'.dependencies]
loom = "0.5"
Expand All @@ -32,6 +33,7 @@ loom = "0.5"
futures-executor = { version = "0.3", default-features = false, features = ["thread-pool"] }
futures-task = { version = "0.3", default-features = false, features = ["std"] }
futures-util = { version = "0.3", default-features = false, features = ["std", "async-await"] }
futures-time = "3.0"

[target.'cfg(tachyonix_loom)'.dev-dependencies]
waker-fn = "1.1"
Expand Down
198 changes: 176 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
//! # std::thread::sleep(std::time::Duration::from_millis(100)); // MIRI bug workaround
//! ```
//!
// Temporary workaround until the `async_event_loom` flag can be whitelisted
// without a `build.rs` [1].
//
// [1]: (https://github.com/rust-lang/rust/issues/124800).
#![allow(unexpected_cfgs)]
#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]

mod loom_exports;
Expand All @@ -54,6 +60,7 @@ use std::task::Poll;
use async_event::Event;
use diatomic_waker::primitives::DiatomicWaker;
use futures_core::Stream;
use pin_project_lite::pin_project;

use crate::queue::{PopError, PushError, Queue};

Expand Down Expand Up @@ -138,6 +145,58 @@ impl<T> Sender<T> {
}
}

/// Sends a message asynchronously, if necessary waiting until enough
/// capacity becomes available or until the deadline elapses.
///
/// The deadline is specified as a `Future` that is expected to resolves to
/// `()` after some duration, such as a `tokio::time::Sleep` future.
pub async fn send_timeout<'a, D>(
&'a self,
message: T,
deadline: D,
) -> Result<(), SendTimeoutError<T>>
where
D: Future<Output = ()> + 'a,
{
let mut message = Some(message);

let res = self
.inner
.sender_signal
.wait_until_or_timeout(
|| {
match self.inner.queue.push(message.take().unwrap()) {
Ok(()) => Some(()),
Err(PushError::Full(m)) => {
// Recycle the message.
message = Some(m);

None
}
Err(PushError::Closed(m)) => {
// Keep the message so it can be returned in the error
// field.
message = Some(m);

Some(())
}
}
},
deadline,
)
.await;

match (message, res) {
(Some(m), Some(())) => Err(SendTimeoutError::Closed(m)),
(Some(m), None) => Err(SendTimeoutError::Timeout(m)),
_ => {
self.inner.receiver_signal.notify();

Ok(())
}
}
}

/// Closes the queue.
///
/// This prevents any further messages from being sent on the channel.
Expand Down Expand Up @@ -243,6 +302,24 @@ impl<T> Receiver<T> {
RecvFuture { receiver: self }.await
}

/// Receives a message asynchronously, if necessary waiting until one
/// becomes available or until the deadline elapses.
///
/// The deadline is specified as a `Future` that is expected to resolves to
/// `()` after some duration, such as a `tokio::time::Sleep` future.
pub async fn recv_timeout<D>(&mut self, deadline: D) -> Result<T, RecvTimeoutError>
where
D: Future<Output = ()>,
{
// We could of course return the future directly from a plain method,
// but the `async` signature makes the intent more explicit.
RecvTimeoutFuture {
receiver: self,
deadline,
}
.await
}

/// Closes the queue.
///
/// This prevents any further messages from being sent on the channel.
Expand All @@ -252,8 +329,9 @@ impl<T> Receiver<T> {
///
/// For this reason, no counterpart to [`Sender::is_closed`] is exposed by
/// the receiver as such method could easily be misused and lead to lost
/// messages. Instead, messages should be received until a [`RecvError`] or
/// [`TryRecvError::Closed`] error is returned.
/// messages. Instead, messages should be received until a [`RecvError`],
/// [`RecvTimeoutError::Closed`] or [`TryRecvError::Closed`] error is
/// returned.
pub fn close(&self) {
if !self.inner.queue.is_closed() {
self.inner.queue.close();
Expand Down Expand Up @@ -348,6 +426,40 @@ impl<'a, T> Future for RecvFuture<'a, T> {
}
}

pin_project! {
/// The future returned by the `Receiver::recv_timeout` method.
///
/// This is just a thin wrapper over the `Stream::poll_next` implementation
/// which abandons if the deadline elapses.
struct RecvTimeoutFuture<'a, T, D> where D: Future<Output=()> {
receiver: &'a mut Receiver<T>,
#[pin]
deadline: D,
}
}

impl<'a, T, D> Future for RecvTimeoutFuture<'a, T, D>
where
D: Future<Output = ()>,
{
type Output = Result<T, RecvTimeoutError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let receiver = this.receiver;
let deadline = this.deadline;

match Pin::new(receiver).poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Ok(v)),
Poll::Ready(None) => Poll::Ready(Err(RecvTimeoutError::Closed)),
Poll::Pending => match deadline.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => Poll::Ready(Err(RecvTimeoutError::Timeout)),
},
}
}
}

/// Creates a new channel, returning the sending and receiving sides.
///
/// # Panic
Expand Down Expand Up @@ -380,8 +492,48 @@ impl<T: fmt::Debug> error::Error for TrySendError<T> {}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TrySendError::Full(_) => "Full(..)".fmt(f),
TrySendError::Closed(_) => "Closed(..)".fmt(f),
TrySendError::Full(_) => "sending into a full channel".fmt(f),
TrySendError::Closed(_) => "sending into a closed channel".fmt(f),
}
}
}

/// An error returned when an attempt to send a message asynchronously is
/// unsuccessful.
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct SendError<T>(pub T);

impl<T> error::Error for SendError<T> {}

impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendError").finish_non_exhaustive()
}
}

impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"sending into a closed channel".fmt(f)
}
}

/// An error returned when an attempt to send a message asynchronously with a
/// deadline is unsuccessful.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SendTimeoutError<T> {
/// The deadline has elapsed.
Timeout(T),
/// The channel has been closed.
Closed(T),
}

impl<T: fmt::Debug> error::Error for SendTimeoutError<T> {}

impl<T> fmt::Display for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SendTimeoutError::Timeout(_) => "the deadline for sending has elapsed".fmt(f),
SendTimeoutError::Closed(_) => "sending into a closed channel".fmt(f),
}
}
}
Expand All @@ -407,34 +559,36 @@ impl fmt::Display for TryRecvError {
}
}

/// An error returned when an attempt to send a message asynchronously is
/// An error returned when an attempt to receive a message asynchronously is
/// unsuccessful.
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct SendError<T>(pub T);

impl<T: fmt::Debug> error::Error for SendError<T> {}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct RecvError;

impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendError").finish_non_exhaustive()
}
}
impl error::Error for RecvError {}

impl<T> fmt::Display for SendError<T> {
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"sending into a closed channel".fmt(f)
"receiving from a closed channel".fmt(f)
}
}

/// An error returned when an attempt to receive a message asynchronously is
/// unsuccessful.
/// An error returned when an attempt to receive a message asynchronously with a
/// deadline is unsuccessful.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct RecvError;
pub enum RecvTimeoutError {
/// The deadline has elapsed.
Timeout,
/// All senders have been dropped.
Closed,
}

impl error::Error for RecvError {}
impl error::Error for RecvTimeoutError {}

impl fmt::Display for RecvError {
impl fmt::Display for RecvTimeoutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"receiving from a closed channel".fmt(f)
match self {
RecvTimeoutError::Timeout => "the deadline for receiving has elapsed".fmt(f),
RecvTimeoutError::Closed => "receiving from a closed channel".fmt(f),
}
}
}
Loading

0 comments on commit 2f76dac

Please sign in to comment.