Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Nov 7, 2024
1 parent 340a6ab commit c8b51ca
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 54 deletions.
1 change: 0 additions & 1 deletion aptos-move/aptos-vm/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ const BLOCK_EXECUTION_TIME_BUCKETS: [f64; 16] = [
0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.80, 0.90, 1.0, 1.25, 1.5, 1.75, 2.0, 3.0, 4.0, 5.0,
];

// TODO - disambiguate against BLOCK_EXECUTOR_EXECUTE_BLOCK
pub static BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
// metric name
Expand Down
3 changes: 2 additions & 1 deletion aptos-move/block-executor/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ pub static BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK: Lazy<Histogram> = Lazy::new(|| {
// metric name
"aptos_executor_block_executor_inner_execute_block_seconds",
// metric description
"The time spent in seconds of BlockExecutor inner block execution in Aptos executor",
"The time spent in the most-inner part of executing a block of transactions, \
i.e. for BlockSTM that is how long parallel or sequential execution took.",
exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(),
)
.unwrap()
Expand Down
1 change: 0 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ aptos-temppath = { workspace = true }
aptos-time-service = { workspace = true }
aptos-types = { workspace = true }
aptos-validator-transaction-pool = { workspace = true }
aptos-vm = { workspace = true }
async-trait = { workspace = true }
bcs = { workspace = true }
byteorder = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};

/// Smallest number of transactions Rayon should put into a single worker task.
/// Same as in execution/executor-benchmark/src/block_preparation.rs
pub const SIG_VERIFY_RAYON_MIN_THRESHOLD: usize = 32;

pub type PreCommitHook =
Box<dyn 'static + FnOnce(&StateComputeResult) -> BoxFuture<'static, ()> + Send>;

