Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CX_CLEANUP] Create new Task Abstraction async_broadcast Channels #2500

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"crates/libp2p-networking",
"crates/testing-macros",
"crates/task",
"crates/new_task",
"crates/task-impls",
"crates/testing",
"crates/types",
Expand Down
22 changes: 22 additions & 0 deletions crates/new_task/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
authors = ["Espresso Systems <[email protected]>"]
name = "hotshot-task-new"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

futures = "0.3.30"
async-broadcast = "0.6.0"
tracing = { workspace = true }
async-compatibility-layer = { workspace = true }

[target.'cfg(all(async_executor_impl = "tokio"))'.dependencies]
tokio = { version = "1.35.1", features = ["time", "rt-multi-thread", "macros", "sync"] }
[target.'cfg(all(async_executor_impl = "async-std"))'.dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }

[lints]
workspace = true
268 changes: 268 additions & 0 deletions crates/new_task/src/dependency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
use async_broadcast::{Receiver, RecvError};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::FutureExt;
use std::future::Future;

/// Type which describes the idea of waiting for a dependency to complete
pub trait Dependency<T> {
/// Complete will wait until it gets some value `T` then return the value
fn completed(self) -> impl Future<Output = Option<T>> + Send;
/// Create an or dependency from this dependency and another
fn or<D: Dependency<T> + Send + 'static>(self, dep: D) -> OrDependency<T>
where
T: Send + Sync + Clone + 'static,
Self: Sized + Send + 'static,
{
let mut or = OrDependency::from_deps(vec![self]);
or.add_dep(dep);
or
}
/// Create an and dependency from this dependency and another
fn and<D: Dependency<T> + Send + 'static>(self, dep: D) -> AndDependency<T>
where
T: Send + Sync + Clone + 'static,
Self: Sized + Send + 'static,
{
let mut and = AndDependency::from_deps(vec![self]);
and.add_dep(dep);
and
}
}

/// Used to combine dependencies to create `AndDependency`s or `OrDependency`s
trait CombineDependencies<T: Clone + Send + Sync + 'static>:
Sized + Dependency<T> + Send + 'static
{
}

/// Defines a dependency that completes when all of its deps complete
pub struct AndDependency<T> {
deps: Vec<BoxFuture<'static, Option<T>>>,
}
impl<T: Clone + Send + Sync> Dependency<Vec<T>> for AndDependency<T> {
/// Returns a vector of all of the results from it's dependencies.
/// The results will be in a random order
async fn completed(self) -> Option<Vec<T>> {
let futures = FuturesUnordered::from_iter(self.deps);
futures
.collect::<Vec<Option<T>>>()
.await
.into_iter()
.collect()
}
}

impl<T: Clone + Send + Sync + 'static> AndDependency<T> {
/// Create from a vec of deps
#[must_use]
pub fn from_deps(deps: Vec<impl Dependency<T> + Send + 'static>) -> Self {
let mut pinned = vec![];
for dep in deps {
pinned.push(dep.completed().boxed());
}
Self { deps: pinned }
}
/// Add another dependency
pub fn add_dep(&mut self, dep: impl Dependency<T> + Send + 'static) {
self.deps.push(dep.completed().boxed());
}
/// Add multiple dependencies
pub fn add_deps(&mut self, deps: AndDependency<T>) {
for dep in deps.deps {
self.deps.push(dep);
}
}
}

/// Defines a dependency that complets when one of it's dependencies compeltes
pub struct OrDependency<T> {
deps: Vec<BoxFuture<'static, Option<T>>>,
}
impl<T: Clone + Send + Sync> Dependency<T> for OrDependency<T> {
/// Returns the value of the first completed dependency
async fn completed(self) -> Option<T> {
let mut futures = FuturesUnordered::from_iter(self.deps);
loop {
if let Some(maybe) = futures.next().await {
if maybe.is_some() {
return maybe;
}
} else {
return None;
}
}
}
}

impl<T: Clone + Send + Sync + 'static> OrDependency<T> {
/// Creat an `OrDependency` from a vec of dependencies
#[must_use]
pub fn from_deps(deps: Vec<impl Dependency<T> + Send + 'static>) -> Self {
let mut pinned = vec![];
for dep in deps {
pinned.push(dep.completed().boxed());
}
Self { deps: pinned }
}
/// Add another dependecy
pub fn add_dep(&mut self, dep: impl Dependency<T> + Send + 'static) {
self.deps.push(dep.completed().boxed());
}
}

