Skip to content

Commit

Permalink
Port generic workload to network_reliability_test
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy authored and basvandijk committed Apr 19, 2023
1 parent b43f9cd commit 7da7d4f
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 62 deletions.
6 changes: 1 addition & 5 deletions rs/tests/src/generic_workload_engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ where
Fold: Fn(A, Out) -> A + Send + 'static,
A: Send + 'static,
{
let log = self.log;
let futures_count = self.rps * self.duration.as_secs() as usize;
let (fut_snd, mut fut_rcv) = tokio::sync::mpsc::unbounded_channel();
let log = self.log;

info!(
log,
"Starting execution of {} futures, expected to be submitted within {} secs.",
Expand All @@ -74,7 +73,6 @@ where
start.elapsed()
}
});

let aggr_jh = task::spawn({
let log = log.clone();
async move {
Expand All @@ -91,7 +89,6 @@ where
aggr
}
});

let dispatch_duration = match dispatch_jh.await {
Ok(v) => v,
Err(_e) => {
Expand All @@ -107,7 +104,6 @@ where
dispatch_duration.as_millis()
)));
}

info!(
log,
"All {} futures started within {} secs and executed to completion within {} secs",
Expand Down
90 changes: 75 additions & 15 deletions rs/tests/src/generic_workload_engine/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ use super::engine::Engine;

#[derive(Default, Clone, Debug)]
pub struct RequestDurationBucket {
// threshold: Duration, TODO
threshold: Duration,
requests_above_threshold: u64,
requests_below_threshold: u64,
}

impl RequestDurationBucket {
pub fn new(threshold: Duration) -> Self {
Self {
threshold,
..Default::default()
}
}

pub fn requests_count_below_threshold(&self) -> u64 {
self.requests_below_threshold
}
Expand All @@ -40,7 +47,7 @@ impl RequestDurationBucket {

pub type Counter = usize;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RequestMetrics {
errors_max: Counter,
errors_map: HashMap<String, Counter>,
Expand All @@ -51,8 +58,8 @@ pub struct RequestMetrics {
failure_calls: Counter,
min_request_duration: Duration,
max_request_duration: Duration,
total_request_duration: Duration,
//requests_duration_buckets: Option<Vec<RequestDurationBucket>>, TODO
total_requests_duration: Duration,
requests_duration_buckets: Vec<RequestDurationBucket>,
}

/// Outcome of a request-based workflow, i.e., r_1, r_2, ..., r_N in which each individual request r_i may depend on the outcome of r_{i-1}
Expand All @@ -62,6 +69,7 @@ pub struct LoadTestMetrics {
inner: BTreeMap<String, RequestMetrics>,
last_time_emitted: Instant,
logger: Logger,
requests_duration_thresholds: Vec<Duration>,
}

impl LoadTestMetrics {
Expand All @@ -70,9 +78,15 @@ impl LoadTestMetrics {
inner: Default::default(),
last_time_emitted: Instant::now(),
logger,
requests_duration_thresholds: vec![],
}
}

pub fn with_requests_duration_thresholds(mut self, duration_threshold: Duration) -> Self {
self.requests_duration_thresholds.push(duration_threshold);
self
}

pub fn success_calls(&self) -> Counter {
self.inner
.values()
Expand Down Expand Up @@ -100,20 +114,58 @@ impl LoadTestMetrics {
})
}

pub fn find_request_duration_bucket(
&self,
_threshold: Duration,
) -> Option<RequestDurationBucket> {
todo!()
pub fn requests_count_below_threshold(&self, threshold: Duration) -> Vec<(String, u64)> {
self.inner
.iter()
.map(|(key, val)| {
(
key.clone(),
val.requests_duration_buckets
.iter()
.find(|bucket| bucket.threshold == threshold)
.expect("No bucket with a given threshold exists.")
.requests_count_below_threshold(),
)
})
.collect()
}

pub fn requests_ratio_below_threshold(&self, threshold: Duration) -> Vec<(String, f64)> {
self.inner
.iter()
.map(|(key, val)| {
(
key.clone(),
val.requests_duration_buckets
.iter()
.find(|bucket| bucket.threshold == threshold)
.expect("No bucket with a given threshold exists.")
.requests_ratio_below_threshold(),
)
})
.collect()
}

pub fn aggregate_load_testing_metrics<T, S>(mut self, item: LoadTestOutcome<T, S>) -> Self
where
T: Clone,
S: Clone + Display,
{
// Initialize empty request metrics with duration buckets.
let empty_request_metrics = RequestMetrics {
requests_duration_buckets: self
.requests_duration_thresholds
.iter()
.cloned()
.map(RequestDurationBucket::new)
.collect(),
..Default::default()
};
item.into_iter().for_each(|(req_name, outcome)| {
let entry = self.inner.entry(req_name).or_default();
let entry = self
.inner
.entry(req_name)
.or_insert_with(|| empty_request_metrics.clone());
entry.push(outcome)
});
self.log_throttled();
Expand Down Expand Up @@ -176,7 +228,7 @@ impl RequestMetrics {
}

pub fn avg_request_duration(&self) -> Option<Duration> {
self.total_request_duration
self.total_requests_duration
.checked_div(self.total_calls().try_into().unwrap())
}

Expand Down Expand Up @@ -219,12 +271,20 @@ impl RequestMetrics {
{
self.min_request_duration = min(self.min_request_duration, item.duration);
self.max_request_duration = max(self.max_request_duration, item.duration);
self.total_request_duration += item.duration;
self.total_requests_duration += item.duration;

self.min_attempts = min(self.min_attempts, item.attempts);
self.max_attempts = max(self.max_attempts, item.attempts);
self.total_attempts += item.attempts;

for bucket in self.requests_duration_buckets.iter_mut() {
if item.duration >= bucket.threshold {
bucket.requests_above_threshold += 1;
} else {
bucket.requests_below_threshold += 1;
}
}

if let Err(error) = item.result {
self.failure_calls += 1;
*self.errors_map.entry(error.to_string()).or_insert(0) += 1;
Expand All @@ -249,12 +309,12 @@ impl Default for RequestMetrics {
success_calls: 0,
failure_calls: 0,
min_request_duration: Duration::MAX,
max_request_duration: Duration::default(),
total_request_duration: Duration::default(),
// requests_duration_buckets: None, TODO
max_request_duration: Duration::ZERO,
total_requests_duration: Duration::ZERO,
min_attempts: Counter::MAX,
max_attempts: 0,
total_attempts: 0,
requests_duration_buckets: vec![],
}
}
}
Expand Down
109 changes: 67 additions & 42 deletions rs/tests/src/networking/network_reliability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ Runbook::
end::catalog[] */

use crate::canister_agent::CanisterAgent;
use crate::canister_api::{CallMode, GenericRequest};
use crate::driver::constants::DEVICE_NAME;
use crate::driver::ic::{AmountOfMemoryKiB, InternetComputer, NrOfVCPUs, Subnet, VmResources};
use crate::driver::test_env::TestEnv;
use crate::driver::test_env_api::{
HasPublicApiUrl, HasTopologySnapshot, IcNodeContainer, IcNodeSnapshot, NnsInstallationExt,
SshSession,
};
use crate::generic_workload_engine::engine::Engine;
use crate::generic_workload_engine::metrics::LoadTestMetrics;
use crate::util::{
self, agent_observes_canister_module, assert_canister_counter_with_retries, block_on,
};
use crate::workload::{CallSpec, Metrics, Request, RoundRobinPlan, Workload};
use ic_agent::{export::Principal, Agent};
use ic_base_types::NodeId;
use ic_registry_subnet_type::SubnetType;
Expand All @@ -47,7 +50,7 @@ const RND_SEED: u64 = 42;
// Size of the payload sent to the counter canister in update("write") call.
const PAYLOAD_SIZE_BYTES: usize = 1024;
// Duration of each request is placed into one of two categories - below or above this threshold.
const DURATION_THRESHOLD: Duration = Duration::from_secs(5);
const DURATION_THRESHOLD: Duration = Duration::from_secs(20);
// Ratio of requests with duration < DURATION_THRESHOLD should exceed this parameter.
const MIN_REQUESTS_RATIO_BELOW_THRESHOLD: f64 = 0.9;
// Parameters related to nodes stressing.
Expand All @@ -64,7 +67,6 @@ const FRACTION_FROM_REMAINING_DURATION: f64 = 0.25;
const MAX_CANISTER_READ_RETRIES: u32 = 4;
const CANISTER_READ_RETRY_WAIT: Duration = Duration::from_secs(10);
// Parameters related to workload creation.
const RESPONSES_COLLECTION_EXTRA_TIMEOUT: Duration = Duration::from_secs(30); // Responses are collected during the workload execution + this extra time, after all requests had been dispatched.
const REQUESTS_DISPATCH_EXTRA_TIMEOUT: Duration = Duration::from_secs(1); // This param can be slightly tweaked (1-2 sec), if the workload fails to dispatch requests precisely on time.

// Test can be run with different setup/configuration parameters.
Expand Down Expand Up @@ -217,7 +219,6 @@ pub fn test(env: TestEnv, config: Config) {
config.rps,
config.runtime,
payload.clone(),
DURATION_THRESHOLD,
);
let handle_workload_app = spawn_workload(
log.clone(),
Expand All @@ -226,7 +227,6 @@ pub fn test(env: TestEnv, config: Config) {
config.rps,
config.runtime,
payload.clone(),
DURATION_THRESHOLD,
);
info!(
&log,
Expand Down Expand Up @@ -275,42 +275,43 @@ pub fn test(env: TestEnv, config: Config) {
&log,
"Step 5: Collect metrics from both workloads and perform assertions ..."
);
let metrics_nns = handle_workload_nns
let load_metrics_nns = handle_workload_nns
.join()
.expect("Workload execution against NNS subnet failed.");
let metrics_app = handle_workload_app
let load_metrics_app = handle_workload_app
.join()
.expect("Workload execution against APP subnet failed.");

let duration_bucket_nns = metrics_nns
.find_request_duration_bucket(DURATION_THRESHOLD)
.unwrap();
let duration_bucket_app = metrics_app
.find_request_duration_bucket(DURATION_THRESHOLD)
.unwrap();
info!(
&log,
"Requests below {} sec:\nRequests_count: NNS={} APP={}\nRequests_ratio: NNS={} APP={}.",
DURATION_THRESHOLD.as_secs(),
duration_bucket_nns.requests_count_below_threshold(),
duration_bucket_app.requests_count_below_threshold(),
duration_bucket_nns.requests_ratio_below_threshold(),
duration_bucket_app.requests_ratio_below_threshold(),
);
assert!(
duration_bucket_nns.requests_ratio_below_threshold() > MIN_REQUESTS_RATIO_BELOW_THRESHOLD
);
assert!(
duration_bucket_app.requests_ratio_below_threshold() > MIN_REQUESTS_RATIO_BELOW_THRESHOLD
"Workload execution results for NNS: {load_metrics_nns}"
);
info!(
&log,
"Results of the workload execution for NNS {:?}", metrics_nns
"Workload execution results for APP: {load_metrics_app}"
);
let requests_count_below_threshold_nns =
load_metrics_nns.requests_count_below_threshold(DURATION_THRESHOLD);
let requests_count_below_threshold_app =
load_metrics_app.requests_count_below_threshold(DURATION_THRESHOLD);
let requests_ratio_below_threshold_nns =
load_metrics_nns.requests_ratio_below_threshold(DURATION_THRESHOLD);
let requests_ratio_below_threshold_app =
load_metrics_app.requests_ratio_below_threshold(DURATION_THRESHOLD);
info!(
&log,
"Results of the workload execution for APP {:?}", metrics_app
"Requests below {} sec:\nRequests_count: NNS={:?} APP={:?}\nRequests_ratio: NNS={:?} APP={:?}.",
DURATION_THRESHOLD.as_secs(),
requests_count_below_threshold_nns,
requests_count_below_threshold_app,
requests_ratio_below_threshold_nns,
requests_ratio_below_threshold_app,
);
assert!(requests_ratio_below_threshold_nns
.iter()
.all(|(_, ratio)| *ratio > MIN_REQUESTS_RATIO_BELOW_THRESHOLD));
assert!(requests_ratio_below_threshold_app
.iter()
.all(|(_, ratio)| *ratio > MIN_REQUESTS_RATIO_BELOW_THRESHOLD));
// Create agents to read results from the counter canisters.
let agent_nns = subnet_nns
.nodes()
Expand Down Expand Up @@ -477,21 +478,45 @@ fn spawn_workload(
rps: usize,
runtime: Duration,
payload: Vec<u8>,
duration_threshold: Duration,
) -> JoinHandle<Metrics> {
let plan = RoundRobinPlan::new(vec![Request::Update(CallSpec::new(
canister_id,
CANISTER_METHOD,
payload,
))]);
) -> JoinHandle<LoadTestMetrics> {
let agents: Vec<CanisterAgent> = agents.into_iter().map(CanisterAgent::from).collect();
thread::spawn(move || {
block_on(async {
let workload = Workload::new(agents, rps, runtime, plan, log)
.with_responses_collection_extra_timeout(RESPONSES_COLLECTION_EXTRA_TIMEOUT)
.increase_requests_dispatch_timeout(REQUESTS_DISPATCH_EXTRA_TIMEOUT)
.with_requests_duration_bucket(duration_threshold);
workload
.execute()
let agents = agents.clone();
block_on(async move {
let agents = agents.clone();
let payload = payload.clone();
let generator = {
let agents = agents.clone();
let payload = payload.clone();
move |idx: usize| {
let agent = &agents[idx % agents.len()];
let agent = agent.clone();
let payload = payload.clone();
async move {
let agent = agent.clone();
let payload = payload.clone();
let request = GenericRequest::new(
canister_id,
CANISTER_METHOD.to_string(),
payload,
CallMode::Update,
);
agent
.call(&request)
.await
.map(|_| ()) // drop non-error responses
.into_test_outcome()
}
}
};
// Don't log metrics during execution.
let log_null = slog::Logger::root(slog::Discard, slog::o!());
let aggregator = LoadTestMetrics::new(log_null)
.with_requests_duration_thresholds(DURATION_THRESHOLD);
let engine = Engine::new(log.clone(), generator, rps, runtime)
.increase_dispatch_timeout(REQUESTS_DISPATCH_EXTRA_TIMEOUT);
engine
.execute(aggregator, LoadTestMetrics::aggregator_fn)
.await
.expect("Execution of the workload failed.")
})
Expand Down

0 comments on commit 7da7d4f

Please sign in to comment.