Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subsystem-bench: Prepare CI output #3158

Merged
merged 11 commits into from
Feb 6, 2024
18 changes: 12 additions & 6 deletions polkadot/node/subsystem-bench/src/approval/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use crate::{
},
core::{
configuration::{TestAuthorities, TestConfiguration},
environment::{TestEnvironment, TestEnvironmentDependencies, MAX_TIME_OF_FLIGHT},
environment::{
BenchmarkUsage, TestEnvironment, TestEnvironmentDependencies, MAX_TIME_OF_FLIGHT,
},
mock::{
dummy_builder,
network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx},
Expand Down Expand Up @@ -876,7 +878,11 @@ fn prepare_test_inner(
)
}

pub async fn bench_approvals(env: &mut TestEnvironment, mut state: ApprovalTestState) {
pub async fn bench_approvals(
benchmark_name: &str,
env: &mut TestEnvironment,
mut state: ApprovalTestState,
) -> BenchmarkUsage {
let producer_rx = state
.start_message_production(
env.network(),
Expand All @@ -885,15 +891,16 @@ pub async fn bench_approvals(env: &mut TestEnvironment, mut state: ApprovalTestS
env.registry().clone(),
)
.await;
bench_approvals_run(env, state, producer_rx).await
bench_approvals_run(benchmark_name, env, state, producer_rx).await
}

/// Runs the approval benchmark.
pub async fn bench_approvals_run(
benchmark_name: &str,
env: &mut TestEnvironment,
state: ApprovalTestState,
producer_rx: oneshot::Receiver<()>,
) {
) -> BenchmarkUsage {
let config = env.config().clone();

env.metrics().set_n_validators(config.n_validators);
Expand Down Expand Up @@ -1054,6 +1061,5 @@ pub async fn bench_approvals_run(
state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst)
);

env.display_network_usage();
env.display_cpu_usage(&["approval-distribution", "approval-voting"]);
env.collect_resource_usage(benchmark_name, &["approval-distribution", "approval-voting"])
}
32 changes: 19 additions & 13 deletions polkadot/node/subsystem-bench/src/availability/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{core::mock::ChainApiState, TestEnvironment};
use crate::{
core::{environment::BenchmarkUsage, mock::ChainApiState},
TestEnvironment,
};
use av_store::NetworkAvailabilityState;
use bitvec::bitvec;
use colored::Colorize;
Expand Down Expand Up @@ -430,7 +433,11 @@ impl TestState {
}
}

pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: TestState) {
pub async fn benchmark_availability_read(
benchmark_name: &str,
env: &mut TestEnvironment,
mut state: TestState,
) -> BenchmarkUsage {
let config = env.config().clone();

env.import_block(new_block_import_info(Hash::repeat_byte(1), 1)).await;
Expand Down Expand Up @@ -490,12 +497,15 @@ pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: T
format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
);

env.display_network_usage();
env.display_cpu_usage(&["availability-recovery"]);
env.stop().await;
env.collect_resource_usage(benchmark_name, &["availability-recovery"])
}

pub async fn benchmark_availability_write(env: &mut TestEnvironment, mut state: TestState) {
pub async fn benchmark_availability_write(
benchmark_name: &str,
env: &mut TestEnvironment,
mut state: TestState,
) -> BenchmarkUsage {
let config = env.config().clone();

env.metrics().set_n_validators(config.n_validators);
Expand Down Expand Up @@ -648,15 +658,11 @@ pub async fn benchmark_availability_write(env: &mut TestEnvironment, mut state:
format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
);

env.display_network_usage();

env.display_cpu_usage(&[
"availability-distribution",
"bitfield-distribution",
"availability-store",
]);

env.stop().await;
env.collect_resource_usage(
benchmark_name,
&["availability-distribution", "bitfield-distribution", "availability-store"],
)
}

pub fn peer_bitfield_message_v2(
Expand Down
16 changes: 16 additions & 0 deletions polkadot/node/subsystem-bench/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ pub enum TestObjective {
Unimplemented,
}

impl std::fmt::Display for TestObjective {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Self::DataAvailabilityRead(_) => "DataAvailabilityRead",
Self::DataAvailabilityWrite => "DataAvailabilityWrite",
Self::TestSequence(_) => "TestSequence",
Self::ApprovalVoting(_) => "ApprovalVoting",
Self::Unimplemented => "Unimplemented",
}
)
}
}

