Skip to content

Commit

Permalink
Add multi-producer, multi-consumer channel (mpmc)
Browse files Browse the repository at this point in the history
  • Loading branch information
obeis committed Jul 3, 2024
1 parent 25c9f2c commit 0bd9f90
Show file tree
Hide file tree
Showing 8 changed files with 1,759 additions and 52 deletions.
4 changes: 3 additions & 1 deletion library/std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
//! the [`io`], [`fs`], and [`net`] modules.
//!
//! The [`thread`] module contains Rust's threading abstractions. [`sync`]
//! contains further primitive shared memory types, including [`atomic`] and
//! contains further primitive shared memory types, including [`atomic`], [`mpmc`] and
//! [`mpsc`], which contains the channel types for message passing.
//!
//! # Use before and after `main()`
Expand All @@ -173,6 +173,7 @@
//! - after-main use of thread-locals, which also affects additional features:
//! - [`thread::current()`]
//! - [`thread::scope()`]
//! - [`sync::mpmc`]
//! - [`sync::mpsc`]
//! - before-main stdio file descriptors are not guaranteed to be open on unix platforms
//!
Expand All @@ -198,6 +199,7 @@
//! [`atomic`]: sync::atomic
//! [`for`]: ../book/ch03-05-control-flow.html#looping-through-a-collection-with-for
//! [`str`]: prim@str
//! [`mpmc`]: sync::mpmc
//! [`mpsc`]: sync::mpsc
//! [`std::cmp`]: cmp
//! [`std::slice`]: mod@slice
Expand Down
9 changes: 8 additions & 1 deletion library/std/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@
//! inter-thread synchronisation mechanism, at the cost of some
//! extra memory.
//!
//! - [`mpmc`]: Multi-producer, multi-consumer queues, used for
//! message-based communication. Can provide a lightweight
//! inter-thread synchronisation mechanism, at the cost of some
//! extra memory.
//!
//! - [`Mutex`]: Mutual Exclusion mechanism, which ensures that at
//! most one thread at a time is able to access some data.
//!
Expand All @@ -150,6 +155,7 @@
//! [`Arc`]: crate::sync::Arc
//! [`Barrier`]: crate::sync::Barrier
//! [`Condvar`]: crate::sync::Condvar
//! [`mpmc`]: crate::sync::mpmc
//! [`mpsc`]: crate::sync::mpsc
//! [`Mutex`]: crate::sync::Mutex
//! [`Once`]: crate::sync::Once
Expand Down Expand Up @@ -191,12 +197,13 @@ pub use self::once_lock::OnceLock;
#[unstable(feature = "reentrant_lock", issue = "121440")]
pub use self::reentrant_lock::{ReentrantLock, ReentrantLockGuard};

#[unstable(feature = "mpmc_channel", issue = "125712")]
pub mod mpmc;
pub mod mpsc;

mod barrier;
mod condvar;
mod lazy_lock;
mod mpmc;
mod mutex;
pub(crate) mod once;
mod once_lock;
Expand Down
5 changes: 5 additions & 0 deletions library/std/src/sync/mpmc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use crate::sync::mpsc::{RecvError, RecvTimeoutError, SendError, TryRecvError
///
/// [`send_timeout`]: super::Sender::send_timeout
#[derive(PartialEq, Eq, Clone, Copy)]
#[unstable(feature = "mpmc_channel", issue = "125712")]
pub enum SendTimeoutError<T> {
/// The message could not be sent because the channel is full and the operation timed out.
///
Expand All @@ -20,12 +21,14 @@ pub enum SendTimeoutError<T> {
Disconnected(T),
}

#[unstable(feature = "mpmc_channel", issue = "125712")]
impl<T> fmt::Debug for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"SendTimeoutError(..)".fmt(f)
}
}

#[unstable(feature = "mpmc_channel", issue = "125712")]
impl<T> fmt::Display for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Expand All @@ -35,8 +38,10 @@ impl<T> fmt::Display for SendTimeoutError<T> {
}
}

#[unstable(feature = "mpmc_channel", issue = "125712")]
impl<T> error::Error for SendTimeoutError<T> {}

#[unstable(feature = "mpmc_channel", issue = "125712")]
impl<T> From<SendError<T>> for SendTimeoutError<T> {
fn from(err: SendError<T>) -> SendTimeoutError<T> {
match err {
Expand Down
Loading

0 comments on commit 0bd9f90

Please sign in to comment.