Expand Down Expand Up @@ -156,7 +160,7 @@ impl ExecutionPipeline {
let num_txns = txns_to_execute.len();
txns_to_execute
.into_par_iter()
.with_min_len(optimal_min_len(num_txns, 32))
.with_min_len(optimal_min_len(num_txns, SIG_VERIFY_RAYON_MIN_THRESHOLD))
.map(|t| t.into())
.collect::<Vec<_>>()
});
Expand Down
23 changes: 17 additions & 6 deletions execution/executor-benchmark/src/block_preparation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,27 @@ use aptos_types::{
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use std::time::Instant;

/// Smallest number of transactions Rayon should put into a single worker task.
/// Same as in consensus/src/execution_pipeline.rs
pub const SIG_VERIFY_RAYON_MIN_THRESHOLD: usize = 32;

/// Executes preparation stage - set of operations that are
/// executed in a separate stage of the pipeline from execution,
/// like signature verificaiton or block partitioning
pub(crate) struct BlockPreparationStage {
num_executor_shards: usize,
/// Number of blocks processed
num_blocks_processed: usize,
maybe_partitioner: Option<Box<dyn BlockPartitioner>>,
/// Pool of theads for signature verification
sig_verify_pool: rayon::ThreadPool,
/// When execution sharding is enabled, number of executor shards
num_executor_shards: usize,
/// When execution sharding is enabled, partitioner that splits block into shards
maybe_partitioner: Option<Box<dyn BlockPartitioner>>,
}

impl BlockPreparationStage {
pub fn new(
sig_verify_num_threads: usize,
num_sig_verify_threads: usize,
num_shards: usize,
partitioner_config: &dyn PartitionerConfig,
) -> Self {
Expand All @@ -37,10 +48,10 @@ impl BlockPreparationStage {
};

let sig_verify_pool = rayon::ThreadPoolBuilder::new()
.num_threads(sig_verify_num_threads) // More than 8 threads doesn't seem to help much
.num_threads(num_sig_verify_threads)
.thread_name(|index| format!("signature-checker-{}", index))
.build()
.unwrap();
.expect("couldn't create sig_verify thread pool");
Self {
num_executor_shards: num_shards,
num_blocks_processed: 0,
Expand All @@ -67,7 +78,7 @@ impl BlockPreparationStage {
.inc_by(num_txns as u64);

txns.into_par_iter()
.with_min_len(optimal_min_len(num_txns, 32))
.with_min_len(optimal_min_len(num_txns, SIG_VERIFY_RAYON_MIN_THRESHOLD))
.map(|t| t.into())
.collect::<Vec<_>>()
});
Expand Down
10 changes: 5 additions & 5 deletions execution/executor-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use aptos_db::AptosDB;
use aptos_executor::{
block_executor::{BlockExecutor, TransactionBlockExecutor},
metrics::{
BLOCK_EXECUTOR_EXECUTE_BLOCK, COMMIT_BLOCKS, EXECUTE_BLOCK, OTHER_TIMERS,
COMMIT_BLOCKS, GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING, OTHER_TIMERS,
PROCESSED_TXNS_OUTPUT_SIZE, UPDATE_LEDGER,
},
};
Expand Down Expand Up @@ -251,7 +251,7 @@ pub fn run_benchmark<V>(
(num_blocks_created, "raw transfer".to_string())
},
};
if pipeline_config.delay_pipeline_start {
if pipeline_config.generate_then_execute {
overall_measuring.start_time = Instant::now();
}
generator.drop_sender();
Expand Down Expand Up @@ -575,8 +575,8 @@ impl ExecutionTimeMeasurement {

let sig_verify_total = TIMER.with_label_values(&["sig_verify"]).get_sample_sum();
let partitioning_total = TIMER.with_label_values(&["partition"]).get_sample_sum();
let execution_total = EXECUTE_BLOCK.get_sample_sum();
let block_executor_total = BLOCK_EXECUTOR_EXECUTE_BLOCK.get_sample_sum();
let execution_total = TIMER.with_label_values(&["execute"]).get_sample_sum();
let block_executor_total = GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum();
let block_executor_inner_total = BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK.get_sample_sum();

let by_other = OTHER_LABELS
Expand Down Expand Up @@ -719,7 +719,7 @@ impl OverallMeasuring {
num_txns / delta_execution.execution_total_time
);
info!(
"{} fraction of execution {:.4} in block executor (component TPS: {:.1})",
"{} fraction of execution {:.4} in get execution output by executing (component TPS: {:.1})",
prefix,
delta_execution.block_executor_total_time / delta_execution.execution_total_time,
num_txns / delta_execution.block_executor_total_time
Expand Down
33 changes: 30 additions & 3 deletions execution/executor-benchmark/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,30 +101,51 @@ impl PrunerOpt {

#[derive(Debug, Parser)]
pub struct PipelineOpt {
/// First generate all transactions for all blocks (and keep them in memory),
/// and only then start the pipeline.
/// Useful when not running large number of blocks (so it can fit in memory),
/// as generation of blocks takes not-insignificant amount of CPU.
#[clap(long)]
generate_then_execute: bool,
/// Run each stage separately, i.e. each stage wait for previous stage to finish
/// processing all blocks, before starting.
/// Allows to see individual throughput of each stage, avoiding resource contention.
#[clap(long)]
split_stages: bool,
/// Skip commit stage - i.e. create executed blocks in memory, but never commit them.
/// Useful when commit is the bottleneck, to see throughput of the rest of the pipeline.
#[clap(long)]
skip_commit: bool,
/// Whether transactions are allowed to abort.
/// By default, workload generates transactions that are all expected to succeeded,
/// so aborts are not allowed - to catch any correctness/configuration issues.
#[clap(long)]
allow_aborts: bool,
/// Whether transactions are allowed to be discarded.
/// By default, workload generates transactions that are all expected to succeeded,
/// so discards are not allowed - to catch any correctness/configuration issues.
#[clap(long)]
allow_discards: bool,
/// Whether transactions are allowed to be retried.
/// By default, workload generates transactions that are all expected to succeeded,
/// so retries are not allowed - to catch any correctness/configuration issues.
#[clap(long)]
allow_retries: bool,
/// Number of worker threads transaction generation will use.
#[clap(long, default_value = "4")]
num_generator_workers: usize,
/// Number of worker threads signature verification will use.
#[clap(long, default_value = "8")]
sig_verify_num_threads: usize,
num_sig_verify_threads: usize,
/// Sharding configuration.
#[clap(flatten)]
sharding_opt: ShardingOpt,
}

impl PipelineOpt {
fn pipeline_config(&self) -> PipelineConfig {
PipelineConfig {
delay_pipeline_start: self.generate_then_execute,
generate_then_execute: self.generate_then_execute,
split_stages: self.split_stages,
skip_commit: self.skip_commit,
allow_aborts: self.allow_aborts,
Expand All @@ -133,7 +154,7 @@ impl PipelineOpt {
num_executor_shards: self.sharding_opt.num_executor_shards,
num_generator_workers: self.num_generator_workers,
partitioner_config: self.sharding_opt.partitioner_config(),
sig_verify_num_threads: self.sig_verify_num_threads,
num_sig_verify_threads: self.num_sig_verify_threads,
}
}
}
Expand Down Expand Up @@ -209,8 +230,14 @@ struct ProfilerOpt {

#[derive(Parser, Debug, ValueEnum, Clone, Default)]
enum BlockExecutorTypeOpt {
/// Transaction execution: AptosVM
/// Executing conflicts: in the input order, via BlockSTM,
/// State: BlockSTM-provided MVHashMap-based view with caching
#[default]
AptosVMWithBlockSTM,
/// Transaction execution: Native rust code producing WriteSet
/// Executing conflicts: All transactions execute on the state at the beginning of the block
/// State: Raw CachedStateView
NativeLooseSpeculative,
PtxExecutor,
}
Expand Down
2 changes: 1 addition & 1 deletion execution/executor-benchmark/src/native/native_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl NativeConfig {
pub fn get_concurrency_level() -> usize {
match NATIVE_EXECUTOR_CONCURRENCY_LEVEL.get() {
Some(concurrency_level) => *concurrency_level,
None => 32,
None => 1,
}
}
}
10 changes: 5 additions & 5 deletions execution/executor-benchmark/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::{
#[derive(Debug, Derivative)]
#[derivative(Default)]
pub struct PipelineConfig {
pub delay_pipeline_start: bool,
pub generate_then_execute: bool,
pub split_stages: bool,
pub skip_commit: bool,
pub allow_aborts: bool,
Expand All @@ -41,7 +41,7 @@ pub struct PipelineConfig {
#[derivative(Default(value = "4"))]
pub num_generator_workers: usize,
pub partitioner_config: PartitionerV2Config,
pub sig_verify_num_threads: usize,
pub num_sig_verify_threads: usize,
}

pub struct Pipeline<V> {
Expand All @@ -67,7 +67,7 @@ where
let executor_3 = executor_1.clone();

let (raw_block_sender, raw_block_receiver) = mpsc::sync_channel::<Vec<Transaction>>(
if config.delay_pipeline_start {
if config.generate_then_execute {
(num_blocks.unwrap() + 1).max(50)
} else {
10
Expand Down Expand Up @@ -101,7 +101,7 @@ where
);

let (start_pipeline_tx, start_pipeline_rx) =
create_start_tx_rx(config.delay_pipeline_start);
create_start_tx_rx(config.generate_then_execute);
let (start_execution_tx, start_execution_rx) = create_start_tx_rx(config.split_stages);
let (start_ledger_update_tx, start_ledger_update_rx) =
create_start_tx_rx(config.split_stages);
Expand All @@ -111,7 +111,7 @@ where

// signature verification and partitioning
let mut preparation_stage = BlockPreparationStage::new(
config.sig_verify_num_threads,
config.num_sig_verify_threads,
// Assume the distributed executor and the distributed partitioner share the same worker set.
config.num_executor_shards,
&config.partitioner_config,
Expand Down
12 changes: 7 additions & 5 deletions execution/executor-benchmark/src/transaction_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use aptos_crypto::hash::HashValue;
use aptos_db::metrics::API_LATENCY_SECONDS;
use aptos_executor::{
block_executor::{BlockExecutor, TransactionBlockExecutor},
metrics::{BLOCK_EXECUTOR_EXECUTE_BLOCK, COMMIT_BLOCKS, EXECUTE_BLOCK},
metrics::{
BLOCK_EXECUTION_WORKFLOW_WHOLE, COMMIT_BLOCKS, GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING,
},
};
use aptos_executor_types::BlockExecutorTrait;
use aptos_logger::prelude::*;
Expand Down Expand Up @@ -133,17 +135,17 @@ fn report_block(
);
info!(
"Accumulative total: BlockSTM+VM time: {:.0} secs, executor time: {:.0} secs, commit time: {:.0} secs, DB commit time: {:.0} secs",
BLOCK_EXECUTOR_EXECUTE_BLOCK.get_sample_sum(),
EXECUTE_BLOCK.get_sample_sum() - BLOCK_EXECUTOR_EXECUTE_BLOCK.get_sample_sum(),
GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum(),
BLOCK_EXECUTION_WORKFLOW_WHOLE.get_sample_sum() - GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum(),
COMMIT_BLOCKS.get_sample_sum(),
API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum(),
);
const NANOS_PER_SEC: f64 = 1_000_000_000.0;
info!(
"Accumulative per transaction: BlockSTM+VM time: {:.0} ns, executor time: {:.0} ns, commit time: {:.0} ns, DB commit time: {:.0} ns",
BLOCK_EXECUTOR_EXECUTE_BLOCK.get_sample_sum() * NANOS_PER_SEC
GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum() * NANOS_PER_SEC
/ total_versions,
(EXECUTE_BLOCK.get_sample_sum() - BLOCK_EXECUTOR_EXECUTE_BLOCK.get_sample_sum()) * NANOS_PER_SEC
(BLOCK_EXECUTION_WORKFLOW_WHOLE.get_sample_sum() - GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum()) * NANOS_PER_SEC
/ total_versions,
COMMIT_BLOCKS.get_sample_sum() * NANOS_PER_SEC
/ total_versions,
Expand Down
20 changes: 11 additions & 9 deletions execution/executor-benchmark/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::pipeline::LedgerUpdateMessage;
use crate::{metrics::TIMER, pipeline::LedgerUpdateMessage};
use aptos_crypto::hash::HashValue;
use aptos_executor::block_executor::{BlockExecutor, TransactionBlockExecutor};
use aptos_executor_types::BlockExecutorTrait;
Expand Down Expand Up @@ -60,14 +60,16 @@ where
self.num_blocks_processed, block_id
);
let num_input_txns = executable_block.transactions.num_transactions();
self.executor
.execute_and_state_checkpoint(
executable_block,
self.parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
)
.unwrap();

{
let _timer = TIMER.with_label_values(&["execute"]).start_timer();
self.executor
.execute_and_state_checkpoint(
executable_block,
self.parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
)
.unwrap();
}
let msg = LedgerUpdateMessage {
current_block_start_time,
first_block_start_time: *self.maybe_first_block_start_time.as_ref().unwrap(),
Expand Down
14 changes: 10 additions & 4 deletions execution/executor/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
use crate::{
logging::{LogEntry, LogSchema},
metrics::{
BLOCK_EXECUTOR_EXECUTE_BLOCK, COMMIT_BLOCKS, CONCURRENCY_GAUGE, EXECUTE_BLOCK,
OTHER_TIMERS, SAVE_TRANSACTIONS, TRANSACTIONS_SAVED, UPDATE_LEDGER,
BLOCK_EXECUTION_WORKFLOW_WHOLE, COMMIT_BLOCKS, CONCURRENCY_GAUGE,
GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING, OTHER_TIMERS, SAVE_TRANSACTIONS,
TRANSACTIONS_SAVED, UPDATE_LEDGER,
},
types::partial_state_compute_result::PartialStateComputeResult,
workflow::{
Expand Down Expand Up @@ -56,6 +57,11 @@ pub trait TransactionBlockExecutor: Send + Sync {
) -> Result<ExecutionOutput>;
}

/// Production implementation of TransactionBlockExecutor.
///
/// Transaction execution: AptosVM
/// Executing conflicts: in the input order, via BlockSTM,
/// State: BlockSTM-provided MVHashMap-based view with caching
pub struct AptosVMBlockExecutor;

impl TransactionBlockExecutor for AptosVMBlockExecutor {
Expand Down Expand Up @@ -217,7 +223,7 @@ where
parent_block_id: HashValue,
onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<()> {
let _timer = EXECUTE_BLOCK.start_timer();
let _timer = BLOCK_EXECUTION_WORKFLOW_WHOLE.start_timer();
let ExecutableBlock {
block_id,
transactions,
Expand Down Expand Up @@ -263,7 +269,7 @@ where
};

let execution_output = {
let _timer = BLOCK_EXECUTOR_EXECUTE_BLOCK.start_timer();
let _timer = GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.start_timer();
fail_point!("executor::block_executor_execute_block", |_| {
Err(ExecutorError::from(anyhow::anyhow!(
"Injected error in block_executor_execute_block"
Expand Down
Loading

0 comments on commit c8b51ca

Please sign in to comment.