#[derive(Debug, clap::Parser)]
#[clap(rename_all = "kebab-case")]
#[allow(missing_docs)]
Expand Down
120 changes: 83 additions & 37 deletions polkadot/node/subsystem-bench/src/core/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use colored::Colorize;
use core::time::Duration;
use futures::{Future, FutureExt};
use polkadot_overseer::{BlockInfo, Handle as OverseerHandle};
use serde::{Deserialize, Serialize};

use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt};
use polkadot_node_subsystem_types::Hash;
Expand Down Expand Up @@ -347,57 +348,102 @@ impl TestEnvironment {
}
}

/// Display network usage stats.
pub fn display_network_usage(&self) {
let stats = self.network().peer_stats(0);

let total_node_received = stats.received() / 1024;
let total_node_sent = stats.sent() / 1024;

println!(
"\nPayload bytes received from peers: {}, {}",
format!("{:.2} KiB total", total_node_received).blue(),
format!("{:.2} KiB/block", total_node_received / self.config().num_blocks)
.bright_blue()
);
pub fn collect_resource_usage(
&self,
benchmark_name: &str,
subsystems_under_test: &[&str],
) -> BenchmarkUsage {
BenchmarkUsage {
benchmark_name: benchmark_name.to_string(),
network_usage: self.network_usage(),
cpu_usage: self.cpu_usage(subsystems_under_test),
}
}

println!(
"Payload bytes sent to peers: {}, {}",
format!("{:.2} KiB total", total_node_sent).blue(),
format!("{:.2} KiB/block", total_node_sent / self.config().num_blocks).bright_blue()
);
fn network_usage(&self) -> Vec<ResourceUsage> {
let stats = self.network().peer_stats(0);
let total_node_received = (stats.received() / 1024) as f64;
let total_node_sent = (stats.sent() / 1024) as f64;
let num_blocks = self.config().num_blocks as f64;

vec![
ResourceUsage {
resource_name: "Received from peers".to_string(),
total: total_node_received,
per_block: total_node_received / num_blocks,
},
ResourceUsage {
resource_name: "Sent to peers".to_string(),
total: total_node_sent,
per_block: total_node_sent / num_blocks,
},
]
}

/// Print CPU usage stats in the CLI.
pub fn display_cpu_usage(&self, subsystems_under_test: &[&str]) {
fn cpu_usage(&self, subsystems_under_test: &[&str]) -> Vec<ResourceUsage> {
let test_metrics = super::display::parse_metrics(self.registry());
let mut usage = vec![];
let num_blocks = self.config().num_blocks as f64;

for subsystem in subsystems_under_test.iter() {
let subsystem_cpu_metrics =
test_metrics.subset_with_label_value("task_group", subsystem);
let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
println!(
"{} CPU usage {}",
subsystem.to_string().bright_green(),
format!("{:.3}s", total_cpu).bright_purple()
);
println!(
"{} CPU usage per block {}",
subsystem.to_string().bright_green(),
format!("{:.3}s", total_cpu / self.config().num_blocks as f64).bright_purple()
);
usage.push(ResourceUsage {
resource_name: subsystem.to_string(),
total: total_cpu,
per_block: total_cpu / num_blocks,
});
}

let test_env_cpu_metrics =
test_metrics.subset_with_label_value("task_group", "test-environment");
let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
println!(
"Total test environment CPU usage {}",
format!("{:.3}s", total_cpu).bright_purple()
);
println!(
"Test environment CPU usage per block {}",
format!("{:.3}s", total_cpu / self.config().num_blocks as f64).bright_purple()

usage.push(ResourceUsage {
resource_name: "Test environment".to_string(),
total: total_cpu,
per_block: total_cpu / num_blocks,
});

usage
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct BenchmarkUsage {
benchmark_name: String,
network_usage: Vec<ResourceUsage>,
cpu_usage: Vec<ResourceUsage>,
}

impl std::fmt::Display for BenchmarkUsage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"\n{}\n\n{}\n{}\n\n{}\n{}\n",
self.benchmark_name.purple(),
format!("{:<32}{:>12}{:>12}", "Network usage, KiB", "total", "per block").blue(),
self.network_usage
.iter()
.map(|v| v.to_string())
.collect::<Vec<String>>()
.join("\n"),
format!("{:<32}{:>12}{:>12}", "CPU usage in seconds", "total", "per block").blue(),
self.cpu_usage.iter().map(|v| v.to_string()).collect::<Vec<String>>().join("\n")
)
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ResourceUsage {
resource_name: String,
total: f64,
per_block: f64,
}

impl std::fmt::Display for ResourceUsage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:<32}{:>12.3}{:>12.3}", self.resource_name.cyan(), self.total, self.per_block)
}
}
62 changes: 43 additions & 19 deletions polkadot/node/subsystem-bench/src/subsystem-bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ struct BenchCli {
/// Enable Cache Misses Profiling with Valgrind. Linux only, Valgrind must be in the PATH
pub cache_misses: bool,

#[clap(long, default_value_t = false)]
/// Shows the output in YAML format
pub yaml_output: bool,

#[command(subcommand)]
pub objective: cli::TestObjective,
}
Expand Down Expand Up @@ -164,34 +168,51 @@ impl BenchCli {
format!("Sequence contains {} step(s)", num_steps).bright_purple()
);
for (index, test_config) in test_sequence.into_iter().enumerate() {
let benchmark_name =
format!("{} #{} {}", &options.path, index + 1, test_config.objective);
gum::info!(target: LOG_TARGET, "{}", format!("Step {}/{}", index + 1, num_steps).bright_purple(),);
display_configuration(&test_config);

match test_config.objective {
let usage = match test_config.objective {
TestObjective::DataAvailabilityRead(ref _opts) => {
let mut state = TestState::new(&test_config);
let (mut env, _protocol_config) = prepare_test(test_config, &mut state);
env.runtime().block_on(availability::benchmark_availability_read(
&mut env, state,
));
&benchmark_name,
&mut env,
state,
))
},
TestObjective::ApprovalVoting(ref options) => {
let (mut env, state) =
approval::prepare_test(test_config.clone(), options.clone());

env.runtime().block_on(bench_approvals(&mut env, state));
env.runtime().block_on(bench_approvals(
&benchmark_name,
&mut env,
state,
))
},
TestObjective::DataAvailabilityWrite => {
let mut state = TestState::new(&test_config);
let (mut env, _protocol_config) = prepare_test(test_config, &mut state);
env.runtime().block_on(availability::benchmark_availability_write(
&mut env, state,
));
&benchmark_name,
&mut env,
state,
))
},
TestObjective::TestSequence(_) => todo!(),
TestObjective::Unimplemented => todo!(),
}
};

