diff --git a/config/src/config/consensus_config.rs b/config/src/config/consensus_config.rs index 244512572a8b8..cd5429ff81556 100644 --- a/config/src/config/consensus_config.rs +++ b/config/src/config/consensus_config.rs @@ -2,6 +2,7 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 +use super::DEFEAULT_MAX_BATCH_TXNS; use crate::config::{ config_sanitizer::ConfigSanitizer, node_config_loader::NodeType, Error, NodeConfig, QuorumStoreConfig, ReliableBroadcastConfig, SafetyRulesConfig, BATCH_PADDING_BYTES, @@ -13,12 +14,14 @@ use serde::{Deserialize, Serialize}; use std::path::PathBuf; // NOTE: when changing, make sure to update QuorumStoreBackPressureConfig::backlog_txn_limit_count as well. -const MAX_SENDING_BLOCK_UNIQUE_TXNS: u64 = 1900; +const MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING: u64 = 1900; const MAX_SENDING_BLOCK_TXNS: u64 = 4500; pub(crate) static MAX_RECEIVING_BLOCK_TXNS: Lazy = Lazy::new(|| 10000.max(2 * MAX_SENDING_BLOCK_TXNS)); // stop reducing size at this point, so 1MB transactions can still go through const MIN_BLOCK_BYTES_OVERRIDE: u64 = 1024 * 1024 + BATCH_PADDING_BYTES as u64; +// We should reduce block size only until two QS batch sizes. +const MIN_BLOCK_TXNS_AFTER_FILTERING: u64 = DEFEAULT_MAX_BATCH_TXNS as u64 * 2; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -26,7 +29,7 @@ pub struct ConsensusConfig { // length of inbound queue of messages pub max_network_channel_size: usize, pub max_sending_block_txns: u64, - pub max_sending_block_unique_txns: u64, + pub max_sending_block_txns_after_filtering: u64, pub max_sending_block_bytes: u64, pub max_sending_inline_txns: u64, pub max_sending_inline_bytes: u64, @@ -65,6 +68,7 @@ pub struct ConsensusConfig { pub intra_consensus_channel_buffer_size: usize, pub quorum_store: QuorumStoreConfig, pub vote_back_pressure_limit: u64, + pub execution_backpressure: Option, pub pipeline_backpressure: Vec, // Used to decide if backoff is needed. // must match one of the CHAIN_HEALTH_WINDOW_SIZES values. @@ -125,10 +129,42 @@ impl Default for DelayedQcAggregatorConfig { } } +/// Execution backpressure which handles gas/s variance, +/// and adjusts block sizes to "recalibrate it" to wanted range. +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ExecutionBackpressureConfig { + /// At what latency does this backpressure level activate + pub back_pressure_pipeline_latency_limit_ms: u64, + /// Look at execution time for this many last blocks + pub num_blocks_to_look_at: usize, + + /// Only blocks above this threshold are treated as potentially needed recalibration + /// This is needed as small blocks have overheads that are irrelevant to the transactions + /// being executed. + pub min_block_time_ms_to_activate: usize, + + /// Backpressure has a second check, where it only activates if + /// at least `min_blocks_to_activate` are above `min_block_time_ms_to_activate` + pub min_blocks_to_activate: usize, + + /// Out of blocks in the window, take this percentile (from 0-1 range), to use for calibration. + /// i.e. 0.5 means take a median of last `num_blocks_to_look_at` blocks. + pub percentile: f64, + /// Recalibrating max block size, to target blocks taking this long. + pub target_block_time_ms: usize, + + /// We compute re-calibrated block size, and use that for max_txns_to_execute. + /// We then set max_txns_in_block to `reordering_ovarpacking_factor` times that. + /// Currently set to 1.0 (i.e. no overpacking), but after execution pool and cost of + /// overpacking being minimal - we should increase it. + pub reordering_ovarpacking_factor: f64, +} + #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] pub struct PipelineBackpressureValues { + // At what latency does this backpressure level activate pub back_pressure_pipeline_latency_limit_ms: u64, - pub max_sending_block_txns_override: u64, + pub max_sending_block_txns_after_filtering_override: u64, pub max_sending_block_bytes_override: u64, // If there is backpressure, giving some more breathing room to go through the backlog, // and making sure rounds don't go extremely fast (even if they are smaller blocks) @@ -143,7 +179,7 @@ pub struct PipelineBackpressureValues { pub struct ChainHealthBackoffValues { pub backoff_if_below_participating_voting_power_percentage: usize, - pub max_sending_block_txns_override: u64, + pub max_sending_block_txns_after_filtering_override: u64, pub max_sending_block_bytes_override: u64, pub backoff_proposal_delay_ms: u64, @@ -155,7 +191,7 @@ impl Default for ConsensusConfig { ConsensusConfig { max_network_channel_size: 1024, max_sending_block_txns: MAX_SENDING_BLOCK_TXNS, - max_sending_block_unique_txns: MAX_SENDING_BLOCK_UNIQUE_TXNS, + max_sending_block_txns_after_filtering: MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes: 3 * 1024 * 1024, // 3MB max_receiving_block_txns: *MAX_RECEIVING_BLOCK_TXNS, max_sending_inline_txns: 100, @@ -187,6 +223,15 @@ impl Default for ConsensusConfig { // Considering block gas limit and pipeline backpressure should keep number of blocks // in the pipline very low, we can keep this limit pretty low, too. vote_back_pressure_limit: 7, + execution_backpressure: Some(ExecutionBackpressureConfig { + back_pressure_pipeline_latency_limit_ms: 500, + num_blocks_to_look_at: 12, + min_blocks_to_activate: 4, + percentile: 0.5, + target_block_time_ms: 300, + min_block_time_ms_to_activate: 100, + reordering_ovarpacking_factor: 1.0, + }), pipeline_backpressure: vec![ PipelineBackpressureValues { // pipeline_latency looks how long has the oldest block still in pipeline @@ -195,65 +240,55 @@ impl Default for ConsensusConfig { // pipeline once quorum on execution result among validators has been reached // (so-(badly)-called "commit certificate"), meaning 2f+1 validators have finished execution. back_pressure_pipeline_latency_limit_ms: 800, - max_sending_block_txns_override: MAX_SENDING_BLOCK_TXNS, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backpressure_proposal_delay_ms: 100, max_txns_from_block_to_execute: None, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 1100, - max_sending_block_txns_override: MAX_SENDING_BLOCK_TXNS, + back_pressure_pipeline_latency_limit_ms: 1200, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backpressure_proposal_delay_ms: 200, max_txns_from_block_to_execute: None, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 1400, - max_sending_block_txns_override: 2000, - max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, + back_pressure_pipeline_latency_limit_ms: 1600, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, + max_sending_block_bytes_override: 5 * 1024 * 1024, backpressure_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, + // with execution backpressure, only later start reducing block size PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 1700, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 2500, + max_sending_block_txns_after_filtering_override: 1000, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 400, + backpressure_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 2000, - max_sending_block_txns_override: 1000, - max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, - max_txns_from_block_to_execute: Some(400), - }, - PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 2300, - max_sending_block_txns_override: 1000, - max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, - max_txns_from_block_to_execute: Some(150), - }, - PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 2700, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 3500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, - max_txns_from_block_to_execute: Some(50), + backpressure_proposal_delay_ms: 300, + max_txns_from_block_to_execute: Some(200), }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 3100, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 4500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, - max_txns_from_block_to_execute: Some(20), + backpressure_proposal_delay_ms: 300, + max_txns_from_block_to_execute: Some(30), }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 3500, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 6000, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, + backpressure_proposal_delay_ms: 300, // in practice, latencies and delay make it such that ~2 blocks/s is max, // meaning that most aggressively we limit to ~10 TPS // For transactions that are more expensive than that, we should @@ -265,44 +300,45 @@ impl Default for ConsensusConfig { chain_health_backoff: vec![ ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 80, - max_sending_block_txns_override: 10000, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backoff_proposal_delay_ms: 150, max_txns_from_block_to_execute: None, }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 78, - max_sending_block_txns_override: 2000, + max_sending_block_txns_after_filtering_override: 2000, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 76, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 74, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backoff_proposal_delay_ms: 500, + backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: Some(100), }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 72, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backoff_proposal_delay_ms: 500, + backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: Some(25), }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 70, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backoff_proposal_delay_ms: 500, + backoff_proposal_delay_ms: 300, // in practice, latencies and delay make it such that ~2 blocks/s is max, // meaning that most aggressively we limit to ~10 TPS // For transactions that are more expensive than that, we should @@ -402,7 +438,7 @@ impl ConsensusConfig { for backpressure_values in &config.pipeline_backpressure { recv_batch_send_block_pairs.push(( config.quorum_store.receiver_max_batch_txns as u64, - backpressure_values.max_sending_block_txns_override, + backpressure_values.max_sending_block_txns_after_filtering_override, format!( "backpressure {} ms: txns", backpressure_values.back_pressure_pipeline_latency_limit_ms, @@ -420,7 +456,7 @@ impl ConsensusConfig { for backoff_values in &config.chain_health_backoff { recv_batch_send_block_pairs.push(( config.quorum_store.receiver_max_batch_txns as u64, - backoff_values.max_sending_block_txns_override, + backoff_values.max_sending_block_txns_after_filtering_override, format!( "backoff {} %: txns", backoff_values.backoff_if_below_participating_voting_power_percentage, @@ -640,7 +676,7 @@ mod test { consensus: ConsensusConfig { pipeline_backpressure: vec![PipelineBackpressureValues { back_pressure_pipeline_latency_limit_ms: 0, - max_sending_block_txns_override: 350, + max_sending_block_txns_after_filtering_override: 350, max_sending_block_bytes_override: 0, backpressure_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, @@ -671,7 +707,7 @@ mod test { consensus: ConsensusConfig { pipeline_backpressure: vec![PipelineBackpressureValues { back_pressure_pipeline_latency_limit_ms: 0, - max_sending_block_txns_override: 251, + max_sending_block_txns_after_filtering_override: 251, max_sending_block_bytes_override: 100, backpressure_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, @@ -698,7 +734,7 @@ mod test { consensus: ConsensusConfig { chain_health_backoff: vec![ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 0, - max_sending_block_txns_override: 100, + max_sending_block_txns_after_filtering_override: 100, max_sending_block_bytes_override: 0, backoff_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, @@ -725,7 +761,7 @@ mod test { consensus: ConsensusConfig { chain_health_backoff: vec![ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 0, - max_sending_block_txns_override: 0, + max_sending_block_txns_after_filtering_override: 0, max_sending_block_bytes_override: 100, backoff_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 6e442023b1ffa..b1e2cbba2c44d 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; pub const BATCH_PADDING_BYTES: usize = 160; +pub const DEFEAULT_MAX_BATCH_TXNS: usize = 250; const DEFAULT_MAX_NUM_BATCHES: usize = 20; #[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)] @@ -101,7 +102,7 @@ impl Default for QuorumStoreConfig { batch_generation_poll_interval_ms: 25, batch_generation_min_non_empty_interval_ms: 200, batch_generation_max_interval_ms: 250, - sender_max_batch_txns: 250, + sender_max_batch_txns: DEFEAULT_MAX_BATCH_TXNS, // TODO: on next release, remove BATCH_PADDING_BYTES sender_max_batch_bytes: 1024 * 1024 - BATCH_PADDING_BYTES, sender_max_num_batches: DEFAULT_MAX_NUM_BATCHES, diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 6c62a2cceff4c..bee98c827bdd1 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -305,6 +305,29 @@ impl Payload { } } + pub fn len_for_execution(&self) -> u64 { + match self { + Payload::DirectMempool(txns) => txns.len() as u64, + Payload::InQuorumStore(proof_with_status) => proof_with_status.len() as u64, + Payload::InQuorumStoreWithLimit(proof_with_status) => { + // here we return the actual length of the payload; limit is considered at the stage + // where we prepare the block from the payload + (proof_with_status.proof_with_data.len() as u64) + .min(proof_with_status.max_txns_to_execute.unwrap_or(u64::MAX)) + }, + Payload::QuorumStoreInlineHybrid( + inline_batches, + proof_with_data, + max_txns_to_execute, + ) => ((proof_with_data.len() + + inline_batches + .iter() + .map(|(_, txns)| txns.len()) + .sum::()) as u64) + .min(max_txns_to_execute.unwrap_or(u64::MAX)), + } + } + pub fn is_empty(&self) -> bool { match self { Payload::DirectMempool(txns) => txns.is_empty(), diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index 2935cba7dedd5..88f356723452e 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -12,8 +12,11 @@ use crate::{ use aptos_crypto::hash::HashValue; use aptos_executor_types::StateComputeResult; use aptos_types::{ - block_info::BlockInfo, contract_event::ContractEvent, randomness::Randomness, - transaction::SignedTransaction, validator_txn::ValidatorTransaction, + block_info::BlockInfo, + contract_event::ContractEvent, + randomness::Randomness, + transaction::{SignedTransaction, TransactionStatus}, + validator_txn::ValidatorTransaction, }; use once_cell::sync::OnceCell; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -38,6 +41,7 @@ pub struct PipelinedBlock { state_compute_result: StateComputeResult, randomness: OnceCell, pipeline_insertion_time: OnceCell, + execution_summary: Arc>, } impl Serialize for PipelinedBlock { @@ -91,6 +95,7 @@ impl<'de> Deserialize<'de> for PipelinedBlock { state_compute_result, randomness: OnceCell::new(), pipeline_insertion_time: OnceCell::new(), + execution_summary: Arc::new(OnceCell::new()), }; if let Some(r) = randomness { block.set_randomness(r); @@ -104,9 +109,34 @@ impl PipelinedBlock { mut self, input_transactions: Vec, result: StateComputeResult, + execution_time: Duration, ) -> Self { self.state_compute_result = result; self.input_transactions = input_transactions; + + let mut to_commit = 0; + let mut to_retry = 0; + for txn in self.state_compute_result.compute_status_for_input_txns() { + match txn { + TransactionStatus::Keep(_) => to_commit += 1, + TransactionStatus::Retry => to_retry += 1, + _ => {}, + } + } + + assert!(self + .execution_summary + .set(ExecutionSummary { + payload_len: self + .block + .payload() + .map_or(0, |payload| payload.len_for_execution()), + to_commit, + to_retry, + execution_time, + }) + .is_ok()); + self } @@ -143,6 +173,7 @@ impl PipelinedBlock { state_compute_result, randomness: OnceCell::new(), pipeline_insertion_time: OnceCell::new(), + execution_summary: Arc::new(OnceCell::new()), } } @@ -153,6 +184,7 @@ impl PipelinedBlock { state_compute_result: StateComputeResult::new_dummy(), randomness: OnceCell::new(), pipeline_insertion_time: OnceCell::new(), + execution_summary: Arc::new(OnceCell::new()), } } @@ -250,4 +282,16 @@ impl PipelinedBlock { pub fn elapsed_in_pipeline(&self) -> Option { self.pipeline_insertion_time.get().map(|t| t.elapsed()) } + + pub fn get_execution_summary(&self) -> Option { + self.execution_summary.get().cloned() + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ExecutionSummary { + pub payload_len: u64, + pub to_commit: u64, + pub to_retry: u64, + pub execution_time: Duration, } diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index 36549f458e195..1c099e69aedc8 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -19,8 +19,12 @@ use crate::{ }; use anyhow::{bail, ensure, format_err, Context}; use aptos_consensus_types::{ - block::Block, common::Round, pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, - sync_info::SyncInfo, timeout_2chain::TwoChainTimeoutCertificate, + block::Block, + common::Round, + pipelined_block::{ExecutionSummary, PipelinedBlock}, + quorum_cert::QuorumCert, + sync_info::SyncInfo, + timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo, }; use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue}; @@ -460,6 +464,70 @@ impl BlockStore { .store(back_pressure, Ordering::Relaxed) } + pub fn pending_blocks(&self) -> Arc> { + self.pending_blocks.clone() + } +} + +impl BlockReader for BlockStore { + fn block_exists(&self, block_id: HashValue) -> bool { + self.inner.read().block_exists(&block_id) + } + + fn get_block(&self, block_id: HashValue) -> Option> { + self.inner.read().get_block(&block_id) + } + + fn ordered_root(&self) -> Arc { + self.inner.read().ordered_root() + } + + fn commit_root(&self) -> Arc { + self.inner.read().commit_root() + } + + fn get_quorum_cert_for_block(&self, block_id: HashValue) -> Option> { + self.inner.read().get_quorum_cert_for_block(&block_id) + } + + fn path_from_ordered_root(&self, block_id: HashValue) -> Option>> { + self.inner.read().path_from_ordered_root(block_id) + } + + fn path_from_commit_root(&self, block_id: HashValue) -> Option>> { + self.inner.read().path_from_commit_root(block_id) + } + + fn highest_certified_block(&self) -> Arc { + self.inner.read().highest_certified_block() + } + + fn highest_quorum_cert(&self) -> Arc { + self.inner.read().highest_quorum_cert() + } + + fn highest_ordered_cert(&self) -> Arc { + self.inner.read().highest_ordered_cert() + } + + fn highest_commit_cert(&self) -> Arc { + self.inner.read().highest_commit_cert() + } + + fn highest_2chain_timeout_cert(&self) -> Option> { + self.inner.read().highest_2chain_timeout_cert() + } + + fn sync_info(&self) -> SyncInfo { + SyncInfo::new_decoupled( + self.highest_quorum_cert().as_ref().clone(), + self.highest_ordered_cert().as_ref().clone(), + self.highest_commit_cert().as_ref().clone(), + self.highest_2chain_timeout_cert() + .map(|tc| tc.as_ref().clone()), + ) + } + /// Return if the consensus is backpressured fn vote_back_pressure(&self) -> bool { #[cfg(any(test, feature = "fuzzing"))] @@ -476,11 +544,7 @@ impl BlockStore { ordered_round > self.vote_back_pressure_limit + commit_round } - pub fn pending_blocks(&self) -> Arc> { - self.pending_blocks.clone() - } - - pub fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration { + fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration { let ordered_root = self.ordered_root(); let commit_root = self.commit_root(); let pending_path = self @@ -551,73 +615,31 @@ impl BlockStore { Duration::ZERO } } -} - -impl BlockReader for BlockStore { - fn block_exists(&self, block_id: HashValue) -> bool { - self.inner.read().block_exists(&block_id) - } - - fn get_block(&self, block_id: HashValue) -> Option> { - self.inner.read().get_block(&block_id) - } - - fn ordered_root(&self) -> Arc { - self.inner.read().ordered_root() - } - - fn commit_root(&self) -> Arc { - self.inner.read().commit_root() - } - - fn get_quorum_cert_for_block(&self, block_id: HashValue) -> Option> { - self.inner.read().get_quorum_cert_for_block(&block_id) - } - - fn path_from_ordered_root(&self, block_id: HashValue) -> Option>> { - self.inner.read().path_from_ordered_root(block_id) - } - - fn path_from_commit_root(&self, block_id: HashValue) -> Option>> { - self.inner.read().path_from_commit_root(block_id) - } - - fn highest_certified_block(&self) -> Arc { - self.inner.read().highest_certified_block() - } - - fn highest_quorum_cert(&self) -> Arc { - self.inner.read().highest_quorum_cert() - } - - fn highest_ordered_cert(&self) -> Arc { - self.inner.read().highest_ordered_cert() - } - - fn highest_commit_cert(&self) -> Arc { - self.inner.read().highest_commit_cert() - } - - fn highest_2chain_timeout_cert(&self) -> Option> { - self.inner.read().highest_2chain_timeout_cert() - } - fn sync_info(&self) -> SyncInfo { - SyncInfo::new_decoupled( - self.highest_quorum_cert().as_ref().clone(), - self.highest_ordered_cert().as_ref().clone(), - self.highest_commit_cert().as_ref().clone(), - self.highest_2chain_timeout_cert() - .map(|tc| tc.as_ref().clone()), - ) - } - - fn vote_back_pressure(&self) -> bool { - self.vote_back_pressure() - } - - fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration { - self.pipeline_pending_latency(proposal_timestamp) + fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec { + let mut res = vec![]; + let mut cur_block = Some(self.ordered_root()); + loop { + match cur_block { + Some(block) => { + if let Some(execution_time_and_size) = block.get_execution_summary() { + info!( + "Found execution time for {}, {:?}", + block.id(), + execution_time_and_size + ); + res.push(execution_time_and_size); + if res.len() >= num_blocks { + return res; + } + } else { + info!("Couldn't find execution time for {}", block.id()); + } + cur_block = self.get_block(block.parent_id()); + }, + None => return res, + } + } } } diff --git a/consensus/src/block_storage/mod.rs b/consensus/src/block_storage/mod.rs index 83fe9fd0450d7..f11b2e2554f2a 100644 --- a/consensus/src/block_storage/mod.rs +++ b/consensus/src/block_storage/mod.rs @@ -3,8 +3,11 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_consensus_types::{ - pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, sync_info::SyncInfo, - timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo, + pipelined_block::{ExecutionSummary, PipelinedBlock}, + quorum_cert::QuorumCert, + sync_info::SyncInfo, + timeout_2chain::TwoChainTimeoutCertificate, + wrapped_ledger_info::WrappedLedgerInfo, }; use aptos_crypto::HashValue; pub use block_store::{sync_manager::BlockRetriever, BlockStore}; @@ -64,4 +67,6 @@ pub trait BlockReader: Send + Sync { // Return time difference between last committed block and new proposal fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration; + + fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec; } diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index 9e5cca0bccc9c..3410cbf2ee3f7 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -284,11 +284,19 @@ pub static WAIT_FOR_FULL_BLOCKS_TRIGGERED: Lazy = Lazy::new(|| { ) }); -/// Counts when chain_health backoff is triggered +/// Counts when pipeline backpressure is triggered pub static PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED: Lazy = Lazy::new(|| { register_avg_counter( "aptos_pipeline_backpressure_on_proposal_triggered", - "Counts when chain_health backoff is triggered", + "Counts when pipeline backpressure is triggered", + ) +}); + +/// Counts when execution backpressure is triggered +pub static EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED: Lazy = Lazy::new(|| { + register_avg_counter( + "aptos_execution_backpressure_on_proposal_triggered", + "Counts when execution backpressure is triggered", ) }); @@ -312,12 +320,11 @@ pub static CONSENSUS_PROPOSAL_PENDING_DURATION: Lazy = Lazy:: }); /// Amount of time (in seconds) proposal is delayed due to backpressure/backoff -pub static PROPOSER_DELAY_PROPOSAL: Lazy = Lazy::new(|| { - register_gauge!( +pub static PROPOSER_DELAY_PROPOSAL: Lazy = Lazy::new(|| { + register_avg_counter( "aptos_proposer_delay_proposal", "Amount of time (in seconds) proposal is delayed due to backpressure/backoff", ) - .unwrap() }); /// Histogram for max number of transactions (after filtering for dedup, expirations, etc) proposer uses when creating block. @@ -330,16 +337,6 @@ pub static PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING: Lazy = Lazy::new( .unwrap() }); -/// Histogram for max number of transactions proposer uses when creating block. -pub static PROPOSER_MAX_BLOCK_TXNS: Lazy = Lazy::new(|| { - register_histogram!( - "aptos_proposer_max_block_txns", - "Histogram for max number of transactions proposer uses when creating block.", - NUM_CONSENSUS_TRANSACTIONS_BUCKETS.to_vec() - ) - .unwrap() -}); - /// Histogram for max number of transactions to execute proposer uses when creating block. pub static PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE: Lazy = Lazy::new(|| { register_histogram!( diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index e178bcd58fa46..8ef6d87d7d7dc 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -621,6 +621,8 @@ impl DagBootstrapper { .health_config .pipeline_backpressure_config .clone(), + // TODO: add pipeline backpressure based on execution speed to DAG config + None, ), ordered_notifier.clone(), ); diff --git a/consensus/src/dag/health/chain_health.rs b/consensus/src/dag/health/chain_health.rs index 26e5d9af256be..6b9845407ae7b 100644 --- a/consensus/src/dag/health/chain_health.rs +++ b/consensus/src/dag/health/chain_health.rs @@ -84,7 +84,7 @@ impl TChainHealth for ChainHealthBackoff { chain_health_backoff.map(|value| { ( - value.max_sending_block_txns_override, + value.max_sending_block_txns_after_filtering_override, value.max_sending_block_bytes_override, ) }) diff --git a/consensus/src/dag/health/pipeline_health.rs b/consensus/src/dag/health/pipeline_health.rs index 3668fd68eec37..f3f5cf5b00fcd 100644 --- a/consensus/src/dag/health/pipeline_health.rs +++ b/consensus/src/dag/health/pipeline_health.rs @@ -68,7 +68,7 @@ impl TPipelineHealth for PipelineLatencyBasedBackpressure { let latency = self.adapter.pipeline_pending_latency(); self.pipeline_config.get_backoff(latency).map(|config| { ( - config.max_sending_block_txns_override, + config.max_sending_block_txns_after_filtering_override, config.max_sending_block_bytes_override, ) }) diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 1ddcb47111c53..543799fe91b15 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -798,8 +798,10 @@ impl EpochManager

{ self.create_proposer_election(&epoch_state, &onchain_consensus_config); let chain_health_backoff_config = ChainHealthBackoffConfig::new(self.config.chain_health_backoff.clone()); - let pipeline_backpressure_config = - PipelineBackpressureConfig::new(self.config.pipeline_backpressure.clone()); + let pipeline_backpressure_config = PipelineBackpressureConfig::new( + self.config.pipeline_backpressure.clone(), + self.config.execution_backpressure.clone(), + ); let safety_rules_container = Arc::new(Mutex::new(safety_rules)); @@ -843,7 +845,7 @@ impl EpochManager

{ self.time_service.clone(), Duration::from_millis(self.config.quorum_store_poll_time_ms), self.config.max_sending_block_txns, - self.config.max_sending_block_unique_txns, + self.config.max_sending_block_txns_after_filtering, self.config.max_sending_block_bytes, self.config.max_sending_inline_txns, self.config.max_sending_inline_bytes, diff --git a/consensus/src/execution_pipeline.rs b/consensus/src/execution_pipeline.rs index 53a50f01a4659..530ea208d6bdd 100644 --- a/consensus/src/execution_pipeline.rs +++ b/consensus/src/execution_pipeline.rs @@ -26,7 +26,10 @@ use aptos_types::{ use fail::fail_point; use once_cell::sync::Lazy; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; -use std::sync::Arc; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use tokio::sync::{mpsc, oneshot}; pub static SIG_VERIFY_POOL: Lazy> = Lazy::new(|| { @@ -48,6 +51,7 @@ impl ExecutionPipeline { let (prepare_block_tx, prepare_block_rx) = mpsc::unbounded_channel(); let (execute_block_tx, execute_block_rx) = mpsc::unbounded_channel(); let (ledger_apply_tx, ledger_apply_rx) = mpsc::unbounded_channel(); + runtime.spawn(Self::prepare_block_stage( prepare_block_rx, execute_block_tx, @@ -185,11 +189,14 @@ impl ExecutionPipeline { error: "Injected error in compute".into(), }) }); - executor.execute_and_state_checkpoint( - block, - parent_block_id, - block_executor_onchain_config, - ) + let start = Instant::now(); + executor + .execute_and_state_checkpoint( + block, + parent_block_id, + block_executor_onchain_config, + ) + .map(|output| (output, start.elapsed())) }) .await ) @@ -216,24 +223,28 @@ impl ExecutionPipeline { input_txns, block_id, parent_block_id, - state_checkpoint_output, + state_checkpoint_output: execution_result, result_tx, }) = block_rx.recv().await { debug!("ledger_apply stage received block {}.", block_id); let res = async { + let (state_checkpoint_output, execution_duration) = execution_result?; let executor = executor.clone(); monitor!( "ledger_apply", tokio::task::spawn_blocking(move || { - executor.ledger_update(block_id, parent_block_id, state_checkpoint_output?) + executor.ledger_update(block_id, parent_block_id, state_checkpoint_output) }) ) .await .expect("Failed to spawn_blocking().") + .map(|output| (output, execution_duration)) } .await; - let pipe_line_res = res.map(|output| PipelineExecutionResult::new(input_txns, output)); + let pipe_line_res = res.map(|(output, execution_duration)| { + PipelineExecutionResult::new(input_txns, output, execution_duration) + }); result_tx.send(pipe_line_res).unwrap_or_else(|err| { error!( block_id = block_id, @@ -267,6 +278,6 @@ struct LedgerApplyCommand { input_txns: Vec, block_id: HashValue, parent_block_id: HashValue, - state_checkpoint_output: ExecutorResult, + state_checkpoint_output: ExecutorResult<(StateCheckpointOutput, Duration)>, result_tx: oneshot::Sender>, } diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index c6a5545b9ac0c..b561b15a9a689 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -8,27 +8,31 @@ use super::{ use crate::{ block_storage::BlockReader, counters::{ - CHAIN_HEALTH_BACKOFF_TRIGGERED, PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, - PROPOSER_DELAY_PROPOSAL, PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING, - PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE, PROPOSER_PENDING_BLOCKS_COUNT, - PROPOSER_PENDING_BLOCKS_FILL_FRACTION, + CHAIN_HEALTH_BACKOFF_TRIGGERED, EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, + PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, PROPOSER_DELAY_PROPOSAL, + PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING, PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE, + PROPOSER_PENDING_BLOCKS_COUNT, PROPOSER_PENDING_BLOCKS_FILL_FRACTION, }, payload_client::PayloadClient, util::time_service::TimeService, }; use anyhow::{bail, ensure, format_err, Context}; -use aptos_config::config::{ChainHealthBackoffValues, PipelineBackpressureValues}; +use aptos_config::config::{ + ChainHealthBackoffValues, ExecutionBackpressureConfig, PipelineBackpressureValues, +}; use aptos_consensus_types::{ block::Block, block_data::BlockData, common::{Author, Payload, PayloadFilter, Round}, + pipelined_block::ExecutionSummary, quorum_cert::QuorumCert, }; use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_logger::{error, sample, sample::SampleRate, warn}; +use aptos_logger::{error, info, sample, sample::SampleRate, warn}; use aptos_types::{on_chain_config::ValidatorTxnConfig, validator_txn::ValidatorTransaction}; use aptos_validator_transaction_pool as vtxn_pool; use futures::future::BoxFuture; +use itertools::Itertools; use std::{ collections::{BTreeMap, HashSet}, sync::Arc, @@ -96,23 +100,31 @@ impl ChainHealthBackoffConfig { #[derive(Clone)] pub struct PipelineBackpressureConfig { backoffs: BTreeMap, + execution: Option, } impl PipelineBackpressureConfig { - pub fn new(backoffs: Vec) -> Self { + pub fn new( + backoffs: Vec, + execution: Option, + ) -> Self { let original_len = backoffs.len(); let backoffs = backoffs .into_iter() .map(|v| (v.back_pressure_pipeline_latency_limit_ms, v)) .collect::>(); assert_eq!(original_len, backoffs.len()); - Self { backoffs } + Self { + backoffs, + execution, + } } #[allow(dead_code)] pub fn new_no_backoff() -> Self { Self { backoffs: BTreeMap::new(), + execution: None, } } @@ -139,6 +151,64 @@ impl PipelineBackpressureConfig { v }) } + + pub fn get_execution_block_size_backoff( + &self, + block_execution_times: &[ExecutionSummary], + ) -> Option { + info!( + "Estimated block execution times: {:?}", + block_execution_times + ); + + self.execution.as_ref().and_then(|config| { + let sizes = block_execution_times + .iter() + .flat_map(|summary| { + // for each block, compute target (re-calibrated) block size + + let execution_time_ms = summary.execution_time.as_millis(); + // Only block above the time threshold are considered giving enough signal to support calibration + // so we filter out shorter locks + if execution_time_ms > config.min_block_time_ms_to_activate as u128 { + // TODO: After cost of "retries" is reduced with execution pool, we + // should be computing block gas limit here, simply as: + // `config.target_block_time_ms / execution_time_ms * gas_consumed_by_block`` + // + // Until then, we need to compute wanted block size to create. + // Unfortunatelly, there is multiple layers where transactions are filtered. + // After deduping/reordering logic is applied, max_txns_to_execute limits the transactions + // passed to executor (`summary.payload_len` here), and then some are discarded for various + // reasons, which we approximate are cheaply ignored. + // For the rest, only `summary.to_commit` fraction of `summary.to_commit + summary.to_retry` + // was executed. And so assuming same discard rate, we scale `summary.payload_len` with it. + Some( + ((config.target_block_time_ms as f64 / execution_time_ms as f64 + * (summary.to_commit as f64 + / (summary.to_commit + summary.to_retry) as f64) + * summary.payload_len as f64) + .floor() as u64) + .max(1), + ) + } else { + None + } + }) + .sorted() + // .sorted_by_key(|key| key.unwrap_or(u64::MAX)) + .collect::>(); + info!("Estimated block back-offs block sizes: {:?}", sizes); + if sizes.len() >= config.min_blocks_to_activate { + Some( + *sizes + .get((config.percentile * sizes.len() as f64) as usize) + .unwrap(), + ) + } else { + None + } + }) + } } /// ProposalGenerator is responsible for generating the proposed block on demand: it's typically @@ -327,11 +397,11 @@ impl ProposalGenerator { .await; PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING.observe(max_block_txns_after_filtering as f64); - PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE.observe( - max_txns_from_block_to_execute.unwrap_or(max_block_txns_after_filtering) as f64, - ); + if let Some(max_to_execute) = max_txns_from_block_to_execute { + PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE.observe(max_to_execute as f64); + } - PROPOSER_DELAY_PROPOSAL.set(proposal_delay.as_secs_f64()); + PROPOSER_DELAY_PROPOSAL.observe(proposal_delay.as_secs_f64()); if !proposal_delay.is_zero() { tokio::time::sleep(proposal_delay).await; } @@ -428,7 +498,7 @@ impl ProposalGenerator { timestamp: Duration, round: Round, ) -> (u64, u64, Option, Duration) { - let mut values_max_block_txns = vec![self.max_block_txns_after_filtering]; + let mut values_max_block_txns_after_filtering = vec![self.max_block_txns_after_filtering]; let mut values_max_block_bytes = vec![self.max_block_bytes]; let mut values_proposal_delay = vec![Duration::ZERO]; let mut values_max_txns_from_block_to_execute = vec![]; @@ -437,7 +507,8 @@ impl ProposalGenerator { .chain_health_backoff_config .get_backoff(voting_power_ratio); if let Some(value) = chain_health_backoff { - values_max_block_txns.push(value.max_sending_block_txns_override); + values_max_block_txns_after_filtering + .push(value.max_sending_block_txns_after_filtering_override); values_max_block_bytes.push(value.max_sending_block_bytes_override); if let Some(val) = value.max_txns_from_block_to_execute { values_max_txns_from_block_to_execute.push(val); @@ -448,11 +519,13 @@ impl ProposalGenerator { CHAIN_HEALTH_BACKOFF_TRIGGERED.observe(0.0); } + let pipeline_pending_latency = self.block_store.pipeline_pending_latency(timestamp); let pipeline_backpressure = self .pipeline_backpressure_config - .get_backoff(self.block_store.pipeline_pending_latency(timestamp)); + .get_backoff(pipeline_pending_latency); if let Some(value) = pipeline_backpressure { - values_max_block_txns.push(value.max_sending_block_txns_override); + values_max_block_txns_after_filtering + .push(value.max_sending_block_txns_after_filtering_override); values_max_block_bytes.push(value.max_sending_block_bytes_override); if let Some(val) = value.max_txns_from_block_to_execute { values_max_txns_from_block_to_execute.push(val); @@ -463,23 +536,62 @@ impl ProposalGenerator { PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED.observe(0.0); }; - let max_block_txns = values_max_block_txns.into_iter().min().unwrap(); + let mut execution_backpressure_applied = false; + if let Some(config) = &self.pipeline_backpressure_config.execution { + if pipeline_pending_latency.as_millis() + > config.back_pressure_pipeline_latency_limit_ms as u128 + { + let execution_backpressure = self + .pipeline_backpressure_config + .get_execution_block_size_backoff( + &self + .block_store + .get_recent_block_execution_times(config.num_blocks_to_look_at), + ); + if let Some(execution_backpressure_block_size) = execution_backpressure { + values_max_block_txns_after_filtering.push( + (execution_backpressure_block_size as f64 + * config.reordering_ovarpacking_factor.max(1.0)) + as u64, + ); + values_max_txns_from_block_to_execute.push(execution_backpressure_block_size); + execution_backpressure_applied = true; + } + } + } + EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED.observe( + if execution_backpressure_applied { + 1.0 + } else { + 0.0 + }, + ); + + let max_block_txns = values_max_block_txns_after_filtering + .into_iter() + .min() + .unwrap(); let max_block_bytes = values_max_block_bytes.into_iter().min().unwrap(); let proposal_delay = values_proposal_delay.into_iter().max().unwrap(); - let max_txns_from_block_to_execute = - values_max_txns_from_block_to_execute.into_iter().min(); - if pipeline_backpressure.is_some() || chain_health_backoff.is_some() { - warn!( - "Generating proposal: reducing limits to {} txns (filtered to {:?}) and {} bytes, due to pipeline_backpressure: {}, chain health backoff: {}. Delaying sending proposal by {}ms. Round: {}", - max_block_txns, - max_txns_from_block_to_execute, - max_block_bytes, - pipeline_backpressure.is_some(), - chain_health_backoff.is_some(), - proposal_delay.as_millis(), - round, - ); - } + let max_txns_from_block_to_execute = values_max_txns_from_block_to_execute + .into_iter() + .min() + .filter(|v| *v < max_block_txns); + + warn!( + pipeline_pending_latency = pipeline_pending_latency.as_millis(), + proposal_delay_ms = proposal_delay.as_millis(), + max_block_txns = max_block_txns, + max_txns_from_block_to_execute = + max_txns_from_block_to_execute.unwrap_or(max_block_txns), + max_block_bytes = max_block_bytes, + is_pipeline_backpressure = pipeline_backpressure.is_some(), + is_execution_backpressure = execution_backpressure_applied, + is_chain_health_backoff = chain_health_backoff.is_some(), + round = round, + "Proposal generation backpressure details", + ); + ( max_block_txns, max_block_bytes, diff --git a/consensus/src/pipeline/execution_schedule_phase.rs b/consensus/src/pipeline/execution_schedule_phase.rs index 96586273649c1..312f476174849 100644 --- a/consensus/src/pipeline/execution_schedule_phase.rs +++ b/consensus/src/pipeline/execution_schedule_phase.rs @@ -93,8 +93,12 @@ impl StatelessPipeline for ExecutionSchedulePhase { let mut results = vec![]; for (block, fut) in itertools::zip_eq(ordered_blocks, futs) { debug!("try to receive compute result for block {}", block.id()); - let PipelineExecutionResult { input_txns, result } = fut.await?; - results.push(block.set_execution_result(input_txns, result)); + let PipelineExecutionResult { + input_txns, + result, + execution_time, + } = fut.await?; + results.push(block.set_execution_result(input_txns, result, execution_time)); } drop(lifetime_guard); Ok(results) diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index db4240cee67b2..6c0184e53469b 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -34,7 +34,7 @@ use aptos_types::{ }; use fail::fail_point; use futures::{future::BoxFuture, SinkExt, StreamExt}; -use std::{boxed::Box, sync::Arc}; +use std::{boxed::Box, sync::Arc, time::Duration}; use tokio::sync::Mutex as AsyncMutex; pub type StateComputeResultFut = BoxFuture<'static, ExecutorResult>; @@ -43,11 +43,20 @@ pub type StateComputeResultFut = BoxFuture<'static, ExecutorResult, pub result: StateComputeResult, + pub execution_time: Duration, } impl PipelineExecutionResult { - pub fn new(input_txns: Vec, result: StateComputeResult) -> Self { - Self { input_txns, result } + pub fn new( + input_txns: Vec, + result: StateComputeResult, + execution_time: Duration, + ) -> Self { + Self { + input_txns, + result, + execution_time, + } } } diff --git a/consensus/src/test_utils/mock_state_computer.rs b/consensus/src/test_utils/mock_state_computer.rs index 286dc9137ffd6..16f226ef3f253 100644 --- a/consensus/src/test_utils/mock_state_computer.rs +++ b/consensus/src/test_utils/mock_state_computer.rs @@ -22,7 +22,7 @@ use aptos_types::{ }; use futures::SinkExt; use futures_channel::mpsc::UnboundedSender; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; pub struct EmptyStateComputer { executor_channel: UnboundedSender, @@ -45,6 +45,7 @@ impl StateComputer for EmptyStateComputer { Ok(PipelineExecutionResult::new( vec![], StateComputeResult::new_dummy(), + Duration::from_secs(0), )) } @@ -129,7 +130,8 @@ impl StateComputer for RandomComputeResultStateComputer { self.random_compute_result_root_hash, )) }; - let pipeline_execution_res = res.map(|res| PipelineExecutionResult::new(vec![], res)); + let pipeline_execution_res = + res.map(|res| PipelineExecutionResult::new(vec![], res, Duration::from_secs(0))); Box::pin(async move { pipeline_execution_res }) } diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 5f87e40dc1456..27c01ca149446 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -427,8 +427,11 @@ impl StateComputeResult { } pub fn transactions_to_commit_len(&self) -> usize { - // StateCheckpoint/BlockEpilogue is added if there is no reconfiguration - self.compute_status_for_input_txns().len() + self.compute_status_for_input_txns() + .iter() + .filter(|status| matches!(status, TransactionStatus::Keep(_))) + .count() + // StateCheckpoint/BlockEpilogue is added if there is no reconfiguration + (if self.has_reconfiguration() { 0 } else { 1 }) } diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index c515fd24fbdc2..eab46a1ce8825 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -829,7 +829,7 @@ fn optimize_for_maximum_throughput( ) { mempool_config_practically_non_expiring(&mut config.mempool); - config.consensus.max_sending_block_unique_txns = max_txns_per_block as u64; + config.consensus.max_sending_block_txns_after_filtering = max_txns_per_block as u64; config.consensus.max_sending_block_txns = config .consensus .max_sending_block_txns @@ -2253,9 +2253,10 @@ pub fn changing_working_quorum_test_helper( } else { for (i, item) in chain_health_backoff.iter_mut().enumerate() { // as we have lower TPS, make limits smaller - item.max_sending_block_txns_override = + item.max_sending_block_txns_after_filtering_override = (block_size / 2_u64.pow(i as u32 + 1)).max(2); - min_block_txns = min_block_txns.min(item.max_sending_block_txns_override); + min_block_txns = + min_block_txns.min(item.max_sending_block_txns_after_filtering_override); // as we have fewer nodes, make backoff triggered earlier: item.backoff_if_below_participating_voting_power_percentage = 90 - i * 5; }