From 8e75c7c160de1fd8559afa3c956459f6a522efb6 Mon Sep 17 00:00:00 2001 From: igor-aptos <110557261+igor-aptos@users.noreply.github.com> Date: Wed, 24 Jul 2024 16:10:32 -0700 Subject: [PATCH] Execution speed backpressure to handle consistent gas miscalibration (#13829) We can have gas miscalibration for various reasons, leading to a block taking longer than intended 300ms even after block gas limit has been applied. And so here we make sure on consensus side to gracefully handle it. We track execution speed of historical blocks, and their sizes - and compute what block size should've been such that it lasted 300ms. We then take p50 of previous 10 blocks - and use that as the block limit for the next block. Once we have improved re-ordering and execution pool, we can additionally use reordering_ovarpacking_factor to create larger blocks - but only execute wanted number of transactions. Currently execution stage is the bottleneck almost always, and so this PR only touches that. But practically - we can do the same for each stage, to make sure to reduce the block if for example commit stage is slow. --- config/src/config/consensus_config.rs | 144 +++++++++------ config/src/config/quorum_store_config.rs | 3 +- consensus/consensus-types/src/common.rs | 23 +++ .../consensus-types/src/pipelined_block.rs | 48 ++++- consensus/src/block_storage/block_store.rs | 168 +++++++++-------- consensus/src/block_storage/mod.rs | 9 +- consensus/src/counters.rs | 27 ++- consensus/src/dag/bootstrap.rs | 2 + consensus/src/dag/health/chain_health.rs | 2 +- consensus/src/dag/health/pipeline_health.rs | 2 +- consensus/src/epoch_manager.rs | 8 +- consensus/src/execution_pipeline.rs | 31 +++- consensus/src/liveness/proposal_generator.rs | 174 ++++++++++++++---- .../src/pipeline/execution_schedule_phase.rs | 8 +- consensus/src/state_computer.rs | 15 +- .../src/test_utils/mock_state_computer.rs | 6 +- execution/executor-types/src/lib.rs | 7 +- testsuite/forge-cli/src/main.rs | 7 +- 18 files changed, 479 insertions(+), 205 deletions(-) diff --git a/config/src/config/consensus_config.rs b/config/src/config/consensus_config.rs index 244512572a8b8..cd5429ff81556 100644 --- a/config/src/config/consensus_config.rs +++ b/config/src/config/consensus_config.rs @@ -2,6 +2,7 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 +use super::DEFEAULT_MAX_BATCH_TXNS; use crate::config::{ config_sanitizer::ConfigSanitizer, node_config_loader::NodeType, Error, NodeConfig, QuorumStoreConfig, ReliableBroadcastConfig, SafetyRulesConfig, BATCH_PADDING_BYTES, @@ -13,12 +14,14 @@ use serde::{Deserialize, Serialize}; use std::path::PathBuf; // NOTE: when changing, make sure to update QuorumStoreBackPressureConfig::backlog_txn_limit_count as well. -const MAX_SENDING_BLOCK_UNIQUE_TXNS: u64 = 1900; +const MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING: u64 = 1900; const MAX_SENDING_BLOCK_TXNS: u64 = 4500; pub(crate) static MAX_RECEIVING_BLOCK_TXNS: Lazy = Lazy::new(|| 10000.max(2 * MAX_SENDING_BLOCK_TXNS)); // stop reducing size at this point, so 1MB transactions can still go through const MIN_BLOCK_BYTES_OVERRIDE: u64 = 1024 * 1024 + BATCH_PADDING_BYTES as u64; +// We should reduce block size only until two QS batch sizes. +const MIN_BLOCK_TXNS_AFTER_FILTERING: u64 = DEFEAULT_MAX_BATCH_TXNS as u64 * 2; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -26,7 +29,7 @@ pub struct ConsensusConfig { // length of inbound queue of messages pub max_network_channel_size: usize, pub max_sending_block_txns: u64, - pub max_sending_block_unique_txns: u64, + pub max_sending_block_txns_after_filtering: u64, pub max_sending_block_bytes: u64, pub max_sending_inline_txns: u64, pub max_sending_inline_bytes: u64, @@ -65,6 +68,7 @@ pub struct ConsensusConfig { pub intra_consensus_channel_buffer_size: usize, pub quorum_store: QuorumStoreConfig, pub vote_back_pressure_limit: u64, + pub execution_backpressure: Option, pub pipeline_backpressure: Vec, // Used to decide if backoff is needed. // must match one of the CHAIN_HEALTH_WINDOW_SIZES values. @@ -125,10 +129,42 @@ impl Default for DelayedQcAggregatorConfig { } } +/// Execution backpressure which handles gas/s variance, +/// and adjusts block sizes to "recalibrate it" to wanted range. +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ExecutionBackpressureConfig { + /// At what latency does this backpressure level activate + pub back_pressure_pipeline_latency_limit_ms: u64, + /// Look at execution time for this many last blocks + pub num_blocks_to_look_at: usize, + + /// Only blocks above this threshold are treated as potentially needed recalibration + /// This is needed as small blocks have overheads that are irrelevant to the transactions + /// being executed. + pub min_block_time_ms_to_activate: usize, + + /// Backpressure has a second check, where it only activates if + /// at least `min_blocks_to_activate` are above `min_block_time_ms_to_activate` + pub min_blocks_to_activate: usize, + + /// Out of blocks in the window, take this percentile (from 0-1 range), to use for calibration. + /// i.e. 0.5 means take a median of last `num_blocks_to_look_at` blocks. + pub percentile: f64, + /// Recalibrating max block size, to target blocks taking this long. + pub target_block_time_ms: usize, + + /// We compute re-calibrated block size, and use that for max_txns_to_execute. + /// We then set max_txns_in_block to `reordering_ovarpacking_factor` times that. + /// Currently set to 1.0 (i.e. no overpacking), but after execution pool and cost of + /// overpacking being minimal - we should increase it. + pub reordering_ovarpacking_factor: f64, +} + #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] pub struct PipelineBackpressureValues { + // At what latency does this backpressure level activate pub back_pressure_pipeline_latency_limit_ms: u64, - pub max_sending_block_txns_override: u64, + pub max_sending_block_txns_after_filtering_override: u64, pub max_sending_block_bytes_override: u64, // If there is backpressure, giving some more breathing room to go through the backlog, // and making sure rounds don't go extremely fast (even if they are smaller blocks) @@ -143,7 +179,7 @@ pub struct PipelineBackpressureValues { pub struct ChainHealthBackoffValues { pub backoff_if_below_participating_voting_power_percentage: usize, - pub max_sending_block_txns_override: u64, + pub max_sending_block_txns_after_filtering_override: u64, pub max_sending_block_bytes_override: u64, pub backoff_proposal_delay_ms: u64, @@ -155,7 +191,7 @@ impl Default for ConsensusConfig { ConsensusConfig { max_network_channel_size: 1024, max_sending_block_txns: MAX_SENDING_BLOCK_TXNS, - max_sending_block_unique_txns: MAX_SENDING_BLOCK_UNIQUE_TXNS, + max_sending_block_txns_after_filtering: MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes: 3 * 1024 * 1024, // 3MB max_receiving_block_txns: *MAX_RECEIVING_BLOCK_TXNS, max_sending_inline_txns: 100, @@ -187,6 +223,15 @@ impl Default for ConsensusConfig { // Considering block gas limit and pipeline backpressure should keep number of blocks // in the pipline very low, we can keep this limit pretty low, too. vote_back_pressure_limit: 7, + execution_backpressure: Some(ExecutionBackpressureConfig { + back_pressure_pipeline_latency_limit_ms: 500, + num_blocks_to_look_at: 12, + min_blocks_to_activate: 4, + percentile: 0.5, + target_block_time_ms: 300, + min_block_time_ms_to_activate: 100, + reordering_ovarpacking_factor: 1.0, + }), pipeline_backpressure: vec![ PipelineBackpressureValues { // pipeline_latency looks how long has the oldest block still in pipeline @@ -195,65 +240,55 @@ impl Default for ConsensusConfig { // pipeline once quorum on execution result among validators has been reached // (so-(badly)-called "commit certificate"), meaning 2f+1 validators have finished execution. back_pressure_pipeline_latency_limit_ms: 800, - max_sending_block_txns_override: MAX_SENDING_BLOCK_TXNS, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backpressure_proposal_delay_ms: 100, max_txns_from_block_to_execute: None, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 1100, - max_sending_block_txns_override: MAX_SENDING_BLOCK_TXNS, + back_pressure_pipeline_latency_limit_ms: 1200, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backpressure_proposal_delay_ms: 200, max_txns_from_block_to_execute: None, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 1400, - max_sending_block_txns_override: 2000, - max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, + back_pressure_pipeline_latency_limit_ms: 1600, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, + max_sending_block_bytes_override: 5 * 1024 * 1024, backpressure_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, + // with execution backpressure, only later start reducing block size PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 1700, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 2500, + max_sending_block_txns_after_filtering_override: 1000, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 400, + backpressure_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 2000, - max_sending_block_txns_override: 1000, - max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, - max_txns_from_block_to_execute: Some(400), - }, - PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 2300, - max_sending_block_txns_override: 1000, - max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, - max_txns_from_block_to_execute: Some(150), - }, - PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 2700, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 3500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, - max_txns_from_block_to_execute: Some(50), + backpressure_proposal_delay_ms: 300, + max_txns_from_block_to_execute: Some(200), }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 3100, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 4500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, - max_txns_from_block_to_execute: Some(20), + backpressure_proposal_delay_ms: 300, + max_txns_from_block_to_execute: Some(30), }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 3500, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 6000, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, + backpressure_proposal_delay_ms: 300, // in practice, latencies and delay make it such that ~2 blocks/s is max, // meaning that most aggressively we limit to ~10 TPS // For transactions that are more expensive than that, we should @@ -265,44 +300,45 @@ impl Default for ConsensusConfig { chain_health_backoff: vec![ ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 80, - max_sending_block_txns_override: 10000, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backoff_proposal_delay_ms: 150, max_txns_from_block_to_execute: None, }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 78, - max_sending_block_txns_override: 2000, + max_sending_block_txns_after_filtering_override: 2000, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 76, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 74, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backoff_proposal_delay_ms: 500, + backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: Some(100), }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 72, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backoff_proposal_delay_ms: 500, + backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: Some(25), }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 70, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backoff_proposal_delay_ms: 500, + backoff_proposal_delay_ms: 300, // in practice, latencies and delay make it such that ~2 blocks/s is max, // meaning that most aggressively we limit to ~10 TPS // For transactions that are more expensive than that, we should @@ -402,7 +438,7 @@ impl ConsensusConfig { for backpressure_values in &config.pipeline_backpressure { recv_batch_send_block_pairs.push(( config.quorum_store.receiver_max_batch_txns as u64, - backpressure_values.max_sending_block_txns_override, + backpressure_values.max_sending_block_txns_after_filtering_override, format!( "backpressure {} ms: txns", backpressure_values.back_pressure_pipeline_latency_limit_ms, @@ -420,7 +456,7 @@ impl ConsensusConfig { for backoff_values in &config.chain_health_backoff { recv_batch_send_block_pairs.push(( config.quorum_store.receiver_max_batch_txns as u64, - backoff_values.max_sending_block_txns_override, + backoff_values.max_sending_block_txns_after_filtering_override, format!( "backoff {} %: txns", backoff_values.backoff_if_below_participating_voting_power_percentage, @@ -640,7 +676,7 @@ mod test { consensus: ConsensusConfig { pipeline_backpressure: vec![PipelineBackpressureValues { back_pressure_pipeline_latency_limit_ms: 0, - max_sending_block_txns_override: 350, + max_sending_block_txns_after_filtering_override: 350, max_sending_block_bytes_override: 0, backpressure_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, @@ -671,7 +707,7 @@ mod test { consensus: ConsensusConfig { pipeline_backpressure: vec![PipelineBackpressureValues { back_pressure_pipeline_latency_limit_ms: 0, - max_sending_block_txns_override: 251, + max_sending_block_txns_after_filtering_override: 251, max_sending_block_bytes_override: 100, backpressure_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, @@ -698,7 +734,7 @@ mod test { consensus: ConsensusConfig { chain_health_backoff: vec![ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 0, - max_sending_block_txns_override: 100, + max_sending_block_txns_after_filtering_override: 100, max_sending_block_bytes_override: 0, backoff_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, @@ -725,7 +761,7 @@ mod test { consensus: ConsensusConfig { chain_health_backoff: vec![ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 0, - max_sending_block_txns_override: 0, + max_sending_block_txns_after_filtering_override: 0, max_sending_block_bytes_override: 100, backoff_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 6e442023b1ffa..b1e2cbba2c44d 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; pub const BATCH_PADDING_BYTES: usize = 160; +pub const DEFEAULT_MAX_BATCH_TXNS: usize = 250; const DEFAULT_MAX_NUM_BATCHES: usize = 20; #[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)] @@ -101,7 +102,7 @@ impl Default for QuorumStoreConfig { batch_generation_poll_interval_ms: 25, batch_generation_min_non_empty_interval_ms: 200, batch_generation_max_interval_ms: 250, - sender_max_batch_txns: 250, + sender_max_batch_txns: DEFEAULT_MAX_BATCH_TXNS, // TODO: on next release, remove BATCH_PADDING_BYTES sender_max_batch_bytes: 1024 * 1024 - BATCH_PADDING_BYTES, sender_max_num_batches: DEFAULT_MAX_NUM_BATCHES, diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 6c62a2cceff4c..bee98c827bdd1 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -305,6 +305,29 @@ impl Payload { } } + pub fn len_for_execution(&self) -> u64 { + match self { + Payload::DirectMempool(txns) => txns.len() as u64, + Payload::InQuorumStore(proof_with_status) => proof_with_status.len() as u64, + Payload::InQuorumStoreWithLimit(proof_with_status) => { + // here we return the actual length of the payload; limit is considered at the stage + // where we prepare the block from the payload + (proof_with_status.proof_with_data.len() as u64) + .min(proof_with_status.max_txns_to_execute.unwrap_or(u64::MAX)) + }, + Payload::QuorumStoreInlineHybrid( + inline_batches, + proof_with_data, + max_txns_to_execute, + ) => ((proof_with_data.len() + + inline_batches + .iter() + .map(|(_, txns)| txns.len()) + .sum::()) as u64) + .min(max_txns_to_execute.unwrap_or(u64::MAX)), + } + } + pub fn is_empty(&self) -> bool { match self { Payload::DirectMempool(txns) => txns.is_empty(), diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index 2935cba7dedd5..88f356723452e 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -12,8 +12,11 @@ use crate::{ use aptos_crypto::hash::HashValue; use aptos_executor_types::StateComputeResult; use aptos_types::{ - block_info::BlockInfo, contract_event::ContractEvent, randomness::Randomness, - transaction::SignedTransaction, validator_txn::ValidatorTransaction, + block_info::BlockInfo, + contract_event::ContractEvent, + randomness::Randomness, + transaction::{SignedTransaction, TransactionStatus}, + validator_txn::ValidatorTransaction, }; use once_cell::sync::OnceCell; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -38,6 +41,7 @@ pub struct PipelinedBlock { state_compute_result: StateComputeResult, randomness: OnceCell, pipeline_insertion_time: OnceCell, + execution_summary: Arc>, } impl Serialize for PipelinedBlock { @@ -91,6 +95,7 @@ impl<'de> Deserialize<'de> for PipelinedBlock { state_compute_result, randomness: OnceCell::new(), pipeline_insertion_time: OnceCell::new(), + execution_summary: Arc::new(OnceCell::new()), }; if let Some(r) = randomness { block.set_randomness(r); @@ -104,9 +109,34 @@ impl PipelinedBlock { mut self, input_transactions: Vec, result: StateComputeResult, + execution_time: Duration, ) -> Self { self.state_compute_result = result; self.input_transactions = input_transactions; + + let mut to_commit = 0; + let mut to_retry = 0; + for txn in self.state_compute_result.compute_status_for_input_txns() { + match txn { + TransactionStatus::Keep(_) => to_commit += 1, + TransactionStatus::Retry => to_retry += 1, + _ => {}, + } + } + + assert!(self + .execution_summary + .set(ExecutionSummary { + payload_len: self + .block + .payload() + .map_or(0, |payload| payload.len_for_execution()), + to_commit, + to_retry, + execution_time, + }) + .is_ok()); + self } @@ -143,6 +173,7 @@ impl PipelinedBlock { state_compute_result, randomness: OnceCell::new(), pipeline_insertion_time: OnceCell::new(), + execution_summary: Arc::new(OnceCell::new()), } } @@ -153,6 +184,7 @@ impl PipelinedBlock { state_compute_result: StateComputeResult::new_dummy(), randomness: OnceCell::new(), pipeline_insertion_time: OnceCell::new(), + execution_summary: Arc::new(OnceCell::new()), } } @@ -250,4 +282,16 @@ impl PipelinedBlock { pub fn elapsed_in_pipeline(&self) -> Option { self.pipeline_insertion_time.get().map(|t| t.elapsed()) } + + pub fn get_execution_summary(&self) -> Option { + self.execution_summary.get().cloned() + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ExecutionSummary { + pub payload_len: u64, + pub to_commit: u64, + pub to_retry: u64, + pub execution_time: Duration, } diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index 36549f458e195..1c099e69aedc8 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -19,8 +19,12 @@ use crate::{ }; use anyhow::{bail, ensure, format_err, Context}; use aptos_consensus_types::{ - block::Block, common::Round, pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, - sync_info::SyncInfo, timeout_2chain::TwoChainTimeoutCertificate, + block::Block, + common::Round, + pipelined_block::{ExecutionSummary, PipelinedBlock}, + quorum_cert::QuorumCert, + sync_info::SyncInfo, + timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo, }; use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue}; @@ -460,6 +464,70 @@ impl BlockStore { .store(back_pressure, Ordering::Relaxed) } + pub fn pending_blocks(&self) -> Arc> { + self.pending_blocks.clone() + } +} + +impl BlockReader for BlockStore { + fn block_exists(&self, block_id: HashValue) -> bool { + self.inner.read().block_exists(&block_id) + } + + fn get_block(&self, block_id: HashValue) -> Option> { + self.inner.read().get_block(&block_id) + } + + fn ordered_root(&self) -> Arc { + self.inner.read().ordered_root() + } + + fn commit_root(&self) -> Arc { + self.inner.read().commit_root() + } + + fn get_quorum_cert_for_block(&self, block_id: HashValue) -> Option> { + self.inner.read().get_quorum_cert_for_block(&block_id) + } + + fn path_from_ordered_root(&self, block_id: HashValue) -> Option>> { + self.inner.read().path_from_ordered_root(block_id) + } + + fn path_from_commit_root(&self, block_id: HashValue) -> Option>> { + self.inner.read().path_from_commit_root(block_id) + } + + fn highest_certified_block(&self) -> Arc { + self.inner.read().highest_certified_block() + } + + fn highest_quorum_cert(&self) -> Arc { + self.inner.read().highest_quorum_cert() + } + + fn highest_ordered_cert(&self) -> Arc { + self.inner.read().highest_ordered_cert() + } + + fn highest_commit_cert(&self) -> Arc { + self.inner.read().highest_commit_cert() + } + + fn highest_2chain_timeout_cert(&self) -> Option> { + self.inner.read().highest_2chain_timeout_cert() + } + + fn sync_info(&self) -> SyncInfo { + SyncInfo::new_decoupled( + self.highest_quorum_cert().as_ref().clone(), + self.highest_ordered_cert().as_ref().clone(), + self.highest_commit_cert().as_ref().clone(), + self.highest_2chain_timeout_cert() + .map(|tc| tc.as_ref().clone()), + ) + } + /// Return if the consensus is backpressured fn vote_back_pressure(&self) -> bool { #[cfg(any(test, feature = "fuzzing"))] @@ -476,11 +544,7 @@ impl BlockStore { ordered_round > self.vote_back_pressure_limit + commit_round } - pub fn pending_blocks(&self) -> Arc> { - self.pending_blocks.clone() - } - - pub fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration { + fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration { let ordered_root = self.ordered_root(); let commit_root = self.commit_root(); let pending_path = self @@ -551,73 +615,31 @@ impl BlockStore { Duration::ZERO } } -} - -impl BlockReader for BlockStore { - fn block_exists(&self, block_id: HashValue) -> bool { - self.inner.read().block_exists(&block_id) - } - - fn get_block(&self, block_id: HashValue) -> Option> { - self.inner.read().get_block(&block_id) - } - - fn ordered_root(&self) -> Arc { - self.inner.read().ordered_root() - } - - fn commit_root(&self) -> Arc { - self.inner.read().commit_root() - } - - fn get_quorum_cert_for_block(&self, block_id: HashValue) -> Option> { - self.inner.read().get_quorum_cert_for_block(&block_id) - } - - fn path_from_ordered_root(&self, block_id: HashValue) -> Option>> { - self.inner.read().path_from_ordered_root(block_id) - } - - fn path_from_commit_root(&self, block_id: HashValue) -> Option>> { - self.inner.read().path_from_commit_root(block_id) - } - - fn highest_certified_block(&self) -> Arc { - self.inner.read().highest_certified_block() - } - - fn highest_quorum_cert(&self) -> Arc { - self.inner.read().highest_quorum_cert() - } - - fn highest_ordered_cert(&self) -> Arc { - self.inner.read().highest_ordered_cert() - } - - fn highest_commit_cert(&self) -> Arc { - self.inner.read().highest_commit_cert() - } - - fn highest_2chain_timeout_cert(&self) -> Option> { - self.inner.read().highest_2chain_timeout_cert() - } - fn sync_info(&self) -> SyncInfo { - SyncInfo::new_decoupled( - self.highest_quorum_cert().as_ref().clone(), - self.highest_ordered_cert().as_ref().clone(), - self.highest_commit_cert().as_ref().clone(), - self.highest_2chain_timeout_cert() - .map(|tc| tc.as_ref().clone()), - ) - } - - fn vote_back_pressure(&self) -> bool { - self.vote_back_pressure() - } - - fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration { - self.pipeline_pending_latency(proposal_timestamp) + fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec { + let mut res = vec![]; + let mut cur_block = Some(self.ordered_root()); + loop { + match cur_block { + Some(block) => { + if let Some(execution_time_and_size) = block.get_execution_summary() { + info!( + "Found execution time for {}, {:?}", + block.id(), + execution_time_and_size + ); + res.push(execution_time_and_size); + if res.len() >= num_blocks { + return res; + } + } else { + info!("Couldn't find execution time for {}", block.id()); + } + cur_block = self.get_block(block.parent_id()); + }, + None => return res, + } + } } } diff --git a/consensus/src/block_storage/mod.rs b/consensus/src/block_storage/mod.rs index 83fe9fd0450d7..f11b2e2554f2a 100644 --- a/consensus/src/block_storage/mod.rs +++ b/consensus/src/block_storage/mod.rs @@ -3,8 +3,11 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_consensus_types::{ - pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, sync_info::SyncInfo, - timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo, + pipelined_block::{ExecutionSummary, PipelinedBlock}, + quorum_cert::QuorumCert, + sync_info::SyncInfo, + timeout_2chain::TwoChainTimeoutCertificate, + wrapped_ledger_info::WrappedLedgerInfo, }; use aptos_crypto::HashValue; pub use block_store::{sync_manager::BlockRetriever, BlockStore}; @@ -64,4 +67,6 @@ pub trait BlockReader: Send + Sync { // Return time difference between last committed block and new proposal fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration; + + fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec; } diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index 9e5cca0bccc9c..3410cbf2ee3f7 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -284,11 +284,19 @@ pub static WAIT_FOR_FULL_BLOCKS_TRIGGERED: Lazy = Lazy::new(|| { ) }); -/// Counts when chain_health backoff is triggered +/// Counts when pipeline backpressure is triggered pub static PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED: Lazy = Lazy::new(|| { register_avg_counter( "aptos_pipeline_backpressure_on_proposal_triggered", - "Counts when chain_health backoff is triggered", + "Counts when pipeline backpressure is triggered", + ) +}); + +/// Counts when execution backpressure is triggered +pub static EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED: Lazy = Lazy::new(|| { + register_avg_counter( + "aptos_execution_backpressure_on_proposal_triggered", + "Counts when execution backpressure is triggered", ) }); @@ -312,12 +320,11 @@ pub static CONSENSUS_PROPOSAL_PENDING_DURATION: Lazy = Lazy:: }); /// Amount of time (in seconds) proposal is delayed due to backpressure/backoff -pub static PROPOSER_DELAY_PROPOSAL: Lazy = Lazy::new(|| { - register_gauge!( +pub static PROPOSER_DELAY_PROPOSAL: Lazy = Lazy::new(|| { + register_avg_counter( "aptos_proposer_delay_proposal", "Amount of time (in seconds) proposal is delayed due to backpressure/backoff", ) - .unwrap() }); /// Histogram for max number of transactions (after filtering for dedup, expirations, etc) proposer uses when creating block. @@ -330,16 +337,6 @@ pub static PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING: Lazy = Lazy::new( .unwrap() }); -/// Histogram for max number of transactions proposer uses when creating block. -pub static PROPOSER_MAX_BLOCK_TXNS: Lazy = Lazy::new(|| { - register_histogram!( - "aptos_proposer_max_block_txns", - "Histogram for max number of transactions proposer uses when creating block.", - NUM_CONSENSUS_TRANSACTIONS_BUCKETS.to_vec() - ) - .unwrap() -}); - /// Histogram for max number of transactions to execute proposer uses when creating block. pub static PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE: Lazy = Lazy::new(|| { register_histogram!( diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index e178bcd58fa46..8ef6d87d7d7dc 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -621,6 +621,8 @@ impl DagBootstrapper { .health_config .pipeline_backpressure_config .clone(), + // TODO: add pipeline backpressure based on execution speed to DAG config + None, ), ordered_notifier.clone(), ); diff --git a/consensus/src/dag/health/chain_health.rs b/consensus/src/dag/health/chain_health.rs index 26e5d9af256be..6b9845407ae7b 100644 --- a/consensus/src/dag/health/chain_health.rs +++ b/consensus/src/dag/health/chain_health.rs @@ -84,7 +84,7 @@ impl TChainHealth for ChainHealthBackoff { chain_health_backoff.map(|value| { ( - value.max_sending_block_txns_override, + value.max_sending_block_txns_after_filtering_override, value.max_sending_block_bytes_override, ) }) diff --git a/consensus/src/dag/health/pipeline_health.rs b/consensus/src/dag/health/pipeline_health.rs index 3668fd68eec37..f3f5cf5b00fcd 100644 --- a/consensus/src/dag/health/pipeline_health.rs +++ b/consensus/src/dag/health/pipeline_health.rs @@ -68,7 +68,7 @@ impl TPipelineHealth for PipelineLatencyBasedBackpressure { let latency = self.adapter.pipeline_pending_latency(); self.pipeline_config.get_backoff(latency).map(|config| { ( - config.max_sending_block_txns_override, + config.max_sending_block_txns_after_filtering_override, config.max_sending_block_bytes_override, ) }) diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 1ddcb47111c53..543799fe91b15 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -798,8 +798,10 @@ impl EpochManager

{ self.create_proposer_election(&epoch_state, &onchain_consensus_config); let chain_health_backoff_config = ChainHealthBackoffConfig::new(self.config.chain_health_backoff.clone()); - let pipeline_backpressure_config = - PipelineBackpressureConfig::new(self.config.pipeline_backpressure.clone()); + let pipeline_backpressure_config = PipelineBackpressureConfig::new( + self.config.pipeline_backpressure.clone(), + self.config.execution_backpressure.clone(), + ); let safety_rules_container = Arc::new(Mutex::new(safety_rules)); @@ -843,7 +845,7 @@ impl EpochManager

{ self.time_service.clone(), Duration::from_millis(self.config.quorum_store_poll_time_ms), self.config.max_sending_block_txns, - self.config.max_sending_block_unique_txns, + self.config.max_sending_block_txns_after_filtering, self.config.max_sending_block_bytes, self.config.max_sending_inline_txns, self.config.max_sending_inline_bytes, diff --git a/consensus/src/execution_pipeline.rs b/consensus/src/execution_pipeline.rs index 53a50f01a4659..530ea208d6bdd 100644 --- a/consensus/src/execution_pipeline.rs +++ b/consensus/src/execution_pipeline.rs @@ -26,7 +26,10 @@ use aptos_types::{ use fail::fail_point; use once_cell::sync::Lazy; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; -use std::sync::Arc; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use tokio::sync::{mpsc, oneshot}; pub static SIG_VERIFY_POOL: Lazy> = Lazy::new(|| { @@ -48,6 +51,7 @@ impl ExecutionPipeline { let (prepare_block_tx, prepare_block_rx) = mpsc::unbounded_channel(); let (execute_block_tx, execute_block_rx) = mpsc::unbounded_channel(); let (ledger_apply_tx, ledger_apply_rx) = mpsc::unbounded_channel(); + runtime.spawn(Self::prepare_block_stage( prepare_block_rx, execute_block_tx, @@ -185,11 +189,14 @@ impl ExecutionPipeline { error: "Injected error in compute".into(), }) }); - executor.execute_and_state_checkpoint( - block, - parent_block_id, - block_executor_onchain_config, - ) + let start = Instant::now(); + executor + .execute_and_state_checkpoint( + block, + parent_block_id, + block_executor_onchain_config, + ) + .map(|output| (output, start.elapsed())) }) .await ) @@ -216,24 +223,28 @@ impl ExecutionPipeline { input_txns, block_id, parent_block_id, - state_checkpoint_output, + state_checkpoint_output: execution_result, result_tx, }) = block_rx.recv().await { debug!("ledger_apply stage received block {}.", block_id); let res = async { + let (state_checkpoint_output, execution_duration) = execution_result?; let executor = executor.clone(); monitor!( "ledger_apply", tokio::task::spawn_blocking(move || { - executor.ledger_update(block_id, parent_block_id, state_checkpoint_output?) + executor.ledger_update(block_id, parent_block_id, state_checkpoint_output) }) ) .await .expect("Failed to spawn_blocking().") + .map(|output| (output, execution_duration)) } .await; - let pipe_line_res = res.map(|output| PipelineExecutionResult::new(input_txns, output)); + let pipe_line_res = res.map(|(output, execution_duration)| { + PipelineExecutionResult::new(input_txns, output, execution_duration) + }); result_tx.send(pipe_line_res).unwrap_or_else(|err| { error!( block_id = block_id, @@ -267,6 +278,6 @@ struct LedgerApplyCommand { input_txns: Vec, block_id: HashValue, parent_block_id: HashValue, - state_checkpoint_output: ExecutorResult, + state_checkpoint_output: ExecutorResult<(StateCheckpointOutput, Duration)>, result_tx: oneshot::Sender>, } diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index c6a5545b9ac0c..b561b15a9a689 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -8,27 +8,31 @@ use super::{ use crate::{ block_storage::BlockReader, counters::{ - CHAIN_HEALTH_BACKOFF_TRIGGERED, PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, - PROPOSER_DELAY_PROPOSAL, PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING, - PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE, PROPOSER_PENDING_BLOCKS_COUNT, - PROPOSER_PENDING_BLOCKS_FILL_FRACTION, + CHAIN_HEALTH_BACKOFF_TRIGGERED, EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, + PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, PROPOSER_DELAY_PROPOSAL, + PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING, PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE, + PROPOSER_PENDING_BLOCKS_COUNT, PROPOSER_PENDING_BLOCKS_FILL_FRACTION, }, payload_client::PayloadClient, util::time_service::TimeService, }; use anyhow::{bail, ensure, format_err, Context}; -use aptos_config::config::{ChainHealthBackoffValues, PipelineBackpressureValues}; +use aptos_config::config::{ + ChainHealthBackoffValues, ExecutionBackpressureConfig, PipelineBackpressureValues, +}; use aptos_consensus_types::{ block::Block, block_data::BlockData, common::{Author, Payload, PayloadFilter, Round}, + pipelined_block::ExecutionSummary, quorum_cert::QuorumCert, }; use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_logger::{error, sample, sample::SampleRate, warn}; +use aptos_logger::{error, info, sample, sample::SampleRate, warn}; use aptos_types::{on_chain_config::ValidatorTxnConfig, validator_txn::ValidatorTransaction}; use aptos_validator_transaction_pool as vtxn_pool; use futures::future::BoxFuture; +use itertools::Itertools; use std::{ collections::{BTreeMap, HashSet}, sync::Arc, @@ -96,23 +100,31 @@ impl ChainHealthBackoffConfig { #[derive(Clone)] pub struct PipelineBackpressureConfig { backoffs: BTreeMap, + execution: Option, } impl PipelineBackpressureConfig { - pub fn new(backoffs: Vec) -> Self { + pub fn new( + backoffs: Vec, + execution: Option, + ) -> Self { let original_len = backoffs.len(); let backoffs = backoffs .into_iter() .map(|v| (v.back_pressure_pipeline_latency_limit_ms, v)) .collect::>(); assert_eq!(original_len, backoffs.len()); - Self { backoffs } + Self { + backoffs, + execution, + } } #[allow(dead_code)] pub fn new_no_backoff() -> Self { Self { backoffs: BTreeMap::new(), + execution: None, } } @@ -139,6 +151,64 @@ impl PipelineBackpressureConfig { v }) } + + pub fn get_execution_block_size_backoff( + &self, + block_execution_times: &[ExecutionSummary], + ) -> Option { + info!( + "Estimated block execution times: {:?}", + block_execution_times + ); + + self.execution.as_ref().and_then(|config| { + let sizes = block_execution_times + .iter() + .flat_map(|summary| { + // for each block, compute target (re-calibrated) block size + + let execution_time_ms = summary.execution_time.as_millis(); + // Only block above the time threshold are considered giving enough signal to support calibration + // so we filter out shorter locks + if execution_time_ms > config.min_block_time_ms_to_activate as u128 { + // TODO: After cost of "retries" is reduced with execution pool, we + // should be computing block gas limit here, simply as: + // `config.target_block_time_ms / execution_time_ms * gas_consumed_by_block`` + // + // Until then, we need to compute wanted block size to create. + // Unfortunatelly, there is multiple layers where transactions are filtered. + // After deduping/reordering logic is applied, max_txns_to_execute limits the transactions + // passed to executor (`summary.payload_len` here), and then some are discarded for various + // reasons, which we approximate are cheaply ignored. + // For the rest, only `summary.to_commit` fraction of `summary.to_commit + summary.to_retry` + // was executed. And so assuming same discard rate, we scale `summary.payload_len` with it. + Some( + ((config.target_block_time_ms as f64 / execution_time_ms as f64 + * (summary.to_commit as f64 + / (summary.to_commit + summary.to_retry) as f64) + * summary.payload_len as f64) + .floor() as u64) + .max(1), + ) + } else { + None + } + }) + .sorted() + // .sorted_by_key(|key| key.unwrap_or(u64::MAX)) + .collect::>(); + info!("Estimated block back-offs block sizes: {:?}", sizes); + if sizes.len() >= config.min_blocks_to_activate { + Some( + *sizes + .get((config.percentile * sizes.len() as f64) as usize) + .unwrap(), + ) + } else { + None + } + }) + } } /// ProposalGenerator is responsible for generating the proposed block on demand: it's typically @@ -327,11 +397,11 @@ impl ProposalGenerator { .await; PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING.observe(max_block_txns_after_filtering as f64); - PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE.observe( - max_txns_from_block_to_execute.unwrap_or(max_block_txns_after_filtering) as f64, - ); + if let Some(max_to_execute) = max_txns_from_block_to_execute { + PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE.observe(max_to_execute as f64); + } - PROPOSER_DELAY_PROPOSAL.set(proposal_delay.as_secs_f64()); + PROPOSER_DELAY_PROPOSAL.observe(proposal_delay.as_secs_f64()); if !proposal_delay.is_zero() { tokio::time::sleep(proposal_delay).await; } @@ -428,7 +498,7 @@ impl ProposalGenerator { timestamp: Duration, round: Round, ) -> (u64, u64, Option, Duration) { - let mut values_max_block_txns = vec![self.max_block_txns_after_filtering]; + let mut values_max_block_txns_after_filtering = vec![self.max_block_txns_after_filtering]; let mut values_max_block_bytes = vec![self.max_block_bytes]; let mut values_proposal_delay = vec![Duration::ZERO]; let mut values_max_txns_from_block_to_execute = vec![]; @@ -437,7 +507,8 @@ impl ProposalGenerator { .chain_health_backoff_config .get_backoff(voting_power_ratio); if let Some(value) = chain_health_backoff { - values_max_block_txns.push(value.max_sending_block_txns_override); + values_max_block_txns_after_filtering + .push(value.max_sending_block_txns_after_filtering_override); values_max_block_bytes.push(value.max_sending_block_bytes_override); if let Some(val) = value.max_txns_from_block_to_execute { values_max_txns_from_block_to_execute.push(val); @@ -448,11 +519,13 @@ impl ProposalGenerator { CHAIN_HEALTH_BACKOFF_TRIGGERED.observe(0.0); } + let pipeline_pending_latency = self.block_store.pipeline_pending_latency(timestamp); let pipeline_backpressure = self .pipeline_backpressure_config - .get_backoff(self.block_store.pipeline_pending_latency(timestamp)); + .get_backoff(pipeline_pending_latency); if let Some(value) = pipeline_backpressure { - values_max_block_txns.push(value.max_sending_block_txns_override); + values_max_block_txns_after_filtering + .push(value.max_sending_block_txns_after_filtering_override); values_max_block_bytes.push(value.max_sending_block_bytes_override); if let Some(val) = value.max_txns_from_block_to_execute { values_max_txns_from_block_to_execute.push(val); @@ -463,23 +536,62 @@ impl ProposalGenerator { PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED.observe(0.0); }; - let max_block_txns = values_max_block_txns.into_iter().min().unwrap(); + let mut execution_backpressure_applied = false; + if let Some(config) = &self.pipeline_backpressure_config.execution { + if pipeline_pending_latency.as_millis() + > config.back_pressure_pipeline_latency_limit_ms as u128 + { + let execution_backpressure = self + .pipeline_backpressure_config + .get_execution_block_size_backoff( + &self + .block_store + .get_recent_block_execution_times(config.num_blocks_to_look_at), + ); + if let Some(execution_backpressure_block_size) = execution_backpressure { + values_max_block_txns_after_filtering.push( + (execution_backpressure_block_size as f64 + * config.reordering_ovarpacking_factor.max(1.0)) + as u64, + ); + values_max_txns_from_block_to_execute.push(execution_backpressure_block_size); + execution_backpressure_applied = true; + } + } + } + EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED.observe( + if execution_backpressure_applied { + 1.0 + } else { + 0.0 + }, + ); + + let max_block_txns = values_max_block_txns_after_filtering + .into_iter() + .min() + .unwrap(); let max_block_bytes = values_max_block_bytes.into_iter().min().unwrap(); let proposal_delay = values_proposal_delay.into_iter().max().unwrap(); - let max_txns_from_block_to_execute = - values_max_txns_from_block_to_execute.into_iter().min(); - if pipeline_backpressure.is_some() || chain_health_backoff.is_some() { - warn!( - "Generating proposal: reducing limits to {} txns (filtered to {:?}) and {} bytes, due to pipeline_backpressure: {}, chain health backoff: {}. Delaying sending proposal by {}ms. Round: {}", - max_block_txns, - max_txns_from_block_to_execute, - max_block_bytes, - pipeline_backpressure.is_some(), - chain_health_backoff.is_some(), - proposal_delay.as_millis(), - round, - ); - } + let max_txns_from_block_to_execute = values_max_txns_from_block_to_execute + .into_iter() + .min() + .filter(|v| *v < max_block_txns); + + warn!( + pipeline_pending_latency = pipeline_pending_latency.as_millis(), + proposal_delay_ms = proposal_delay.as_millis(), + max_block_txns = max_block_txns, + max_txns_from_block_to_execute = + max_txns_from_block_to_execute.unwrap_or(max_block_txns), + max_block_bytes = max_block_bytes, + is_pipeline_backpressure = pipeline_backpressure.is_some(), + is_execution_backpressure = execution_backpressure_applied, + is_chain_health_backoff = chain_health_backoff.is_some(), + round = round, + "Proposal generation backpressure details", + ); + ( max_block_txns, max_block_bytes, diff --git a/consensus/src/pipeline/execution_schedule_phase.rs b/consensus/src/pipeline/execution_schedule_phase.rs index 96586273649c1..312f476174849 100644 --- a/consensus/src/pipeline/execution_schedule_phase.rs +++ b/consensus/src/pipeline/execution_schedule_phase.rs @@ -93,8 +93,12 @@ impl StatelessPipeline for ExecutionSchedulePhase { let mut results = vec![]; for (block, fut) in itertools::zip_eq(ordered_blocks, futs) { debug!("try to receive compute result for block {}", block.id()); - let PipelineExecutionResult { input_txns, result } = fut.await?; - results.push(block.set_execution_result(input_txns, result)); + let PipelineExecutionResult { + input_txns, + result, + execution_time, + } = fut.await?; + results.push(block.set_execution_result(input_txns, result, execution_time)); } drop(lifetime_guard); Ok(results) diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index db4240cee67b2..6c0184e53469b 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -34,7 +34,7 @@ use aptos_types::{ }; use fail::fail_point; use futures::{future::BoxFuture, SinkExt, StreamExt}; -use std::{boxed::Box, sync::Arc}; +use std::{boxed::Box, sync::Arc, time::Duration}; use tokio::sync::Mutex as AsyncMutex; pub type StateComputeResultFut = BoxFuture<'static, ExecutorResult>; @@ -43,11 +43,20 @@ pub type StateComputeResultFut = BoxFuture<'static, ExecutorResult, pub result: StateComputeResult, + pub execution_time: Duration, } impl PipelineExecutionResult { - pub fn new(input_txns: Vec, result: StateComputeResult) -> Self { - Self { input_txns, result } + pub fn new( + input_txns: Vec, + result: StateComputeResult, + execution_time: Duration, + ) -> Self { + Self { + input_txns, + result, + execution_time, + } } } diff --git a/consensus/src/test_utils/mock_state_computer.rs b/consensus/src/test_utils/mock_state_computer.rs index 286dc9137ffd6..16f226ef3f253 100644 --- a/consensus/src/test_utils/mock_state_computer.rs +++ b/consensus/src/test_utils/mock_state_computer.rs @@ -22,7 +22,7 @@ use aptos_types::{ }; use futures::SinkExt; use futures_channel::mpsc::UnboundedSender; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; pub struct EmptyStateComputer { executor_channel: UnboundedSender, @@ -45,6 +45,7 @@ impl StateComputer for EmptyStateComputer { Ok(PipelineExecutionResult::new( vec![], StateComputeResult::new_dummy(), + Duration::from_secs(0), )) } @@ -129,7 +130,8 @@ impl StateComputer for RandomComputeResultStateComputer { self.random_compute_result_root_hash, )) }; - let pipeline_execution_res = res.map(|res| PipelineExecutionResult::new(vec![], res)); + let pipeline_execution_res = + res.map(|res| PipelineExecutionResult::new(vec![], res, Duration::from_secs(0))); Box::pin(async move { pipeline_execution_res }) } diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 5f87e40dc1456..27c01ca149446 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -427,8 +427,11 @@ impl StateComputeResult { } pub fn transactions_to_commit_len(&self) -> usize { - // StateCheckpoint/BlockEpilogue is added if there is no reconfiguration - self.compute_status_for_input_txns().len() + self.compute_status_for_input_txns() + .iter() + .filter(|status| matches!(status, TransactionStatus::Keep(_))) + .count() + // StateCheckpoint/BlockEpilogue is added if there is no reconfiguration + (if self.has_reconfiguration() { 0 } else { 1 }) } diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index c515fd24fbdc2..eab46a1ce8825 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -829,7 +829,7 @@ fn optimize_for_maximum_throughput( ) { mempool_config_practically_non_expiring(&mut config.mempool); - config.consensus.max_sending_block_unique_txns = max_txns_per_block as u64; + config.consensus.max_sending_block_txns_after_filtering = max_txns_per_block as u64; config.consensus.max_sending_block_txns = config .consensus .max_sending_block_txns @@ -2253,9 +2253,10 @@ pub fn changing_working_quorum_test_helper( } else { for (i, item) in chain_health_backoff.iter_mut().enumerate() { // as we have lower TPS, make limits smaller - item.max_sending_block_txns_override = + item.max_sending_block_txns_after_filtering_override = (block_size / 2_u64.pow(i as u32 + 1)).max(2); - min_block_txns = min_block_txns.min(item.max_sending_block_txns_override); + min_block_txns = + min_block_txns.min(item.max_sending_block_txns_after_filtering_override); // as we have fewer nodes, make backoff triggered earlier: item.backoff_if_below_participating_voting_power_percentage = 90 - i * 5; }