From d33a4f83a9e2f38eeaa86d56d80d61f8c546b2f2 Mon Sep 17 00:00:00 2001 From: Javier Viola Date: Fri, 14 Jun 2024 16:37:35 +0200 Subject: [PATCH] feat(orchestrator): Add wait methods for log line count --- Cargo.toml | 1 + crates/orchestrator/Cargo.toml | 2 + crates/orchestrator/src/network/node.rs | 100 ++++++++++++++++++------ crates/sdk/tests/smoke.rs | 8 +- 4 files changed, 86 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 118a7a83c..3fa7e4953 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ axum-extra = { version = "0.9" } tower = { version = "0.4" } tower-http = { version = "0.5" } tracing-subscriber = { version = "0.3" } +glob-match = "0.2.1" # Zombienet workspace crates: support = { package = "zombienet-support", version = "0.2.4", path = "crates/support" } diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index d1bb614f5..72827618d 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -30,6 +30,8 @@ reqwest = { workspace = true } tracing = { workspace = true } pjs-rs = { version = "0.1.2", optional = true } uuid = { workspace = true } +regex = { workspace = true } +glob-match = { workspace = true } # Zombienet deps configuration = { workspace = true } diff --git a/crates/orchestrator/src/network/node.rs b/crates/orchestrator/src/network/node.rs index 1568f72b6..3ea473c18 100644 --- a/crates/orchestrator/src/network/node.rs +++ b/crates/orchestrator/src/network/node.rs @@ -4,10 +4,12 @@ use anyhow::anyhow; use prom_metrics_parser::MetricMap; use provider::DynNode; use subxt::{backend::rpc::RpcClient, OnlineClient}; -use support::{constants::THIS_IS_A_BUG, net::wait_ws_ready}; +use support::net::wait_ws_ready; use thiserror::Error; use tokio::sync::RwLock; -use tracing::{debug, trace, warn}; +use tracing::{debug, trace}; +use regex::Regex; +use glob_match::glob_match; use crate::network_spec::node::NodeSpec; #[cfg(feature = "pjs")] @@ -66,12 +68,6 @@ impl NetworkNode { &self.ws_uri } - /// Get the logs of the node - /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering - pub async fn logs(&self) -> Result { - Ok(self.inner.logs().await?) - } - // Subxt /// Get the rpc client for the node @@ -138,7 +134,7 @@ impl NetworkNode { Ok(()) } - // Assertions + // Metrics assertions /// Get metric value 'by name' from prometheus (exposed by the node) /// metric name can be: @@ -189,12 +185,14 @@ impl NetworkNode { } } - // Wait methods + // Wait methods for metrics + + /// Wait until a metric value pass the `predicate` pub async fn wait_metric( &self, metric_name: impl Into, predicate: impl Fn(f64) -> bool, - ) -> Result { + ) -> Result<(), anyhow::Error> { let metric_name = metric_name.into(); debug!("waiting until metric {metric_name} pass the predicate"); loop { @@ -202,7 +200,7 @@ impl NetworkNode { match res { Ok(res) => { if res { - return Ok(true); + return Ok(()); } }, Err(e) => { @@ -234,12 +232,14 @@ impl NetworkNode { } } + /// Wait until a metric value pass the `predicate` + /// with a timeout (secs) pub async fn wait_metric_with_timeout( &self, metric_name: impl Into, predicate: impl Fn(f64) -> bool, timeout_secs: impl Into, - ) -> Result { + ) -> Result<(), anyhow::Error> { let metric_name = metric_name.into(); let secs = timeout_secs.into(); debug!("waiting until metric {metric_name} pass the predicate"); @@ -251,12 +251,7 @@ impl NetworkNode { if let Ok(inner_res) = res { match inner_res { - Ok(true) => Ok(true), - Ok(false) => { - // should not happens - warn!("wait_metric return false"); - Err(anyhow!("wait_metric return false, {THIS_IS_A_BUG}")) - }, + Ok(_) => Ok(()), Err(e) => Err(anyhow!("Error waiting for metric: {}", e)), } } else { @@ -267,11 +262,70 @@ impl NetworkNode { } } + // Logs + + /// Get the logs of the node + /// TODO: do we need the `since` param, maybe we could be handy later for loop filtering + pub async fn logs(&self) -> Result { + Ok(self.inner.logs().await?) + } + + /// Wait until a the number of matching log lines is reach + pub async fn wait_log_line_count<'a>( + &self, + pattern: impl Into, + is_glob: bool, + count: usize, + ) -> Result<(), anyhow::Error> { + let pattern: String = pattern.into(); + debug!("waiting until we find pattern {pattern} {count} times"); + let match_fn: Box bool> = if is_glob { + Box::new(|line: &str| -> bool { + glob_match(&pattern, line) + }) + } else { + let re = Regex::new(&pattern)?; + Box::new(move |line: &str| -> bool { re.is_match(line) }) + }; + + + loop { + let mut q = 0_usize; + let logs = self.logs().await?; + for line in logs.lines() { + println!("line is {line}"); + if match_fn(line) { + println!("pattern {pattern} match in line {line}"); + q += 1; + if q >= count { + return Ok(()) + } + } + } + + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + + /// Wait until a the number of matching log lines is reach + /// with timeout (secs) + pub async fn wait_log_line_count_with_timeout( + &self, + substring: impl Into, + is_glob: bool, + count: usize, + timeout_secs: impl Into, + ) -> Result<(), anyhow::Error> { + let secs = timeout_secs.into(); + debug!("waiting until match {count} lines"); + tokio::time::timeout( + Duration::from_secs(secs), + self.wait_log_line_count(substring, is_glob, count), + ) + .await? + } + // TODO: impl - // wait_log_line_count - // wait_log_line_count_with_timeout - // wait_subxt_client - // wait_subxt_client_with_timeout // wait_event_count // wait_event_count_with_timeout diff --git a/crates/sdk/tests/smoke.rs b/crates/sdk/tests/smoke.rs index 4ab437f54..a98e27536 100644 --- a/crates/sdk/tests/smoke.rs +++ b/crates/sdk/tests/smoke.rs @@ -58,13 +58,17 @@ async fn ci_k8s_basic_functionalities_should_works() { ) .unwrap(); // check best block through metrics without timeout - assert!(best_block_pass); + assert_eq!(best_block_pass, ()); + + + + alice.wait_log_line_count("*rted #1*", true, 10).await.unwrap(); // check best block through metrics with timeout assert!(alice .wait_metric_with_timeout(BEST_BLOCK_METRIC, |x| x > 10_f64, 45_u32) .await - .unwrap()); + .is_ok()); // ensure timeout error let best_block = alice.reports(BEST_BLOCK_METRIC).await.unwrap();