From 1b0fb831745ec8492b759c38900d895d42bb841c Mon Sep 17 00:00:00 2001 From: Zekun Li Date: Mon, 2 Dec 2024 15:18:22 -0800 Subject: [PATCH] [pipeline] switch from broadcast channel to shared future --- .../consensus-types/src/pipelined_block.rs | 8 +- .../src/pipeline/execution_schedule_phase.rs | 2 +- consensus/src/pipeline/persisting_phase.rs | 4 +- consensus/src/pipeline/pipeline_builder.rs | 80 ++++++++++--------- 4 files changed, 51 insertions(+), 43 deletions(-) diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index f9c17d378104a..10fa28f9227ba 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -107,15 +107,15 @@ impl PipelineFutures { pub struct PipelineInputTx { pub rand_tx: Option>>, pub order_vote_tx: Option>, - pub order_proof_tx: tokio::sync::broadcast::Sender<()>, - pub commit_proof_tx: tokio::sync::broadcast::Sender, + pub order_proof_tx: Option>, + pub commit_proof_tx: Option>, } pub struct PipelineInputRx { pub rand_rx: oneshot::Receiver>, pub order_vote_rx: oneshot::Receiver<()>, - pub order_proof_rx: tokio::sync::broadcast::Receiver<()>, - pub commit_proof_rx: tokio::sync::broadcast::Receiver, + pub order_proof_fut: TaskFuture<()>, + pub commit_proof_fut: TaskFuture, } /// A representation of a block that has been added to the execution pipeline. It might either be in ordered diff --git a/consensus/src/pipeline/execution_schedule_phase.rs b/consensus/src/pipeline/execution_schedule_phase.rs index 7f5dbfefe72f7..42a673a0ea4a5 100644 --- a/consensus/src/pipeline/execution_schedule_phase.rs +++ b/consensus/src/pipeline/execution_schedule_phase.rs @@ -79,7 +79,7 @@ impl StatelessPipeline for ExecutionSchedulePhase { for b in &ordered_blocks { if let Some(tx) = b.pipeline_tx().lock().as_mut() { tx.rand_tx.take().map(|tx| tx.send(b.randomness().cloned())); - let _ = tx.order_proof_tx.send(()); + tx.order_proof_tx.take().map(|tx| tx.send(())); } } diff --git a/consensus/src/pipeline/persisting_phase.rs b/consensus/src/pipeline/persisting_phase.rs index 6c536f4ca0544..a038c05db1672 100644 --- a/consensus/src/pipeline/persisting_phase.rs +++ b/consensus/src/pipeline/persisting_phase.rs @@ -75,7 +75,9 @@ impl StatelessPipeline for PersistingPhase { { for b in &blocks { if let Some(tx) = b.pipeline_tx().lock().as_mut() { - let _ = tx.commit_proof_tx.send(commit_ledger_info.clone()); + tx.commit_proof_tx + .take() + .map(|tx| tx.send(commit_ledger_info.clone())); } b.wait_for_commit_ledger().await; } diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index 07cbf4673b49a..cb6a90d463498 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -165,23 +165,39 @@ impl PipelineBuilder { } } - fn channel() -> (PipelineInputTx, PipelineInputRx) { + fn channel(abort_handles: &mut Vec) -> (PipelineInputTx, PipelineInputRx) { let (rand_tx, rand_rx) = oneshot::channel(); let (order_vote_tx, order_vote_rx) = oneshot::channel(); - let (order_proof_tx, order_proof_rx) = tokio::sync::broadcast::channel(1); - let (commit_proof_tx, commit_proof_rx) = tokio::sync::broadcast::channel(1); + let (order_proof_tx, order_proof_fut) = oneshot::channel(); + let (commit_proof_tx, commit_proof_fut) = oneshot::channel(); + let order_proof_fut = spawn_shared_fut( + async move { + order_proof_fut + .await + .map_err(|_| TaskError::from(anyhow!("order proof tx cancelled"))) + }, + abort_handles, + ); + let commit_proof_fut = spawn_shared_fut( + async move { + commit_proof_fut + .await + .map_err(|_| TaskError::from(anyhow!("commit proof tx cancelled"))) + }, + abort_handles, + ); ( PipelineInputTx { rand_tx: Some(rand_tx), order_vote_tx: Some(order_vote_tx), - order_proof_tx, - commit_proof_tx, + order_proof_tx: Some(order_proof_tx), + commit_proof_tx: Some(commit_proof_tx), }, PipelineInputRx { rand_rx, order_vote_rx, - order_proof_rx, - commit_proof_rx, + order_proof_fut, + commit_proof_fut, }, ) } @@ -242,16 +258,15 @@ impl PipelineBuilder { block: Arc, block_store_callback: Box, ) -> (PipelineFutures, PipelineInputTx, Vec) { - let (tx, rx) = Self::channel(); + let mut abort_handles = vec![]; + let (tx, rx) = Self::channel(&mut abort_handles); let PipelineInputRx { rand_rx, order_vote_rx, - order_proof_rx, - commit_proof_rx, + order_proof_fut, + commit_proof_fut, } = rx; - let mut abort_handles = vec![]; - let prepare_fut = spawn_shared_fut( Self::prepare(self.block_preparer.clone(), block.clone()), &mut abort_handles, @@ -282,8 +297,8 @@ impl PipelineBuilder { Self::sign_commit_vote( ledger_update_fut.clone(), order_vote_rx, - order_proof_rx.resubscribe(), - commit_proof_rx.resubscribe(), + order_proof_fut.clone(), + commit_proof_fut.clone(), self.signer.clone(), block.clone(), ), @@ -293,8 +308,8 @@ impl PipelineBuilder { Self::pre_commit( ledger_update_fut.clone(), parent.pre_commit_fut.clone(), - order_proof_rx, - commit_proof_rx.resubscribe(), + order_proof_fut, + commit_proof_fut.clone(), self.executor.clone(), block.clone(), ), @@ -303,7 +318,7 @@ impl PipelineBuilder { let commit_ledger_fut = spawn_shared_fut( Self::commit_ledger( pre_commit_fut.clone(), - commit_proof_rx, + commit_proof_fut, parent.commit_ledger_fut.clone(), self.executor.clone(), block.clone(), @@ -530,19 +545,19 @@ impl PipelineBuilder { async fn sign_commit_vote( ledger_update_phase: TaskFuture, order_vote_rx: oneshot::Receiver<()>, - mut order_proof_rx: tokio::sync::broadcast::Receiver<()>, - mut commit_proof_rx: tokio::sync::broadcast::Receiver, + order_proof_fut: TaskFuture<()>, + commit_proof_fut: TaskFuture, signer: Arc, block: Arc, ) -> TaskResult { let (compute_result, _, epoch_end_timestamp) = ledger_update_phase.await?; - // either order_vote_rx or order_proof_rx can trigger the next phase + // either order_vote_rx or order_proof_fut can trigger the next phase select! { Ok(_) = order_vote_rx => { } - Ok(_) = order_proof_rx.recv() => { + Ok(_) = order_proof_fut => { } - Ok(_) = commit_proof_rx.recv() => { + Ok(_) = commit_proof_fut => { } else => { return Err(anyhow!("all receivers dropped"))?; @@ -580,24 +595,18 @@ impl PipelineBuilder { ledger_update_phase: TaskFuture, // TODO bound parent_commit_ledger too parent_block_pre_commit_phase: TaskFuture, - mut order_proof_rx: tokio::sync::broadcast::Receiver<()>, - mut commit_proof_rx: tokio::sync::broadcast::Receiver, + order_proof_fut: TaskFuture<()>, + commit_proof_fut: TaskFuture, executor: Arc, block: Arc, ) -> TaskResult { let (compute_result, _, _) = ledger_update_phase.await?; parent_block_pre_commit_phase.await?; - order_proof_rx - .recv() - .await - .map_err(|_| anyhow!("order proof tx cancelled"))?; + order_proof_fut.await?; if compute_result.has_reconfiguration() { - commit_proof_rx - .recv() - .await - .map_err(|_| anyhow!("commit proof tx cancelled"))?; + commit_proof_fut.await?; } let _tracker = Tracker::new("pre_commit", &block); @@ -644,17 +653,14 @@ impl PipelineBuilder { /// What it does: Commit the ledger info to storage, this makes the data visible for clients async fn commit_ledger( pre_commit_fut: TaskFuture, - mut commit_proof_rx: tokio::sync::broadcast::Receiver, + commit_proof_fut: TaskFuture, parent_block_commit_phase: TaskFuture, executor: Arc, block: Arc, ) -> TaskResult { parent_block_commit_phase.await?; pre_commit_fut.await?; - let ledger_info_with_sigs = commit_proof_rx - .recv() - .await - .map_err(|_| anyhow!("commit rx cancelled"))?; + let ledger_info_with_sigs = commit_proof_fut.await?; // it's committed as prefix if ledger_info_with_sigs.commit_info().id() != block.id() {