Skip to content

Commit

Permalink
Executor benchmark revamps
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Oct 31, 2024
1 parent 42279bc commit e350fa7
Show file tree
Hide file tree
Showing 23 changed files with 742 additions and 301 deletions.
7 changes: 5 additions & 2 deletions api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use aptos_config::{
};
use aptos_crypto::{ed25519::Ed25519PrivateKey, hash::HashValue, SigningKey};
use aptos_db::AptosDB;
use aptos_executor::{block_executor::BlockExecutor, db_bootstrapper};
use aptos_executor::{
block_executor::{AptosVMBlockExecutor, BlockExecutor},
db_bootstrapper,
};
use aptos_executor_types::BlockExecutorTrait;
use aptos_framework::BuiltPackage;
use aptos_indexer_grpc_table_info::internal_indexer_db_service::MockInternalIndexerDBService;
Expand Down Expand Up @@ -204,7 +207,7 @@ pub fn new_test_context_inner(
rng,
root_key,
validator_owner,
Box::new(BlockExecutor::<AptosVM>::new(db_rw)),
Box::new(BlockExecutor::<AptosVMBlockExecutor>::new(db_rw)),
mempool,
db,
test_name,
Expand Down
4 changes: 2 additions & 2 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub struct AptosTransactionOutput {
}

impl AptosTransactionOutput {
pub(crate) fn new(output: VMOutput) -> Self {
pub fn new(output: VMOutput) -> Self {
Self {
vm_output: Mutex::new(Some(output)),
committed_output: OnceCell::new(),
Expand All @@ -138,7 +138,7 @@ impl AptosTransactionOutput {
self.committed_output.get().unwrap()
}

fn take_output(mut self) -> TransactionOutput {
pub fn take_output(mut self) -> TransactionOutput {
match self.committed_output.take() {
Some(output) => output,
// TODO: revisit whether we should always get it via committed, or o.w. create a
Expand Down
1 change: 1 addition & 0 deletions aptos-move/aptos-vm/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const BLOCK_EXECUTION_TIME_BUCKETS: [f64; 16] = [
0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.80, 0.90, 1.0, 1.25, 1.5, 1.75, 2.0, 3.0, 4.0, 5.0,
];

// TODO - disambiguate against BLOCK_EXECUTOR_EXECUTE_BLOCK
pub static BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
// metric name
Expand Down
11 changes: 11 additions & 0 deletions aptos-move/block-executor/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ fn output_buckets() -> std::vec::Vec<f64> {
.unwrap()
}

pub static BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
// metric name
"aptos_executor_block_executor_inner_execute_block_seconds",
// metric description
"The time spent in seconds of BlockExecutor inner block execution in Aptos executor",
exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(),
)
.unwrap()
});

/// Count of times the module publishing fallback was triggered in parallel execution.
pub static MODULE_PUBLISHING_FALLBACK_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
Expand Down
8 changes: 5 additions & 3 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

use crate::{
code_cache_global::ImmutableModuleCache,
counters,
counters::{
PARALLEL_EXECUTION_SECONDS, RAYON_EXECUTION_SECONDS, TASK_EXECUTE_SECONDS,
TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, WORK_WITH_TASK_SECONDS,
self, BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK, PARALLEL_EXECUTION_SECONDS,
RAYON_EXECUTION_SECONDS, TASK_EXECUTE_SECONDS, TASK_VALIDATE_SECONDS, VM_INIT_SECONDS,
WORK_WITH_TASK_SECONDS,
},
errors::*,
executor_utilities::*,
Expand Down Expand Up @@ -1692,6 +1692,8 @@ where
signature_verified_block: &[T],
base_view: &S,
) -> BlockExecutionResult<BlockOutput<E::Output>, E::Error> {
let _timer = BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK.start_timer();

if self.config.local.concurrency_level > 1 {
let parallel_result =
self.execute_transactions_parallel(&env, signature_verified_block, base_view);
Expand Down
7 changes: 3 additions & 4 deletions consensus/src/consensus_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ use aptos_channels::aptos_channel::Receiver;
use aptos_config::config::NodeConfig;
use aptos_consensus_notifications::ConsensusNotificationSender;
use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener};
use aptos_executor::block_executor::BlockExecutor;
use aptos_executor::block_executor::{AptosVMBlockExecutor, BlockExecutor};
use aptos_logger::prelude::*;
use aptos_mempool::QuorumStoreRequest;
use aptos_network::application::interface::{NetworkClient, NetworkServiceEvents};
use aptos_storage_interface::DbReaderWriter;
use aptos_time_service::TimeService;
use aptos_validator_transaction_pool::VTxnPoolState;
use aptos_vm::AptosVM;
use futures::channel::mpsc;
use move_core_types::account_address::AccountAddress;
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -65,7 +64,7 @@ pub fn start_consensus(
));

let execution_proxy = ExecutionProxy::new(
Arc::new(BlockExecutor::<AptosVM>::new(aptos_db)),
Arc::new(BlockExecutor::<AptosVMBlockExecutor>::new(aptos_db)),
txn_notifier,
state_sync_notifier,
runtime.handle(),
Expand Down Expand Up @@ -158,7 +157,7 @@ pub fn start_consensus_observer(
node_config.consensus.mempool_executed_txn_timeout_ms,
));
let execution_proxy = ExecutionProxy::new(
Arc::new(BlockExecutor::<AptosVM>::new(aptos_db.clone())),
Arc::new(BlockExecutor::<AptosVMBlockExecutor>::new(aptos_db.clone())),
txn_notifier,
state_sync_notifier,
consensus_observer_runtime.handle(),
Expand Down
55 changes: 34 additions & 21 deletions execution/executor-benchmark/src/block_preparation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{metrics::TIMER, pipeline::ExecuteBlockMessage};
use crate::{
metrics::{NUM_TXNS, TIMER},
pipeline::ExecuteBlockMessage,
};
use aptos_block_partitioner::{BlockPartitioner, PartitionerConfig};
use aptos_crypto::HashValue;
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
Expand All @@ -10,39 +13,39 @@ use aptos_types::{
block_executor::partitioner::{ExecutableBlock, ExecutableTransactions},
transaction::{signature_verified_transaction::SignatureVerifiedTransaction, Transaction},
};
use once_cell::sync::Lazy;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use std::{sync::Arc, time::Instant};

pub static SIG_VERIFY_POOL: Lazy<Arc<rayon::ThreadPool>> = Lazy::new(|| {
Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(8) // More than 8 threads doesn't seem to help much
.thread_name(|index| format!("signature-checker-{}", index))
.build()
.unwrap(),
)
});
use std::time::Instant;

pub(crate) struct BlockPreparationStage {
num_executor_shards: usize,
num_blocks_processed: usize,
maybe_partitioner: Option<Box<dyn BlockPartitioner>>,
sig_verify_pool: rayon::ThreadPool,
}

impl BlockPreparationStage {
pub fn new(num_shards: usize, partitioner_config: &dyn PartitionerConfig) -> Self {
pub fn new(
sig_verify_num_threads: usize,
num_shards: usize,
partitioner_config: &dyn PartitionerConfig,
) -> Self {
let maybe_partitioner = if num_shards == 0 {
None
} else {
let partitioner = partitioner_config.build();
Some(partitioner)
};

let sig_verify_pool = rayon::ThreadPoolBuilder::new()
.num_threads(sig_verify_num_threads) // More than 8 threads doesn't seem to help much
.thread_name(|index| format!("signature-checker-{}", index))
.build()
.unwrap();
Self {
num_executor_shards: num_shards,
num_blocks_processed: 0,
maybe_partitioner,
sig_verify_pool,
}
}

Expand All @@ -54,16 +57,26 @@ impl BlockPreparationStage {
txns.len()
);
let block_id = HashValue::random();
let sig_verified_txns: Vec<SignatureVerifiedTransaction> = SIG_VERIFY_POOL.install(|| {
let num_txns = txns.len();
txns.into_par_iter()
.with_min_len(optimal_min_len(num_txns, 32))
.map(|t| t.into())
.collect::<Vec<_>>()
});
let sig_verified_txns: Vec<SignatureVerifiedTransaction> =
self.sig_verify_pool.install(|| {
let _timer = TIMER.with_label_values(&["sig_verify"]).start_timer();

let num_txns = txns.len();
NUM_TXNS
.with_label_values(&["sig_verify"])
.inc_by(num_txns as u64);

txns.into_par_iter()
.with_min_len(optimal_min_len(num_txns, 32))
.map(|t| t.into())
.collect::<Vec<_>>()
});
let block: ExecutableBlock = match &self.maybe_partitioner {
None => (block_id, sig_verified_txns).into(),
Some(partitioner) => {
NUM_TXNS
.with_label_values(&["partition"])
.inc_by(sig_verified_txns.len() as u64);
let analyzed_transactions =
sig_verified_txns.into_iter().map(|t| t.into()).collect();
let timer = TIMER.with_label_values(&["partition"]).start_timer();
Expand Down
2 changes: 1 addition & 1 deletion execution/executor-benchmark/src/db_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn create_db_with_accounts<V>(
);
}

fn bootstrap_with_genesis(
pub(crate) fn bootstrap_with_genesis(
db_dir: impl AsRef<Path>,
enable_storage_sharding: bool,
init_features: Features,
Expand Down
Loading

0 comments on commit e350fa7

Please sign in to comment.