Skip to content

Commit

Permalink
refactor(jstzd): use poll in retry
Browse files Browse the repository at this point in the history
  • Loading branch information
huancheng-trili committed Nov 28, 2024
1 parent db88c9e commit 65e9bf6
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 26 deletions.
104 changes: 98 additions & 6 deletions crates/jstzd/src/task/utils.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,107 @@
pub async fn retry<'a, F>(retries: u16, interval_ms: u64, f: impl Fn() -> F) -> bool
pub async fn retry<'a, F>(
retries: u16,
interval_ms: u64,
f: impl Fn() -> F + Sync,
) -> bool
where
F: std::future::Future<Output = anyhow::Result<bool>> + Send + 'a,
{
let duration = tokio::time::Duration::from_millis(interval_ms);
for _ in 0..retries {
tokio::time::sleep(duration).await;
poll(retries, interval_ms, || async {
if let Ok(v) = f().await {
if v {
return true;
return Some(true);
}
}
None
})
.await
.unwrap_or(false)
}

pub async fn poll<'a, F, T>(
max_attempts: u16,
interval_ms: u64,
f: impl Fn() -> F,
) -> Option<T>
where
F: std::future::Future<Output = Option<T>> + Send + 'a,
{
let duration = tokio::time::Duration::from_millis(interval_ms);
for _ in 0..max_attempts {
tokio::time::sleep(duration).await;
if let Some(v) = f().await {
return Some(v);
}
}
None
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::test]
async fn retry() {
async fn check(locked: Arc<Mutex<i32>>, result: bool) -> anyhow::Result<bool> {
let mut v = locked.lock().await;
if *v == 5 {
return Ok(result);
}
*v += 1;
Err(anyhow::anyhow!(""))
}

// retry till the end and get a positive result
let locked = Arc::new(Mutex::new(1));
assert!(super::retry(5, 1, || async { check(locked.clone(), true).await }).await);

// retry till the end and get a negative result
let locked = Arc::new(Mutex::new(1));
assert!(
!super::retry(5, 1, || async { check(locked.clone(), false).await }).await
);

// not waiting long enough
let locked = Arc::new(Mutex::new(1));
assert!(
!super::retry(2, 1, || async { check(locked.clone(), true).await }).await
);
}

#[tokio::test]
async fn poll() {
async fn check(locked: Arc<Mutex<i32>>, result: bool) -> Option<bool> {
let mut v = locked.lock().await;
if *v == 5 {
return Some(result);
}
*v += 1;
None
}

// poll till the end and get a positive result
let locked = Arc::new(Mutex::new(1));
assert!(
super::poll(5, 1, || async { check(locked.clone(), true).await })
.await
.unwrap()
);

// poll till the end and get a negative result
let locked = Arc::new(Mutex::new(1));
assert!(
!super::poll(5, 1, || async { check(locked.clone(), false).await })
.await
.unwrap()
);

// not waiting long enough
let locked = Arc::new(Mutex::new(1));
assert!(
super::poll(2, 1, || async { check(locked.clone(), true).await })
.await
.is_none()
);
}
false
}
4 changes: 2 additions & 2 deletions crates/jstzd/tests/octez_client_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context;
use jstz_crypto::public_key_hash::PublicKeyHash;
use jstzd::task::Task;
use jstzd::task::{utils::poll, Task};
use octez::r#async::{
client::{OctezClient, OctezClientConfigBuilder, Signature},
endpoint::Endpoint,
Expand All @@ -23,7 +23,7 @@ mod utils;
use std::path::PathBuf;
use utils::{
activate_alpha, create_client, get_head_block_hash, get_operation_kind, get_request,
import_activator, poll, setup, spawn_octez_node, spawn_rollup, ACTIVATOR_SECRET_KEY,
import_activator, setup, spawn_octez_node, spawn_rollup, ACTIVATOR_SECRET_KEY,
};

fn read_file(path: &Path) -> Value {
Expand Down
18 changes: 0 additions & 18 deletions crates/jstzd/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,6 @@ pub const ACTIVATOR_SECRET_KEY: &str =
"unencrypted:edsk31vznjHSSpGExDMHYASz45VZqXN4DPxvsa4hAyY8dHM28cZzp6";
pub const ROLLUP_ADDRESS: &str = "sr1PuFMgaRUN12rKQ3J2ae5psNtwCxPNmGNK";

pub async fn poll<'a, F, T>(
max_attempts: u16,
interval_ms: u64,
f: impl Fn() -> F,
) -> Option<T>
where
F: std::future::Future<Output = Option<T>> + Send + 'a,
{
let duration = tokio::time::Duration::from_millis(interval_ms);
for _ in 0..max_attempts {
tokio::time::sleep(duration).await;
if let Some(v) = f().await {
return Some(v);
}
}
None
}

pub async fn setup(
param_file_path: Option<PathBuf>,
) -> (OctezNode, OctezClient, octez_baker::OctezBaker) {
Expand Down

0 comments on commit 65e9bf6

Please sign in to comment.