diff --git a/Cargo.lock b/Cargo.lock index 3b429c2e148c0..7780fb3a12a04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -813,7 +813,6 @@ name = "aptos-comparison-testing" version = "0.1.0" dependencies = [ "anyhow", - "aptos-block-executor", "aptos-framework", "aptos-language-e2e-tests", "aptos-rest-client", @@ -1263,7 +1262,6 @@ dependencies = [ "anyhow", "aptos-backup-cli", "aptos-backup-service", - "aptos-block-executor", "aptos-config", "aptos-db", "aptos-executor", @@ -1454,7 +1452,6 @@ name = "aptos-executor" version = "0.1.0" dependencies = [ "anyhow", - "aptos-block-executor", "aptos-cached-packages", "aptos-config", "aptos-consensus-types", @@ -1545,7 +1542,6 @@ dependencies = [ name = "aptos-executor-service" version = "0.1.0" dependencies = [ - "aptos-block-executor", "aptos-block-partitioner", "aptos-config", "aptos-infallible", @@ -1654,7 +1650,6 @@ dependencies = [ name = "aptos-experimental-ptx-executor" version = "0.1.0" dependencies = [ - "aptos-block-executor", "aptos-experimental-runtimes", "aptos-infallible", "aptos-logger", @@ -4296,7 +4291,6 @@ version = "0.1.0" dependencies = [ "anyhow", "aptos-api-types", - "aptos-block-executor", "aptos-cached-packages", "aptos-crypto", "aptos-framework", @@ -4594,7 +4588,6 @@ name = "aptos-vm-profiling" version = "0.1.0" dependencies = [ "anyhow", - "aptos-block-executor", "aptos-cached-packages", "aptos-gas-schedule", "aptos-language-e2e-tests", diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index b06f804c4bc18..46fa65725cb90 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -3,9 +3,7 @@ use anyhow::{bail, format_err, Result}; use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, + code_cache_global_manager::AptosModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, }; use aptos_gas_profiling::{GasProfiler, TransactionGasLog}; use aptos_rest_client::Client; @@ -22,6 +20,7 @@ use aptos_types::{ SignedTransaction, Transaction, TransactionInfo, TransactionOutput, TransactionPayload, Version, }, + txn_provider::{default::DefaultTxnProvider, TxnProvider}, vm_status::VMStatus, }; use aptos_validator_interface::{ @@ -430,14 +429,14 @@ fn is_reconfiguration(vm_output: &TransactionOutput) -> bool { } fn execute_block_no_limit( - txn_provider: &DefaultTxnProvider, + txn_provider: &dyn TxnProvider, state_view: &DebuggerStateView, concurrency_level: usize, ) -> Result, VMStatus> { BlockAptosVM::execute_block::< _, NoOpTransactionCommitHook, - DefaultTxnProvider, + dyn TxnProvider, >( txn_provider, state_view, diff --git a/aptos-move/aptos-e2e-comparison-testing/Cargo.toml b/aptos-move/aptos-e2e-comparison-testing/Cargo.toml index bce1115bb31da..30c851670b4df 100644 --- a/aptos-move/aptos-e2e-comparison-testing/Cargo.toml +++ b/aptos-move/aptos-e2e-comparison-testing/Cargo.toml @@ -12,7 +12,6 @@ default-run = "aptos-comparison-testing" [dependencies] anyhow = { workspace = true } -aptos-block-executor = { workspace = true } aptos-framework = { workspace = true } aptos-language-e2e-tests = { workspace = true } aptos-rest-client = { workspace = true } diff --git a/aptos-move/aptos-e2e-comparison-testing/src/data_collection.rs b/aptos-move/aptos-e2e-comparison-testing/src/data_collection.rs index 4cb63d52a4748..4a02d594f4379 100644 --- a/aptos-move/aptos-e2e-comparison-testing/src/data_collection.rs +++ b/aptos-move/aptos-e2e-comparison-testing/src/data_collection.rs @@ -6,7 +6,6 @@ use crate::{ CompilationCache, DataManager, IndexWriter, PackageInfo, TxnIndex, }; use anyhow::{format_err, Result}; -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; use aptos_framework::natives::code::PackageMetadata; use aptos_rest_client::Client; use aptos_types::{ @@ -15,6 +14,7 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, Transaction, TransactionOutput, Version, }, + txn_provider::default::DefaultTxnProvider, write_set::TOTAL_SUPPLY_STATE_KEY, }; use aptos_validator_interface::{AptosValidatorInterface, FilterCondition, RestDebuggerInterface}; diff --git a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs index 8feb67adefb88..3e0ddcc456e21 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs @@ -4,9 +4,7 @@ use crate::transactions; use aptos_bitvec::BitVec; use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, + code_cache_global_manager::AptosModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, }; use aptos_block_partitioner::{ v2::config::PartitionerV2Config, BlockPartitioner, PartitionerConfig, @@ -32,6 +30,7 @@ use aptos_types::{ }, ExecutionStatus, Transaction, TransactionOutput, TransactionStatus, }, + txn_provider::{default::DefaultTxnProvider, TxnProvider}, vm_status::VMStatus, }; use aptos_vm::{ @@ -220,7 +219,7 @@ where let output = BlockAptosVM::execute_block::< _, NoOpTransactionCommitHook, - DefaultTxnProvider, + dyn TxnProvider, >( txn_provider, self.state_view.as_ref(), @@ -271,7 +270,7 @@ where let output = BlockAptosVM::execute_block::< _, NoOpTransactionCommitHook, - DefaultTxnProvider, + dyn TxnProvider, >( txn_provider, self.state_view.as_ref(), diff --git a/aptos-move/aptos-transactional-test-harness/Cargo.toml b/aptos-move/aptos-transactional-test-harness/Cargo.toml index 8a96c7366ee5f..c0e44b746718d 100644 --- a/aptos-move/aptos-transactional-test-harness/Cargo.toml +++ b/aptos-move/aptos-transactional-test-harness/Cargo.toml @@ -15,7 +15,6 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos-api-types = { workspace = true } -aptos-block-executor = { workspace = true } aptos-cached-packages = { workspace = true } aptos-crypto = { workspace = true } aptos-framework = { workspace = true } diff --git a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs index e1db7fe94bb38..cafa991fc0a98 100644 --- a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs +++ b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs @@ -4,7 +4,6 @@ use anyhow::{bail, format_err, Result}; use aptos_api_types::AsConverter; -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; use aptos_crypto::{ ed25519::{Ed25519PrivateKey, Ed25519PublicKey}, hash::HashValue, @@ -24,6 +23,7 @@ use aptos_types::{ EntryFunction as TransactionEntryFunction, ExecutionStatus, RawTransaction, Script as TransactionScript, Transaction, TransactionOutput, TransactionStatus, }, + txn_provider::default::DefaultTxnProvider, AptosCoinType, }; use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; diff --git a/aptos-move/aptos-vm-profiling/Cargo.toml b/aptos-move/aptos-vm-profiling/Cargo.toml index a68262976e353..fd466f4389777 100644 --- a/aptos-move/aptos-vm-profiling/Cargo.toml +++ b/aptos-move/aptos-vm-profiling/Cargo.toml @@ -17,7 +17,6 @@ glob = { workspace = true } once_cell = { workspace = true } smallvec = { workspace = true } -aptos-block-executor = { workspace = true } aptos-cached-packages = { workspace = true } aptos-gas-schedule = { workspace = true } aptos-language-e2e-tests = { workspace = true } @@ -36,7 +35,7 @@ move-vm-types = { workspace = true } [[bin]] name = "main" -path = "src/main.rs" +path = "src/main.rs" [[bin]] name = "run-move" diff --git a/aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs b/aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs index 6d2bd39e1ea96..f4519b16ceec7 100644 --- a/aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs +++ b/aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs @@ -2,10 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; use aptos_language_e2e_tests::{account::AccountData, data_store::FakeDataStore}; use aptos_types::{ transaction::{signature_verified_transaction::SignatureVerifiedTransaction, Transaction}, + txn_provider::default::DefaultTxnProvider, write_set::WriteSet, }; use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index cc94c805c21aa..e11d92cc233b9 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -28,9 +28,7 @@ use crate::{ }; use anyhow::anyhow; use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, + code_cache_global_manager::AptosModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, }; use aptos_crypto::HashValue; use aptos_framework::{ @@ -73,6 +71,7 @@ use aptos_types::{ TransactionAuxiliaryData, TransactionOutput, TransactionPayload, TransactionStatus, VMValidatorResult, ViewFunctionOutput, WriteSetPayload, }, + txn_provider::TxnProvider, vm_status::{AbortLocation, StatusCode, VMStatus}, }; use aptos_utils::aptos_try; @@ -2801,7 +2800,7 @@ impl VMBlockExecutor for AptosVMBlockExecutor { fn execute_block( &self, - txn_provider: &DefaultTxnProvider, + txn_provider: &dyn TxnProvider, state_view: &(impl StateView + Sync), onchain_config: BlockExecutorConfigFromOnchain, transaction_slice_metadata: TransactionSliceMetadata, @@ -2823,7 +2822,7 @@ impl VMBlockExecutor for AptosVMBlockExecutor { let ret = BlockAptosVM::execute_block::< _, NoOpTransactionCommitHook, - DefaultTxnProvider, + dyn TxnProvider, >( txn_provider, state_view, diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 838ebd1af2aed..33bc53fe84ae2 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -14,7 +14,7 @@ use aptos_aggregator::{ use aptos_block_executor::{ code_cache_global_manager::AptosModuleCacheManager, errors::BlockExecutionError, executor::BlockExecutor, task::TransactionOutput as BlockExecutorTransactionOutput, - txn_commit_hook::TransactionCommitHook, txn_provider::TxnProvider, types::InputOutputKey, + txn_commit_hook::TransactionCommitHook, types::InputOutputKey, }; use aptos_infallible::Mutex; use aptos_types::{ @@ -28,6 +28,7 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, TransactionOutput, TransactionStatus, }, + txn_provider::TxnProvider, write_set::WriteOp, }; use aptos_vm_logging::{flush_speculative_logs, init_speculative_logs}; @@ -391,7 +392,7 @@ impl BlockAptosVM { pub fn execute_block_on_thread_pool< S: StateView + Sync, L: TransactionCommitHook, - TP: TxnProvider + Sync, + TP: TxnProvider + Sync + ?Sized, >( executor_thread_pool: Arc, signature_verified_block: &TP, @@ -466,7 +467,7 @@ impl BlockAptosVM { pub fn execute_block< S: StateView + Sync, L: TransactionCommitHook, - TP: TxnProvider + Sync, + TP: TxnProvider + Sync + ?Sized, >( signature_verified_block: &TP, state_view: &S, diff --git a/aptos-move/aptos-vm/src/lib.rs b/aptos-move/aptos-vm/src/lib.rs index a7526bdf03bf5..efdeb58765b90 100644 --- a/aptos-move/aptos-vm/src/lib.rs +++ b/aptos-move/aptos-vm/src/lib.rs @@ -126,7 +126,6 @@ pub mod verifier; pub use crate::aptos_vm::{AptosSimulationVM, AptosVM}; use crate::sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}; -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; use aptos_types::{ block_executor::{ config::BlockExecutorConfigFromOnchain, execution_state::TransactionSliceMetadata, @@ -137,6 +136,7 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, SignedTransaction, TransactionOutput, VMValidatorResult, }, + txn_provider::TxnProvider, vm_status::VMStatus, }; use aptos_vm_types::module_and_script_storage::code_storage::AptosCodeStorage; @@ -169,7 +169,7 @@ pub trait VMBlockExecutor: Send + Sync { /// Executes a block of transactions and returns output for each one of them. fn execute_block( &self, - txn_provider: &DefaultTxnProvider, + txn_provider: &dyn TxnProvider, state_view: &(impl StateView + Sync), onchain_config: BlockExecutorConfigFromOnchain, transaction_slice_metadata: TransactionSliceMetadata, @@ -179,7 +179,7 @@ pub trait VMBlockExecutor: Send + Sync { /// any block limit. fn execute_block_no_limit( &self, - txn_provider: &DefaultTxnProvider, + txn_provider: &dyn TxnProvider, state_view: &(impl StateView + Sync), ) -> Result, VMStatus> { self.execute_block( diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs index cc41d58e83050..293f485621e27 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs @@ -16,9 +16,7 @@ use crate::{ ExecutorShardCommand, }, }; -use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, txn_provider::default::DefaultTxnProvider, -}; +use aptos_block_executor::code_cache_global_manager::AptosModuleCacheManager; use aptos_logger::{info, trace}; use aptos_types::{ block_executor::{ @@ -32,6 +30,7 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, TransactionOutput, }, + txn_provider::default::DefaultTxnProvider, }; use aptos_vm_logging::disable_speculative_logging; use futures::{channel::oneshot, executor::block_on}; diff --git a/aptos-move/aptos-vm/tests/sharded_block_executor.rs b/aptos-move/aptos-vm/tests/sharded_block_executor.rs index 2b79bd0ceb3c3..0aec31e2bf576 100644 --- a/aptos-move/aptos-vm/tests/sharded_block_executor.rs +++ b/aptos-move/aptos-vm/tests/sharded_block_executor.rs @@ -186,7 +186,6 @@ fn test_partitioner_v2_connected_component_sharded_block_executor_with_random_tr } mod test_utils { - use aptos_block_executor::txn_provider::default::DefaultTxnProvider; use aptos_block_partitioner::BlockPartitioner; use aptos_language_e2e_tests::{ account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore, @@ -201,6 +200,7 @@ mod test_utils { signature_verified_transaction::SignatureVerifiedTransaction, Transaction, TransactionOutput, }, + txn_provider::default::DefaultTxnProvider, }; use aptos_vm::{ aptos_vm::AptosVMBlockExecutor, diff --git a/aptos-move/block-executor/src/counters.rs b/aptos-move/block-executor/src/counters.rs index e75d1b3354805..17489816445a9 100644 --- a/aptos-move/block-executor/src/counters.rs +++ b/aptos-move/block-executor/src/counters.rs @@ -165,6 +165,17 @@ pub static TASK_EXECUTE_SECONDS: Lazy = Lazy::new(|| { .unwrap() }); +pub static TXN_GET_SECONDS: Lazy = Lazy::new(|| { + register_histogram!( + // metric name + "aptos_execution_txn_get_seconds", + // metric description + "The time spent in txn get (for blocking txn provider) in Block STM", + time_buckets(), + ) + .unwrap() +}); + pub static DEPENDENCY_WAIT_SECONDS: Lazy = Lazy::new(|| { register_histogram!( "aptos_execution_dependency_wait", diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index 727153f0042b4..477e0fe60a0f1 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -18,7 +18,6 @@ use crate::{ task::{ExecutionStatus, ExecutorTask, TransactionOutput}, txn_commit_hook::TransactionCommitHook, txn_last_input_output::{KeyKind, TxnLastInputOutput}, - txn_provider::TxnProvider, types::ReadWriteSummary, view::{LatestView, ParallelState, SequentialState, ViewState}, }; @@ -43,6 +42,7 @@ use aptos_types::{ transaction::{ block_epilogue::BlockEndInfo, BlockExecutableTransaction as Transaction, BlockOutput, }, + txn_provider::TxnProvider, vm::modules::AptosModuleExtension, write_set::{TransactionWrite, WriteOp}, }; @@ -72,7 +72,7 @@ use std::{ }, }; -pub struct BlockExecutor { +pub struct BlockExecutor { // Number of active concurrent tasks, corresponding to the maximum number of rayon // threads that may be concurrently participating in parallel execution. config: BlockExecutorConfig, @@ -88,7 +88,7 @@ where S: TStateView + Sync, L: TransactionCommitHook, X: Executable + 'static, - TP: TxnProvider + Sync, + TP: TxnProvider + Sync + ?Sized, { /// The caller needs to ensure that concurrency_level > 1 (0 is illegal and 1 should /// be handled by sequential execution) and that concurrency_level <= num_cpus. diff --git a/aptos-move/block-executor/src/lib.rs b/aptos-move/block-executor/src/lib.rs index 39a762126c491..f2d388f769008 100644 --- a/aptos-move/block-executor/src/lib.rs +++ b/aptos-move/block-executor/src/lib.rs @@ -155,7 +155,6 @@ mod scheduler; pub mod task; pub mod txn_commit_hook; pub mod txn_last_input_output; -pub mod txn_provider; pub mod types; #[cfg(test)] mod unit_tests; diff --git a/aptos-move/block-executor/src/proptest_types/bencher.rs b/aptos-move/block-executor/src/proptest_types/bencher.rs index bc77cb100cd0a..02e201badd317 100644 --- a/aptos-move/block-executor/src/proptest_types/bencher.rs +++ b/aptos-move/block-executor/src/proptest_types/bencher.rs @@ -12,11 +12,11 @@ use crate::{ }, }, txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::default::DefaultTxnProvider, }; use aptos_types::{ block_executor::config::BlockExecutorConfig, contract_event::TransactionEvent, executable::ExecutableTestType, state_store::MockStateView, + txn_provider::default::DefaultTxnProvider, }; use criterion::{BatchSize, Bencher as CBencher}; use num_cpus; diff --git a/aptos-move/block-executor/src/proptest_types/tests.rs b/aptos-move/block-executor/src/proptest_types/tests.rs index d34e31b9b0973..a5bccaf55b220 100644 --- a/aptos-move/block-executor/src/proptest_types/tests.rs +++ b/aptos-move/block-executor/src/proptest_types/tests.rs @@ -14,11 +14,11 @@ use crate::{ }, }, txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::default::DefaultTxnProvider, }; use aptos_types::{ block_executor::config::BlockExecutorConfig, contract_event::TransactionEvent, executable::ExecutableTestType, state_store::MockStateView, + txn_provider::default::DefaultTxnProvider, }; use claims::{assert_matches, assert_ok}; use num_cpus; diff --git a/aptos-move/block-executor/src/unit_tests/mod.rs b/aptos-move/block-executor/src/unit_tests/mod.rs index 1b4c5c56ca057..967cf3d83a0d7 100644 --- a/aptos-move/block-executor/src/unit_tests/mod.rs +++ b/aptos-move/block-executor/src/unit_tests/mod.rs @@ -19,7 +19,6 @@ use crate::{ DependencyResult, ExecutionTaskType, Scheduler, SchedulerTask, TWaitForDependency, }, txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::default::DefaultTxnProvider, }; use aptos_aggregator::{ bounded_math::SignedU128, @@ -32,6 +31,7 @@ use aptos_types::{ contract_event::TransactionEvent, executable::{ExecutableTestType, ModulePath}, state_store::state_value::StateValueMetadata, + txn_provider::default::DefaultTxnProvider, write_set::WriteOpKind, }; use claims::{assert_matches, assert_ok}; diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index 9c9023c9c4b7d..d6cd789b568e0 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -16,7 +16,6 @@ use aptos_abstract_gas_usage::CalibrationAlgebra; use aptos_bitvec::BitVec; use aptos_block_executor::{ code_cache_global_manager::AptosModuleCacheManager, txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::default::DefaultTxnProvider, }; use aptos_crypto::HashValue; use aptos_framework::ReleaseBundle; @@ -49,6 +48,7 @@ use aptos_types::{ BlockOutput, ExecutionStatus, SignedTransaction, Transaction, TransactionOutput, TransactionPayload, TransactionStatus, VMValidatorResult, ViewFunctionOutput, }, + txn_provider::default::DefaultTxnProvider, vm_status::VMStatus, write_set::{WriteOp, WriteSet, WriteSetMut}, AptosCoinType, CoinType, diff --git a/config/src/config/consensus_config.rs b/config/src/config/consensus_config.rs index 800999c3ec5e0..b96d30ce1cc79 100644 --- a/config/src/config/consensus_config.rs +++ b/config/src/config/consensus_config.rs @@ -232,25 +232,25 @@ impl Default for ConsensusConfig { }, // with execution backpressure, only later start reducing block size PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 2500, + back_pressure_pipeline_latency_limit_ms: 5000, max_sending_block_txns_after_filtering_override: 1000, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, backpressure_proposal_delay_ms: 300, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 3500, + back_pressure_pipeline_latency_limit_ms: 7000, max_sending_block_txns_after_filtering_override: 200, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, backpressure_proposal_delay_ms: 300, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 4500, + back_pressure_pipeline_latency_limit_ms: 9000, max_sending_block_txns_after_filtering_override: 30, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, backpressure_proposal_delay_ms: 300, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 6000, + back_pressure_pipeline_latency_limit_ms: 12000, // in practice, latencies and delay make it such that ~2 blocks/s is max, // meaning that most aggressively we limit to ~10 TPS // For transactions that are more expensive than that, we should diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index f245e02f40b7d..e8d7f13632245 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -31,7 +31,7 @@ impl Default for QuorumStoreBackPressureConfig { QuorumStoreBackPressureConfig { // QS will be backpressured if the remaining total txns is more than this number // Roughly, target TPS * commit latency seconds - backlog_txn_limit_count: 36_000, + backlog_txn_limit_count: 72_000, // QS will create batches at the max rate until this number is reached backlog_per_validator_batch_limit_count: 20, decrease_duration_ms: 1000, diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 869a40c683cc4..3c86dfcf2bb72 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -128,11 +128,12 @@ pub struct RejectedTransactionSummary { #[derive(Debug)] pub enum DataStatus { - Cached(Vec), + Cached(Vec<(Arc>, u64)>), Requested( Vec<( HashValue, - oneshot::Receiver>>, + u64, + oneshot::Receiver>>>, )>, ), } @@ -255,7 +256,7 @@ pub enum Payload { InQuorumStore(ProofWithData), InQuorumStoreWithLimit(ProofWithDataWithTxnLimit), QuorumStoreInlineHybrid( - Vec<(BatchInfo, Vec)>, + Vec<(BatchInfo, Arc>)>, ProofWithData, Option, ), @@ -507,7 +508,8 @@ impl Payload { Self::verify_with_cache(&proof_with_data.proofs, validator, proof_cache)?; for (batch, payload) in inline_batches.iter() { // TODO: Can cloning be avoided here? - if BatchPayload::new(batch.author(), payload.clone()).hash() != *batch.digest() + if BatchPayload::new(batch.author(), payload.as_ref().clone()).hash() + != *batch.digest() { return Err(anyhow::anyhow!( "Hash of the received inline batch doesn't match the digest value", diff --git a/consensus/consensus-types/src/payload.rs b/consensus/consensus-types/src/payload.rs index 9514c8d00907c..84aeb2b83144d 100644 --- a/consensus/consensus-types/src/payload.rs +++ b/consensus/consensus-types/src/payload.rs @@ -33,7 +33,7 @@ pub trait TDataInfo { pub struct DataFetchFut { pub iteration: u32, - pub fut: Shared>>>, + pub fut: Shared>, u64)>>>>, } impl fmt::Debug for DataFetchFut { @@ -172,11 +172,11 @@ impl PayloadExecutionLimit { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct InlineBatch { batch_info: BatchInfo, - transactions: Vec, + transactions: Arc>, } impl InlineBatch { - pub fn new(batch_info: BatchInfo, transactions: Vec) -> Self { + pub fn new(batch_info: BatchInfo, transactions: Arc>) -> Self { Self { batch_info, transactions, @@ -210,10 +210,15 @@ impl InlineBatches { self.0.is_empty() } - pub fn transactions(&self) -> Vec { + pub fn transactions(&self) -> Vec<(Arc>, u64)> { self.0 .iter() - .flat_map(|inline_batch| inline_batch.transactions.clone()) + .map(|inline_batch| { + ( + inline_batch.transactions.clone(), + inline_batch.batch_info.gas_bucket_start(), + ) + }) .collect() } @@ -231,8 +236,8 @@ impl From> for InlineBatches { } } -impl From)>> for InlineBatches { - fn from(value: Vec<(BatchInfo, Vec)>) -> Self { +impl From>)>> for InlineBatches { + fn from(value: Vec<(BatchInfo, Arc>)>) -> Self { value .into_iter() .map(|(batch_info, transactions)| InlineBatch::new(batch_info, transactions)) diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index dad1262f03e57..608658f9994e8 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -190,6 +190,8 @@ pub struct PipelinedBlock { pipeline_tx: Arc>>, #[derivative(PartialEq = "ignore")] pipeline_abort_handle: Arc>>>, + #[derivative(PartialEq = "ignore")] + committed_transactions: Arc>>>>>>>, } impl Serialize for PipelinedBlock { @@ -377,10 +379,11 @@ impl PipelinedBlock { pipeline_futs: Arc::new(Mutex::new(None)), pipeline_tx: Arc::new(Mutex::new(None)), pipeline_abort_handle: Arc::new(Mutex::new(None)), + committed_transactions: Arc::new(Mutex::new(None)), } } - pub fn new_ordered(block: Block, window: OrderedBlockWindow) -> Self { + pub fn new_with_window(block: Block, window: OrderedBlockWindow) -> Self { info!( "New Ordered PipelinedBlock with block_id: {}, parent_id: {}, round: {}, epoch: {}, txns: {}", block.id(), @@ -401,6 +404,35 @@ impl PipelinedBlock { pipeline_futs: Arc::new(Mutex::new(None)), pipeline_tx: Arc::new(Mutex::new(None)), pipeline_abort_handle: Arc::new(Mutex::new(None)), + committed_transactions: Arc::new(Mutex::new(None)), + } + } + + pub fn new_recovered(block: Block, committed_transactions: Vec) -> Self { + info!( + "New Recovered PipelinedBlock with block_id: {}, parent_id: {}, round: {}, epoch: {}, txns: {}, committed_txns: {}", + block.id(), + block.parent_id(), + block.round(), + block.epoch(), + block.payload().map_or(0, |p| p.len()), + committed_transactions.len(), + ); + Self { + block, + block_window: OrderedBlockWindow::empty(), + input_transactions: vec![], + state_compute_result: StateComputeResult::new_dummy(), + randomness: OnceCell::new(), + pipeline_insertion_time: OnceCell::new(), + execution_summary: Arc::new(OnceCell::new()), + pre_commit_fut: Arc::new(Mutex::new(None)), + pipeline_futs: Arc::new(Mutex::new(None)), + pipeline_tx: Arc::new(Mutex::new(None)), + pipeline_abort_handle: Arc::new(Mutex::new(None)), + committed_transactions: Arc::new(Mutex::new(Some(Arc::new(OnceCell::with_value(Ok( + Arc::new(committed_transactions), + )))))), } } @@ -507,6 +539,124 @@ impl PipelinedBlock { pub fn get_execution_summary(&self) -> Option { self.execution_summary.get().cloned() } + + pub fn init_committed_transactions(&self) { + info!( + "Init committed transactions: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + *self.committed_transactions.lock() = Some(Arc::new(OnceCell::new())); + } + + pub fn set_committed_transactions(&self, committed_transactions: Vec) { + info!( + "Setting committed transactions: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + if let Some(once_cell) = self.committed_transactions.lock().as_ref() { + info!( + "Setting committed transactions: locked: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + once_cell + .set(Ok(Arc::new(committed_transactions))) + .expect("committed_transactions already set") + } else { + warn!( + "Setting committed transactions: no once_cell to set: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + } + info!( + "Setting committed transactions: done: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + } + + pub fn cancel_committed_transactions(&self) { + info!( + "Cancelled committed transactions: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + if let Some(once_cell) = self.committed_transactions.lock().as_ref() { + once_cell + .set(Err(ExecutorError::CouldNotGetCommittedTransactions)) + .expect("committed_transactions already set") + } else { + warn!( + "Cancelled committed transactions: no once_cell to cancel: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + } + } + + // TODO: change return value + pub fn wait_for_committed_transactions(&self) -> ExecutorResult>> { + if self.block().is_genesis_block() || self.block.is_nil_block() { + return Ok(Arc::new(vec![])); + } + info!( + "Waiting for committed transactions: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + + let guard = self.committed_transactions.lock(); + info!( + "Waiting for committed transactions: locked: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + let inner = guard.clone(); + drop(guard); + + if let Some(once_cell) = inner { + match once_cell.wait() { + Ok(committed_transactions) => { + info!( + "Done waiting for committed transactions: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + Ok(committed_transactions.clone()) + }, + Err(_) => { + warn!( + "Failed to wait for committed transactions: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + Err(ExecutorError::CouldNotGetCommittedTransactions) + }, + } + } else { + warn!( + "No committed transactions to wait for: ({}, {}) {}", + self.epoch(), + self.round(), + self.id() + ); + Err(ExecutorError::CouldNotGetCommittedTransactions) + } + } } /// Pipeline related functions diff --git a/consensus/src/block_preparer.rs b/consensus/src/block_preparer.rs index e2787bb43b741..a0f524fdb29ad 100644 --- a/consensus/src/block_preparer.rs +++ b/consensus/src/block_preparer.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - counters::{self, MAX_TXNS_FROM_BLOCK_TO_EXECUTE, TXN_SHUFFLE_SECONDS}, + counters::{self, MAX_TXNS_FROM_BLOCK_TO_EXECUTE}, monitor, payload_manager::TPayloadManager, transaction_deduper::TransactionDeduper, @@ -15,7 +15,7 @@ use aptos_logger::info; use aptos_types::transaction::SignedTransaction; use fail::fail_point; use futures::{stream::FuturesOrdered, StreamExt}; -use std::{sync::Arc, time::Instant}; +use std::{cmp::Reverse, collections::HashSet, sync::Arc, time::Instant}; pub struct BlockPreparer { payload_manager: Arc, @@ -43,7 +43,7 @@ impl BlockPreparer { &self, block: &Block, block_window: &OrderedBlockWindow, - ) -> ExecutorResult<(Vec, Option)> { + ) -> ExecutorResult<(Vec<(Arc>, u64)>, Option)> { let mut txns = vec![]; let pipelined_blocks = block_window.pipelined_blocks(); let mut futures = FuturesOrdered::new(); @@ -52,10 +52,14 @@ impl BlockPreparer { .map(|b| b.block()) .chain(std::iter::once(block)) { - futures.push_back(async move { self.payload_manager.get_transactions(block).await }); + futures.push_back(self.payload_manager.get_transactions(block)); } + info!("get_transactions added all futures"); + + let mut idx = 0; let mut max_txns_from_block_to_execute = None; loop { + info!("get_transactions waiting for next: {}", idx); match futures.next().await { // TODO: we are turning off the max txns from block to execute feature for now Some(Ok((block_txns, _max_txns))) => { @@ -67,7 +71,13 @@ impl BlockPreparer { }, None => break, } + idx += 1; } + info!( + "get_transactions finished in block window for ({}, {})", + block.epoch(), + block.round() + ); Ok((txns, max_txns_from_block_to_execute)) } @@ -84,7 +94,9 @@ impl BlockPreparer { }); let start_time = Instant::now(); info!( - "BlockPreparer: Preparing for block {} and window {:?}", + "BlockPreparer: Preparing for block ({}, {}) {} and window {:?}", + block.epoch(), + block.round(), block.id(), block_window .blocks() @@ -93,29 +105,76 @@ impl BlockPreparer { .collect::>() ); - let (txns, max_txns_from_block_to_execute) = monitor!("get_transactions", { + let now = std::time::Instant::now(); + // TODO: we could do this incrementally, but for now just do it every time + let mut committed_transactions = HashSet::new(); + + // TODO: don't materialize these? + let (mut batched_txns, max_txns_from_block_to_execute) = monitor!("get_transactions", { self.get_transactions(block, block_window).await? }); + + let num_blocks_in_window = block_window.pipelined_blocks().len(); + for b in block_window + .pipelined_blocks() + .iter() + .take(num_blocks_in_window.saturating_sub(1)) + { + info!( + "BlockPreparer: Waiting for committed transactions at block {} for block {}", + b.round(), + block.round() + ); + for txn_hash in b.wait_for_committed_transactions()?.iter() { + committed_transactions.insert(*txn_hash); + } + info!( + "BlockPreparer: Waiting for committed transactions at block {} for block {}: Done", + b.round(), + block.round() + ); + } + + info!( + "BlockPreparer: Waiting for part of committed transactions for round {} took {} ms", + block.round(), + now.elapsed().as_millis() + ); + let txn_filter = self.txn_filter.clone(); let txn_deduper = self.txn_deduper.clone(); - let txn_shuffler = self.txn_shuffler.clone(); let block_id = block.id(); let block_timestamp_usecs = block.timestamp_usecs(); // Transaction filtering, deduplication and shuffling are CPU intensive tasks, so we run them in a blocking task. let result = tokio::task::spawn_blocking(move || { - let filtered_txns = txn_filter.filter(block_id, block_timestamp_usecs, txns); - let deduped_txns = txn_deduper.dedup(filtered_txns); - let mut shuffled_txns = { - let _timer = TXN_SHUFFLE_SECONDS.start_timer(); - - txn_shuffler.shuffle(deduped_txns) - }; + // stable sort to ensure batches with same gas are in the same order + batched_txns.sort_by_key(|(_, gas)| Reverse(*gas)); + let batched_txns: Vec> = monitor!( + "filter_committed_transactions", + batched_txns + .into_iter() + .map(|(txns, _)| { + txns.iter() + .filter(|txn| !committed_transactions.contains(&txn.committed_hash())) + .cloned() + .collect() + }) + .collect() + ); + let txns: Vec<_> = monitor!( + "flatten_transactions", + batched_txns.into_iter().flatten().collect() + ); + let filtered_txns = monitor!("filter_transactions", { + txn_filter.filter(block_id, block_timestamp_usecs, txns) + }); + let mut deduped_txns = monitor!("dedup_transactions", txn_deduper.dedup(filtered_txns)); if let Some(max_txns_from_block_to_execute) = max_txns_from_block_to_execute { - shuffled_txns.truncate(max_txns_from_block_to_execute as usize); + deduped_txns.truncate(max_txns_from_block_to_execute as usize); } - MAX_TXNS_FROM_BLOCK_TO_EXECUTE.observe(shuffled_txns.len() as f64); - Ok(shuffled_txns) + MAX_TXNS_FROM_BLOCK_TO_EXECUTE.observe(deduped_txns.len() as f64); + Ok(deduped_txns) }) .await .expect("Failed to spawn blocking task for transaction generation"); diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index 88fcfa3dea821..289536432541a 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -34,7 +34,8 @@ use aptos_executor_types::state_compute_result::StateComputeResult; use aptos_infallible::{Mutex, RwLock}; use aptos_logger::prelude::*; use aptos_types::{ - ledger_info::LedgerInfoWithSignatures, proof::accumulator::InMemoryTransactionAccumulator, + account_config::NewBlockEvent, ledger_info::LedgerInfoWithSignatures, + proof::accumulator::InMemoryTransactionAccumulator, transaction::Version, }; use futures::executor::block_on; #[cfg(test)] @@ -253,9 +254,21 @@ impl BlockStore { }; for block in blocks { - block_store.insert_block(block).await.unwrap_or_else(|e| { - panic!("[BlockStore] failed to insert block during build {:?}", e) - }); + if block.round() <= root_block.round() { + block_store + .insert_committed_block(block) + .await + .unwrap_or_else(|e| { + panic!( + "[BlockStore] failed to insert committed block during build {:?}", + e + ) + }); + } else { + block_store.insert_block(block).await.unwrap_or_else(|e| { + panic!("[BlockStore] failed to insert block during build {:?}", e) + }); + } } for qc in quorum_certs { block_store @@ -380,6 +393,88 @@ impl BlockStore { self.try_send_for_execution().await; } + pub async fn insert_committed_block( + &self, + block: Block, + ) -> anyhow::Result> { + // TODO: factor out repeated code + + // ensure local time past the block time + let block_time = Duration::from_micros(block.timestamp_usecs()); + let current_timestamp = self.time_service.get_current_timestamp(); + if let Some(t) = block_time.checked_sub(current_timestamp) { + if t > Duration::from_secs(1) { + warn!("Long wait time {}ms for block {}", t.as_millis(), block); + } + self.time_service.wait_until(block_time).await; + } + if let Some(payload) = block.payload() { + self.payload_manager + .prefetch_payload_data(payload, block.timestamp_usecs()); + } + self.storage + .save_tree(vec![block.clone()], vec![]) + .context("Insert block failed when saving block")?; + let mut block_tree = self.inner.write(); + + info!( + "recovering committed transactions for PipelinedBlock with block_id: {}, parent_id: {}, round: {}, epoch: {}", + block.id(), + block.parent_id(), + block.round(), + block.epoch(), + ); + let aptos_db = self.storage.aptos_db(); + let latest_block_event = aptos_db + .get_latest_block_events(1) + .expect("at least one block"); + let latest_block_event_with_version = + latest_block_event.first().expect("at least one block"); + let latest_new_block_event = latest_block_event_with_version + .event + .expect_new_block_event() + .expect("new block event"); + let mut height = latest_new_block_event.height(); + let committed_transactions; + loop { + let (start_version, end_version, new_block_event): (Version, Version, NewBlockEvent) = + self.storage + .aptos_db() + .get_block_info_by_height(height) + .expect("block id by height"); + if new_block_event.epoch() < block.epoch() { + panic!( + "the epoch of the latest block event {} is less than the block epoch {}", + new_block_event.epoch(), + block.epoch(), + ); + } + if new_block_event.round() < block.round() { + info!( + "the round of the latest block event {} is less than the block round {}", + new_block_event.round(), + block.round(), + ); + committed_transactions = vec![]; + break; + } + if new_block_event.epoch() == block.epoch() && new_block_event.round() == block.round() + { + let iter = aptos_db + .get_transaction_info_iterator(start_version, end_version - start_version + 1) + .expect("iterator"); + committed_transactions = iter + .map(|info| info.expect("info").transaction_hash()) + .collect(); + break; + } + height -= 1; + } + + let pipelined_block = PipelinedBlock::new_recovered(block.clone(), committed_transactions); + block_tree.insert_block(pipelined_block) + } + /// Insert a block if it passes all validation tests. /// Returns the Arc to the block kept in the block store after persisting it to storage /// @@ -423,7 +518,7 @@ impl BlockStore { block_window.blocks().iter().map(|b| format!("{}", b.id())).collect::>(), now.elapsed().as_millis() ); - PipelinedBlock::new_ordered(block.clone(), block_window) + PipelinedBlock::new_with_window(block.clone(), block_window) } else { info!( "no block_window for PipelinedBlock with block_id: {}, parent_id: {}, round: {}, epoch: {}", @@ -433,9 +528,27 @@ impl BlockStore { block.epoch(), ); // TODO: assert that this is an older block than commit root - PipelinedBlock::new_ordered(block.clone(), OrderedBlockWindow::empty()) + PipelinedBlock::new_with_window(block.clone(), OrderedBlockWindow::empty()) }; + // ensure local time past the block time + let block_time = Duration::from_micros(pipelined_block.timestamp_usecs()); + let current_timestamp = self.time_service.get_current_timestamp(); + if let Some(t) = block_time.checked_sub(current_timestamp) { + if t > Duration::from_secs(1) { + warn!( + "Long wait time {}ms for block {}", + t.as_millis(), + pipelined_block.block() + ); + } + self.time_service.wait_until(block_time).await; + } + self.storage + .save_tree(vec![pipelined_block.block().clone()], vec![]) + .context("Insert block failed when saving block")?; + let stored_pipelined_block = self.inner.write().insert_block(pipelined_block)?; + // build pipeline if let Some(pipeline_builder) = &self.pipeline_builder { let parent_block = self @@ -460,7 +573,7 @@ impl BlockStore { } }); pipeline_builder.build( - &pipelined_block, + stored_pipelined_block.clone(), parent_block .pipeline_futs() .expect("Futures should exist when pipeline enabled"), @@ -468,23 +581,7 @@ impl BlockStore { ); } - // ensure local time past the block time - let block_time = Duration::from_micros(block.timestamp_usecs()); - let current_timestamp = self.time_service.get_current_timestamp(); - if let Some(t) = block_time.checked_sub(current_timestamp) { - if t > Duration::from_secs(1) { - warn!("Long wait time {}ms for block {}", t.as_millis(), block); - } - self.time_service.wait_until(block_time).await; - } - if let Some(payload) = block.payload() { - self.payload_manager - .prefetch_payload_data(payload, block.timestamp_usecs()); - } - self.storage - .save_tree(vec![block.clone()], vec![]) - .context("Insert block failed when saving block")?; - self.inner.write().insert_block(pipelined_block) + Ok(stored_pipelined_block) } /// Validates quorum certificates and inserts it into block tree assuming dependencies exist. @@ -616,6 +713,10 @@ impl BlockReader for BlockStore { self.inner.read().commit_root() } + fn window_root(&self) -> Arc { + self.inner.read().window_root() + } + fn get_quorum_cert_for_block(&self, block_id: HashValue) -> Option> { self.inner.read().get_quorum_cert_for_block(&block_id) } @@ -628,6 +729,10 @@ impl BlockReader for BlockStore { self.inner.read().path_from_commit_root(block_id) } + fn path_from_window_root(&self, block_id: HashValue) -> Option>> { + self.inner.read().path_from_window_root(block_id) + } + #[cfg(test)] fn highest_certified_block(&self) -> Arc { self.inner.read().highest_certified_block() diff --git a/consensus/src/block_storage/block_tree.rs b/consensus/src/block_storage/block_tree.rs index d0be51e28e107..4f41be81ba328 100644 --- a/consensus/src/block_storage/block_tree.rs +++ b/consensus/src/block_storage/block_tree.rs @@ -219,6 +219,11 @@ impl BlockTree { .expect("Commit root must exist") } + pub(super) fn window_root(&self) -> Arc { + self.get_block(&self.window_root_id) + .expect("Window root must exist") + } + pub(super) fn highest_certified_block(&self) -> Arc { self.get_block(&self.highest_certified_block_id) .expect("Highest cerfified block must exist") @@ -252,11 +257,6 @@ impl BlockTree { self.id_to_quorum_cert.get(block_id).cloned() } - pub(super) fn window_root(&self) -> Arc { - self.get_block(&self.window_root_id) - .expect("Window root not found") - } - // TODO: return an error when not enough blocks? // TODO: how to know if the window is complete? /// Retrieves a Window of Recent Blocks @@ -632,6 +632,13 @@ impl BlockTree { self.path_from_root_to_block(block_id, self.commit_root_id, self.commit_root().round()) } + pub(super) fn path_from_window_root( + &self, + block_id: HashValue, + ) -> Option>> { + self.path_from_root_to_block(block_id, self.window_root_id, self.window_root().round()) + } + pub(super) fn max_pruned_blocks_in_mem(&self) -> usize { self.max_pruned_blocks_in_mem } diff --git a/consensus/src/block_storage/mod.rs b/consensus/src/block_storage/mod.rs index 51e6520f3d05e..02b1741716ca5 100644 --- a/consensus/src/block_storage/mod.rs +++ b/consensus/src/block_storage/mod.rs @@ -34,6 +34,8 @@ pub trait BlockReader: Send + Sync { /// Get the current commit root block of the BlockTree. fn commit_root(&self) -> Arc; + fn window_root(&self) -> Arc; + fn get_quorum_cert_for_block(&self, block_id: HashValue) -> Option>; /// Returns all the blocks between the ordered/commit root and the given block, including the given block @@ -47,6 +49,8 @@ pub trait BlockReader: Send + Sync { fn path_from_commit_root(&self, block_id: HashValue) -> Option>>; + fn path_from_window_root(&self, block_id: HashValue) -> Option>>; + /// Return the certified block with the highest round. #[cfg(test)] fn highest_certified_block(&self) -> Arc; diff --git a/consensus/src/block_storage/sync_manager.rs b/consensus/src/block_storage/sync_manager.rs index 85a79c6bf4e6a..b82fefb983ded 100644 --- a/consensus/src/block_storage/sync_manager.rs +++ b/consensus/src/block_storage/sync_manager.rs @@ -20,7 +20,7 @@ use crate::{ network::{IncomingBlockRetrievalRequest, NetworkSender}, network_interface::ConsensusMsg, payload_manager::TPayloadManager, - persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData}, + persistent_liveness_storage::{PersistentLivenessStorage, RecoveryData}, pipeline::execution_client::TExecutionClient, }; use anyhow::{anyhow, bail, Context}; @@ -432,7 +432,8 @@ impl BlockStore { } // Check early that recovery will succeed, and return before corrupting our state in case it will not. - LedgerRecoveryData::new(highest_commit_cert.ledger_info().clone()) + storage + .recover_from_ledger_with_ledger_info(highest_commit_cert.ledger_info().clone()) .find_root( &mut blocks.clone(), &mut quorum_certs.clone(), diff --git a/consensus/src/consensus_observer/network/observer_message.rs b/consensus/src/consensus_observer/network/observer_message.rs index aa274d1982e0a..4935bd29521c5 100644 --- a/consensus/src/consensus_observer/network/observer_message.rs +++ b/consensus/src/consensus_observer/network/observer_message.rs @@ -314,12 +314,15 @@ impl CommitDecision { /// The transaction payload and proof of each block #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct PayloadWithProof { - transactions: Vec, - proofs: Vec, + pub transactions: Vec<(Arc>, u64)>, + pub proofs: Vec, } impl PayloadWithProof { - pub fn new(transactions: Vec, proofs: Vec) -> Self { + pub fn new( + transactions: Vec<(Arc>, u64)>, + proofs: Vec, + ) -> Self { Self { transactions, proofs, @@ -376,7 +379,7 @@ pub enum BlockTransactionPayload { impl BlockTransactionPayload { /// Creates a returns a new InQuorumStore transaction payload pub fn new_in_quorum_store( - transactions: Vec, + transactions: Vec<(Arc>, u64)>, proofs: Vec, ) -> Self { let payload_with_proof = PayloadWithProof::new(transactions, proofs); @@ -385,7 +388,7 @@ impl BlockTransactionPayload { /// Creates a returns a new InQuorumStoreWithLimit transaction payload pub fn new_in_quorum_store_with_limit( - transactions: Vec, + transactions: Vec<(Arc>, u64)>, proofs: Vec, limit: Option, ) -> Self { @@ -396,7 +399,7 @@ impl BlockTransactionPayload { /// Creates a returns a new QuorumStoreInlineHybrid transaction payload pub fn new_quorum_store_inline_hybrid( - transactions: Vec, + transactions: Vec<(Arc>, u64)>, proofs: Vec, limit: Option, inline_batches: Vec, @@ -407,7 +410,7 @@ impl BlockTransactionPayload { } pub fn new_opt_quorum_store( - transactions: Vec, + transactions: Vec<(Arc>, u64)>, proofs: Vec, limit: Option, batch_infos: Vec, @@ -463,7 +466,7 @@ impl BlockTransactionPayload { } /// Returns the transactions in the payload - pub fn transactions(&self) -> Vec { + pub fn transactions(&self) -> Vec<(Arc>, u64)> { match self { BlockTransactionPayload::InQuorumStore(payload) => payload.transactions.clone(), BlockTransactionPayload::InQuorumStoreWithLimit(payload) => { @@ -555,7 +558,7 @@ impl BlockTransactionPayload { /// Verifies the inline batches against the expected inline batches fn verify_inline_batches( &self, - expected_inline_batches: &[(BatchInfo, Vec)], + expected_inline_batches: &[(BatchInfo, Arc>)], ) -> Result<(), Error> { // Get the expected inline batches let expected_inline_batches: Vec<&BatchInfo> = expected_inline_batches @@ -687,7 +690,14 @@ impl BlockPayload { pub fn verify_payload_digests(&self) -> Result<(), Error> { // Get the block info, transactions, payload proofs and inline batches let block_info = self.block.clone(); - let transactions = self.transaction_payload.transactions(); + // Get the transactions, payload proofs and inline batches + // TODO: avoid a clone here? + let transactions: Vec<_> = self + .transaction_payload + .transactions() + .into_iter() + .flat_map(|(txns, _)| txns.as_ref().clone().into_iter()) + .collect(); let payload_proofs = self.transaction_payload.payload_proofs(); let opt_and_inline_batches = self.transaction_payload.optqs_and_inline_batches(); @@ -1021,7 +1031,7 @@ mod test { // Create a quorum store payload with a single inline batch let proof_with_data = ProofWithData::new(vec![]); let ordered_payload = Payload::QuorumStoreInlineHybrid( - vec![(create_batch_info(), vec![])], + vec![(create_batch_info(), Arc::new(vec![]))], proof_with_data, transaction_limit, ); @@ -1093,7 +1103,7 @@ mod test { // Create a quorum store payload with a single inline batch let proof_with_data: ProofBatches = Vec::new().into(); let ordered_payload = Payload::OptQuorumStore(OptQuorumStorePayload::new( - vec![(create_batch_info(), vec![])].into(), + vec![(create_batch_info(), Arc::new(vec![]))].into(), Vec::new().into(), proof_with_data, PayloadExecutionLimit::None, @@ -1139,7 +1149,7 @@ mod test { create_batch_info(), AggregateSignature::empty(), )]; - let inline_batches: InlineBatches = vec![(create_batch_info(), vec![])].into(); + let inline_batches: InlineBatches = vec![(create_batch_info(), Arc::new(vec![]))].into(); let opt_batches: OptBatches = vec![create_batch_info()].into(); let opt_and_inline_batches = [opt_batches.deref().clone(), inline_batches.batch_infos()].concat(); @@ -1666,7 +1676,8 @@ mod test { ) -> BlockPayload { // Create the transaction payload let transaction_payload = BlockTransactionPayload::new_quorum_store_inline_hybrid( - signed_transactions.to_vec(), + // TODO: this seems wrong + vec![(Arc::new(signed_transactions.to_vec()), 0)], proofs.to_vec(), None, inline_batches.to_vec(), @@ -1688,7 +1699,7 @@ mod test { ) -> BlockPayload { // Create the transaction payload let transaction_payload = BlockTransactionPayload::new_opt_quorum_store( - signed_transactions.to_vec(), + vec![(Arc::new(signed_transactions.to_vec()), 0)], proofs.to_vec(), None, opt_and_inline_batches.to_vec(), @@ -1756,7 +1767,7 @@ mod test { BlockType::Genesis, ); let block = Block::new_for_testing(block_info.id(), block_data, None); - Arc::new(PipelinedBlock::new_ordered( + Arc::new(PipelinedBlock::new_with_window( block, OrderedBlockWindow::empty(), )) @@ -1789,7 +1800,7 @@ mod test { // Create the pipelined block let block = Block::new_for_testing(block_info.id(), block_data, None); - Arc::new(PipelinedBlock::new_ordered( + Arc::new(PipelinedBlock::new_with_window( block, OrderedBlockWindow::empty(), )) diff --git a/consensus/src/consensus_observer/observer/active_state.rs b/consensus/src/consensus_observer/observer/active_state.rs index f3b5c63271693..994f22625c75e 100644 --- a/consensus/src/consensus_observer/observer/active_state.rs +++ b/consensus/src/consensus_observer/observer/active_state.rs @@ -549,7 +549,7 @@ mod test { BlockType::Genesis, ); let block = Block::new_for_testing(block_info.id(), block_data, None); - let pipelined_block = Arc::new(PipelinedBlock::new_ordered( + let pipelined_block = Arc::new(PipelinedBlock::new_with_window( block, OrderedBlockWindow::empty(), )); @@ -610,7 +610,7 @@ mod test { BlockType::Genesis, ); let block = Block::new_for_testing(block_info.id(), block_data, None); - Arc::new(PipelinedBlock::new_ordered( + Arc::new(PipelinedBlock::new_with_window( block, OrderedBlockWindow::empty(), )) diff --git a/consensus/src/consensus_observer/observer/consensus_observer.rs b/consensus/src/consensus_observer/observer/consensus_observer.rs index 7b85f4199dd6e..5ed4cbc092f56 100644 --- a/consensus/src/consensus_observer/observer/consensus_observer.rs +++ b/consensus/src/consensus_observer/observer/consensus_observer.rs @@ -786,7 +786,7 @@ impl ConsensusObserver { self.block_payload_store.clone(), ); self.pipeline_builder().build( - block, + block.clone(), self.get_last_pipeline_futs(), commit_callback, ); diff --git a/consensus/src/consensus_observer/observer/ordered_blocks.rs b/consensus/src/consensus_observer/observer/ordered_blocks.rs index 7cf6a696a43ff..22a0230873112 100644 --- a/consensus/src/consensus_observer/observer/ordered_blocks.rs +++ b/consensus/src/consensus_observer/observer/ordered_blocks.rs @@ -718,7 +718,7 @@ mod test { BlockType::Genesis, ); let block = Block::new_for_testing(block_info.id(), block_data, None); - let pipelined_block = Arc::new(PipelinedBlock::new_ordered( + let pipelined_block = Arc::new(PipelinedBlock::new_with_window( block, OrderedBlockWindow::empty(), )); diff --git a/consensus/src/consensus_observer/observer/payload_store.rs b/consensus/src/consensus_observer/observer/payload_store.rs index c16bf76131b83..9853148f5f1bf 100644 --- a/consensus/src/consensus_observer/observer/payload_store.rs +++ b/consensus/src/consensus_observer/observer/payload_store.rs @@ -1130,7 +1130,7 @@ mod test { block_type, ); let block = Block::new_for_testing(block_info.id(), block_data, None); - let pipelined_block = Arc::new(PipelinedBlock::new_ordered( + let pipelined_block = Arc::new(PipelinedBlock::new_with_window( block, OrderedBlockWindow::empty(), )); diff --git a/consensus/src/consensus_observer/observer/pending_blocks.rs b/consensus/src/consensus_observer/observer/pending_blocks.rs index d7b9f219bbfea..a61a05a410611 100644 --- a/consensus/src/consensus_observer/observer/pending_blocks.rs +++ b/consensus/src/consensus_observer/observer/pending_blocks.rs @@ -965,7 +965,7 @@ mod test { BlockType::Genesis, ); let block = Block::new_for_testing(block_info.id(), block_data, None); - let pipelined_block = Arc::new(PipelinedBlock::new_ordered( + let pipelined_block = Arc::new(PipelinedBlock::new_with_window( block, // TODO revisit this, not sure how i would do this right now... OrderedBlockWindow::empty(), diff --git a/consensus/src/dag/tests/helpers.rs b/consensus/src/dag/tests/helpers.rs index 97d7ca98929f9..7a9f430cdccec 100644 --- a/consensus/src/dag/tests/helpers.rs +++ b/consensus/src/dag/tests/helpers.rs @@ -27,7 +27,7 @@ pub(super) struct MockPayloadManager {} impl TPayloadManager for MockPayloadManager { fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {} - fn notify_commit(&self, _block_timestamp: u64, _blocks: &[Arc]) {} + fn notify_commit(&self, _block_timestamp: u64, _block: Option) {} fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> { unimplemented!() @@ -36,7 +36,7 @@ impl TPayloadManager for MockPayloadManager { async fn get_transactions( &self, _block: &Block, - ) -> ExecutorResult<(Vec, Option)> { + ) -> ExecutorResult<(Vec<(Arc>, u64)>, Option)> { Ok((Vec::new(), None)) } } diff --git a/consensus/src/execution_pipeline.rs b/consensus/src/execution_pipeline.rs index 3bb28c4d5f6ce..c7d3271554e6b 100644 --- a/consensus/src/execution_pipeline.rs +++ b/consensus/src/execution_pipeline.rs @@ -12,26 +12,39 @@ use crate::{ }; use aptos_consensus_types::{ block::Block, pipeline_execution_result::PipelineExecutionResult, - pipelined_block::OrderedBlockWindow, + pipelined_block::PipelinedBlock, }; use aptos_crypto::HashValue; use aptos_executor_types::{ state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorError, ExecutorResult, }; use aptos_experimental_runtimes::thread_manager::optimal_min_len; -use aptos_logger::{debug, warn}; +use aptos_logger::{debug, info, warn}; use aptos_types::{ - block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableBlock}, + block_executor::{ + config::BlockExecutorConfigFromOnchain, + partitioner::{ExecutableBlock, ExecutableTransactions}, + }, block_metadata_ext::BlockMetadataExt, transaction::{ - signature_verified_transaction::SignatureVerifiedTransaction, SignedTransaction, + signature_verified_transaction::{ + SignatureVerifiedTransaction, + SignatureVerifiedTransaction::{Invalid, Valid}, + }, + SignedTransaction, + Transaction::UserTransaction, + TransactionStatus, }, + txn_provider::{blocking_txn_provider::BlockingTxnProvider, TxnIndex, TxnProvider}, }; use fail::fail_point; use futures::future::BoxFuture; +use itertools::Itertools; use once_cell::sync::Lazy; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use std::{ + collections::HashSet, + iter::zip, sync::Arc, time::{Duration, Instant}, }; @@ -92,8 +105,7 @@ impl ExecutionPipeline { pub async fn queue( &self, - block: Block, - block_window: OrderedBlockWindow, + block: PipelinedBlock, metadata: BlockMetadataExt, parent_block_id: HashValue, txn_generator: BlockPreparer, @@ -102,11 +114,11 @@ impl ExecutionPipeline { lifetime_guard: CountedRequest<()>, ) -> StateComputeResultFut { let (result_tx, result_rx) = oneshot::channel(); + let block_round = block.round(); let block_id = block.id(); self.prepare_block_tx .send(PrepareBlockCommand { block, - block_window, metadata, block_executor_onchain_config, parent_block_id, @@ -119,14 +131,19 @@ impl ExecutionPipeline { .expect("Failed to send block to execution pipeline."); Box::pin(async move { - result_rx + let result = result_rx .await .map_err(|err| ExecutorError::InternalError { error: format!( "Failed to receive execution result for block {}: {:?}.", block_id, err ), - })? + })?; + info!( + "received result_rx for round {} block {}.", + block_round, block_id + ); + result }) } @@ -136,7 +153,6 @@ impl ExecutionPipeline { ) { let PrepareBlockCommand { block, - block_window, metadata, block_executor_onchain_config, parent_block_id, @@ -148,7 +164,9 @@ impl ExecutionPipeline { } = command; counters::PREPARE_BLOCK_WAIT_TIME.observe_duration(command_creation_time.elapsed()); debug!("prepare_block received block {}.", block.id()); - let input_txns = block_preparer.prepare_block(&block, &block_window).await; + let input_txns = block_preparer + .prepare_block(block.block(), block.block_window()) + .await; if let Err(e) = input_txns { result_tx .send(Err(e)) @@ -172,10 +190,12 @@ impl ExecutionPipeline { }); counters::PREPARE_BLOCK_SIG_VERIFICATION_TIME .observe_duration(sig_verification_start.elapsed()); + let block_id = block.id(); execute_block_tx .send(ExecuteBlockCommand { input_txns, - block: (block.id(), sig_verified_txns).into(), + pipelined_block: block, + block: (block_id, sig_verified_txns).into(), parent_block_id, block_executor_onchain_config, pre_commit_hook, @@ -207,8 +227,9 @@ impl ExecutionPipeline { ledger_apply_tx: mpsc::UnboundedSender, executor: Arc, ) { - while let Some(ExecuteBlockCommand { - input_txns, + 'outer: while let Some(ExecuteBlockCommand { + input_txns: _, + pipelined_block, block, parent_block_id, block_executor_onchain_config, @@ -218,43 +239,218 @@ impl ExecutionPipeline { lifetime_guard, }) = block_rx.recv().await { + let now = Instant::now(); + counters::EXECUTE_BLOCK_WAIT_TIME.observe_duration(command_creation_time.elapsed()); let block_id = block.block_id; - debug!("execute_stage received block {}.", block_id); + info!("execute_stage received block {}.", block_id); + + let mut committed_transactions = HashSet::new(); + + // TODO: lots of repeated code here + monitor!("execute_wait_for_committed_transactions", { + for b in pipelined_block + .block_window() + .pipelined_blocks() + .iter() + .filter(|window_block| window_block.round() == pipelined_block.round() - 1) + { + info!( + "Execution: Waiting for committed transactions at block {} for block {}", + b.round(), + pipelined_block.round() + ); + let txn_hashes = b.wait_for_committed_transactions(); + match txn_hashes { + Ok(txn_hashes) => { + for txn_hash in txn_hashes.iter() { + committed_transactions.insert(*txn_hash); + } + }, + Err(e) => { + info!( + "Execution: Waiting for committed transactions at block {} for block {}: Failed {}", + b.round(), + pipelined_block.round(), + e + ); + // TODO: can't clone, so make the whole thing return an error, then send it after this block of code. + result_tx + .send(Err(ExecutorError::CouldNotGetCommittedTransactions)) + .unwrap_or_else(log_failed_to_send_result( + "execute_stage", + block_id, + )); + continue 'outer; + }, + } + info!( + "Execution: Waiting for committed transactions at block {} for block {}: Done", + b.round(), + pipelined_block.round() + ); + } + }); + + let (mut txns, blocking_txn_provider) = + monitor!("execute_filter_block_committed_transactions", { + // TODO: Find a better way to do this. + match block.transactions { + ExecutableTransactions::Unsharded(txns) => { + let transactions: Vec<_> = txns + .into_iter() + .filter(|txn| { + if let Valid(UserTransaction(user_txn)) = txn { + !committed_transactions.contains(&user_txn.committed_hash()) + } else { + true + } + }) + .collect(); + let transactions_len = transactions.len(); + (transactions, BlockingTxnProvider::new(transactions_len)) + }, + ExecutableTransactions::UnshardedBlocking(_) => { + unimplemented!("Not expecting this yet.") + }, + ExecutableTransactions::Sharded(_) => { + unimplemented!("Sharded transactions are not supported yet.") + }, + } + }); + + let blocking_txn_writer = blocking_txn_provider.clone(); + let join_shuffle = tokio::task::spawn_blocking(move || { + // TODO: keep this previously split so we don't have to re-split it here + if let Some((first_user_txn_idx, _)) = txns.iter().find_position(|txn| { + let txn = match txn { + Valid(txn) => txn, + Invalid(txn) => txn, + }; + matches!(txn, UserTransaction(_)) + }) { + let timer = Instant::now(); + let validator_txns: Vec<_> = txns.drain(0..first_user_txn_idx).collect(); + info!( + "Execution: Split validator txns from user txns in {} micros", + timer.elapsed().as_micros() + ); + let shuffle_iterator = crate::transaction_shuffler::use_case_aware::iterator::ShuffledTransactionIterator::new(crate::transaction_shuffler::use_case_aware::Config { + sender_spread_factor: 32, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 4, + }).extended_with(txns); + for (idx, txn) in validator_txns + .into_iter() + .chain(shuffle_iterator) + .enumerate() + { + blocking_txn_writer.set_txn(idx as TxnIndex, txn); + } + } else { + // No user transactions in the block. + for (idx, txn) in txns.into_iter().enumerate() { + blocking_txn_writer.set_txn(idx as TxnIndex, txn); + } + } + }); + let transactions = ExecutableTransactions::UnshardedBlocking(blocking_txn_provider); + let transactions_cloned = transactions.clone(); + let block = ExecutableBlock::new(block.block_id, transactions); + let executor = executor.clone(); - let execution_time = monitor!( - "execute_block", - tokio::task::spawn_blocking(move || { - fail_point!("consensus::compute", |_| { - Err(ExecutorError::InternalError { - error: "Injected error in compute".into(), - }) - }); - let start = Instant::now(); - executor - .execute_and_state_checkpoint( - block, - parent_block_id, - block_executor_onchain_config, - ) - .map(|_| start.elapsed()) - }) - .await - ) - .expect("Failed to spawn_blocking."); + let join_execute = tokio::task::spawn_blocking(move || { + fail_point!("consensus::compute", |_| { + Err(ExecutorError::InternalError { + error: "Injected error in compute".into(), + }) + }); + let start = Instant::now(); + info!("execute_and_state_checkpoint start. {}", block_id); + executor + .execute_and_state_checkpoint( + block, + parent_block_id, + block_executor_onchain_config, + ) + .map(|output| { + info!("execute_and_state_checkpoint end. {}", block_id); + (output, start.elapsed()) + }) + }); + + join_shuffle.await.expect("Failed to join_shuffle."); + + let input_txns = monitor!("execute_filter_input_committed_transactions", { + let txns_provider_reader = match &transactions_cloned { + ExecutableTransactions::UnshardedBlocking(txns) => txns.clone(), + ExecutableTransactions::Unsharded(_) => { + unreachable!("Should have been converted to UnshardedBlocking") + }, + ExecutableTransactions::Sharded(_) => { + unreachable!("Should have been converted to UnshardedBlocking") + }, + }; + let mut input_txns = vec![]; + for idx in 0..txns_provider_reader.num_txns() { + match txns_provider_reader.get_txn(idx as TxnIndex) { + Valid(UserTransaction(user_txn)) => { + input_txns.push(user_txn.clone()); + }, + Invalid(UserTransaction(user_txn)) => { + input_txns.push(user_txn.clone()); + }, + _ => {}, + } + } + input_txns + }); + + let state_checkpoint_output = + monitor!("execute_block", join_execute.await).expect("Failed to join_execute."); + + monitor!("execute_update_committed_transactions", { + if let Ok((output, _)) = &state_checkpoint_output { + // Block metadata + validator transactions + let num_system_txns = 1 + pipelined_block + .validator_txns() + .map_or(0, |txns| txns.len()); + let committed_transactions: Vec<_> = + zip(input_txns.iter(), output.iter().skip(num_system_txns)) + .filter_map(|(input_txn, txn_status)| { + if let TransactionStatus::Keep(_) = txn_status { + Some(input_txn.committed_hash()) + } else { + None + } + }) + .collect(); + pipelined_block.set_committed_transactions(committed_transactions); + } else { + warn!("Not doing cancel of committed transactions: execute_block failed for block ({},{}) {}.", pipelined_block.epoch(), pipelined_block.round(), block_id); + // pipelined_block.cancel_committed_transactions(); + } + }); ledger_apply_tx .send(LedgerApplyCommand { input_txns, block_id, parent_block_id, - execution_time, + execution_time: state_checkpoint_output.map(|(_, time)| time), pre_commit_hook, result_tx, command_creation_time: Instant::now(), lifetime_guard, }) .expect("Failed to send block to ledger_apply stage."); + + info!( + "execute_stage for block ({},{}) took {} ms", + pipelined_block.epoch(), + pipelined_block.round(), + now.elapsed().as_millis() + ); } debug!("execute_stage quitting."); } @@ -277,7 +473,7 @@ impl ExecutionPipeline { }) = block_rx.recv().await { counters::APPLY_LEDGER_WAIT_TIME.observe_duration(command_creation_time.elapsed()); - debug!("ledger_apply stage received block {}.", block_id); + info!("ledger_apply stage received block {}.", block_id); let res = async { let execution_duration = execution_time?; let executor = executor.clone(); @@ -371,8 +567,7 @@ impl ExecutionPipeline { } struct PrepareBlockCommand { - block: Block, - block_window: OrderedBlockWindow, + block: PipelinedBlock, metadata: BlockMetadataExt, block_executor_onchain_config: BlockExecutorConfigFromOnchain, // The parent block id. @@ -386,6 +581,7 @@ struct PrepareBlockCommand { struct ExecuteBlockCommand { input_txns: Vec, + pipelined_block: PipelinedBlock, block: ExecutableBlock, parent_block_id: HashValue, block_executor_onchain_config: BlockExecutorConfigFromOnchain, diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index 47bdea4c9ce95..4c019abf51351 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -379,11 +379,11 @@ impl ProposalGenerator { // being executed: pending blocks vector keeps all the pending ancestors of the extended branch. let mut pending_blocks = self .block_store - .path_from_commit_root(hqc.certified_block().id()) + .path_from_window_root(hqc.certified_block().id()) .ok_or_else(|| format_err!("HQC {} already pruned", hqc.certified_block().id()))?; // Avoid txn manager long poll if the root block has txns, so that the leader can // deliver the commit proof to others without delay. - pending_blocks.push(self.block_store.commit_root()); + pending_blocks.push(self.block_store.window_root()); // Exclude all the pending transactions: these are all the ancestors of // parent (including) up to the root (including). diff --git a/consensus/src/payload_client/user/quorum_store_client.rs b/consensus/src/payload_client/user/quorum_store_client.rs index c8c541208c863..cd71f9261688e 100644 --- a/consensus/src/payload_client/user/quorum_store_client.rs +++ b/consensus/src/payload_client/user/quorum_store_client.rs @@ -139,7 +139,6 @@ impl UserPayloadClient for QuorumStoreClient { payload_len = payload.len(), return_empty = return_empty, return_non_full = return_non_full, - duration_ms = start_time.elapsed().as_millis() as u64, "Pull payloads from QuorumStore: proposal" ); diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index e5d5493266051..b5593cc99f668 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -42,7 +42,7 @@ use tokio::sync::oneshot; pub trait TPayloadManager: Send + Sync { /// Notify the payload manager that a block has been committed. This indicates that the /// transactions in the block's payload are no longer required for consensus. - fn notify_commit(&self, block_timestamp: u64, blocks: &[Arc]); + fn notify_commit(&self, block_timestamp: u64, block: Option); /// Prefetch the data for a payload. This is used to ensure that the data for a payload is /// available when block is executed. @@ -57,7 +57,7 @@ pub trait TPayloadManager: Send + Sync { async fn get_transactions( &self, block: &Block, - ) -> ExecutorResult<(Vec, Option)>; + ) -> ExecutorResult<(Vec<(Arc>, u64)>, Option)>; } /// A payload manager that directly returns the transactions in a block's payload. @@ -71,7 +71,7 @@ impl DirectMempoolPayloadManager { #[async_trait] impl TPayloadManager for DirectMempoolPayloadManager { - fn notify_commit(&self, _block_timestamp: u64, _blocks: &[Arc]) {} + fn notify_commit(&self, _block_timestamp: u64, _block: Option) {} fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {} @@ -82,13 +82,13 @@ impl TPayloadManager for DirectMempoolPayloadManager { async fn get_transactions( &self, block: &Block, - ) -> ExecutorResult<(Vec, Option)> { + ) -> ExecutorResult<(Vec<(Arc>, u64)>, Option)> { let Some(payload) = block.payload() else { return Ok((Vec::new(), None)); }; match payload { - Payload::DirectMempool(txns) => Ok((txns.clone(), None)), + Payload::DirectMempool(txns) => Ok((vec![(Arc::new(txns.clone()), 0)], None)), _ => unreachable!( "DirectMempoolPayloadManager: Unacceptable payload type {}. Epoch: {}, Round: {}, Block: {}", payload, @@ -132,7 +132,8 @@ impl QuorumStorePayloadManager { batch_reader: Arc, ) -> Vec<( HashValue, - oneshot::Receiver>>, + u64, + oneshot::Receiver>>>, )> { let mut receivers = Vec::new(); for (batch_info, responders) in batches { @@ -144,6 +145,7 @@ impl QuorumStorePayloadManager { if block_timestamp <= batch_info.expiration() { receivers.push(( *batch_info.digest(), + batch_info.gas_bucket_start(), batch_reader.get_batch( *batch_info.digest(), batch_info.expiration(), @@ -224,22 +226,25 @@ impl QuorumStorePayloadManager { #[async_trait] impl TPayloadManager for QuorumStorePayloadManager { - fn notify_commit(&self, block_timestamp: u64, blocks: &[Arc]) { + fn notify_commit(&self, block_timestamp: u64, block: Option) { self.batch_reader .update_certified_timestamp(block_timestamp); - let mut batches_removed = HashSet::new(); - for block in blocks { - for batch in Self::batches_removed_from_window(block) { - batches_removed.insert(batch); - } - } + let batches_to_remove = + block.map_or(vec![], |block| Self::batches_removed_from_window(&block)); + info!( + "batches_to_remove: {}", + batches_to_remove + .iter() + .map(|b| format!("{}", b)) + .join(", ") + ); let mut tx = self.coordinator_tx.clone(); if let Err(e) = tx.try_send(CoordinatorCommand::CommitNotification( block_timestamp, - batches_removed.into_iter().collect(), + batches_to_remove, )) { warn!( "CommitNotification failed. Is the epoch shutting down? error: {}", @@ -418,8 +423,18 @@ impl TPayloadManager for QuorumStorePayloadManager { async fn get_transactions( &self, block: &Block, - ) -> ExecutorResult<(Vec, Option)> { + ) -> ExecutorResult<(Vec<(Arc>, u64)>, Option)> { + info!( + "get_transactions for block ({}, {}) started.", + block.epoch(), + block.round() + ); let Some(payload) = block.payload() else { + info!( + "get_transactions for block ({}, {}) finished (empty).", + block.epoch(), + block.round() + ); return Ok((Vec::new(), None)); }; @@ -468,7 +483,7 @@ impl TPayloadManager for QuorumStorePayloadManager { &mut inline_batches .iter() // TODO: Can clone be avoided here? - .flat_map(|(_batch_info, txns)| txns.clone()) + .map(|(batch_info, txns)| (txns.clone(), batch_info.gas_bucket_start())) .collect(), ); all_txns @@ -499,6 +514,7 @@ impl TPayloadManager for QuorumStorePayloadManager { &self.ordered_authors, ) .await?; + // TODO: this is a complete hack, need to add real support for OptQuorumStore let inline_batch_txns = opt_qs_payload.inline_batches().transactions(); let all_txns = [proof_batch_txns, opt_batch_txns, inline_batch_txns].concat(); BlockTransactionPayload::new_opt_quorum_store( @@ -529,6 +545,12 @@ impl TPayloadManager for QuorumStorePayloadManager { consensus_publisher.publish_message(message); } + info!( + "get_transactions for block ({}, {}) finished.", + block.epoch(), + block.round() + ); + Ok(( transaction_payload.transactions(), transaction_payload.limit(), @@ -541,7 +563,7 @@ async fn get_transactions_for_observer( block: &Block, block_payloads: &Arc>>, consensus_publisher: &Option>, -) -> ExecutorResult<(Vec, Option)> { +) -> ExecutorResult<(Vec<(Arc>, u64)>, Option)> { // The data should already be available (as consensus observer will only ever // forward a block to the executor once the data has been received and verified). let block_payload = match block_payloads.lock().entry((block.epoch(), block.round())) { @@ -589,14 +611,14 @@ async fn request_txns_from_quorum_store( batches_and_responders: Vec<(BatchInfo, Vec)>, timestamp: u64, batch_reader: Arc, -) -> ExecutorResult> { +) -> ExecutorResult>, u64)>> { let mut vec_ret = Vec::new(); let receivers = QuorumStorePayloadManager::request_transactions( batches_and_responders, timestamp, batch_reader, ); - for (digest, rx) in receivers { + for (digest, gas_bucket_start, rx) in receivers { match rx.await { Err(e) => { // We probably advanced epoch already. @@ -607,15 +629,14 @@ async fn request_txns_from_quorum_store( return Err(DataNotFound(digest)); }, Ok(Ok(data)) => { - vec_ret.push(data); + vec_ret.push((data, gas_bucket_start)); }, Ok(Err(e)) => { return Err(e); }, } } - let ret: Vec = vec_ret.into_iter().flatten().collect(); - Ok(ret) + Ok(vec_ret) } async fn process_payload_helper( @@ -623,7 +644,7 @@ async fn process_payload_helper( batch_reader: Arc, block: &Block, ordered_authors: &[PeerId], -) -> ExecutorResult> { +) -> ExecutorResult>, u64)>> { let (iteration, fut) = { let data_fut_guard = data_ptr.data_fut.lock(); let data_fut = data_fut_guard.as_ref().expect("must be initialized"); @@ -668,10 +689,16 @@ async fn process_payload( batch_reader: Arc, block: &Block, ordered_authors: &[PeerId], -) -> ExecutorResult> { + // TODO: replace this Vec<(Arc>, u64>> with a struct BatchedTransactions +) -> ExecutorResult>, u64)>> { let status = proof_with_data.status.lock().take(); match status.expect("Should have been updated before.") { DataStatus::Cached(data) => { + info!( + "get_transactions block ({},{}) data is cached.", + block.epoch(), + block.round() + ); counters::QUORUM_BATCH_READY_COUNT.inc(); proof_with_data .status @@ -681,6 +708,11 @@ async fn process_payload( }, DataStatus::Requested(receivers) => { let _timer = counters::BATCH_WAIT_DURATION.start_timer(); + info!( + "get_transactions block ({},{}) data is being requested.", + block.epoch(), + block.round() + ); let mut vec_ret = Vec::new(); if !receivers.is_empty() { debug!( @@ -689,7 +721,7 @@ async fn process_payload( block.round() ); } - for (digest, rx) in receivers { + for (digest, gas_bucket_start, rx) in receivers { match rx.await { Err(e) => { // We probably advanced epoch already. @@ -719,7 +751,7 @@ async fn process_payload( return Err(DataNotFound(digest)); }, Ok(Ok(data)) => { - vec_ret.push(data); + vec_ret.push((data, gas_bucket_start)); }, Ok(Err(e)) => { let new_receivers = QuorumStorePayloadManager::request_transactions( @@ -745,13 +777,12 @@ async fn process_payload( }, } } - let ret: Vec = vec_ret.into_iter().flatten().collect(); // execution asks for the data twice, so data is cached here for the second time. proof_with_data .status .lock() - .replace(DataStatus::Cached(ret.clone())); - Ok(ret) + .replace(DataStatus::Cached(vec_ret.clone())); + Ok(vec_ret) }, } } @@ -775,7 +806,7 @@ impl ConsensusObserverPayloadManager { #[async_trait] impl TPayloadManager for ConsensusObserverPayloadManager { - fn notify_commit(&self, _block_timestamp: u64, _blocks: &[Arc]) { + fn notify_commit(&self, _block_timestamp: u64, _block: Option) { // noop } @@ -790,7 +821,7 @@ impl TPayloadManager for ConsensusObserverPayloadManager { async fn get_transactions( &self, block: &Block, - ) -> ExecutorResult<(Vec, Option)> { + ) -> ExecutorResult<(Vec<(Arc>, u64)>, Option)> { return get_transactions_for_observer(block, &self.txns_pool, &self.consensus_publisher) .await; } diff --git a/consensus/src/persistent_liveness_storage.rs b/consensus/src/persistent_liveness_storage.rs index 07add49887a08..07355ecfc7547 100644 --- a/consensus/src/persistent_liveness_storage.rs +++ b/consensus/src/persistent_liveness_storage.rs @@ -41,6 +41,11 @@ pub trait PersistentLivenessStorage: Send + Sync { /// Construct data that can be recovered from ledger fn recover_from_ledger(&self) -> LedgerRecoveryData; + fn recover_from_ledger_with_ledger_info( + &self, + latest_ledger_info: LedgerInfoWithSignatures, + ) -> LedgerRecoveryData; + /// Construct necessary data to start consensus. fn start(&self, order_vote_enabled: bool, window_size: usize) -> LivenessStorageData; @@ -85,11 +90,18 @@ impl Debug for RootInfo { #[derive(Clone)] pub struct LedgerRecoveryData { storage_ledger: LedgerInfoWithSignatures, + latest_epoch_ending_ledger: LedgerInfoWithSignatures, } impl LedgerRecoveryData { - pub fn new(storage_ledger: LedgerInfoWithSignatures) -> Self { - LedgerRecoveryData { storage_ledger } + pub fn new( + storage_ledger: LedgerInfoWithSignatures, + latest_epoch_ending_ledger: LedgerInfoWithSignatures, + ) -> Self { + LedgerRecoveryData { + storage_ledger, + latest_epoch_ending_ledger, + } } pub fn committed_round(&self) -> Round { @@ -112,20 +124,24 @@ impl LedgerRecoveryData { self.storage_ledger ); + // We always generate the virtual genesis block, + // as block window could extend to the beginning of the epoch. + let genesis = Block::make_genesis_block_from_ledger_info( + self.latest_epoch_ending_ledger.ledger_info(), + ); + let genesis_qc = QuorumCert::certificate_for_genesis_from_ledger_info( + self.latest_epoch_ending_ledger.ledger_info(), + genesis.id(), + ); + let genesis_ledger_info = genesis_qc.ledger_info().clone(); + let genesis_id = genesis.id(); + blocks.push(genesis); + quorum_certs.push(genesis_qc); + // We start from the block that storage's latest ledger info, if storage has end-epoch - // LedgerInfo, we generate the virtual genesis block + // LedgerInfo, we use info from the virtual genesis block let (latest_commit_id, latest_ledger_info_sig) = if self.storage_ledger.ledger_info().ends_epoch() { - let genesis = - Block::make_genesis_block_from_ledger_info(self.storage_ledger.ledger_info()); - let genesis_qc = QuorumCert::certificate_for_genesis_from_ledger_info( - self.storage_ledger.ledger_info(), - genesis.id(), - ); - let genesis_ledger_info = genesis_qc.ledger_info().clone(); - let genesis_id = genesis.id(); - blocks.push(genesis); - quorum_certs.push(genesis_qc); (genesis_id, genesis_ledger_info) } else { ( @@ -433,10 +449,45 @@ impl PersistentLivenessStorage for StorageWriteProxy { .aptos_db .get_latest_ledger_info() .expect("Failed to get latest ledger info."); - LedgerRecoveryData::new(latest_ledger_info) + self.recover_from_ledger_with_ledger_info(latest_ledger_info) + } + + fn recover_from_ledger_with_ledger_info( + &self, + latest_ledger_info: LedgerInfoWithSignatures, + ) -> LedgerRecoveryData { + if latest_ledger_info.ledger_info().ends_epoch() { + return LedgerRecoveryData::new(latest_ledger_info.clone(), latest_ledger_info); + } + + info!( + "Recover from ledger info: {}", + latest_ledger_info.ledger_info() + ); + let latest_epoch_change_proof: EpochChangeProof = self + .aptos_db + .get_epoch_ending_ledger_infos( + latest_ledger_info.ledger_info().next_block_epoch() - 1, + latest_ledger_info.ledger_info().next_block_epoch(), + ) + .expect("Failed to get latest epoch ending ledger info."); + assert_eq!( + latest_epoch_change_proof.ledger_info_with_sigs.len(), + 1, + "Expected exactly one epoch ending ledger info, found {}", + latest_epoch_change_proof.ledger_info_with_sigs.len() + ); + let latest_epoch_ending_ledger_info = latest_epoch_change_proof + .ledger_info_with_sigs + .first() + .expect("Expected exactly one epoch ending ledger info.") + .clone(); + LedgerRecoveryData::new(latest_ledger_info, latest_epoch_ending_ledger_info) } fn start(&self, order_vote_enabled: bool, window_size: usize) -> LivenessStorageData { + // TODO: during consensus recovery, set committed transactions for any blocks in the window + // TODO: where it is missing. at this point aptos_db should be up to date. info!("Start consensus recovery."); let raw_data = self .db @@ -466,6 +517,7 @@ impl PersistentLivenessStorage for StorageWriteProxy { qc_repr.concat() ); // find the block corresponding to storage latest ledger info + let ledger_recovery_data = self.recover_from_ledger(); let latest_ledger_info = self .aptos_db .get_latest_ledger_info() @@ -474,7 +526,6 @@ impl PersistentLivenessStorage for StorageWriteProxy { .aptos_db .get_accumulator_summary(latest_ledger_info.ledger_info().version()) .expect("Failed to get accumulator summary."); - let ledger_recovery_data = LedgerRecoveryData::new(latest_ledger_info); match RecoveryData::new( last_vote, diff --git a/consensus/src/pipeline/buffer_manager.rs b/consensus/src/pipeline/buffer_manager.rs index babc1a2366d1e..50c7421d9a6d5 100644 --- a/consensus/src/pipeline/buffer_manager.rs +++ b/consensus/src/pipeline/buffer_manager.rs @@ -571,7 +571,11 @@ impl BufferManager { self.previous_commit_time = Instant::now(); self.commit_proof_rb_handle.take(); // purge the incoming blocks queue - while let Ok(Some(_)) = self.block_rx.try_next() {} + while let Ok(Some(ordered_blocks)) = self.block_rx.try_next() { + for block in &ordered_blocks.ordered_blocks { + block.cancel_committed_transactions(); + } + } // Wait for ongoing tasks to finish before sending back ack. while self.ongoing_tasks.load(Ordering::SeqCst) > 0 { tokio::time::sleep(Duration::from_millis(10)).await; diff --git a/consensus/src/pipeline/execution_schedule_phase.rs b/consensus/src/pipeline/execution_schedule_phase.rs index bef39b890baa4..88803a0a2b495 100644 --- a/consensus/src/pipeline/execution_schedule_phase.rs +++ b/consensus/src/pipeline/execution_schedule_phase.rs @@ -99,8 +99,7 @@ impl StatelessPipeline for ExecutionSchedulePhase { let fut = self .execution_proxy .schedule_compute( - b.block(), - b.block_window(), + b, b.parent_id(), b.randomness().cloned(), lifetime_guard.spawn(()), diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index 3ad81c09cad5f..02d8f9c6fd432 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -18,10 +18,9 @@ use aptos_consensus_types::{ common::Round, pipeline::commit_vote::CommitVote, pipelined_block::{ - CommitLedgerResult, CommitVoteResult, ExecuteResult, LedgerUpdateResult, - OrderedBlockWindow, PipelineFutures, PipelineInputRx, PipelineInputTx, PipelinedBlock, - PostCommitResult, PostLedgerUpdateResult, PostPreCommitResult, PreCommitResult, - PrepareResult, TaskError, TaskFuture, TaskResult, + CommitLedgerResult, CommitVoteResult, ExecuteResult, LedgerUpdateResult, PipelineFutures, + PipelineInputRx, PipelineInputTx, PipelinedBlock, PostCommitResult, PostLedgerUpdateResult, + PostPreCommitResult, PreCommitResult, PrepareResult, TaskError, TaskFuture, TaskResult, }, }; use aptos_crypto::HashValue; @@ -223,16 +222,12 @@ impl PipelineBuilder { pub fn build( &self, - pipelined_block: &PipelinedBlock, + pipelined_block: Arc, parent_futs: PipelineFutures, block_store_callback: Box, ) { - let (futs, tx, abort_handles) = self.build_internal( - parent_futs, - Arc::new(pipelined_block.block().clone()), - Arc::new(pipelined_block.block_window().clone()), - block_store_callback, - ); + let (futs, tx, abort_handles) = + self.build_internal(parent_futs, pipelined_block.clone(), block_store_callback); pipelined_block.set_pipeline_futs(futs); pipelined_block.set_pipeline_tx(tx); pipelined_block.set_pipeline_abort_handles(abort_handles); @@ -241,8 +236,7 @@ impl PipelineBuilder { fn build_internal( &self, parent: PipelineFutures, - block: Arc, - block_window: Arc, + block: Arc, block_store_callback: Box, ) -> (PipelineFutures, PipelineInputTx, Vec) { let (tx, rx) = Self::channel(); @@ -256,11 +250,7 @@ impl PipelineBuilder { let mut abort_handles = vec![]; let prepare_fut = spawn_shared_fut( - Self::prepare( - self.block_preparer.clone(), - block.clone(), - block_window.clone(), - ), + Self::prepare(self.block_preparer.clone(), block.clone()), &mut abort_handles, ); let execute_fut = spawn_shared_fut( @@ -334,7 +324,6 @@ impl PipelineBuilder { self.state_sync_notifier.clone(), self.payload_manager.clone(), block.clone(), - block_window.clone(), ), &mut abort_handles, ); @@ -372,13 +361,15 @@ impl PipelineBuilder { /// What it does: Wait for all data becomes available and verify transaction signatures async fn prepare( preparer: Arc, - block: Arc, - block_window: Arc, + block: Arc, ) -> TaskResult { - let _tracker = Tracker::new("prepare", &block); + let _tracker = Tracker::new("prepare", block.block()); // the loop can only be abort by the caller let input_txns = loop { - match preparer.prepare_block(&block, &block_window).await { + match preparer + .prepare_block(block.block(), block.block_window()) + .await + { Ok(input_txns) => break input_txns, Err(e) => { warn!( @@ -411,7 +402,7 @@ impl PipelineBuilder { parent_block_execute_phase: TaskFuture, randomness_rx: oneshot::Receiver>, executor: Arc, - block: Arc, + block: Arc, is_randomness_enabled: bool, validator: Arc<[AccountAddress]>, onchain_execution_config: BlockExecutorConfigFromOnchain, @@ -422,11 +413,13 @@ impl PipelineBuilder { .await .map_err(|_| anyhow!("randomness tx cancelled"))?; - let _tracker = Tracker::new("execute", &block); + let _tracker = Tracker::new("execute", block.block()); let metadata_txn = if is_randomness_enabled { - block.new_metadata_with_randomness(&validator, maybe_rand) + block + .block() + .new_metadata_with_randomness(&validator, maybe_rand) } else { - block.new_block_metadata(&validator).into() + block.block().new_block_metadata(&validator).into() }; let txns = [ vec![SignatureVerifiedTransaction::from(Transaction::from( @@ -443,6 +436,7 @@ impl PipelineBuilder { user_txns.as_ref().clone(), ] .concat(); + // TODO: share the extra logic that is now in execution_pipeline::execute_stage let start = Instant::now(); tokio::task::spawn_blocking(move || { executor @@ -465,11 +459,11 @@ impl PipelineBuilder { execute_phase: TaskFuture, parent_block_ledger_update_phase: TaskFuture, executor: Arc, - block: Arc, + block: Arc, ) -> TaskResult { let (_, _, prev_epoch_end_timestamp) = parent_block_ledger_update_phase.await?; let execution_time = execute_phase.await?; - let _tracker = Tracker::new("ledger_update", &block); + let _tracker = Tracker::new("ledger_update", block.block()); let timestamp = block.timestamp_usecs(); let result = tokio::task::spawn_blocking(move || { executor @@ -495,12 +489,12 @@ impl PipelineBuilder { prepare_fut: TaskFuture, ledger_update: TaskFuture, mempool_notifier: Arc, - block: Arc, + block: Arc, ) -> TaskResult { let user_txns = prepare_fut.await?; let (compute_result, _, _) = ledger_update.await?; - let _tracker = Tracker::new("post_ledger_update", &block); + let _tracker = Tracker::new("post_ledger_update", block.block()); let compute_status = compute_result.compute_status_for_input_txns(); // the length of compute_status is user_txns.len() + num_vtxns + 1 due to having blockmetadata if user_txns.len() >= compute_status.len() { @@ -545,7 +539,7 @@ impl PipelineBuilder { mut order_proof_rx: tokio::sync::broadcast::Receiver<()>, mut commit_proof_rx: tokio::sync::broadcast::Receiver, signer: Arc, - block: Arc, + block: Arc, ) -> TaskResult { let (compute_result, _, epoch_end_timestamp) = ledger_update_phase.await?; // either order_vote_rx or order_proof_rx can trigger the next phase @@ -561,8 +555,8 @@ impl PipelineBuilder { } } - let _tracker = Tracker::new("sign_commit_vote", &block); - let mut block_info = block.gen_block_info( + let _tracker = Tracker::new("sign_commit_vote", block.block()); + let mut block_info = block.block().gen_block_info( compute_result.root_hash(), compute_result.last_version_or_0(), compute_result.epoch_state().clone(), @@ -595,7 +589,7 @@ impl PipelineBuilder { mut order_proof_rx: tokio::sync::broadcast::Receiver<()>, mut commit_proof_rx: tokio::sync::broadcast::Receiver, executor: Arc, - block: Arc, + block: Arc, ) -> TaskResult { let (compute_result, _, _) = ledger_update_phase.await?; parent_block_pre_commit_phase.await?; @@ -612,7 +606,7 @@ impl PipelineBuilder { .map_err(|_| anyhow!("commit proof tx cancelled"))?; } - let _tracker = Tracker::new("pre_commit", &block); + let _tracker = Tracker::new("pre_commit", block.block()); tokio::task::spawn_blocking(move || { executor .pre_commit_block(block.id()) @@ -631,14 +625,12 @@ impl PipelineBuilder { parent_post_pre_commit: TaskFuture, state_sync_notifier: Arc, payload_manager: Arc, - block: Arc, - block_window: Arc, + block: Arc, ) -> TaskResult { let compute_result = pre_commit.await?; parent_post_pre_commit.await?; - let _tracker = Tracker::new("post_pre_commit", &block); - let blocks = block_window.pipelined_blocks().clone(); + let _tracker = Tracker::new("post_pre_commit", block.block()); let timestamp = block.timestamp_usecs(); let _timer = counters::OP_COUNTERS.timer("pre_commit_notify"); @@ -653,7 +645,7 @@ impl PipelineBuilder { error!(error = ?e, "Failed to notify state synchronizer"); } - payload_manager.notify_commit(timestamp, &blocks); + payload_manager.notify_commit(timestamp, Some(block.as_ref().clone())); Ok(()) } @@ -664,7 +656,7 @@ impl PipelineBuilder { mut commit_proof_rx: tokio::sync::broadcast::Receiver, parent_block_commit_phase: TaskFuture, executor: Arc, - block: Arc, + block: Arc, ) -> TaskResult { parent_block_commit_phase.await?; pre_commit_fut.await?; @@ -678,7 +670,7 @@ impl PipelineBuilder { return Ok(None); } - let _tracker = Tracker::new("commit_ledger", &block); + let _tracker = Tracker::new("commit_ledger", block.block()); let ledger_info_with_sigs_clone = ledger_info_with_sigs.clone(); tokio::task::spawn_blocking(move || { executor @@ -697,14 +689,14 @@ impl PipelineBuilder { commit_ledger_fut: TaskFuture, parent_post_commit: TaskFuture, block_store_callback: Box, - block: Arc, + block: Arc, ) -> TaskResult { parent_post_commit.await?; let maybe_ledger_info_with_sigs = commit_ledger_fut.await?; let compute_result = pre_commit_fut.await?; - let _tracker = Tracker::new("post_commit_ledger", &block); - update_counters_for_block(&block); + let _tracker = Tracker::new("post_commit_ledger", block.block()); + update_counters_for_block(block.block()); update_counters_for_compute_result(&compute_result); if let Some(ledger_info_with_sigs) = maybe_ledger_info_with_sigs { diff --git a/consensus/src/quorum_store/batch_proof_queue.rs b/consensus/src/quorum_store/batch_proof_queue.rs index 80e56bb13bd55..fba9f7d9e07d5 100644 --- a/consensus/src/quorum_store/batch_proof_queue.rs +++ b/consensus/src/quorum_store/batch_proof_queue.rs @@ -16,6 +16,7 @@ use aptos_logger::{info, sample, sample::SampleRate, warn}; use aptos_metrics_core::TimerHelper; use aptos_short_hex_str::AsShortHexStr; use aptos_types::{transaction::SignedTransaction, PeerId}; +use dashmap::DashSet; use rand::{prelude::SliceRandom, thread_rng}; use std::{ cmp::Reverse, @@ -522,7 +523,7 @@ impl BatchProofQueue { return_non_full: bool, block_timestamp: Duration, ) -> ( - Vec<(BatchInfo, Vec)>, + Vec<(BatchInfo, Arc>)>, PayloadTxnsSize, u64, ) { @@ -571,25 +572,36 @@ impl BatchProofQueue { block_timestamp: Duration, min_batch_age_usecs: Option, ) -> (Vec<&QueueItem>, PayloadTxnsSize, u64, bool) { + let start_time = Instant::now(); + let mut result = Vec::new(); let mut cur_unique_txns = 0; let mut cur_all_txns = PayloadTxnsSize::zero(); let mut excluded_txns = 0; let mut full = false; - // Set of all the excluded transactions and all the transactions included in the result - let mut filtered_txns = HashSet::new(); - for batch_info in excluded_batches { - let batch_key = BatchKey::from_info(batch_info); - if let Some(txn_summaries) = self - .items - .get(&batch_key) - .and_then(|item| item.txn_summaries.as_ref()) - { - for txn_summary in txn_summaries { - filtered_txns.insert(*txn_summary); - } - } - } + + let filtered_txns: DashSet = DashSet::new(); + // let num_all_txns = excluded_batches + // .iter() + // .map(|batch| batch.num_txns() as usize) + // .sum(); + // let filtered_txns = DashSet::with_capacity(num_all_txns); + // excluded_batches.par_iter().for_each(|batch_info| { + // let batch_key = BatchKey::from_info(batch_info); + // if let Some(txn_summaries) = self + // .items + // .get(&batch_key) + // .and_then(|item| item.txn_summaries.as_ref()) + // { + // for txn_summary in txn_summaries { + // filtered_txns.insert(*txn_summary); + // } + // } + // }); + info!( + "Pull payloads from QuorumStore: building filtered_txns took {} ms", + start_time.elapsed().as_millis() + ); let max_batch_creation_ts_usecs = min_batch_age_usecs .map(|min_age| aptos_infallible::duration_since_epoch().as_micros() as u64 - min_age); @@ -700,6 +712,7 @@ impl BatchProofQueue { result_count = result.len(), full = full, return_non_full = return_non_full, + elapsed_time_ms = start_time.elapsed().as_millis() as u64, "Pull payloads from QuorumStore: internal" ); diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index c7dfe6aeff0c1..3349a4abf9a74 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -22,7 +22,7 @@ use tokio::{sync::oneshot, time}; struct BatchRequesterState { signers: Vec, next_index: usize, - ret_tx: oneshot::Sender>>, + ret_tx: oneshot::Sender>>>, num_retries: usize, retry_limit: usize, } @@ -30,7 +30,7 @@ struct BatchRequesterState { impl BatchRequesterState { fn new( signers: Vec, - ret_tx: oneshot::Sender>>, + ret_tx: oneshot::Sender>>>, retry_limit: usize, ) -> Self { Self { @@ -68,7 +68,7 @@ impl BatchRequesterState { } } - fn serve_request(self, digest: HashValue, maybe_payload: Option>) { + fn serve_request(self, digest: HashValue, maybe_payload: Option>>) { if let Some(payload) = maybe_payload { trace!( "QS: batch to oneshot, digest {}, tx {:?}", @@ -133,9 +133,9 @@ impl BatchRequester { digest: HashValue, expiration: u64, responders: Vec, - ret_tx: oneshot::Sender>>, + ret_tx: oneshot::Sender>>>, mut subscriber_rx: oneshot::Receiver, - ) -> Option<(BatchInfo, Vec)> { + ) -> Option<(BatchInfo, Arc>)> { let validator_verifier = self.validator_verifier.clone(); let mut request_state = BatchRequesterState::new(responders, ret_tx, self.retry_limit); let network_sender = self.network_sender.clone(); @@ -168,7 +168,7 @@ impl BatchRequester { counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); let digest = *batch.digest(); let batch_info = batch.batch_info().clone(); - let payload = batch.into_transactions(); + let payload = Arc::new(batch.into_transactions()); request_state.serve_request(digest, Some(payload.clone())); return Some((batch_info, payload)); } diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index f8d32b23137ff..c99ccd0217162 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -435,7 +435,7 @@ pub trait BatchReader: Send + Sync { digest: HashValue, expiration: u64, signers: Vec, - ) -> oneshot::Receiver>>; + ) -> oneshot::Receiver>>>; fn update_certified_timestamp(&self, certified_time: u64); } @@ -467,7 +467,7 @@ impl BatchReader for Batch digest: HashValue, expiration: u64, signers: Vec, - ) -> oneshot::Receiver>> { + ) -> oneshot::Receiver>>> { let (tx, rx) = oneshot::channel(); let batch_store = self.batch_store.clone(); let batch_requester = self.batch_requester.clone(); diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index fed469c4df93f..441e9f17e28c9 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -56,7 +56,7 @@ fn request_for_test( num_bytes, 0, ), - maybe_payload, + maybe_payload.map(Arc::new), ) } diff --git a/consensus/src/quorum_store/tests/proof_coordinator_test.rs b/consensus/src/quorum_store/tests/proof_coordinator_test.rs index 2fd9d36ac57f5..be9ffb12ab6cf 100644 --- a/consensus/src/quorum_store/tests/proof_coordinator_test.rs +++ b/consensus/src/quorum_store/tests/proof_coordinator_test.rs @@ -34,7 +34,7 @@ impl BatchReader for MockBatchReader { _digest: HashValue, _expiration: u64, _signers: Vec, - ) -> tokio::sync::oneshot::Receiver>> { + ) -> tokio::sync::oneshot::Receiver>>> { unimplemented!() } diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index 96b6324c39a14..4321e4b511f3d 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -12,12 +12,13 @@ use serde::{Deserialize, Serialize}; use std::{ fmt::{Display, Formatter}, ops::Deref, + sync::Arc, }; #[derive(Clone, Eq, Deserialize, Serialize, PartialEq, Debug)] pub struct PersistedValue { info: BatchInfo, - maybe_payload: Option>, + maybe_payload: Option>>, } #[derive(PartialEq, Debug)] @@ -27,7 +28,7 @@ pub(crate) enum StorageMode { } impl PersistedValue { - pub(crate) fn new(info: BatchInfo, maybe_payload: Option>) -> Self { + pub(crate) fn new(info: BatchInfo, maybe_payload: Option>>) -> Self { Self { info, maybe_payload, @@ -41,7 +42,7 @@ impl PersistedValue { } } - pub(crate) fn take_payload(&mut self) -> Option> { + pub(crate) fn take_payload(&mut self) -> Option>> { self.maybe_payload.take() } @@ -54,7 +55,7 @@ impl PersistedValue { &self.info } - pub fn payload(&self) -> &Option> { + pub fn payload(&self) -> &Option>> { &self.maybe_payload } @@ -75,7 +76,7 @@ impl PersistedValue { vec![] } - pub fn unpack(self) -> (BatchInfo, Option>) { + pub fn unpack(self) -> (BatchInfo, Option>>) { (self.info, self.maybe_payload) } } @@ -97,9 +98,12 @@ impl TryFrom for Batch { batch_info: value.info, payload: BatchPayload::new( author, + // TODO: can this clone be avoided as well? value .maybe_payload - .ok_or_else(|| anyhow::anyhow!("Payload not exist"))?, + .ok_or_else(|| anyhow::anyhow!("Payload not exist"))? + .as_ref() + .clone(), ), }) } @@ -263,7 +267,7 @@ impl From for PersistedValue { batch_info, payload, } = value; - PersistedValue::new(batch_info, Some(payload.into_transactions())) + PersistedValue::new(batch_info, Some(Arc::new(payload.into_transactions()))) } } diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index 1f4877119514d..6f980c605d9fb 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -20,10 +20,8 @@ use crate::{ use anyhow::Result; use aptos_consensus_notifications::ConsensusNotificationSender; use aptos_consensus_types::{ - block::Block, - common::Round, - pipeline_execution_result::PipelineExecutionResult, - pipelined_block::{OrderedBlockWindow, PipelinedBlock}, + common::Round, pipeline_execution_result::PipelineExecutionResult, + pipelined_block::PipelinedBlock, }; use aptos_crypto::HashValue; use aptos_executor_types::{ @@ -135,13 +133,12 @@ impl ExecutionProxy { fn pre_commit_hook( &self, - block: &Block, - block_window: &OrderedBlockWindow, + block: &PipelinedBlock, payload_manager: Arc, ) -> PreCommitHook { let mut pre_commit_notifier = self.pre_commit_notifier.clone(); let state_sync_notifier = self.state_sync_notifier.clone(); - let blocks = block_window.pipelined_blocks().clone(); + let block_cloned = block.clone(); let timestamp = block.timestamp_usecs(); Box::new(move |state_compute_result: &StateComputeResult| { let state_compute_result = state_compute_result.clone(); @@ -162,7 +159,7 @@ impl ExecutionProxy { error!(error = ?e, "Failed to notify state synchronizer"); } - payload_manager.notify_commit(timestamp, &blocks); + payload_manager.notify_commit(timestamp, Some(block_cloned)); })) .await .expect("Failed to send pre-commit notification"); @@ -210,13 +207,14 @@ impl StateComputer for ExecutionProxy { async fn schedule_compute( &self, // The block to be executed. - block: &Block, - block_window: &OrderedBlockWindow, + block: &PipelinedBlock, // The parent block id. parent_block_id: HashValue, randomness: Option, lifetime_guard: CountedRequest<()>, ) -> StateComputeResultFut { + block.init_committed_transactions(); + let block_id = block.id(); debug!( block = %block, @@ -249,9 +247,11 @@ impl StateComputer for ExecutionProxy { let timestamp = block.timestamp_usecs(); let metadata = if is_randomness_enabled { - block.new_metadata_with_randomness(&validators, randomness) + block + .block() + .new_metadata_with_randomness(&validators, randomness) } else { - block.new_block_metadata(&validators).into() + block.block().new_block_metadata(&validators).into() }; let pipeline_entry_time = Instant::now(); @@ -259,12 +259,11 @@ impl StateComputer for ExecutionProxy { .execution_pipeline .queue( block.clone(), - block_window.clone(), metadata.clone(), parent_block_id, transaction_generator, block_executor_onchain_config, - self.pre_commit_hook(block, block_window, payload_manager), + self.pre_commit_hook(block, payload_manager), lifetime_guard, ) .await; @@ -272,8 +271,19 @@ impl StateComputer for ExecutionProxy { counters::PIPELINE_ENTRY_TO_INSERTED_TIME.observe_duration(pipeline_entry_time.elapsed()); let pipeline_inserted_timestamp = Instant::now(); + let block_cloned = block.clone(); Box::pin(async move { - let pipeline_execution_result = fut.await?; + let pipeline_execution_result = match fut.await { + Ok(result) => result, + Err(e) => { + error!( + error = ?e, + "Failed to execute block in pipeline", + ); + block_cloned.cancel_committed_transactions(); + return Err(e); + }, + }; debug!( block_id = block_id, "Got state compute result, post processing." @@ -432,7 +442,7 @@ impl StateComputer for ExecutionProxy { // Might be none if called in the recovery path, or between epoch stop and start. if let Some(inner) = self.state.read().as_ref() { let block_timestamp = target.commit_info().timestamp_usecs(); - inner.payload_manager.notify_commit(block_timestamp, &[]); + inner.payload_manager.notify_commit(block_timestamp, None); } // Inject an error for fail point testing @@ -541,7 +551,7 @@ async fn test_commit_sync_race() { _block: ExecutableBlock, _parent_block_id: HashValue, _onchain_config: BlockExecutorConfigFromOnchain, - ) -> ExecutorResult<()> { + ) -> ExecutorResult> { todo!() } diff --git a/consensus/src/state_computer_tests.rs b/consensus/src/state_computer_tests.rs index 63655a99841d1..cbc3646ecefcc 100644 --- a/consensus/src/state_computer_tests.rs +++ b/consensus/src/state_computer_tests.rs @@ -10,7 +10,9 @@ use crate::{ use aptos_config::config::transaction_filter_type::Filter; use aptos_consensus_notifications::{ConsensusNotificationSender, Error}; use aptos_consensus_types::{ - block::Block, block_data::BlockData, pipelined_block::OrderedBlockWindow, + block::Block, + block_data::BlockData, + pipelined_block::{OrderedBlockWindow, PipelinedBlock}, }; use aptos_crypto::HashValue; use aptos_executor_types::{ @@ -119,9 +121,10 @@ impl BlockExecutorTrait for DummyBlockExecutor { block: ExecutableBlock, _parent_block_id: HashValue, _onchain_config: BlockExecutorConfigFromOnchain, - ) -> ExecutorResult<()> { + ) -> ExecutorResult> { self.blocks_received.lock().push(block); - Ok(()) + // TODO: fix + Ok(vec![]) } fn ledger_update( @@ -186,6 +189,7 @@ async fn should_see_and_notify_validator_txns() { ]), None, ); + let pipelined_block = PipelinedBlock::new_with_window(block, OrderedBlockWindow::empty()); let epoch_state = EpochState::empty(); @@ -200,13 +204,7 @@ async fn should_see_and_notify_validator_txns() { // Ensure the dummy executor has received the txns. let _ = execution_policy - .schedule_compute( - &block, - &OrderedBlockWindow::empty(), - HashValue::zero(), - None, - dummy_guard(), - ) + .schedule_compute(&pipelined_block, HashValue::zero(), None, dummy_guard()) .await .await .unwrap(); diff --git a/consensus/src/state_replication.rs b/consensus/src/state_replication.rs index a702ecb7aadd2..10ae6ae587aa0 100644 --- a/consensus/src/state_replication.rs +++ b/consensus/src/state_replication.rs @@ -8,10 +8,7 @@ use crate::{ transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, }; use anyhow::Result; -use aptos_consensus_types::{ - block::Block, - pipelined_block::{OrderedBlockWindow, PipelinedBlock}, -}; +use aptos_consensus_types::pipelined_block::PipelinedBlock; use aptos_crypto::HashValue; use aptos_executor_types::ExecutorResult; use aptos_types::{ @@ -31,8 +28,7 @@ pub trait StateComputer: Send + Sync { async fn schedule_compute( &self, // The block that will be computed. - _block: &Block, - _block_window: &OrderedBlockWindow, + _block: &PipelinedBlock, // The parent block root hash. _parent_block_id: HashValue, _randomness: Option, diff --git a/consensus/src/test_utils/mock_execution_client.rs b/consensus/src/test_utils/mock_execution_client.rs index 94afb9d88ac74..5ac8692d26f5e 100644 --- a/consensus/src/test_utils/mock_execution_client.rs +++ b/consensus/src/test_utils/mock_execution_client.rs @@ -80,7 +80,11 @@ impl MockExecutionClient { txns.append(&mut payload_txns); } // they may fail during shutdown - let _ = self.state_sync_client.unbounded_send(txns); + let _ = self.state_sync_client.unbounded_send( + txns.into_iter() + .flat_map(|(txns, _)| txns.as_ref().clone().into_iter()) + .collect(), + ); callback( &ordered_blocks.into_iter().map(Arc::new).collect::>(), diff --git a/consensus/src/test_utils/mock_state_computer.rs b/consensus/src/test_utils/mock_state_computer.rs index 0219e93bdcc1e..00750129f4264 100644 --- a/consensus/src/test_utils/mock_state_computer.rs +++ b/consensus/src/test_utils/mock_state_computer.rs @@ -13,9 +13,7 @@ use crate::{ }; use anyhow::{anyhow, Result}; use aptos_consensus_types::{ - block::Block, - pipeline_execution_result::PipelineExecutionResult, - pipelined_block::{OrderedBlockWindow, PipelinedBlock}, + pipeline_execution_result::PipelineExecutionResult, pipelined_block::PipelinedBlock, }; use aptos_crypto::HashValue; use aptos_executor_types::{ @@ -123,8 +121,7 @@ impl RandomComputeResultStateComputer { impl StateComputer for RandomComputeResultStateComputer { async fn schedule_compute( &self, - _block: &Block, - _block_window: &OrderedBlockWindow, + _block: &PipelinedBlock, parent_block_id: HashValue, _randomness: Option, _lifetime_guard: CountedRequest<()>, diff --git a/consensus/src/test_utils/mock_storage.rs b/consensus/src/test_utils/mock_storage.rs index fa337a93d69d0..b67fd605bbfae 100644 --- a/consensus/src/test_utils/mock_storage.rs +++ b/consensus/src/test_utils/mock_storage.rs @@ -94,10 +94,11 @@ impl MockStorage { } pub fn get_ledger_recovery_data(&self) -> LedgerRecoveryData { - LedgerRecoveryData::new(LedgerInfoWithSignatures::new( + let ledger_info = LedgerInfoWithSignatures::new( self.storage_ledger.lock().clone(), AggregateSignature::empty(), - )) + ); + LedgerRecoveryData::new(ledger_info.clone(), ledger_info) } pub fn try_start(&self, order_vote_enabled: bool, window_size: usize) -> Result { @@ -204,6 +205,13 @@ impl PersistentLivenessStorage for MockStorage { self.get_ledger_recovery_data() } + fn recover_from_ledger_with_ledger_info( + &self, + _latest_ledger_info: LedgerInfoWithSignatures, + ) -> LedgerRecoveryData { + self.recover_from_ledger() + } + fn start(&self, order_vote_enabled: bool, window_size: usize) -> LivenessStorageData { match self.try_start(order_vote_enabled, window_size) { Ok(recovery_data) => LivenessStorageData::FullRecoveryData(recovery_data), @@ -274,10 +282,18 @@ impl PersistentLivenessStorage for EmptyStorage { } fn recover_from_ledger(&self) -> LedgerRecoveryData { - LedgerRecoveryData::new(LedgerInfoWithSignatures::new( + let ledger_info = LedgerInfoWithSignatures::new( LedgerInfo::mock_genesis(None), AggregateSignature::empty(), - )) + ); + LedgerRecoveryData::new(ledger_info.clone(), ledger_info) + } + + fn recover_from_ledger_with_ledger_info( + &self, + _latest_ledger_info: LedgerInfoWithSignatures, + ) -> LedgerRecoveryData { + self.recover_from_ledger() } fn start(&self, order_vote_enabled: bool, window_size: usize) -> LivenessStorageData { diff --git a/consensus/src/transaction_shuffler/mod.rs b/consensus/src/transaction_shuffler/mod.rs index 7abb146c74b27..bee86f037470e 100644 --- a/consensus/src/transaction_shuffler/mod.rs +++ b/consensus/src/transaction_shuffler/mod.rs @@ -7,7 +7,7 @@ use sender_aware::SenderAwareShuffler; use std::sync::Arc; mod sender_aware; -mod use_case_aware; +pub mod use_case_aware; // re-export use case aware shuffler for fuzzer. #[cfg(feature = "fuzzing")] pub mod transaction_shuffler_fuzzing { diff --git a/consensus/src/transaction_shuffler/use_case_aware/iterator.rs b/consensus/src/transaction_shuffler/use_case_aware/iterator.rs index b7cfa0fd1412b..da466ea43f500 100644 --- a/consensus/src/transaction_shuffler/use_case_aware/iterator.rs +++ b/consensus/src/transaction_shuffler/use_case_aware/iterator.rs @@ -10,7 +10,7 @@ use aptos_types::transaction::use_case::UseCaseAwareTransaction; use std::{collections::VecDeque, fmt::Debug}; #[derive(Debug)] -pub(super) struct ShuffledTransactionIterator { +pub struct ShuffledTransactionIterator { input_queue: VecDeque, delayed_queue: DelayedQueue, input_idx: InputIdx, @@ -21,7 +21,7 @@ impl ShuffledTransactionIterator where Txn: UseCaseAwareTransaction + Debug, { - pub(super) fn new(config: Config) -> Self { + pub fn new(config: Config) -> Self { Self { input_queue: VecDeque::new(), delayed_queue: DelayedQueue::new(config), @@ -30,7 +30,7 @@ where } } - pub(super) fn extended_with(mut self, txns: impl IntoIterator) -> Self { + pub fn extended_with(mut self, txns: impl IntoIterator) -> Self { self.input_queue.extend(txns); self } diff --git a/consensus/src/util/db_tool.rs b/consensus/src/util/db_tool.rs index 6ae051b251997..0d86d9ac60768 100644 --- a/consensus/src/util/db_tool.rs +++ b/consensus/src/util/db_tool.rs @@ -69,7 +69,7 @@ fn extract_txns_from_quorum_store( for digest in digests { if let Some(batch) = all_batches.get(&digest) { if let Some(txns) = batch.payload() { - block_txns.extend(txns); + block_txns.extend(txns.as_ref()); } else { bail!("Payload is not found for batch ({digest})."); } @@ -108,7 +108,7 @@ pub fn extract_txns_from_block<'a>( ) .unwrap(); for (_, txns) in inline_batches { - all_txns.extend(txns); + all_txns.extend(txns.as_ref()); } Ok(all_txns) }, diff --git a/execution/executor-benchmark/src/native_executor.rs b/execution/executor-benchmark/src/native_executor.rs index 37f2253c685c2..facba59c6a311 100644 --- a/execution/executor-benchmark/src/native_executor.rs +++ b/execution/executor-benchmark/src/native_executor.rs @@ -6,7 +6,6 @@ use crate::{ metrics::TIMER, }; use anyhow::Result; -use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_types::{ account_address::AccountAddress, account_config::{DepositEvent, WithdrawEvent}, @@ -21,6 +20,7 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, ExecutionStatus, Transaction, TransactionAuxiliaryData, TransactionOutput, TransactionStatus, }, + txn_provider::TxnProvider, vm_status::AbortLocation, write_set::{WriteOp, WriteSet, WriteSetMut}, }; @@ -358,7 +358,7 @@ impl VMBlockExecutor for NativeExecutor { fn execute_block( &self, - txn_provider: &DefaultTxnProvider, + txn_provider: &dyn TxnProvider, state_view: &(impl StateView + Sync), _onchain_config: BlockExecutorConfigFromOnchain, _transaction_slice_metadata: TransactionSliceMetadata, diff --git a/execution/executor-service/Cargo.toml b/execution/executor-service/Cargo.toml index 8ef9319e548bc..5a71bb62d8cce 100644 --- a/execution/executor-service/Cargo.toml +++ b/execution/executor-service/Cargo.toml @@ -13,7 +13,6 @@ repository = { workspace = true } rust-version = { workspace = true } [dependencies] -aptos-block-executor = { workspace = true } aptos-block-partitioner = { workspace = true } aptos-config = { workspace = true } aptos-infallible = { workspace = true } @@ -21,7 +20,7 @@ aptos-language-e2e-tests = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } aptos-node-resource-metrics = { workspace = true } -aptos-push-metrics = { workspace = true } +aptos-push-metrics = { workspace = true } aptos-secure-net = { workspace = true } aptos-storage-interface = { workspace = true } aptos-types = { workspace = true } diff --git a/execution/executor-service/src/test_utils.rs b/execution/executor-service/src/test_utils.rs index 520efbd3fa340..ce50168ae7eb0 100644 --- a/execution/executor-service/src/test_utils.rs +++ b/execution/executor-service/src/test_utils.rs @@ -1,7 +1,6 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; use aptos_block_partitioner::{v2::config::PartitionerV2Config, PartitionerConfig}; use aptos_language_e2e_tests::{ account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore, @@ -18,6 +17,7 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, Transaction, TransactionOutput, }, + txn_provider::default::DefaultTxnProvider, }; use aptos_vm::{ aptos_vm::AptosVMBlockExecutor, diff --git a/execution/executor-types/src/error.rs b/execution/executor-types/src/error.rs index 0e3367d0ec3ca..e8134aef2f396 100644 --- a/execution/executor-types/src/error.rs +++ b/execution/executor-types/src/error.rs @@ -41,6 +41,9 @@ pub enum ExecutorError { #[error("request timeout")] CouldNotGetData, + + #[error("committed transactions get failed")] + CouldNotGetCommittedTransactions, } impl From for ExecutorError { diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index fab455b1c7c77..dfde7d5fdebad 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -17,7 +17,7 @@ use aptos_types::{ state_store::{state_key::StateKey, state_value::StateValue}, transaction::{ Transaction, TransactionInfo, TransactionListWithProof, TransactionOutputListWithProof, - Version, + TransactionStatus, Version, }, write_set::WriteSet, }; @@ -145,7 +145,7 @@ pub trait BlockExecutorTrait: Send + Sync { block: ExecutableBlock, parent_block_id: HashValue, onchain_config: BlockExecutorConfigFromOnchain, - ) -> ExecutorResult<()>; + ) -> ExecutorResult>; fn ledger_update( &self, diff --git a/execution/executor/Cargo.toml b/execution/executor/Cargo.toml index 2cbc86379c51c..7fc27599e38fd 100644 --- a/execution/executor/Cargo.toml +++ b/execution/executor/Cargo.toml @@ -14,7 +14,6 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } -aptos-block-executor = { workspace = true } aptos-consensus-types = { workspace = true } aptos-crypto = { workspace = true } aptos-drop-helper = { workspace = true } diff --git a/execution/executor/src/block_executor/mod.rs b/execution/executor/src/block_executor/mod.rs index 352f42d93e643..69fcf978555ff 100644 --- a/execution/executor/src/block_executor/mod.rs +++ b/execution/executor/src/block_executor/mod.rs @@ -36,6 +36,7 @@ use aptos_types::{ }, ledger_info::LedgerInfoWithSignatures, state_store::StateViewId, + transaction::TransactionStatus, }; use aptos_vm::VMBlockExecutor; use block_tree::BlockTree; @@ -95,7 +96,7 @@ where block: ExecutableBlock, parent_block_id: HashValue, onchain_config: BlockExecutorConfigFromOnchain, - ) -> ExecutorResult<()> { + ) -> ExecutorResult> { let _guard = CONCURRENCY_GAUGE.concurrency_with(&["block", "execute_and_state_checkpoint"]); self.maybe_initialize()?; @@ -181,7 +182,7 @@ where block: ExecutableBlock, parent_block_id: HashValue, onchain_config: BlockExecutorConfigFromOnchain, - ) -> ExecutorResult<()> { + ) -> ExecutorResult> { let _timer = BLOCK_EXECUTION_WORKFLOW_WHOLE.start_timer(); let ExecutableBlock { block_id, @@ -260,11 +261,12 @@ where }; let output = PartialStateComputeResult::new(execution_output); output.set_state_checkpoint_output(state_checkpoint_output); + let statuses_for_input_txns = output.execution_output.statuses_for_input_txns.clone(); let _ = self .block_tree .add_block(parent_block_id, block_id, output)?; - Ok(()) + Ok(statuses_for_input_txns) } fn ledger_update( diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index 17d415fa9918d..3dfc88e35f1be 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -4,7 +4,6 @@ use crate::block_executor::BlockExecutor; use anyhow::Result; -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; use aptos_crypto::{hash::SPARSE_MERKLE_PLACEHOLDER_HASH, HashValue}; use aptos_executor_types::BlockExecutorTrait; use aptos_storage_interface::{chunk_to_commit::ChunkToCommit, DbReader, DbReaderWriter, DbWriter}; @@ -22,6 +21,7 @@ use aptos_types::{ }, BlockOutput, Transaction, TransactionOutput, Version, }, + txn_provider::TxnProvider, vm_status::VMStatus, }; use aptos_vm::{ @@ -78,7 +78,7 @@ impl VMBlockExecutor for FakeVM { fn execute_block( &self, - _txn_provider: &DefaultTxnProvider, + _txn_provider: &dyn TxnProvider, _state_view: &impl StateView, _onchain_config: BlockExecutorConfigFromOnchain, _transaction_slice_metadata: TransactionSliceMetadata, diff --git a/execution/executor/src/tests/mock_vm/mock_vm_test.rs b/execution/executor/src/tests/mock_vm/mock_vm_test.rs index 79ec5a32d65b4..6fe768d39ad00 100644 --- a/execution/executor/src/tests/mock_vm/mock_vm_test.rs +++ b/execution/executor/src/tests/mock_vm/mock_vm_test.rs @@ -3,12 +3,12 @@ // SPDX-License-Identifier: Apache-2.0 use super::{balance_ap, encode_mint_transaction, encode_transfer_transaction, seqnum_ap, MockVM}; -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; use aptos_types::{ account_address::AccountAddress, bytes::NumToBytes, state_store::{state_key::StateKey, MockStateView}, transaction::signature_verified_transaction::into_signature_verified_block, + txn_provider::default::DefaultTxnProvider, write_set::WriteOp, }; use aptos_vm::VMBlockExecutor; diff --git a/execution/executor/src/tests/mock_vm/mod.rs b/execution/executor/src/tests/mock_vm/mod.rs index b1ab87984c7a4..f1c86847df27a 100644 --- a/execution/executor/src/tests/mock_vm/mod.rs +++ b/execution/executor/src/tests/mock_vm/mod.rs @@ -6,7 +6,6 @@ mod mock_vm_test; use anyhow::Result; -use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_crypto::{ed25519::Ed25519PrivateKey, PrivateKey, Uniform}; use aptos_types::{ account_address::AccountAddress, @@ -27,6 +26,7 @@ use aptos_types::{ TransactionArgument, TransactionAuxiliaryData, TransactionOutput, TransactionPayload, TransactionStatus, WriteSetPayload, }, + txn_provider::TxnProvider, vm_status::{StatusCode, VMStatus}, write_set::{WriteOp, WriteSet, WriteSetMut}, }; @@ -68,7 +68,7 @@ impl VMBlockExecutor for MockVM { fn execute_block( &self, - txn_provider: &DefaultTxnProvider, + txn_provider: &dyn TxnProvider, state_view: &impl StateView, _onchain_config: BlockExecutorConfigFromOnchain, _transaction_slice_metadata: TransactionSliceMetadata, diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index 2a7d45795a6ca..2ababf70196fb 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -6,9 +6,6 @@ use crate::{ metrics::{EXECUTOR_ERRORS, OTHER_TIMERS}, }; use anyhow::{anyhow, Result}; -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; -#[cfg(feature = "consensus-only-perf-test")] -use aptos_block_executor::txn_provider::TxnProvider; use aptos_crypto::HashValue; use aptos_executor_service::{ local_executor_helper::SHARDED_BLOCK_EXECUTOR, @@ -41,6 +38,7 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, BlockEndInfo, BlockOutput, Transaction, TransactionOutput, TransactionStatus, Version, }, + txn_provider::{default::DefaultTxnProvider, TxnProvider}, write_set::{TransactionWrite, WriteSet}, }; use aptos_vm::VMBlockExecutor; @@ -59,9 +57,19 @@ impl DoGetExecutionOutput { ) -> Result { let out = match transactions { ExecutableTransactions::Unsharded(txns) => { + let txn_provider = DefaultTxnProvider::new(txns); + Self::by_transaction_execution_unsharded::( + executor, + &txn_provider, + state_view, + onchain_config, + transaction_slice_metadata, + )? + }, + ExecutableTransactions::UnshardedBlocking(blocking_txn_provider) => { Self::by_transaction_execution_unsharded::( executor, - txns, + &blocking_txn_provider, state_view, onchain_config, transaction_slice_metadata, @@ -92,17 +100,16 @@ impl DoGetExecutionOutput { fn by_transaction_execution_unsharded( executor: &V, - transactions: Vec, + txn_provider: &dyn TxnProvider, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, transaction_slice_metadata: TransactionSliceMetadata, ) -> Result { let append_state_checkpoint_to_block = transaction_slice_metadata.append_state_checkpoint_to_block(); - let txn_provider = DefaultTxnProvider::new(transactions); let block_output = Self::execute_block::( executor, - &txn_provider, + txn_provider, &state_view, onchain_config, transaction_slice_metadata, @@ -112,7 +119,7 @@ impl DoGetExecutionOutput { Parser::parse( state_view.next_version(), txn_provider - .txns + .to_vec() .into_iter() .map(|t| t.into_inner()) .collect(), @@ -217,7 +224,7 @@ impl DoGetExecutionOutput { #[cfg(not(feature = "consensus-only-perf-test"))] fn execute_block( executor: &V, - txn_provider: &DefaultTxnProvider, + txn_provider: &dyn TxnProvider, state_view: &CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, transaction_slice_metadata: TransactionSliceMetadata, diff --git a/experimental/execution/ptx-executor/Cargo.toml b/experimental/execution/ptx-executor/Cargo.toml index 4a9ce2a43f317..0da896d31500a 100644 --- a/experimental/execution/ptx-executor/Cargo.toml +++ b/experimental/execution/ptx-executor/Cargo.toml @@ -13,7 +13,6 @@ repository = { workspace = true } rust-version = { workspace = true } [dependencies] -aptos-block-executor = { workspace = true } aptos-experimental-runtimes = { workspace = true } aptos-infallible = { workspace = true } aptos-logger = { workspace = true } diff --git a/experimental/execution/ptx-executor/src/lib.rs b/experimental/execution/ptx-executor/src/lib.rs index 43a6c9ba24683..8074088bb51e9 100644 --- a/experimental/execution/ptx-executor/src/lib.rs +++ b/experimental/execution/ptx-executor/src/lib.rs @@ -21,7 +21,6 @@ use crate::{ analyzer::PtxAnalyzer, finalizer::PtxFinalizer, metrics::TIMER, runner::PtxRunner, scheduler::PtxScheduler, sorter::PtxSorter, state_reader::PtxStateReader, }; -use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_infallible::Mutex; use aptos_metrics_core::TimerHelper; @@ -35,6 +34,7 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput, TransactionOutput, }, + txn_provider::TxnProvider, }; use aptos_vm::{ sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, @@ -52,7 +52,7 @@ impl VMBlockExecutor for PtxBlockExecutor { fn execute_block( &self, - txn_provider: &DefaultTxnProvider, + txn_provider: &dyn TxnProvider, state_view: &(impl StateView + Sync), _onchain_config: BlockExecutorConfigFromOnchain, _transaction_slice_metadata: TransactionSliceMetadata, diff --git a/storage/db-tool/Cargo.toml b/storage/db-tool/Cargo.toml index cfdbaff99bc22..b2feeb952750d 100644 --- a/storage/db-tool/Cargo.toml +++ b/storage/db-tool/Cargo.toml @@ -14,7 +14,6 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos-backup-cli = { workspace = true } -aptos-block-executor = { workspace = true } aptos-config = { workspace = true } aptos-db = { workspace = true, features = ["db-debugger"] } aptos-executor = { workspace = true } diff --git a/storage/db-tool/src/replay_on_archive.rs b/storage/db-tool/src/replay_on_archive.rs index d4bd05161a90c..09250295ae4f2 100644 --- a/storage/db-tool/src/replay_on_archive.rs +++ b/storage/db-tool/src/replay_on_archive.rs @@ -4,7 +4,6 @@ use anyhow::{bail, Error, Ok, Result}; use aptos_backup_cli::utils::{ReplayConcurrencyLevelOpt, RocksdbOpt}; -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; use aptos_config::config::{ StorageDirPaths, BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, NO_OP_STORAGE_PRUNER_CONFIG, @@ -18,6 +17,7 @@ use aptos_types::{ signature_verified_transaction::SignatureVerifiedTransaction, Transaction, TransactionInfo, Version, }, + txn_provider::default::DefaultTxnProvider, write_set::WriteSet, }; use aptos_vm::{aptos_vm::AptosVMBlockExecutor, AptosVM, VMBlockExecutor}; @@ -30,6 +30,7 @@ use std::{ sync::{atomic::AtomicU64, Arc}, time::Instant, }; + // Replay Verify controller is responsible for providing legit range with start and end versions. #[derive(Parser)] pub struct Opt { diff --git a/testsuite/smoke-test/src/smoke_test_environment.rs b/testsuite/smoke-test/src/smoke_test_environment.rs index f2c32fa25ad99..ad4438c9cbc04 100644 --- a/testsuite/smoke-test/src/smoke_test_environment.rs +++ b/testsuite/smoke-test/src/smoke_test_environment.rs @@ -17,7 +17,7 @@ use rand::rngs::OsRng; use std::{num::NonZeroUsize, sync::Arc}; use tokio::task::JoinHandle; -const SWARM_BUILD_NUM_RETRIES: u8 = 3; +const SWARM_BUILD_NUM_RETRIES: u8 = 0; #[derive(Clone)] pub struct SwarmBuilder { diff --git a/types/src/block_executor/partitioner.rs b/types/src/block_executor/partitioner.rs index b42eb4efcd8a4..0625fa2926ccc 100644 --- a/types/src/block_executor/partitioner.rs +++ b/types/src/block_executor/partitioner.rs @@ -1,10 +1,15 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::transaction::{ - analyzed_transaction::{AnalyzedTransaction, StorageLocation}, - signature_verified_transaction::{into_signature_verified_block, SignatureVerifiedTransaction}, - Transaction, +use crate::{ + transaction::{ + analyzed_transaction::{AnalyzedTransaction, StorageLocation}, + signature_verified_transaction::{ + into_signature_verified_block, SignatureVerifiedTransaction, + }, + Transaction, + }, + txn_provider::{blocking_txn_provider::BlockingTxnProvider, TxnProvider}, }; use aptos_crypto::HashValue; use serde::{Deserialize, Serialize}; @@ -549,6 +554,7 @@ impl PartitionedTransactions { #[derive(Clone)] pub enum ExecutableTransactions { Unsharded(Vec), + UnshardedBlocking(BlockingTxnProvider), Sharded(PartitionedTransactions), } @@ -556,6 +562,7 @@ impl ExecutableTransactions { pub fn num_transactions(&self) -> usize { match self { ExecutableTransactions::Unsharded(transactions) => transactions.len(), + ExecutableTransactions::UnshardedBlocking(provider) => provider.num_txns(), ExecutableTransactions::Sharded(partitioned_txns) => partitioned_txns.num_txns(), } } @@ -563,6 +570,7 @@ impl ExecutableTransactions { pub fn into_txns(self) -> Vec { match self { ExecutableTransactions::Unsharded(txns) => txns, + ExecutableTransactions::UnshardedBlocking(provider) => provider.to_vec(), ExecutableTransactions::Sharded(partitioned) => { PartitionedTransactions::flatten(partitioned) .into_iter() diff --git a/types/src/lib.rs b/types/src/lib.rs index 9081b6c0f0a4d..ba58b5c7db6c2 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -44,6 +44,7 @@ pub mod test_helpers; pub mod timestamp; pub mod transaction; pub mod trusted_state; +pub mod txn_provider; pub mod utility_coin; pub mod validator_config; pub mod validator_info; diff --git a/types/src/on_chain_config/consensus_config.rs b/types/src/on_chain_config/consensus_config.rs index 4d087b3ffb89a..ef9fe42f4eef2 100644 --- a/types/src/on_chain_config/consensus_config.rs +++ b/types/src/on_chain_config/consensus_config.rs @@ -273,7 +273,7 @@ impl OnChainConsensusConfig { // TODO: actually add to onchain config pub fn window_size(&self) -> usize { - 2 + 20 } pub fn is_dag_enabled(&self) -> bool { diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index b557dfc00e877..cb1b322af62b8 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -77,7 +77,12 @@ pub use script::{ TypeArgumentABI, }; use serde::de::DeserializeOwned; -use std::{collections::BTreeSet, hash::Hash, ops::Deref, sync::atomic::AtomicU64}; +use std::{ + collections::BTreeSet, + hash::Hash, + ops::Deref, + sync::{atomic::AtomicU64, Arc}, +}; pub type Version = u64; // Height - also used for MVCC in StateDB pub type AtomicVersion = AtomicU64; @@ -463,7 +468,7 @@ impl WriteSetPayload { /// **IMPORTANT:** The signature of a `SignedTransaction` is not guaranteed to be verified. For a /// transaction whose signature is statically guaranteed to be verified, see /// [`SignatureCheckedTransaction`]. -#[derive(Clone, Eq, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct SignedTransaction { /// The raw transaction raw_txn: RawTransaction, @@ -483,7 +488,11 @@ pub struct SignedTransaction { /// A cached hash of the transaction. #[serde(skip)] - committed_hash: OnceCell, + committed_hash: Arc>, + + /// A cached signature verification result of the transaction. + #[serde(skip)] + valid_signature: Arc>, } /// PartialEq ignores the cached OnceCell fields that may or may not be initialized. @@ -493,6 +502,8 @@ impl PartialEq for SignedTransaction { } } +impl Eq for SignedTransaction {} + /// A transaction for which the signature has been verified. Created by /// [`SignedTransaction::check_signature`] and [`RawTransaction::sign`]. #[derive(Clone, Debug, Eq, PartialEq)] @@ -542,7 +553,8 @@ impl SignedTransaction { authenticator, raw_txn_size: OnceCell::new(), authenticator_size: OnceCell::new(), - committed_hash: OnceCell::new(), + committed_hash: Arc::new(OnceCell::new()), + valid_signature: Arc::new(OnceCell::new()), } } @@ -557,7 +569,8 @@ impl SignedTransaction { authenticator, raw_txn_size: OnceCell::new(), authenticator_size: OnceCell::new(), - committed_hash: OnceCell::new(), + committed_hash: Arc::new(OnceCell::new()), + valid_signature: Arc::new(OnceCell::new()), } } @@ -718,6 +731,12 @@ impl SignedTransaction { Ok(()) } + pub fn is_valid_signature(&self) -> bool { + *self + .valid_signature + .get_or_init(|| self.verify_signature().is_ok()) + } + pub fn contains_duplicate_signers(&self) -> bool { let mut all_signer_addresses = self.authenticator.secondary_signer_addresses(); all_signer_addresses.push(self.sender()); diff --git a/types/src/transaction/signature_verified_transaction.rs b/types/src/transaction/signature_verified_transaction.rs index cd35b573b42c1..ccccc5f919fdc 100644 --- a/types/src/transaction/signature_verified_transaction.rs +++ b/types/src/transaction/signature_verified_transaction.rs @@ -79,9 +79,9 @@ impl BlockExecutableTransaction for SignatureVerifiedTransaction { impl From for SignatureVerifiedTransaction { fn from(txn: Transaction) -> Self { match txn { - Transaction::UserTransaction(txn) => match txn.verify_signature() { - Ok(_) => SignatureVerifiedTransaction::Valid(Transaction::UserTransaction(txn)), - Err(_) => SignatureVerifiedTransaction::Invalid(Transaction::UserTransaction(txn)), + Transaction::UserTransaction(txn) => match txn.is_valid_signature() { + true => SignatureVerifiedTransaction::Valid(Transaction::UserTransaction(txn)), + false => SignatureVerifiedTransaction::Invalid(Transaction::UserTransaction(txn)), }, _ => SignatureVerifiedTransaction::Valid(txn), } diff --git a/types/src/transaction/use_case.rs b/types/src/transaction/use_case.rs index d947b76874b44..098394aa3e5a4 100644 --- a/types/src/transaction/use_case.rs +++ b/types/src/transaction/use_case.rs @@ -1,7 +1,9 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::transaction::SignedTransaction; +use crate::transaction::{ + signature_verified_transaction::SignatureVerifiedTransaction, SignedTransaction, +}; use move_core_types::account_address::AccountAddress; #[derive(Clone, Eq, Hash, PartialEq)] @@ -52,3 +54,27 @@ impl UseCaseAwareTransaction for SignedTransaction { } } } + +impl UseCaseAwareTransaction for SignatureVerifiedTransaction { + fn parse_sender(&self) -> AccountAddress { + let txn = match self { + SignatureVerifiedTransaction::Valid(txn) => txn, + SignatureVerifiedTransaction::Invalid(txn) => txn, + }; + match txn { + crate::transaction::Transaction::UserTransaction(txn) => txn.parse_sender(), + _ => unreachable!("UseCaseAwareTransaction should not be given non-UserTransaction"), + } + } + + fn parse_use_case(&self) -> UseCaseKey { + let txn = match self { + SignatureVerifiedTransaction::Valid(txn) => txn, + SignatureVerifiedTransaction::Invalid(txn) => txn, + }; + match txn { + crate::transaction::Transaction::UserTransaction(txn) => txn.parse_use_case(), + _ => unreachable!("UseCaseAwareTransaction should not be given non-UserTransaction"), + } + } +} diff --git a/aptos-move/block-executor/src/txn_provider/blocking_txns_provider.rs b/types/src/txn_provider/blocking_txn_provider.rs similarity index 57% rename from aptos-move/block-executor/src/txn_provider/blocking_txns_provider.rs rename to types/src/txn_provider/blocking_txn_provider.rs index c866271f41376..9730bf346d179 100644 --- a/aptos-move/block-executor/src/txn_provider/blocking_txns_provider.rs +++ b/types/src/txn_provider/blocking_txn_provider.rs @@ -1,19 +1,22 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::txn_provider::TxnProvider; -use aptos_mvhashmap::types::TxnIndex; -use aptos_types::transaction::BlockExecutableTransaction as Transaction; +use crate::{ + transaction::BlockExecutableTransaction as Transaction, + txn_provider::{TxnIndex, TxnProvider}, +}; use once_cell::sync::OnceCell; +use std::sync::Arc; +#[derive(Clone)] pub struct BlockingTxnProvider { - txns: Vec>, + txns: Arc>>, } #[allow(dead_code)] impl BlockingTxnProvider { pub fn new(num_txns: usize) -> Self { - let txns = vec![OnceCell::new(); num_txns]; + let txns = Arc::new(vec![OnceCell::new(); num_txns]); Self { txns } } @@ -30,6 +33,16 @@ impl TxnProvider for BlockingTxnProvider } fn get_txn(&self, idx: TxnIndex) -> &T { - self.txns[idx as usize].wait() + let res = self.txns[idx as usize].wait(); + res + } + + fn to_vec(&self) -> Vec { + let mut txns = vec![]; + for i in 0..self.num_txns() as TxnIndex { + let txn = self.get_txn(i).clone(); + txns.push(txn); + } + txns } } diff --git a/aptos-move/block-executor/src/txn_provider/default.rs b/types/src/txn_provider/default.rs similarity index 79% rename from aptos-move/block-executor/src/txn_provider/default.rs rename to types/src/txn_provider/default.rs index 3f7881971856b..bb29bbd9439fb 100644 --- a/aptos-move/block-executor/src/txn_provider/default.rs +++ b/types/src/txn_provider/default.rs @@ -1,9 +1,10 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::txn_provider::TxnProvider; -use aptos_mvhashmap::types::TxnIndex; -use aptos_types::transaction::BlockExecutableTransaction as Transaction; +use crate::{ + transaction::BlockExecutableTransaction as Transaction, + txn_provider::{TxnIndex, TxnProvider}, +}; pub struct DefaultTxnProvider { pub txns: Vec, @@ -27,6 +28,10 @@ impl TxnProvider for DefaultTxnProvider { fn get_txn(&self, idx: TxnIndex) -> &T { &self.txns[idx as usize] } + + fn to_vec(&self) -> Vec { + self.txns.clone() + } } impl Iterator for DefaultTxnProvider { diff --git a/aptos-move/block-executor/src/txn_provider/mod.rs b/types/src/txn_provider/mod.rs similarity index 56% rename from aptos-move/block-executor/src/txn_provider/mod.rs rename to types/src/txn_provider/mod.rs index 88a11f2478b1c..2f178b9eb62f5 100644 --- a/aptos-move/block-executor/src/txn_provider/mod.rs +++ b/types/src/txn_provider/mod.rs @@ -1,16 +1,19 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -mod blocking_txns_provider; +pub mod blocking_txn_provider; pub mod default; -use aptos_mvhashmap::types::TxnIndex; -use aptos_types::transaction::BlockExecutableTransaction as Transaction; +use crate::transaction::BlockExecutableTransaction as Transaction; -pub trait TxnProvider { +pub type TxnIndex = u32; + +pub trait TxnProvider: Send + Sync { /// Get total number of transactions fn num_txns(&self) -> usize; /// Get a reference of the txn object by its index. fn get_txn(&self, idx: TxnIndex) -> &T; + + fn to_vec(&self) -> Vec; }