diff --git a/config/src/config/consensus_config.rs b/config/src/config/consensus_config.rs index 8bff185d5052d3..538546fd448c40 100644 --- a/config/src/config/consensus_config.rs +++ b/config/src/config/consensus_config.rs @@ -2,6 +2,7 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 +use super::DEFEAULT_MAX_BATCH_TXNS; use crate::config::{ config_sanitizer::ConfigSanitizer, node_config_loader::NodeType, Error, NodeConfig, QuorumStoreConfig, ReliableBroadcastConfig, SafetyRulesConfig, BATCH_PADDING_BYTES, @@ -13,12 +14,14 @@ use serde::{Deserialize, Serialize}; use std::path::PathBuf; // NOTE: when changing, make sure to update QuorumStoreBackPressureConfig::backlog_txn_limit_count as well. -const MAX_SENDING_BLOCK_UNIQUE_TXNS: u64 = 1900; -const MAX_SENDING_BLOCK_TXNS: u64 = 4500; +const MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING: u64 = 3000; +const MAX_SENDING_BLOCK_TXNS: u64 = 6500; pub(crate) static MAX_RECEIVING_BLOCK_TXNS: Lazy = Lazy::new(|| 10000.max(2 * MAX_SENDING_BLOCK_TXNS)); // stop reducing size at this point, so 1MB transactions can still go through const MIN_BLOCK_BYTES_OVERRIDE: u64 = 1024 * 1024 + BATCH_PADDING_BYTES as u64; +// We should reduce block size only until two QS batch sizes. +const MIN_BLOCK_TXNS_AFTER_FILTERING: u64 = DEFEAULT_MAX_BATCH_TXNS as u64 * 2; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -26,7 +29,7 @@ pub struct ConsensusConfig { // length of inbound queue of messages pub max_network_channel_size: usize, pub max_sending_block_txns: u64, - pub max_sending_block_unique_txns: u64, + pub max_sending_block_txns_after_filtering: u64, pub max_sending_block_bytes: u64, pub max_sending_inline_txns: u64, pub max_sending_inline_bytes: u64, @@ -126,21 +129,42 @@ impl Default for DelayedQcAggregatorConfig { } } +/// Execution backpressure which handles gas/s variance, +/// and adjusts block sizes to "recalibrate it" to wanted range. #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct ExecutionBackpressureConfig { + /// At what latency does this backpressure level activate pub back_pressure_pipeline_latency_limit_ms: u64, + /// Look at execution time for this many last blocks pub num_blocks_to_look_at: usize, + + /// Only blocks above this threshold are treated as potentially needed recalibration + /// This is needed as small blocks have overheads that are irrelevant to the transactions + /// being executed. + pub min_block_time_ms_to_activate: usize, + + /// Backpressure has a second check, where it only activates if + /// at least `min_blocks_to_activate` are above `min_block_time_ms_to_activate` pub min_blocks_to_activate: usize, + + /// Out of blocks in the window, take this percentile (from 0-1 range), to use for calibration. + /// i.e. 0.5 means take a median of last `num_blocks_to_look_at` blocks. pub percentile: f64, + /// Recalibrating max block size, to target blocks taking this long. pub target_block_time_ms: usize, - pub min_block_time_ms_to_activate: usize, + + /// We compute re-calibrated block size, and use that for max_txns_to_execute. + /// We then set max_txns_in_block to `reordering_ovarpacking_factor` times that. + /// Currently set to 1.0 (i.e. no overpacking), but after execution pool and cost of + /// overpacking being minimal - we should increase it. pub reordering_ovarpacking_factor: f64, } #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] pub struct PipelineBackpressureValues { + // At what latency does this backpressure level activate pub back_pressure_pipeline_latency_limit_ms: u64, - pub max_sending_block_txns_override: u64, + pub max_sending_block_txns_after_filtering_override: u64, pub max_sending_block_bytes_override: u64, // If there is backpressure, giving some more breathing room to go through the backlog, // and making sure rounds don't go extremely fast (even if they are smaller blocks) @@ -155,7 +179,7 @@ pub struct PipelineBackpressureValues { pub struct ChainHealthBackoffValues { pub backoff_if_below_participating_voting_power_percentage: usize, - pub max_sending_block_txns_override: u64, + pub max_sending_block_txns_after_filtering_override: u64, pub max_sending_block_bytes_override: u64, pub backoff_proposal_delay_ms: u64, @@ -167,7 +191,7 @@ impl Default for ConsensusConfig { ConsensusConfig { max_network_channel_size: 1024, max_sending_block_txns: MAX_SENDING_BLOCK_TXNS, - max_sending_block_unique_txns: MAX_SENDING_BLOCK_UNIQUE_TXNS, + max_sending_block_txns_after_filtering: MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes: 3 * 1024 * 1024, // 3MB max_receiving_block_txns: *MAX_RECEIVING_BLOCK_TXNS, max_sending_inline_txns: 100, @@ -216,66 +240,55 @@ impl Default for ConsensusConfig { // pipeline once quorum on execution result among validators has been reached // (so-(badly)-called "commit certificate"), meaning 2f+1 validators have finished execution. back_pressure_pipeline_latency_limit_ms: 800, - max_sending_block_txns_override: MAX_SENDING_BLOCK_TXNS, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backpressure_proposal_delay_ms: 100, max_txns_from_block_to_execute: None, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 1100, - max_sending_block_txns_override: MAX_SENDING_BLOCK_TXNS, + back_pressure_pipeline_latency_limit_ms: 1200, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backpressure_proposal_delay_ms: 200, max_txns_from_block_to_execute: None, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 1400, - max_sending_block_txns_override: MAX_SENDING_BLOCK_TXNS, + back_pressure_pipeline_latency_limit_ms: 1600, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backpressure_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, + // with execution backpressure, only later start reducing block size PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 1700, - max_sending_block_txns_override: MAX_SENDING_BLOCK_TXNS, - max_sending_block_bytes_override: 5 * 1024 * 1024, - backpressure_proposal_delay_ms: 400, - max_txns_from_block_to_execute: None, - }, - PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 2000, - max_sending_block_txns_override: MAX_SENDING_BLOCK_TXNS, - max_sending_block_bytes_override: 5 * 1024 * 1024, - backpressure_proposal_delay_ms: 500, - max_txns_from_block_to_execute: None, - }, - // with other pipeline backpressure, only later start reducing block size - PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 3000, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 2500, + max_sending_block_txns_after_filtering_override: 1000, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, + backpressure_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 4000, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 3500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, + backpressure_proposal_delay_ms: 300, max_txns_from_block_to_execute: Some(200), }, PipelineBackpressureValues { - back_pressure_pipeline_latency_limit_ms: 5000, - max_sending_block_txns_override: 1000, + back_pressure_pipeline_latency_limit_ms: 4500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, + backpressure_proposal_delay_ms: 300, max_txns_from_block_to_execute: Some(30), }, PipelineBackpressureValues { back_pressure_pipeline_latency_limit_ms: 6000, - max_sending_block_txns_override: 1000, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backpressure_proposal_delay_ms: 500, + backpressure_proposal_delay_ms: 300, // in practice, latencies and delay make it such that ~2 blocks/s is max, // meaning that most aggressively we limit to ~10 TPS // For transactions that are more expensive than that, we should @@ -287,44 +300,45 @@ impl Default for ConsensusConfig { chain_health_backoff: vec![ ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 80, - max_sending_block_txns_override: 10000, + max_sending_block_txns_after_filtering_override: + MAX_SENDING_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: 5 * 1024 * 1024, backoff_proposal_delay_ms: 150, max_txns_from_block_to_execute: None, }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 78, - max_sending_block_txns_override: 2000, + max_sending_block_txns_after_filtering_override: 2000, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 76, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: None, }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 74, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backoff_proposal_delay_ms: 500, + backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: Some(100), }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 72, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backoff_proposal_delay_ms: 500, + backoff_proposal_delay_ms: 300, max_txns_from_block_to_execute: Some(25), }, ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 70, - max_sending_block_txns_override: 500, + max_sending_block_txns_after_filtering_override: MIN_BLOCK_TXNS_AFTER_FILTERING, max_sending_block_bytes_override: MIN_BLOCK_BYTES_OVERRIDE, - backoff_proposal_delay_ms: 500, + backoff_proposal_delay_ms: 300, // in practice, latencies and delay make it such that ~2 blocks/s is max, // meaning that most aggressively we limit to ~10 TPS // For transactions that are more expensive than that, we should @@ -424,7 +438,7 @@ impl ConsensusConfig { for backpressure_values in &config.pipeline_backpressure { recv_batch_send_block_pairs.push(( config.quorum_store.receiver_max_batch_txns as u64, - backpressure_values.max_sending_block_txns_override, + backpressure_values.max_sending_block_txns_after_filtering_override, format!( "backpressure {} ms: txns", backpressure_values.back_pressure_pipeline_latency_limit_ms, @@ -442,7 +456,7 @@ impl ConsensusConfig { for backoff_values in &config.chain_health_backoff { recv_batch_send_block_pairs.push(( config.quorum_store.receiver_max_batch_txns as u64, - backoff_values.max_sending_block_txns_override, + backoff_values.max_sending_block_txns_after_filtering_override, format!( "backoff {} %: txns", backoff_values.backoff_if_below_participating_voting_power_percentage, @@ -662,7 +676,7 @@ mod test { consensus: ConsensusConfig { pipeline_backpressure: vec![PipelineBackpressureValues { back_pressure_pipeline_latency_limit_ms: 0, - max_sending_block_txns_override: 350, + max_sending_block_txns_after_filtering_override: 350, max_sending_block_bytes_override: 0, backpressure_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, @@ -693,7 +707,7 @@ mod test { consensus: ConsensusConfig { pipeline_backpressure: vec![PipelineBackpressureValues { back_pressure_pipeline_latency_limit_ms: 0, - max_sending_block_txns_override: 251, + max_sending_block_txns_after_filtering_override: 251, max_sending_block_bytes_override: 100, backpressure_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, @@ -720,7 +734,7 @@ mod test { consensus: ConsensusConfig { chain_health_backoff: vec![ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 0, - max_sending_block_txns_override: 100, + max_sending_block_txns_after_filtering_override: 100, max_sending_block_bytes_override: 0, backoff_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, @@ -747,7 +761,7 @@ mod test { consensus: ConsensusConfig { chain_health_backoff: vec![ChainHealthBackoffValues { backoff_if_below_participating_voting_power_percentage: 0, - max_sending_block_txns_override: 0, + max_sending_block_txns_after_filtering_override: 0, max_sending_block_bytes_override: 100, backoff_proposal_delay_ms: 0, max_txns_from_block_to_execute: None, diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 6e442023b1ffac..b1e2cbba2c44dc 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; pub const BATCH_PADDING_BYTES: usize = 160; +pub const DEFEAULT_MAX_BATCH_TXNS: usize = 250; const DEFAULT_MAX_NUM_BATCHES: usize = 20; #[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)] @@ -101,7 +102,7 @@ impl Default for QuorumStoreConfig { batch_generation_poll_interval_ms: 25, batch_generation_min_non_empty_interval_ms: 200, batch_generation_max_interval_ms: 250, - sender_max_batch_txns: 250, + sender_max_batch_txns: DEFEAULT_MAX_BATCH_TXNS, // TODO: on next release, remove BATCH_PADDING_BYTES sender_max_batch_bytes: 1024 * 1024 - BATCH_PADDING_BYTES, sender_max_num_batches: DEFAULT_MAX_NUM_BATCHES, diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 595524d48ae1ee..bee98c827bdd16 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -312,15 +312,19 @@ impl Payload { Payload::InQuorumStoreWithLimit(proof_with_status) => { // here we return the actual length of the payload; limit is considered at the stage // where we prepare the block from the payload - (proof_with_status.proof_with_data.len() as u64).min(proof_with_status.max_txns_to_execute.unwrap_or(u64::MAX)) - }, - Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, max_txns_to_execute) => { - ((proof_with_data.len() - + inline_batches - .iter() - .map(|(_, txns)| txns.len()) - .sum::()) as u64).min(max_txns_to_execute.unwrap_or(u64::MAX)) + (proof_with_status.proof_with_data.len() as u64) + .min(proof_with_status.max_txns_to_execute.unwrap_or(u64::MAX)) }, + Payload::QuorumStoreInlineHybrid( + inline_batches, + proof_with_data, + max_txns_to_execute, + ) => ((proof_with_data.len() + + inline_batches + .iter() + .map(|(_, txns)| txns.len()) + .sum::()) as u64) + .min(max_txns_to_execute.unwrap_or(u64::MAX)), } } diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index 833757b0e3eb05..88f356723452e3 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -12,8 +12,11 @@ use crate::{ use aptos_crypto::hash::HashValue; use aptos_executor_types::StateComputeResult; use aptos_types::{ - block_info::BlockInfo, contract_event::ContractEvent, randomness::Randomness, - transaction::{SignedTransaction, TransactionStatus}, validator_txn::ValidatorTransaction, + block_info::BlockInfo, + contract_event::ContractEvent, + randomness::Randomness, + transaction::{SignedTransaction, TransactionStatus}, + validator_txn::ValidatorTransaction, }; use once_cell::sync::OnceCell; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -117,16 +120,22 @@ impl PipelinedBlock { match txn { TransactionStatus::Keep(_) => to_commit += 1, TransactionStatus::Retry => to_retry += 1, - _ => {} + _ => {}, } } - assert!(self.execution_summary.set(ExecutionSummary { - payload_len: self.block.payload().map_or(0, |payload| payload.len_for_execution()), - to_commit, - to_retry, - execution_time, - }).is_ok()); + assert!(self + .execution_summary + .set(ExecutionSummary { + payload_len: self + .block + .payload() + .map_or(0, |payload| payload.len_for_execution()), + to_commit, + to_retry, + execution_time, + }) + .is_ok()); self } diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index 2927a7eb5a1972..1c099e69aedc8a 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -19,8 +19,12 @@ use crate::{ }; use anyhow::{bail, ensure, format_err, Context}; use aptos_consensus_types::{ - block::Block, common::Round, pipelined_block::{ExecutionSummary, PipelinedBlock}, quorum_cert::QuorumCert, - sync_info::SyncInfo, timeout_2chain::TwoChainTimeoutCertificate, + block::Block, + common::Round, + pipelined_block::{ExecutionSummary, PipelinedBlock}, + quorum_cert::QuorumCert, + sync_info::SyncInfo, + timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo, }; use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue}; diff --git a/consensus/src/block_storage/mod.rs b/consensus/src/block_storage/mod.rs index bd7d1e303ab1ca..f11b2e2554f2a6 100644 --- a/consensus/src/block_storage/mod.rs +++ b/consensus/src/block_storage/mod.rs @@ -3,8 +3,11 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_consensus_types::{ - pipelined_block::{ExecutionSummary, PipelinedBlock}, quorum_cert::QuorumCert, sync_info::SyncInfo, - timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo, + pipelined_block::{ExecutionSummary, PipelinedBlock}, + quorum_cert::QuorumCert, + sync_info::SyncInfo, + timeout_2chain::TwoChainTimeoutCertificate, + wrapped_ledger_info::WrappedLedgerInfo, }; use aptos_crypto::HashValue; pub use block_store::{sync_manager::BlockRetriever, BlockStore}; diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index e5e245b48333ef..3410cbf2ee3f73 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -320,12 +320,11 @@ pub static CONSENSUS_PROPOSAL_PENDING_DURATION: Lazy = Lazy:: }); /// Amount of time (in seconds) proposal is delayed due to backpressure/backoff -pub static PROPOSER_DELAY_PROPOSAL: Lazy = Lazy::new(|| { - register_gauge!( +pub static PROPOSER_DELAY_PROPOSAL: Lazy = Lazy::new(|| { + register_avg_counter( "aptos_proposer_delay_proposal", "Amount of time (in seconds) proposal is delayed due to backpressure/backoff", ) - .unwrap() }); /// Histogram for max number of transactions (after filtering for dedup, expirations, etc) proposer uses when creating block. diff --git a/consensus/src/dag/health/chain_health.rs b/consensus/src/dag/health/chain_health.rs index 26e5d9af256be6..6b9845407ae7b1 100644 --- a/consensus/src/dag/health/chain_health.rs +++ b/consensus/src/dag/health/chain_health.rs @@ -84,7 +84,7 @@ impl TChainHealth for ChainHealthBackoff { chain_health_backoff.map(|value| { ( - value.max_sending_block_txns_override, + value.max_sending_block_txns_after_filtering_override, value.max_sending_block_bytes_override, ) }) diff --git a/consensus/src/dag/health/pipeline_health.rs b/consensus/src/dag/health/pipeline_health.rs index 3668fd68eec37f..f3f5cf5b00fcd8 100644 --- a/consensus/src/dag/health/pipeline_health.rs +++ b/consensus/src/dag/health/pipeline_health.rs @@ -68,7 +68,7 @@ impl TPipelineHealth for PipelineLatencyBasedBackpressure { let latency = self.adapter.pipeline_pending_latency(); self.pipeline_config.get_backoff(latency).map(|config| { ( - config.max_sending_block_txns_override, + config.max_sending_block_txns_after_filtering_override, config.max_sending_block_bytes_override, ) }) diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index e5f1fc453975a8..543799fe91b158 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -845,7 +845,7 @@ impl EpochManager

{ self.time_service.clone(), Duration::from_millis(self.config.quorum_store_poll_time_ms), self.config.max_sending_block_txns, - self.config.max_sending_block_unique_txns, + self.config.max_sending_block_txns_after_filtering, self.config.max_sending_block_bytes, self.config.max_sending_inline_txns, self.config.max_sending_inline_bytes, diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index af0bc1b9ffe406..b5ef3c2e1c5106 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -8,10 +8,10 @@ use super::{ use crate::{ block_storage::BlockReader, counters::{ - CHAIN_HEALTH_BACKOFF_TRIGGERED, PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, - PROPOSER_DELAY_PROPOSAL, PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING, - PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE, PROPOSER_PENDING_BLOCKS_COUNT, - PROPOSER_PENDING_BLOCKS_FILL_FRACTION, + CHAIN_HEALTH_BACKOFF_TRIGGERED, EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, + PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, PROPOSER_DELAY_PROPOSAL, + PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING, PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE, + PROPOSER_PENDING_BLOCKS_COUNT, PROPOSER_PENDING_BLOCKS_FILL_FRACTION, }, payload_client::PayloadClient, util::time_service::TimeService, @@ -21,7 +21,11 @@ use aptos_config::config::{ ChainHealthBackoffValues, ExecutionBackpressureConfig, PipelineBackpressureValues, }; use aptos_consensus_types::{ - block::Block, block_data::BlockData, common::{Author, Payload, PayloadFilter, Round}, pipelined_block::ExecutionSummary, quorum_cert::QuorumCert + block::Block, + block_data::BlockData, + common::{Author, Payload, PayloadFilter, Round}, + pipelined_block::ExecutionSummary, + quorum_cert::QuorumCert, }; use aptos_crypto::{hash::CryptoHash, HashValue}; use aptos_logger::{error, info, sample, sample::SampleRate, warn}; @@ -161,11 +165,27 @@ impl PipelineBackpressureConfig { let sizes = block_execution_times .iter() .flat_map(|summary| { + // for each block, compute target (re-calibrated) block size + let execution_time_ms = summary.execution_time.as_millis(); + // Only block above the time threshold are considered giving enough signal to support calibration + // so we filter out shorter locks if execution_time_ms > config.min_block_time_ms_to_activate as u128 { + // TODO: After cost of "retries" is reduced with execution pool, we + // should be computing block gas limit here, simply as: + // `config.target_block_time_ms / execution_time_ms * gas_consumed_by_block`` + // + // Until then, we need to compute wanted block size to create. + // Unfortunatelly, there is multiple layers where transactions are filtered. + // After deduping/reordering logic is applied, max_txns_to_execute limits the transactions + // passed to executor (`summary.payload_len` here), and then some are discarded for various + // reasons, which we approximate are cheaply ignored. + // For the rest, only `summary.to_commit` fraction of `summary.to_commit + summary.to_retry` + // was executed. And so assuming same discard rate, we scale `summary.payload_len` with it. Some( ((config.target_block_time_ms as f64 / execution_time_ms as f64 - * (summary.to_commit as f64 / (summary.to_commit + summary.to_retry) as f64) + * (summary.to_commit as f64 + / (summary.to_commit + summary.to_retry) as f64) * summary.payload_len as f64) .floor() as u64) .max(1), @@ -179,9 +199,11 @@ impl PipelineBackpressureConfig { .collect::>(); info!("Estimated block back-offs block sizes: {:?}", sizes); if sizes.len() >= config.min_blocks_to_activate { - Some(*sizes - .get((config.percentile * sizes.len() as f64) as usize) - .unwrap()) + Some( + *sizes + .get((config.percentile * sizes.len() as f64) as usize) + .unwrap(), + ) } else { None } @@ -379,7 +401,7 @@ impl ProposalGenerator { max_txns_from_block_to_execute.unwrap_or(max_block_txns_after_filtering) as f64, ); - PROPOSER_DELAY_PROPOSAL.set(proposal_delay.as_secs_f64()); + PROPOSER_DELAY_PROPOSAL.observe(proposal_delay.as_secs_f64()); if !proposal_delay.is_zero() { tokio::time::sleep(proposal_delay).await; } @@ -476,7 +498,7 @@ impl ProposalGenerator { timestamp: Duration, round: Round, ) -> (u64, u64, Option, Duration) { - let mut values_max_block_txns = vec![self.max_block_txns_after_filtering]; + let mut values_max_block_txns_after_filtering = vec![self.max_block_txns_after_filtering]; let mut values_max_block_bytes = vec![self.max_block_bytes]; let mut values_proposal_delay = vec![Duration::ZERO]; let mut values_max_txns_from_block_to_execute = vec![]; @@ -485,7 +507,8 @@ impl ProposalGenerator { .chain_health_backoff_config .get_backoff(voting_power_ratio); if let Some(value) = chain_health_backoff { - values_max_block_txns.push(value.max_sending_block_txns_override); + values_max_block_txns_after_filtering + .push(value.max_sending_block_txns_after_filtering_override); values_max_block_bytes.push(value.max_sending_block_bytes_override); if let Some(val) = value.max_txns_from_block_to_execute { values_max_txns_from_block_to_execute.push(val); @@ -501,7 +524,8 @@ impl ProposalGenerator { .pipeline_backpressure_config .get_backoff(pipeline_pending_latency); if let Some(value) = pipeline_backpressure { - values_max_block_txns.push(value.max_sending_block_txns_override); + values_max_block_txns_after_filtering + .push(value.max_sending_block_txns_after_filtering_override); values_max_block_bytes.push(value.max_sending_block_bytes_override); if let Some(val) = value.max_txns_from_block_to_execute { values_max_txns_from_block_to_execute.push(val); @@ -525,7 +549,7 @@ impl ProposalGenerator { .get_recent_block_execution_times(config.num_blocks_to_look_at), ); if let Some(execution_backpressure_block_size) = execution_backpressure { - values_max_block_txns.push( + values_max_block_txns_after_filtering.push( (execution_backpressure_block_size as f64 * config.reordering_ovarpacking_factor.max(1.0)) as u64, @@ -543,11 +567,16 @@ impl ProposalGenerator { }, ); - let max_block_txns = values_max_block_txns.into_iter().min().unwrap(); + let max_block_txns = values_max_block_txns_after_filtering + .into_iter() + .min() + .unwrap(); let max_block_bytes = values_max_block_bytes.into_iter().min().unwrap(); let proposal_delay = values_proposal_delay.into_iter().max().unwrap(); - let max_txns_from_block_to_execute = - values_max_txns_from_block_to_execute.into_iter().min().filter(|v| *v < max_block_txns); + let max_txns_from_block_to_execute = values_max_txns_from_block_to_execute + .into_iter() + .min() + .filter(|v| *v < max_block_txns); warn!( pipeline_pending_latency = pipeline_pending_latency.as_millis(), diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 58a503571b8904..27c01ca1494464 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -429,7 +429,7 @@ impl StateComputeResult { pub fn transactions_to_commit_len(&self) -> usize { self.compute_status_for_input_txns() .iter() - .filter(|status| if let TransactionStatus::Keep(_) = status { true } else { false }) + .filter(|status| matches!(status, TransactionStatus::Keep(_))) .count() // StateCheckpoint/BlockEpilogue is added if there is no reconfiguration + (if self.has_reconfiguration() { 0 } else { 1 }) diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index cb69c32b37e8a3..f043cd58e391a0 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -829,7 +829,7 @@ fn optimize_for_maximum_throughput( ) { mempool_config_practically_non_expiring(&mut config.mempool); - config.consensus.max_sending_block_unique_txns = max_txns_per_block as u64; + config.consensus.max_sending_block_txns_after_filtering = max_txns_per_block as u64; config.consensus.max_sending_block_txns = config .consensus .max_sending_block_txns @@ -2253,9 +2253,10 @@ pub fn changing_working_quorum_test_helper( } else { for (i, item) in chain_health_backoff.iter_mut().enumerate() { // as we have lower TPS, make limits smaller - item.max_sending_block_txns_override = + item.max_sending_block_txns_after_filtering_override = (block_size / 2_u64.pow(i as u32 + 1)).max(2); - min_block_txns = min_block_txns.min(item.max_sending_block_txns_override); + min_block_txns = + min_block_txns.min(item.max_sending_block_txns_after_filtering_override); // as we have fewer nodes, make backoff triggered earlier: item.backoff_if_below_participating_voting_power_percentage = 90 - i * 5; }