From 3312f4c6355d004e33ccc289220c5ec213cb403c Mon Sep 17 00:00:00 2001 From: Zekun Li Date: Wed, 4 Dec 2024 16:15:25 -0800 Subject: [PATCH] [pipeline] fixes [pipeline] expect cancellation error [pipeline] block callback until notification finishes --- consensus/consensus-types/src/pipelined_block.rs | 11 +++++++---- consensus/src/pipeline/pipeline_builder.rs | 5 ++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index 10fa28f9227ba4..ad0f62beda04fc 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -461,7 +461,9 @@ impl PipelinedBlock { pub async fn wait_for_compute_result(&self) -> ExecutorResult<(StateComputeResult, Duration)> { self.pipeline_futs() - .expect("Pipeline needs to be enabled") + .ok_or(ExecutorError::InternalError { + error: "Pipeline aborted".to_string(), + })? .ledger_update_fut .await .map(|(compute_result, execution_time, _)| (compute_result, execution_time)) @@ -471,11 +473,12 @@ impl PipelinedBlock { } pub async fn wait_for_commit_ledger(&self) { - self.pipeline_futs() + // this may be cancelled + let _ = self + .pipeline_futs() .expect("Pipeline needs to be enabled") .commit_ledger_fut - .await - .expect("Commit ledger should succeed"); + .await; } } diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index cb6a90d4634981..3d61dd50f146d6 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -349,6 +349,7 @@ impl PipelineBuilder { pre_commit_fut.clone(), commit_ledger_fut.clone(), parent.post_commit_fut.clone(), + parent.post_pre_commit_fut.clone(), self.payload_manager.clone(), block_store_callback, block.clone(), @@ -679,11 +680,12 @@ impl PipelineBuilder { Ok(Some(ledger_info_with_sigs)) } - /// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes + /// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes 3. post pre commit finishes /// What it does: Update counters for the block, and notify block tree about the commit async fn post_commit_ledger( pre_commit_fut: TaskFuture, commit_ledger_fut: TaskFuture, + post_pre_commit_fut: TaskFuture, parent_post_commit: TaskFuture, payload_manager: Arc, block_store_callback: Box, @@ -692,6 +694,7 @@ impl PipelineBuilder { parent_post_commit.await?; let maybe_ledger_info_with_sigs = commit_ledger_fut.await?; let compute_result = pre_commit_fut.await?; + post_pre_commit_fut.await?; let _tracker = Tracker::new("post_commit_ledger", &block); update_counters_for_block(&block);