diff --git a/fav_core/src/ops.rs b/fav_core/src/ops.rs index 69b73fe..06a7ede 100644 --- a/fav_core/src/ops.rs +++ b/fav_core/src/ops.rs @@ -85,9 +85,9 @@ pub trait LocalSetOps: Net + HttpConfig { /// Fetch one resource set, /// `F: Fn() -> Future<...>`, if Future is ready, one can cleanup and /// shutdown gracefully, then return `FavCoreError::Cancel`. - async fn fetch_set(&self, set: &mut Self::Set, f: F) -> FavCoreResult<()> + async fn fetch_set(&self, set: &mut Self::Set, cancelled: F) -> FavCoreResult<()> where - F: Fn() -> Fut + Send, + F: FnOnce() -> Fut + Send, Fut: Future + Send, Any: Send; } @@ -106,18 +106,22 @@ pub trait LocalResOps: Net + HttpConfig { fn fetch_res( &self, resource: &mut Self::Res, - f: F, + cancelled: F, ) -> impl Future> where - F: Fn() -> Fut + Send, + F: FnOnce() -> Fut + Send, Fut: Future + Send, Any: Send; /// Pull one resource, /// `F: Fn() -> Future<...>`, if Future is ready, one can cleanup and /// shutdown gracefully, then return `FavCoreError::Cancel`. - async fn pull_res(&self, resource: &mut Self::Res, f: F) -> FavCoreResult<()> + async fn pull_res( + &self, + resource: &mut Self::Res, + cancelled: F, + ) -> FavCoreResult<()> where - F: Fn() -> Fut + Send, + F: FnOnce() -> Fut + Send, Fut: Future + Send, Any: Send; } @@ -153,14 +157,14 @@ pub trait SetOpsExt: SetOps { pub async fn batch_op_set<'a, SS, F, T>(sets: &'a mut SS, mut f: F) -> FavCoreResult<()> where SS: Sets + 'a, - F: FnMut(&'a mut SS::Set, Box WaitForCancellationFutureOwned + Send>) -> T, + F: FnMut(&'a mut SS::Set, Box WaitForCancellationFutureOwned + Send>) -> T, T: Future>, { let token = CancellationToken::new(); let mut stream = tokio_stream::iter(sets.iter_mut()) .map(|s| { let token1 = token.clone(); - let shutdown = Box::new(move || token1.clone().cancelled_owned()); + let shutdown = Box::new(move || token1.cancelled_owned()); let fut = f(s, shutdown); let token = token.clone(); async move { @@ -173,24 +177,24 @@ where }) .buffer_unordered(8); let mut result = Ok(()); - loop { - tokio::select! { - res = stream.next() => { + tokio::select! { + _ = async { + while let Some(res) = stream.next().await { match res { - None => break, - Some(Err(FavCoreError::Cancel)) => {} - Some(Err(FavCoreError::NetworkError(e))) if e.is_connect() => { + Err(FavCoreError::Cancel) => {} + Err(FavCoreError::NetworkError(e)) if e.is_connect() => { token.cancel(); // if already cancelled, it's handled by token itself result = Err(FavCoreError::NetworkError(e)); } - Some(Err(e)) => error!("{}", e), + Err(e) => error!("{}", e), _ => {} } } - _ = tokio::signal::ctrl_c() => { - token.cancel(); - result = Err(FavCoreError::Cancel); - } + + } => {} + _ = tokio::signal::ctrl_c() => { + token.cancel(); + result = Err(FavCoreError::Cancel); } } result @@ -267,14 +271,14 @@ pub trait ResOpsExt: ResOps { pub async fn batch_op_res<'a, S, F, T>(set: &'a mut S, mut f: F) -> FavCoreResult<()> where S: Set + 'a, - F: FnMut(&'a mut S::Res, Box WaitForCancellationFutureOwned + Send>) -> T, + F: FnMut(&'a mut S::Res, Box WaitForCancellationFutureOwned + Send>) -> T, T: Future>, { let token = CancellationToken::new(); let mut stream = tokio_stream::iter(set.iter_mut()) .map(|s| { let token1 = token.clone(); - let shutdown = Box::new(move || token1.clone().cancelled_owned()); + let shutdown = Box::new(move || token1.cancelled_owned()); let fut = f(s, shutdown); let token = token.clone(); async move { @@ -287,24 +291,24 @@ where }) .buffer_unordered(8); let mut result = Ok(()); - loop { - tokio::select! { - res = stream.next() => { + tokio::select! { + _ = async { + while let Some(res) = stream.next().await { match res { - None => break, - Some(Err(FavCoreError::Cancel)) => {} - Some(Err(FavCoreError::NetworkError(e))) if e.is_connect() => { - token.cancel(); + Err(FavCoreError::Cancel) => {} + Err(FavCoreError::NetworkError(e)) if e.is_connect() => { + token.cancel(); // if already cancelled, it's handled by token itself result = Err(FavCoreError::NetworkError(e)); } - Some(Err(e)) => error!("{}", e), + Err(e) => error!("{}", e), _ => {} } } - _ = tokio::signal::ctrl_c() => { - token.cancel(); - result = Err(FavCoreError::Cancel); - } + + } => {} + _ = tokio::signal::ctrl_c() => { + token.cancel(); + result = Err(FavCoreError::Cancel); } } result