/// A dependency that listens on a chanel for an event
/// that matches what some value it wants.
pub struct EventDependency<T: Clone + Send + Sync> {
/// Channel of incomming events
pub(crate) event_rx: Receiver<T>,
/// Closure which returns true if the incoming `T` is the
/// thing that completes this dependency
pub(crate) match_fn: Box<dyn Fn(&T) -> bool + Send>,
}

impl<T: Clone + Send + Sync + 'static> EventDependency<T> {
/// Create a new `EventDependency`
#[must_use]
pub fn new(receiver: Receiver<T>, match_fn: Box<dyn Fn(&T) -> bool + Send>) -> Self {
Self {
event_rx: receiver,
match_fn: Box::new(match_fn),
}
}
}

impl<T: Clone + Send + Sync + 'static> Dependency<T> for EventDependency<T> {
async fn completed(mut self) -> Option<T> {
loop {
match self.event_rx.recv_direct().await {
Ok(event) => {
if (self.match_fn)(&event) {
return Some(event);
}
}
Err(RecvError::Overflowed(n)) => {
tracing::error!("Dependency Task overloaded, skipping {} events", n);
}
Err(RecvError::Closed) => {
return None;
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::{AndDependency, Dependency, EventDependency, OrDependency};
use async_broadcast::{broadcast, Receiver};

fn eq_dep(rx: Receiver<usize>, val: usize) -> EventDependency<usize> {
EventDependency {
event_rx: rx,
match_fn: Box::new(move |v| *v == val),
}
}

#[cfg_attr(
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn it_works() {
let (tx, rx) = broadcast(10);

let mut deps = vec![];
for i in 0..5 {
tx.broadcast(i).await.unwrap();
deps.push(eq_dep(rx.clone(), 5));
}

let and = AndDependency::from_deps(deps);
tx.broadcast(5).await.unwrap();
let result = and.completed().await;
assert_eq!(result, Some(vec![5; 5]));
}
#[cfg_attr(
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn or_dep() {
let (tx, rx) = broadcast(10);

tx.broadcast(5).await.unwrap();
let mut deps = vec![];
for _ in 0..5 {
deps.push(eq_dep(rx.clone(), 5));
}
let or = OrDependency::from_deps(deps);
let result = or.completed().await;
assert_eq!(result, Some(5));
}

#[cfg_attr(
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn and_or_dep() {
let (tx, rx) = broadcast(10);

tx.broadcast(1).await.unwrap();
tx.broadcast(2).await.unwrap();
tx.broadcast(3).await.unwrap();
tx.broadcast(5).await.unwrap();
tx.broadcast(6).await.unwrap();

let or1 = OrDependency::from_deps([eq_dep(rx.clone(), 4), eq_dep(rx.clone(), 6)].into());
let or2 = OrDependency::from_deps([eq_dep(rx.clone(), 4), eq_dep(rx.clone(), 5)].into());
let and = AndDependency::from_deps([or1, or2].into());
let result = and.completed().await;
assert_eq!(result, Some(vec![6, 5]));
}

#[cfg_attr(
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn or_and_dep() {
let (tx, rx) = broadcast(10);

tx.broadcast(1).await.unwrap();
tx.broadcast(2).await.unwrap();
tx.broadcast(3).await.unwrap();
tx.broadcast(4).await.unwrap();
tx.broadcast(5).await.unwrap();

let and1 = eq_dep(rx.clone(), 4).and(eq_dep(rx.clone(), 6));
let and2 = eq_dep(rx.clone(), 4).and(eq_dep(rx.clone(), 5));
let or = and1.or(and2);
let result = or.completed().await;
assert_eq!(result, Some(vec![4, 5]));
}

#[cfg_attr(
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn many_and_dep() {
let (tx, rx) = broadcast(10);

tx.broadcast(1).await.unwrap();
tx.broadcast(2).await.unwrap();
tx.broadcast(3).await.unwrap();
tx.broadcast(4).await.unwrap();
tx.broadcast(5).await.unwrap();
tx.broadcast(6).await.unwrap();

let mut and1 = eq_dep(rx.clone(), 4).and(eq_dep(rx.clone(), 6));
let and2 = eq_dep(rx.clone(), 4).and(eq_dep(rx.clone(), 5));
and1.add_deps(and2);
let result = and1.completed().await;
assert_eq!(result, Some(vec![4, 6, 4, 5]));
}
}
bfish713 marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading