Skip to content

Commit

Permalink
ParallelJoin works but require dyn
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Oct 20, 2023
1 parent 5909b1a commit af5edd3
Showing 1 changed file with 17 additions and 42 deletions.
59 changes: 17 additions & 42 deletions src/seq_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use std::{
task::{Context, Poll},
};
use std::marker::PhantomData;
use async_trait::async_trait;
use clap::builder::TypedValueParser;

use futures::{
stream::{iter, Iter as StreamIter, TryCollect},
Future, Stream, StreamExt, TryStreamExt,
};
use futures::{stream::{iter, Iter as StreamIter, TryCollect}, Future, Stream, StreamExt, TryStreamExt, TryFuture};
use futures_util::future::TryJoinAll;
use futures_util::stream::FuturesOrdered;
use pin_project::pin_project;

use crate::exact::ExactSizeStream;
Expand Down Expand Up @@ -103,38 +102,10 @@ where
}
}

struct ParallelFutures<'a, I, F> {
spawner: UnsafeSpawner<'a, F>,
iterable: I,
}

#[pin_project]
pub struct ParallelFutures2<'a, F>
where F: futures::future::TryFuture,
F::Ok: Send + 'static,
F::Error: Send + 'static
{
spawner: UnsafeSpawner<'a, Result<F::Ok, F::Error>>,
#[pin]
inner: TryJoinAll<UnsafeSpawnerHandle<Result<F::Ok, F::Error>>>,
}

impl <F> Future for ParallelFutures2<'_, F>

where F: futures::future::TryFuture,
F::Ok: Send + 'static,
F::Error: Send + 'static
{
type Output = Result<Vec<F::Ok>, F::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

/// The `SeqJoin` trait wraps `seq_try_join_all`, providing the `active` parameter
/// from the provided context so that the value can be made consistent.
pub trait SeqJoin {

/// Perform a sequential join of the futures from the provided iterable.
/// This uses [`seq_join`], with the current state of the associated object
/// being used to determine the number of active items to track (see [`active_work`]).
Expand Down Expand Up @@ -166,24 +137,28 @@ pub trait SeqJoin {
}

/// Join multiple tasks in parallel. Only do this if you can't use a sequential join.
fn parallel_join<'a, I, F, O, E>(&self, iterable: I) -> ParallelFutures2<'a, I::Item>
fn parallel_join<I, F, O, E>(&self, iterable: I) -> Pin<Box<dyn Future<Output = Result<Vec<O>, E>> + Send>>
where
I: IntoIterator<Item = F>,
// F: futures::future::TryFuture + Send,
F: Future<Output = Result<O, E>> + Send + 'a,
I: IntoIterator<Item = F> + Send,
F: Future<Output = Result<O, E>> + Send,
O: Send + 'static,
E: Send + 'static
// F::Ok: Send + 'static,
// F::Error: Send + 'static,
{
// let iterable = iterable.into_iter().map(|f| {
// spawner.spawn(f)
// });
// let spawner = UnsafeSpawner::default();
let mut futures = FuturesOrdered::default();
let spawner = UnsafeSpawner::default();
ParallelFutures2 {
spawner,
inner: futures::future::try_join_all(iterable.into_iter().map(|f| spawner.spawn(f))),
for f in iterable.into_iter() {
futures.push_back(spawner.spawn(f.into_future()));
}

Box::pin(async move { futures.try_collect().await })
// ParallelFutures2 {
// spawner,
// inner: futures::future::try_join_all(iterable.into_iter().map(|f| spawner.spawn(f))),
// }
// #[allow(clippy::disallowed_methods)] // Just in this one place.
// futures::future::try_join_all(iterable.into_iter()
// .map(|f| tokio::spawn()))
Expand Down

0 comments on commit af5edd3

Please sign in to comment.