From fe763391b1049e388e5abea80f4f4e3ba089a59c Mon Sep 17 00:00:00 2001 From: Zekun Li Date: Wed, 4 Dec 2024 16:15:25 -0800 Subject: [PATCH 1/2] [pipeline] fixes [pipeline] expect cancellation error [pipeline] block callback until notification finishes [pipeline] avoid panic when parent fut doesn't exist during epoch ending --- .../consensus-types/src/pipelined_block.rs | 26 +++++++++++++------ consensus/src/block_storage/block_store.rs | 6 ++--- consensus/src/pipeline/pipeline_builder.rs | 5 +++- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index 10fa28f9227ba..b0cc069aff768 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -17,7 +17,7 @@ use aptos_executor_types::{ state_compute_result::StateComputeResult, ExecutorError, ExecutorResult, }; use aptos_infallible::Mutex; -use aptos_logger::{error, warn}; +use aptos_logger::{error, info, warn}; use aptos_types::{ block_info::BlockInfo, contract_event::ContractEvent, @@ -427,7 +427,9 @@ impl PipelinedBlock { /// Pipeline related functions impl PipelinedBlock { pub fn pipeline_enabled(&self) -> bool { - self.pipeline_futs.lock().is_some() + // if the pipeline_tx is set, the pipeline is enabled, + // we don't use pipeline fut here because it can't be taken when abort + self.pipeline_tx.lock().is_some() } pub fn pipeline_futs(&self) -> Option { @@ -451,6 +453,12 @@ impl PipelinedBlock { } pub fn abort_pipeline(&self) -> Option { + info!( + "[Pipeline] Aborting pipeline for block {} {} {}", + self.id(), + self.epoch(), + self.round() + ); if let Some(abort_handles) = self.pipeline_abort_handle.lock().take() { for handle in abort_handles { handle.abort(); @@ -461,7 +469,9 @@ impl PipelinedBlock { pub async fn wait_for_compute_result(&self) -> ExecutorResult<(StateComputeResult, Duration)> { self.pipeline_futs() - .expect("Pipeline needs to be enabled") + .ok_or(ExecutorError::InternalError { + error: "Pipeline aborted".to_string(), + })? .ledger_update_fut .await .map(|(compute_result, execution_time, _)| (compute_result, execution_time)) @@ -471,11 +481,11 @@ impl PipelinedBlock { } pub async fn wait_for_commit_ledger(&self) { - self.pipeline_futs() - .expect("Pipeline needs to be enabled") - .commit_ledger_fut - .await - .expect("Commit ledger should succeed"); + // may be aborted (e.g. by reset) + if let Some(fut) = self.pipeline_futs() { + // this may be cancelled + let _ = fut.commit_ledger_fut.await; + } } } diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index ae20e0a5b01cc..b8c8146121e0a 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -400,9 +400,9 @@ impl BlockStore { }); pipeline_builder.build( &pipelined_block, - parent_block - .pipeline_futs() - .expect("Futures should exist when pipeline enabled"), + parent_block.pipeline_futs().ok_or_else(|| { + anyhow::anyhow!("Parent future doesn't exist, potentially epoch ended") + })?, callback, ); } diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index cb6a90d463498..94577fad0511b 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -348,6 +348,7 @@ impl PipelineBuilder { Self::post_commit_ledger( pre_commit_fut.clone(), commit_ledger_fut.clone(), + post_pre_commit_fut.clone(), parent.post_commit_fut.clone(), self.payload_manager.clone(), block_store_callback, @@ -679,11 +680,12 @@ impl PipelineBuilder { Ok(Some(ledger_info_with_sigs)) } - /// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes + /// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes 3. post pre commit finishes /// What it does: Update counters for the block, and notify block tree about the commit async fn post_commit_ledger( pre_commit_fut: TaskFuture, commit_ledger_fut: TaskFuture, + post_pre_commit_fut: TaskFuture, parent_post_commit: TaskFuture, payload_manager: Arc, block_store_callback: Box, @@ -692,6 +694,7 @@ impl PipelineBuilder { parent_post_commit.await?; let maybe_ledger_info_with_sigs = commit_ledger_fut.await?; let compute_result = pre_commit_fut.await?; + post_pre_commit_fut.await?; let _tracker = Tracker::new("post_commit_ledger", &block); update_counters_for_block(&block); From 02e57fa4f0a6ef969e05099606f74dc6bddcbde3 Mon Sep 17 00:00:00 2001 From: Zekun Li Date: Thu, 5 Dec 2024 14:31:06 -0800 Subject: [PATCH 2/2] [Pipeline] add counters --- consensus/src/counters.rs | 15 ++++- consensus/src/lib.rs | 2 +- consensus/src/pipeline/pipeline_builder.rs | 74 ++++++++++++++++------ 3 files changed, 69 insertions(+), 22 deletions(-) diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index f55d9d4ff9f37..8c901c7bbd599 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -836,7 +836,7 @@ pub static NUM_BYTES_PER_BLOCK: Lazy = Lazy::new(|| { // * 0.3 to 2.0: step 0.1 // * 2.0 to 4.0: step 0.2 // * 4.0 to 7.5: step 0.5 -const BLOCK_TRACING_BUCKETS: &[f64] = &[ +const TRACING_BUCKETS: &[f64] = &[ 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0, 2.2, 2.4, 2.6, 2.8, 3.0, 3.2, 3.4, 3.6, 3.8, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 10.0, @@ -848,7 +848,18 @@ pub static BLOCK_TRACING: Lazy = Lazy::new(|| { "aptos_consensus_block_tracing", "Histogram for different stages of a block", &["stage"], - BLOCK_TRACING_BUCKETS.to_vec() + TRACING_BUCKETS.to_vec() + ) + .unwrap() +}); + +/// Traces pipeline stages +pub static PIPELINE_TRACING: Lazy = Lazy::new(|| { + register_histogram_vec!( + "aptos_consensus_pipeline_tracing", + "Histogram for different stages of a block's pipeline", + &["stage", "type"], + TRACING_BUCKETS.to_vec() ) .unwrap() }); diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index aa1bc3234558e..a38b66912e191 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -75,7 +75,7 @@ pub use quorum_store::quorum_store_db::QUORUM_STORE_DB_NAME; #[cfg(feature = "fuzzing")] pub use round_manager::round_manager_fuzzing; -struct IntGaugeGuard { +pub struct IntGaugeGuard { gauge: IntGauge, } diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index 94577fad0511b..a71f9927fed3c 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -10,6 +10,7 @@ use crate::{ monitor, payload_manager::TPayloadManager, txn_notifier::TxnNotifier, + IntGaugeGuard, }; use anyhow::anyhow; use aptos_consensus_notifications::ConsensusNotificationSender; @@ -104,18 +105,30 @@ struct Tracker { block_id: HashValue, epoch: u64, round: Round, + created_at: Instant, + started_at: Option, + running_guard: Option, } impl Tracker { - pub fn new(name: &'static str, block: &Block) -> Self { - let ret = Self { + fn start_waiting(name: &'static str, block: &Block) -> Self { + Self { name, block_id: block.id(), epoch: block.epoch(), round: block.round(), - }; - ret.log_start(); - ret + created_at: Instant::now(), + started_at: None, + running_guard: None, + } + } + + fn start_working(&mut self) { + self.started_at = Some(Instant::now()); + self.running_guard = Some(IntGaugeGuard::new( + counters::OP_COUNTERS.gauge(&format!("{}_running", self.name)), + )); + self.log_start(); } fn log_start(&self) { @@ -126,9 +139,25 @@ impl Tracker { } fn log_end(&self) { + let Some(started_at) = self.started_at else { + return; + }; + let wait_time = started_at.duration_since(self.created_at); + let work_time = Instant::now().duration_since(started_at); + counters::PIPELINE_TRACING + .with_label_values(&[self.name, "wait_time"]) + .observe(wait_time.as_secs_f64()); + counters::PIPELINE_TRACING + .with_label_values(&[self.name, "work_time"]) + .observe(work_time.as_secs_f64()); info!( - "[Pipeline] Block {} {} {} finishes {}", - self.block_id, self.epoch, self.round, self.name + "[Pipeline] Block {} {} {} finishes {}, waits {}, takes {}", + self.block_id, + self.epoch, + self.round, + self.name, + wait_time.as_millis(), + work_time.as_millis() ); } } @@ -139,7 +168,6 @@ impl Drop for Tracker { } } -// TODO: add counters for each phase impl PipelineBuilder { pub fn new( block_preparer: Arc, @@ -379,7 +407,8 @@ impl PipelineBuilder { /// Precondition: Block is inserted into block tree (all ancestors are available) /// What it does: Wait for all data becomes available and verify transaction signatures async fn prepare(preparer: Arc, block: Arc) -> TaskResult { - let _tracker = Tracker::new("prepare", &block); + let mut tracker = Tracker::start_waiting("prepare", &block); + tracker.start_working(); // the loop can only be abort by the caller let input_txns = loop { match preparer.prepare_block(&block).await { @@ -420,13 +449,14 @@ impl PipelineBuilder { validator: Arc<[AccountAddress]>, onchain_execution_config: BlockExecutorConfigFromOnchain, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("execute", &block); parent_block_execute_phase.await?; let user_txns = prepare_phase.await?; let maybe_rand = randomness_rx .await .map_err(|_| anyhow!("randomness tx cancelled"))?; - let _tracker = Tracker::new("execute", &block); + tracker.start_working(); let metadata_txn = if is_randomness_enabled { block.new_metadata_with_randomness(&validator, maybe_rand) } else { @@ -471,9 +501,11 @@ impl PipelineBuilder { executor: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("ledger_update", &block); let (_, _, prev_epoch_end_timestamp) = parent_block_ledger_update_phase.await?; let execution_time = execute_phase.await?; - let _tracker = Tracker::new("ledger_update", &block); + + tracker.start_working(); let timestamp = block.timestamp_usecs(); let result = tokio::task::spawn_blocking(move || { executor @@ -501,10 +533,11 @@ impl PipelineBuilder { mempool_notifier: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("post_ledger_update", &block); let user_txns = prepare_fut.await?; let (compute_result, _, _) = ledger_update.await?; - let _tracker = Tracker::new("post_ledger_update", &block); + tracker.start_working(); let compute_status = compute_result.compute_status_for_input_txns(); // the length of compute_status is user_txns.len() + num_vtxns + 1 due to having blockmetadata if user_txns.len() >= compute_status.len() { @@ -551,6 +584,7 @@ impl PipelineBuilder { signer: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("sign_commit_vote", &block); let (compute_result, _, epoch_end_timestamp) = ledger_update_phase.await?; // either order_vote_rx or order_proof_fut can trigger the next phase select! { @@ -564,8 +598,8 @@ impl PipelineBuilder { return Err(anyhow!("all receivers dropped"))?; } } + tracker.start_working(); - let _tracker = Tracker::new("sign_commit_vote", &block); let mut block_info = block.gen_block_info( compute_result.root_hash(), compute_result.last_version_or_0(), @@ -601,6 +635,7 @@ impl PipelineBuilder { executor: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("pre_commit", &block); let (compute_result, _, _) = ledger_update_phase.await?; parent_block_pre_commit_phase.await?; @@ -610,7 +645,7 @@ impl PipelineBuilder { commit_proof_fut.await?; } - let _tracker = Tracker::new("pre_commit", &block); + tracker.start_working(); tokio::task::spawn_blocking(move || { executor .pre_commit_block(block.id()) @@ -630,12 +665,11 @@ impl PipelineBuilder { state_sync_notifier: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("post_pre_commit", &block); let compute_result = pre_commit.await?; parent_post_pre_commit.await?; - let _tracker = Tracker::new("post_pre_commit", &block); - let _timer = counters::OP_COUNTERS.timer("pre_commit_notify"); - + tracker.start_working(); let txns = compute_result.transactions_to_commit().to_vec(); let subscribable_events = compute_result.subscribable_events().to_vec(); if let Err(e) = monitor!( @@ -659,6 +693,7 @@ impl PipelineBuilder { executor: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("commit_ledger", &block); parent_block_commit_phase.await?; pre_commit_fut.await?; let ledger_info_with_sigs = commit_proof_fut.await?; @@ -668,7 +703,7 @@ impl PipelineBuilder { return Ok(None); } - let _tracker = Tracker::new("commit_ledger", &block); + tracker.start_working(); let ledger_info_with_sigs_clone = ledger_info_with_sigs.clone(); tokio::task::spawn_blocking(move || { executor @@ -691,12 +726,13 @@ impl PipelineBuilder { block_store_callback: Box, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("post_commit_ledger", &block); parent_post_commit.await?; let maybe_ledger_info_with_sigs = commit_ledger_fut.await?; let compute_result = pre_commit_fut.await?; post_pre_commit_fut.await?; - let _tracker = Tracker::new("post_commit_ledger", &block); + tracker.start_working(); update_counters_for_block(&block); update_counters_for_compute_result(&compute_result);