let output = if self.yaml_output {
serde_yaml::to_string(&vec![usage])?
} else {
usage.to_string()
};
println!("{}", output);
}

return Ok(())
},
TestObjective::DataAvailabilityRead(ref _options) => self.create_test_configuration(),
Expand Down Expand Up @@ -232,25 +253,28 @@ impl BenchCli {
let mut state = TestState::new(&test_config);
let (mut env, _protocol_config) = prepare_test(test_config, &mut state);

match self.objective {
TestObjective::DataAvailabilityRead(_options) => {
env.runtime()
.block_on(availability::benchmark_availability_read(&mut env, state));
},
TestObjective::DataAvailabilityWrite => {
env.runtime()
.block_on(availability::benchmark_availability_write(&mut env, state));
},
TestObjective::TestSequence(_options) => {},
let benchmark_name = format!("{}", self.objective);
let usage = match self.objective {
TestObjective::DataAvailabilityRead(_options) => env.runtime().block_on(
availability::benchmark_availability_read(&benchmark_name, &mut env, state),
),
TestObjective::DataAvailabilityWrite => env.runtime().block_on(
availability::benchmark_availability_write(&benchmark_name, &mut env, state),
),
TestObjective::TestSequence(_options) => todo!(),
TestObjective::ApprovalVoting(_) => todo!(),
TestObjective::Unimplemented => todo!(),
}
};

if let Some(agent_running) = agent_running {
let agent_ready = agent_running.stop()?;
agent_ready.shutdown();
}

let output =
if self.yaml_output { serde_yaml::to_string(&vec![usage])? } else { usage.to_string() };
println!("{}", output);

Ok(())
}
}
Expand Down
Loading