Skip to content

Commit

Permalink
feat(orchestrator): Add wait methods for log line count (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepoviola authored Jun 14, 2024
1 parent 4bf3fcf commit 85993c8
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ axum-extra = { version = "0.9" }
tower = { version = "0.4" }
tower-http = { version = "0.5" }
tracing-subscriber = { version = "0.3" }
glob-match = "0.2.1"

# Zombienet workspace crates:
support = { package = "zombienet-support", version = "0.2.5", path = "crates/support" }
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ reqwest = { workspace = true }
tracing = { workspace = true }
pjs-rs = { version = "0.1.2", optional = true }
uuid = { workspace = true }
regex = { workspace = true }
glob-match = { workspace = true }

# Zombienet deps
configuration = { workspace = true }
Expand Down
97 changes: 74 additions & 23 deletions crates/orchestrator/src/network/node.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{sync::Arc, time::Duration};

use anyhow::anyhow;
use glob_match::glob_match;
use prom_metrics_parser::MetricMap;
use provider::DynNode;
use regex::Regex;
use subxt::{backend::rpc::RpcClient, OnlineClient};
use support::{constants::THIS_IS_A_BUG, net::wait_ws_ready};
use support::net::wait_ws_ready;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{debug, trace, warn};
use tracing::{debug, trace};

use crate::network_spec::node::NodeSpec;
#[cfg(feature = "pjs")]
Expand Down Expand Up @@ -66,12 +68,6 @@ impl NetworkNode {
&self.ws_uri
}

/// Get the logs of the node
/// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
pub async fn logs(&self) -> Result<String, anyhow::Error> {
Ok(self.inner.logs().await?)
}

// Subxt

/// Get the rpc client for the node
Expand Down Expand Up @@ -138,7 +134,7 @@ impl NetworkNode {
Ok(())
}

// Assertions
// Metrics assertions

/// Get metric value 'by name' from prometheus (exposed by the node)
/// metric name can be:
Expand Down Expand Up @@ -189,20 +185,22 @@ impl NetworkNode {
}
}

// Wait methods
// Wait methods for metrics

/// Wait until a metric value pass the `predicate`
pub async fn wait_metric(
&self,
metric_name: impl Into<String>,
predicate: impl Fn(f64) -> bool,
) -> Result<bool, anyhow::Error> {
) -> Result<(), anyhow::Error> {
let metric_name = metric_name.into();
debug!("waiting until metric {metric_name} pass the predicate");
loop {
let res = self.assert_with(&metric_name, &predicate).await;
match res {
Ok(res) => {
if res {
return Ok(true);
return Ok(());
}
},
Err(e) => {
Expand Down Expand Up @@ -234,12 +232,14 @@ impl NetworkNode {
}
}

/// Wait until a metric value pass the `predicate`
/// with a timeout (secs)
pub async fn wait_metric_with_timeout(
&self,
metric_name: impl Into<String>,
predicate: impl Fn(f64) -> bool,
timeout_secs: impl Into<u64>,
) -> Result<bool, anyhow::Error> {
) -> Result<(), anyhow::Error> {
let metric_name = metric_name.into();
let secs = timeout_secs.into();
debug!("waiting until metric {metric_name} pass the predicate");
Expand All @@ -251,12 +251,7 @@ impl NetworkNode {

if let Ok(inner_res) = res {
match inner_res {
Ok(true) => Ok(true),
Ok(false) => {
// should not happens
warn!("wait_metric return false");
Err(anyhow!("wait_metric return false, {THIS_IS_A_BUG}"))
},
Ok(_) => Ok(()),
Err(e) => Err(anyhow!("Error waiting for metric: {}", e)),
}
} else {
Expand All @@ -267,11 +262,67 @@ impl NetworkNode {
}
}

// Logs

/// Get the logs of the node
/// TODO: do we need the `since` param, maybe we could be handy later for loop filtering
pub async fn logs(&self) -> Result<String, anyhow::Error> {
Ok(self.inner.logs().await?)
}

/// Wait until a the number of matching log lines is reach
pub async fn wait_log_line_count<'a>(
&self,
pattern: impl Into<String>,
is_glob: bool,
count: usize,
) -> Result<(), anyhow::Error> {
let pattern: String = pattern.into();
debug!("waiting until we find pattern {pattern} {count} times");
let match_fn: Box<dyn Fn(&str) -> bool> = if is_glob {
Box::new(|line: &str| -> bool { glob_match(&pattern, line) })
} else {
let re = Regex::new(&pattern)?;
Box::new(move |line: &str| -> bool { re.is_match(line) })
};

loop {
let mut q = 0_usize;
let logs = self.logs().await?;
for line in logs.lines() {
println!("line is {line}");
if match_fn(line) {
println!("pattern {pattern} match in line {line}");
q += 1;
if q >= count {
return Ok(());
}
}
}

tokio::time::sleep(Duration::from_secs(2)).await;
}
}

/// Wait until a the number of matching log lines is reach
/// with timeout (secs)
pub async fn wait_log_line_count_with_timeout(
&self,
substring: impl Into<String>,
is_glob: bool,
count: usize,
timeout_secs: impl Into<u64>,
) -> Result<(), anyhow::Error> {
let secs = timeout_secs.into();
debug!("waiting until match {count} lines");
tokio::time::timeout(
Duration::from_secs(secs),
self.wait_log_line_count(substring, is_glob, count),
)
.await?
}

// TODO: impl
// wait_log_line_count
// wait_log_line_count_with_timeout
// wait_subxt_client
// wait_subxt_client_with_timeout
// wait_event_count
// wait_event_count_with_timeout

Expand Down
11 changes: 7 additions & 4 deletions crates/sdk/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,22 @@ async fn ci_k8s_basic_functionalities_should_works() {
.await;
assert!(r.is_err());

let (best_block_pass, client) = try_join!(
let (_best_block_pass, client) = try_join!(
alice.wait_metric(BEST_BLOCK_METRIC, |x| x > 5_f64),
alice.wait_client::<subxt::PolkadotConfig>()
)
.unwrap();
// check best block through metrics without timeout
assert!(best_block_pass);

alice
.wait_log_line_count("*rted #1*", true, 10)
.await
.unwrap();

// check best block through metrics with timeout
assert!(alice
.wait_metric_with_timeout(BEST_BLOCK_METRIC, |x| x > 10_f64, 45_u32)
.await
.unwrap());
.is_ok());

// ensure timeout error
let best_block = alice.reports(BEST_BLOCK_METRIC).await.unwrap();
Expand Down

0 comments on commit 85993c8

Please sign in to comment.