Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
kingwingfly committed Jun 6, 2024
1 parent e1eaa58 commit 22cbfdc
Showing 1 changed file with 37 additions and 33 deletions.
70 changes: 37 additions & 33 deletions fav_core/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ pub trait LocalSetOps: Net + HttpConfig {
/// Fetch one resource set,
/// `F: Fn() -> Future<...>`, if Future is ready, one can cleanup and
/// shutdown gracefully, then return `FavCoreError::Cancel`.
async fn fetch_set<F, Fut, Any>(&self, set: &mut Self::Set, f: F) -> FavCoreResult<()>
async fn fetch_set<F, Fut, Any>(&self, set: &mut Self::Set, cancelled: F) -> FavCoreResult<()>
where
F: Fn() -> Fut + Send,
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send;
}
Expand All @@ -106,18 +106,22 @@ pub trait LocalResOps: Net + HttpConfig {
fn fetch_res<F, Fut, Any>(
&self,
resource: &mut Self::Res,
f: F,
cancelled: F,
) -> impl Future<Output = FavCoreResult<()>>
where
F: Fn() -> Fut + Send,
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send;
/// Pull one resource,
/// `F: Fn() -> Future<...>`, if Future is ready, one can cleanup and
/// shutdown gracefully, then return `FavCoreError::Cancel`.
async fn pull_res<F, Fut, Any>(&self, resource: &mut Self::Res, f: F) -> FavCoreResult<()>
async fn pull_res<F, Fut, Any>(
&self,
resource: &mut Self::Res,
cancelled: F,
) -> FavCoreResult<()>
where
F: Fn() -> Fut + Send,
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Any> + Send,
Any: Send;
}
Expand Down Expand Up @@ -153,14 +157,14 @@ pub trait SetOpsExt: SetOps {
pub async fn batch_op_set<'a, SS, F, T>(sets: &'a mut SS, mut f: F) -> FavCoreResult<()>
where
SS: Sets + 'a,
F: FnMut(&'a mut SS::Set, Box<dyn Fn() -> WaitForCancellationFutureOwned + Send>) -> T,
F: FnMut(&'a mut SS::Set, Box<dyn FnOnce() -> WaitForCancellationFutureOwned + Send>) -> T,
T: Future<Output = FavCoreResult<()>>,
{
let token = CancellationToken::new();
let mut stream = tokio_stream::iter(sets.iter_mut())
.map(|s| {
let token1 = token.clone();
let shutdown = Box::new(move || token1.clone().cancelled_owned());
let shutdown = Box::new(move || token1.cancelled_owned());
let fut = f(s, shutdown);
let token = token.clone();
async move {
Expand All @@ -173,24 +177,24 @@ where
})
.buffer_unordered(8);
let mut result = Ok(());
loop {
tokio::select! {
res = stream.next() => {
tokio::select! {
_ = async {
while let Some(res) = stream.next().await {
match res {
None => break,
Some(Err(FavCoreError::Cancel)) => {}
Some(Err(FavCoreError::NetworkError(e))) if e.is_connect() => {
Err(FavCoreError::Cancel) => {}
Err(FavCoreError::NetworkError(e)) if e.is_connect() => {
token.cancel(); // if already cancelled, it's handled by token itself
result = Err(FavCoreError::NetworkError(e));
}
Some(Err(e)) => error!("{}", e),
Err(e) => error!("{}", e),
_ => {}
}
}
_ = tokio::signal::ctrl_c() => {
token.cancel();
result = Err(FavCoreError::Cancel);
}

} => {}
_ = tokio::signal::ctrl_c() => {
token.cancel();
result = Err(FavCoreError::Cancel);
}
}
result
Expand Down Expand Up @@ -267,14 +271,14 @@ pub trait ResOpsExt: ResOps {
pub async fn batch_op_res<'a, S, F, T>(set: &'a mut S, mut f: F) -> FavCoreResult<()>
where
S: Set + 'a,
F: FnMut(&'a mut S::Res, Box<dyn Fn() -> WaitForCancellationFutureOwned + Send>) -> T,
F: FnMut(&'a mut S::Res, Box<dyn FnOnce() -> WaitForCancellationFutureOwned + Send>) -> T,
T: Future<Output = FavCoreResult<()>>,
{
let token = CancellationToken::new();
let mut stream = tokio_stream::iter(set.iter_mut())
.map(|s| {
let token1 = token.clone();
let shutdown = Box::new(move || token1.clone().cancelled_owned());
let shutdown = Box::new(move || token1.cancelled_owned());
let fut = f(s, shutdown);
let token = token.clone();
async move {
Expand All @@ -287,24 +291,24 @@ where
})
.buffer_unordered(8);
let mut result = Ok(());
loop {
tokio::select! {
res = stream.next() => {
tokio::select! {
_ = async {
while let Some(res) = stream.next().await {
match res {
None => break,
Some(Err(FavCoreError::Cancel)) => {}
Some(Err(FavCoreError::NetworkError(e))) if e.is_connect() => {
token.cancel();
Err(FavCoreError::Cancel) => {}
Err(FavCoreError::NetworkError(e)) if e.is_connect() => {
token.cancel(); // if already cancelled, it's handled by token itself
result = Err(FavCoreError::NetworkError(e));
}
Some(Err(e)) => error!("{}", e),
Err(e) => error!("{}", e),
_ => {}
}
}
_ = tokio::signal::ctrl_c() => {
token.cancel();
result = Err(FavCoreError::Cancel);
}

} => {}
_ = tokio::signal::ctrl_c() => {
token.cancel();
result = Err(FavCoreError::Cancel);
}
}
result
Expand Down

0 comments on commit 22cbfdc

Please sign in to comment.