Skip to content

Commit

Permalink
[pipeline] fixes
Browse files Browse the repository at this point in the history
[pipeline] expect cancellation error
[pipeline] block callback until notification finishes
  • Loading branch information
Zekun Li authored and Zekun Li committed Dec 5, 2024
1 parent c2969f0 commit 3312f4c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
11 changes: 7 additions & 4 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,9 @@ impl PipelinedBlock {

pub async fn wait_for_compute_result(&self) -> ExecutorResult<(StateComputeResult, Duration)> {
self.pipeline_futs()
.expect("Pipeline needs to be enabled")
.ok_or(ExecutorError::InternalError {
error: "Pipeline aborted".to_string(),
})?
.ledger_update_fut
.await
.map(|(compute_result, execution_time, _)| (compute_result, execution_time))
Expand All @@ -471,11 +473,12 @@ impl PipelinedBlock {
}

pub async fn wait_for_commit_ledger(&self) {
self.pipeline_futs()
// this may be cancelled
let _ = self
.pipeline_futs()
.expect("Pipeline needs to be enabled")
.commit_ledger_fut
.await
.expect("Commit ledger should succeed");
.await;
}
}

Expand Down
5 changes: 4 additions & 1 deletion consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ impl PipelineBuilder {
pre_commit_fut.clone(),
commit_ledger_fut.clone(),
parent.post_commit_fut.clone(),
parent.post_pre_commit_fut.clone(),
self.payload_manager.clone(),
block_store_callback,
block.clone(),
Expand Down Expand Up @@ -679,11 +680,12 @@ impl PipelineBuilder {
Ok(Some(ledger_info_with_sigs))
}

/// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes
/// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes 3. post pre commit finishes
/// What it does: Update counters for the block, and notify block tree about the commit
async fn post_commit_ledger(
pre_commit_fut: TaskFuture<PreCommitResult>,
commit_ledger_fut: TaskFuture<CommitLedgerResult>,
post_pre_commit_fut: TaskFuture<PostPreCommitResult>,
parent_post_commit: TaskFuture<PostCommitResult>,
payload_manager: Arc<dyn TPayloadManager>,
block_store_callback: Box<dyn FnOnce(LedgerInfoWithSignatures) + Send + Sync>,
Expand All @@ -692,6 +694,7 @@ impl PipelineBuilder {
parent_post_commit.await?;
let maybe_ledger_info_with_sigs = commit_ledger_fut.await?;
let compute_result = pre_commit_fut.await?;
post_pre_commit_fut.await?;

let _tracker = Tracker::new("post_commit_ledger", &block);
update_counters_for_block(&block);
Expand Down

0 comments on commit 3312f4c

Please sign in to comment.