Skip to content

Commit

Permalink
[BlockSTM] Iterable txns input to BlockSTM
Browse files Browse the repository at this point in the history
Currently BlockSTM takes in a block (vec) of txns and executes them.
This commits adds a capability where we don't need to provide all the
txns in the block upfront, rather provide them as per any desired logic
in the system.

The commit has a default implementation 'DefaultTxnProvider' where all
txns are provided upfront as per current logic, and also a reference
implementation of 'BlockingTxnsProvider' where txns can be provided
after BlockSTM starts execution.

Note: One should be careful while using 'BlockingTxnsProvider' because
if BlockSTM chooses to execute a txn that is not yet provided, then that
thread gets blocked until such a txn is provided. This could lead to
performance degradation.
  • Loading branch information
manudhundi committed Sep 9, 2024
1 parent 7dfcb77 commit f8ed8c1
Show file tree
Hide file tree
Showing 33 changed files with 362 additions and 132 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 17 additions & 9 deletions aptos-move/aptos-debugger/src/aptos_debugger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::{bail, format_err, Result};
use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook;
use aptos_block_executor::{
txn_commit_hook::NoOpTransactionCommitHook,
txn_provider::{default::DefaultTxnProvider, TxnProvider},
};
use aptos_gas_profiling::{GasProfiler, TransactionGasLog};
use aptos_rest_client::Client;
use aptos_types::{
Expand Down Expand Up @@ -59,22 +62,23 @@ impl AptosDebugger {
) -> Result<Vec<TransactionOutput>> {
let sig_verified_txns: Vec<SignatureVerifiedTransaction> =
txns.into_iter().map(|x| x.into()).collect::<Vec<_>>();
let txn_provider = DefaultTxnProvider::new(sig_verified_txns);
let state_view = DebuggerStateView::new(self.debugger.clone(), version);

print_transaction_stats(&sig_verified_txns, version);
print_transaction_stats(txn_provider.get_txns(), version);

let mut result = None;

for concurrency_level in concurrency_levels {
for i in 0..repeat_execution_times {
let start_time = Instant::now();
let cur_result =
execute_block_no_limit(&sig_verified_txns, &state_view, *concurrency_level)
execute_block_no_limit(&txn_provider, &state_view, *concurrency_level)
.map_err(|err| format_err!("Unexpected VM Error: {:?}", err))?;

println!(
"[{} txns from {}] Finished execution round {}/{} with concurrency_level={} in {}ms",
sig_verified_txns.len(),
txn_provider.num_txns(),
version,
i + 1,
repeat_execution_times,
Expand All @@ -98,7 +102,7 @@ impl AptosDebugger {
}

let result = result.unwrap();
assert_eq!(sig_verified_txns.len(), result.len());
assert_eq!(txn_provider.num_txns(), result.len());
Ok(result)
}

Expand Down Expand Up @@ -350,7 +354,7 @@ impl AptosDebugger {
}
}

fn print_transaction_stats(sig_verified_txns: &[SignatureVerifiedTransaction], version: u64) {
fn print_transaction_stats(sig_verified_txns: &[Arc<SignatureVerifiedTransaction>], version: u64) {
let transaction_types = sig_verified_txns
.iter()
.map(|txn| txn.expect_valid().type_name().to_string())
Expand Down Expand Up @@ -420,12 +424,16 @@ fn is_reconfiguration(vm_output: &TransactionOutput) -> bool {
}

fn execute_block_no_limit(
sig_verified_txns: &[SignatureVerifiedTransaction],
txn_provider: &DefaultTxnProvider<SignatureVerifiedTransaction>,
state_view: &DebuggerStateView,
concurrency_level: usize,
) -> Result<Vec<TransactionOutput>, VMStatus> {
BlockAptosVM::execute_block::<_, NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>>(
sig_verified_txns,
BlockAptosVM::execute_block::<
_,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
DefaultTxnProvider<SignatureVerifiedTransaction>,
>(
txn_provider,
state_view,
BlockExecutorConfig {
local: BlockExecutorLocalConfig {
Expand Down
1 change: 1 addition & 0 deletions aptos-move/aptos-e2e-comparison-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ default-run = "aptos-comparison-testing"

[dependencies]
anyhow = { workspace = true }
aptos-block-executor = { workspace = true }
aptos-framework = { workspace = true }
aptos-language-e2e-tests = { workspace = true }
aptos-rest-client = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
CompilationCache, DataManager, IndexWriter, PackageInfo, TxnIndex,
};
use anyhow::{format_err, Result};
use aptos_block_executor::txn_provider::default::DefaultTxnProvider;
use aptos_framework::natives::code::PackageMetadata;
use aptos_rest_client::Client;
use aptos_types::{
Expand Down Expand Up @@ -92,7 +93,8 @@ impl DataCollection {
// FIXME(#10412): remove the assert
let val = debugger_state_view.get_state_value(TOTAL_SUPPLY_STATE_KEY.deref());
assert!(val.is_ok() && val.unwrap().is_some());
AptosVM::execute_block_no_limit(&sig_verified_txns, debugger_state_view)
let txn_provider = DefaultTxnProvider::new(sig_verified_txns);
AptosVM::execute_block_no_limit(&txn_provider, debugger_state_view)
.map_err(|err| format_err!("Unexpected VM Error: {:?}", err))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

use crate::transactions;
use aptos_bitvec::BitVec;
use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook;
use aptos_block_executor::{
txn_commit_hook::NoOpTransactionCommitHook,
txn_provider::{default::DefaultTxnProvider, TxnProvider},
};
use aptos_block_partitioner::{
v2::config::PartitionerV2Config, BlockPartitioner, PartitionerConfig,
};
Expand Down Expand Up @@ -189,16 +192,16 @@ where
pub(crate) fn execute_sequential(mut self) {
// The output is ignored here since we're just testing transaction performance, not trying
// to assert correctness.
let txns = self.gen_transaction();
self.execute_benchmark_sequential(&txns, None);
let txn_provider = DefaultTxnProvider::new(self.gen_transaction());
self.execute_benchmark_sequential(&txn_provider, None);
}

/// Executes this state in a single block.
pub(crate) fn execute_parallel(mut self) {
// The output is ignored here since we're just testing transaction performance, not trying
// to assert correctness.
let txns = self.gen_transaction();
self.execute_benchmark_parallel(&txns, num_cpus::get(), None);
let txn_provider = DefaultTxnProvider::new(self.gen_transaction());
self.execute_benchmark_parallel(&txn_provider, num_cpus::get(), None);
}

fn is_shareded(&self) -> bool {
Expand All @@ -207,16 +210,17 @@ where

fn execute_benchmark_sequential(
&self,
transactions: &[SignatureVerifiedTransaction],
txn_provider: &DefaultTxnProvider<SignatureVerifiedTransaction>,
maybe_block_gas_limit: Option<u64>,
) -> (Vec<TransactionOutput>, usize) {
let block_size = transactions.len();
let block_size = txn_provider.num_txns();
let timer = Instant::now();
let output = BlockAptosVM::execute_block::<
_,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
DefaultTxnProvider<SignatureVerifiedTransaction>,
>(
transactions,
txn_provider,
self.state_view.as_ref(),
BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit),
None,
Expand Down Expand Up @@ -254,17 +258,18 @@ where

fn execute_benchmark_parallel(
&self,
transactions: &[SignatureVerifiedTransaction],
txn_provider: &DefaultTxnProvider<SignatureVerifiedTransaction>,
concurrency_level_per_shard: usize,
maybe_block_gas_limit: Option<u64>,
) -> (Vec<TransactionOutput>, usize) {
let block_size = transactions.len();
let block_size = txn_provider.num_txns();
let timer = Instant::now();
let output = BlockAptosVM::execute_block::<
_,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
DefaultTxnProvider<SignatureVerifiedTransaction>,
>(
transactions,
txn_provider,
self.state_view.as_ref(),
BlockExecutorConfig::new_maybe_block_limit(
concurrency_level_per_shard,
Expand All @@ -288,6 +293,7 @@ where
conurrency_level_per_shard: usize,
maybe_block_gas_limit: Option<u64>,
) -> (usize, usize) {
let txn_provider = DefaultTxnProvider::new(transactions);
let (output, par_tps) = if run_par {
println!("Parallel execution starts...");
let (output, tps) = if self.is_shareded() {
Expand All @@ -298,7 +304,7 @@ where
)
} else {
self.execute_benchmark_parallel(
&transactions,
&txn_provider,
conurrency_level_per_shard,
maybe_block_gas_limit,
)
Expand All @@ -317,7 +323,7 @@ where
let (output, seq_tps) = if run_seq {
println!("Sequential execution starts...");
let (output, tps) =
self.execute_benchmark_sequential(&transactions, maybe_block_gas_limit);
self.execute_benchmark_sequential(&txn_provider, maybe_block_gas_limit);
println!("Sequential execution finishes, TPS = {}", tps);
(output, tps)
} else {
Expand Down
1 change: 1 addition & 0 deletions aptos-move/aptos-transactional-test-harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ rust-version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
aptos-api-types = { workspace = true }
aptos-block-executor = { workspace = true }
aptos-cached-packages = { workspace = true }
aptos-crypto = { workspace = true }
aptos-framework = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use anyhow::{bail, format_err, Result};
use aptos_api_types::AsConverter;
use aptos_block_executor::txn_provider::default::DefaultTxnProvider;
use aptos_crypto::{
ed25519::{Ed25519PrivateKey, Ed25519PublicKey},
hash::HashValue,
Expand Down Expand Up @@ -516,13 +517,14 @@ impl<'a> AptosTestAdapter<'a> {
fn run_transaction(&mut self, txn: Transaction) -> Result<TransactionOutput> {
let txn_block = vec![txn];
let sig_verified_block = into_signature_verified_block(txn_block);
let txn_provider = DefaultTxnProvider::new(sig_verified_block);
let onchain_config = BlockExecutorConfigFromOnchain {
// TODO fetch values from state?
// Or should we just use execute_block_no_limit ?
block_gas_limit_type: BlockGasLimitType::Limit(30000),
};
let (mut outputs, _) =
AptosVM::execute_block(&sig_verified_block, &self.storage.clone(), onchain_config)?
AptosVM::execute_block(&txn_provider, &self.storage.clone(), onchain_config)?
.into_inner();

assert_eq!(outputs.len(), 1);
Expand Down
1 change: 1 addition & 0 deletions aptos-move/aptos-vm-profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ glob = { workspace = true }
once_cell = { workspace = true }
smallvec = { workspace = true }

aptos-block-executor = { workspace = true }
aptos-cached-packages = { workspace = true }
aptos-gas-schedule = { workspace = true }
aptos-language-e2e-tests = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use aptos_block_executor::txn_provider::default::DefaultTxnProvider;
use aptos_language_e2e_tests::{account::AccountData, data_store::FakeDataStore};
use aptos_types::{
transaction::{signature_verified_transaction::SignatureVerifiedTransaction, Transaction},
Expand Down Expand Up @@ -48,7 +49,8 @@ fn main() -> Result<()> {
})
.collect();

let res = AptosVM::execute_block_no_limit(&txns, &state_store)?;
let txn_provider = DefaultTxnProvider::new(txns);
let res = AptosVM::execute_block_no_limit(&txn_provider, &state_store)?;
for i in 0..NUM_TXNS {
assert!(res[i as usize].status().status().unwrap().is_success());
}
Expand Down
14 changes: 9 additions & 5 deletions aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use crate::{
VMExecutor, VMValidator,
};
use anyhow::anyhow;
use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook;
use aptos_block_executor::{
txn_commit_hook::NoOpTransactionCommitHook,
txn_provider::{default::DefaultTxnProvider, TxnProvider},
};
use aptos_crypto::HashValue;
use aptos_framework::{
natives::{code::PublishRequest, randomness::RandomnessContext},
Expand Down Expand Up @@ -2554,7 +2557,7 @@ impl VMExecutor for AptosVM {
/// mutability. Writes to be applied to the data view are encoded in the write set part of a
/// transaction output.
fn execute_block(
transactions: &[SignatureVerifiedTransaction],
txn_provider: &DefaultTxnProvider<SignatureVerifiedTransaction>,
state_view: &(impl StateView + Sync),
onchain_config: BlockExecutorConfigFromOnchain,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
Expand All @@ -2568,15 +2571,16 @@ impl VMExecutor for AptosVM {
info!(
log_context,
"Executing block, transaction count: {}",
transactions.len()
txn_provider.num_txns()
);

let count = transactions.len();
let count = txn_provider.num_txns();
let ret = BlockAptosVM::execute_block::<
_,
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
DefaultTxnProvider<SignatureVerifiedTransaction>,
>(
transactions,
txn_provider,
state_view,
BlockExecutorConfig {
local: BlockExecutorLocalConfig {
Expand Down
Loading

0 comments on commit f8ed8c1

Please sign in to comment.