Skip to content

Commit

Permalink
[pipeline] integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Zekun Li authored and Zekun Li committed Nov 4, 2024
1 parent 5ebd28c commit e03085f
Show file tree
Hide file tree
Showing 13 changed files with 598 additions and 232 deletions.
95 changes: 68 additions & 27 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use crate::{
block::Block,
common::{Payload, Round},
order_vote_proposal::OrderVoteProposal,
pipeline::commit_vote::CommitVote,
pipeline_execution_result::PipelineExecutionResult,
quorum_cert::QuorumCert,
vote_proposal::VoteProposal,
};
use anyhow::Error;
use aptos_crypto::hash::{HashValue, ACCUMULATOR_PLACEHOLDER_HASH};
use aptos_crypto::{
bls12381,
hash::{HashValue, ACCUMULATOR_PLACEHOLDER_HASH},
};
use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorResult};
use aptos_infallible::Mutex;
use aptos_logger::{error, warn};
Expand All @@ -36,9 +38,12 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::oneshot, task::JoinError};
use tokio::{
sync::oneshot,
task::{AbortHandle, JoinError},
};

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum TaskError {
JoinError(Arc<JoinError>),
InternalError(Arc<Error>),
Expand All @@ -61,21 +66,22 @@ impl From<Error> for TaskError {
pub type TaskResult<T> = Result<T, TaskError>;
pub type TaskFuture<T> = Shared<BoxFuture<'static, TaskResult<T>>>;

#[derive(Clone)]
pub struct PipelineFutures {
pub prepare_fut: TaskFuture<Vec<SignatureVerifiedTransaction>>,
pub execute_fut: TaskFuture<()>,
pub ledger_update_fut: TaskFuture<StateComputeResult>,
pub post_ledger_update_fut: TaskFuture<()>,
pub commit_vote_fut: TaskFuture<CommitVote>,
pub commit_vote_fut: TaskFuture<bls12381::Signature>,
pub pre_commit_fut: TaskFuture<StateComputeResult>,
pub post_pre_commit_fut: TaskFuture<()>,
pub commit_ledger_fut: TaskFuture<LedgerInfoWithSignatures>,
pub commit_ledger_fut: TaskFuture<Option<LedgerInfoWithSignatures>>,
pub post_commit_fut: TaskFuture<()>,
}

pub struct PipelineTx {
pub rand_tx: oneshot::Sender<Option<Randomness>>,
pub order_vote_tx: oneshot::Sender<()>,
pub rand_tx: Option<oneshot::Sender<Option<Randomness>>>,
pub order_vote_tx: Option<oneshot::Sender<()>>,
pub order_proof_tx: tokio::sync::broadcast::Sender<()>,
pub commit_proof_tx: tokio::sync::broadcast::Sender<LedgerInfoWithSignatures>,
}
Expand Down Expand Up @@ -108,6 +114,12 @@ pub struct PipelinedBlock {
#[derivative(PartialEq = "ignore")]
pre_commit_fut: Arc<Mutex<Option<BoxFuture<'static, ExecutorResult<()>>>>>,
// pipeline related fields
#[derivative(PartialEq = "ignore")]
pipeline_futures: Option<PipelineFutures>,
#[derivative(PartialEq = "ignore")]
pipeline_tx: Option<Arc<Mutex<PipelineTx>>>,
#[derivative(PartialEq = "ignore")]
pipeline_abort_handle: Option<Vec<AbortHandle>>,
}

impl Serialize for PipelinedBlock {
Expand Down Expand Up @@ -151,15 +163,7 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
randomness,
} = SerializedBlock::deserialize(deserializer)?;

let block = PipelinedBlock {
block,
input_transactions,
state_compute_result: StateComputeResult::new_dummy(),
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
pre_commit_fut: Arc::new(Mutex::new(None)),
};
let block = PipelinedBlock::new(block, input_transactions, StateComputeResult::new_dummy());
if let Some(r) = randomness {
block.set_randomness(r);
}
Expand All @@ -168,6 +172,10 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
}

impl PipelinedBlock {
pub fn set_compute_result(&mut self, compute_result: StateComputeResult) {
self.state_compute_result = compute_result;
}

pub fn set_execution_result(
mut self,
pipeline_execution_result: PipelineExecutionResult,
Expand Down Expand Up @@ -234,7 +242,7 @@ impl PipelinedBlock {
}

pub fn set_randomness(&self, randomness: Randomness) {
assert!(self.randomness.set(randomness).is_ok());
assert!(self.randomness.set(randomness.clone()).is_ok());
}

pub fn set_insertion_time(&self) {
Expand Down Expand Up @@ -275,19 +283,14 @@ impl PipelinedBlock {
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
pre_commit_fut: Arc::new(Mutex::new(None)),
pipeline_futures: None,
pipeline_tx: None,
pipeline_abort_handle: None,
}
}

pub fn new_ordered(block: Block) -> Self {
Self {
block,
input_transactions: vec![],
state_compute_result: StateComputeResult::new_dummy(),
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
pre_commit_fut: Arc::new(Mutex::new(None)),
}
Self::new(block, vec![], StateComputeResult::new_dummy())
}

pub fn block(&self) -> &Block {
Expand Down Expand Up @@ -388,6 +391,44 @@ impl PipelinedBlock {
pub fn get_execution_summary(&self) -> Option<ExecutionSummary> {
self.execution_summary.get().cloned()
}

pub fn pipeline_fut(&self) -> Option<&PipelineFutures> {
self.pipeline_futures.as_ref()
}

pub fn set_pipeline_fut(&mut self, pipeline_futures: PipelineFutures) {
self.pipeline_futures = Some(pipeline_futures);
}

pub fn set_pipeline_tx(&mut self, pipeline_tx: PipelineTx) {
self.pipeline_tx = Some(Arc::new(Mutex::new(pipeline_tx)));
}

pub fn set_pipeline_abort_handles(&mut self, abort_handles: Vec<AbortHandle>) {
self.pipeline_abort_handle = Some(abort_handles);
}

pub fn pipeline_tx(&self) -> Option<&Arc<Mutex<PipelineTx>>> {
self.pipeline_tx.as_ref()
}

pub fn abort_pipeline(&self) {
if let Some(abort_handles) = &self.pipeline_abort_handle {
for handle in abort_handles {
handle.abort();
}
}
}

pub async fn wait_until_complete(&self) {
// for all stages that involve executor
if let Some(futs) = &self.pipeline_futures {
let _ = futs.execute_fut.clone().await;
let _ = futs.commit_ledger_fut.clone().await;
let _ = futs.pre_commit_fut.clone().await;
let _ = futs.commit_ledger_fut.clone().await;
}
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
Expand Down
57 changes: 49 additions & 8 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
persistent_liveness_storage::{
PersistentLivenessStorage, RecoveryData, RootInfo, RootMetadata,
},
pipeline::execution_client::TExecutionClient,
pipeline::{execution_client::TExecutionClient, pipeline_builder::PipelineBuilder},
util::time_service::TimeService,
};
use anyhow::{bail, ensure, format_err, Context};
Expand Down Expand Up @@ -88,6 +88,7 @@ pub struct BlockStore {
back_pressure_for_test: AtomicBool,
order_vote_enabled: bool,
pending_blocks: Arc<Mutex<PendingBlocks>>,
pipeline_builder: PipelineBuilder,
}

impl BlockStore {
Expand All @@ -101,6 +102,7 @@ impl BlockStore {
payload_manager: Arc<dyn TPayloadManager>,
order_vote_enabled: bool,
pending_blocks: Arc<Mutex<PendingBlocks>>,
pipeline_builder: PipelineBuilder,
) -> Self {
let highest_2chain_tc = initial_data.highest_2chain_timeout_certificate();
let (root, root_metadata, blocks, quorum_certs) = initial_data.take();
Expand All @@ -118,6 +120,7 @@ impl BlockStore {
payload_manager,
order_vote_enabled,
pending_blocks,
pipeline_builder,
));
block_on(block_store.try_send_for_execution());
block_store
Expand Down Expand Up @@ -156,6 +159,7 @@ impl BlockStore {
payload_manager: Arc<dyn TPayloadManager>,
order_vote_enabled: bool,
pending_blocks: Arc<Mutex<PendingBlocks>>,
pipeline_builder: PipelineBuilder,
) -> Self {
let RootInfo(root_block, root_qc, root_ordered_cert, root_commit_cert) = root;

Expand Down Expand Up @@ -186,13 +190,17 @@ impl BlockStore {
));
assert_eq!(result.root_hash(), root_metadata.accu_hash);

let pipelined_root_block = PipelinedBlock::new(
let mut pipelined_root_block = PipelinedBlock::new(
*root_block,
vec![],
// Create a dummy state_compute_result with necessary fields filled in.
result,
result.clone(),
);

let pipeline_fut =
pipeline_builder.build_root(result, root_commit_cert.ledger_info().clone());
pipelined_root_block.set_pipeline_fut(pipeline_fut);

let tree = BlockTree::new(
pipelined_root_block,
root_qc,
Expand All @@ -213,6 +221,7 @@ impl BlockStore {
back_pressure_for_test: AtomicBool::new(false),
order_vote_enabled,
pending_blocks,
pipeline_builder,
};

for block in blocks {
Expand Down Expand Up @@ -254,6 +263,12 @@ impl BlockStore {

assert!(!blocks_to_commit.is_empty());

// send order proof to pipeline
for block in &blocks_to_commit {
let pipeline_tx = block.pipeline_tx().unwrap().lock();
let _ = pipeline_tx.order_proof_tx.send(());
}

let block_tree = self.inner.clone();
let storage = self.storage.clone();
let finality_proof_clone = finality_proof.clone();
Expand Down Expand Up @@ -324,6 +339,7 @@ impl BlockStore {
self.payload_manager.clone(),
self.order_vote_enabled,
self.pending_blocks.clone(),
self.pipeline_builder.clone(),
)
.await;

Expand Down Expand Up @@ -351,7 +367,36 @@ impl BlockStore {
"Block with old round"
);

let pipelined_block = PipelinedBlock::new_ordered(block.clone());
if let Some(payload) = block.payload() {
self.payload_manager
.prefetch_payload_data(payload, block.timestamp_usecs());
}

let mut pipelined_block = PipelinedBlock::new_ordered(block.clone());

// build pipeline
let parent_block = self
.get_block(block.parent_id())
.ok_or_else(|| anyhow::anyhow!("Parent block not found"))?;

let block_tree = self.inner.clone();
let storage = self.storage.clone();
let id = block.id();
let round = block.round();
let callback = Box::new(move |commit_decision: LedgerInfoWithSignatures| {
block_tree
.write()
.commit_callback_v2(storage, id, round, commit_decision);
});
let (fut, tx, abort_handles) = self.pipeline_builder.build(
parent_block.pipeline_fut().unwrap(),
Arc::new(block),
callback,
);
pipelined_block.set_pipeline_fut(fut);
pipelined_block.set_pipeline_tx(tx);
pipelined_block.set_pipeline_abort_handles(abort_handles);

// ensure local time past the block time
let block_time = Duration::from_micros(pipelined_block.timestamp_usecs());
let current_timestamp = self.time_service.get_current_timestamp();
Expand All @@ -365,10 +410,6 @@ impl BlockStore {
}
self.time_service.wait_until(block_time).await;
}
if let Some(payload) = pipelined_block.block().payload() {
self.payload_manager
.prefetch_payload_data(payload, pipelined_block.block().timestamp_usecs());
}
self.storage
.save_tree(vec![pipelined_block.block().clone()], vec![])
.context("Insert block failed when saving block")?;
Expand Down
15 changes: 7 additions & 8 deletions consensus/src/block_storage/block_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use crate::{
use anyhow::bail;
use aptos_consensus_types::{
pipelined_block::PipelinedBlock, quorum_cert::QuorumCert,
timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo,
timeout_2chain::TwoChainTimeoutCertificate, vote_data::VoteData,
wrapped_ledger_info::WrappedLedgerInfo,
};
use aptos_crypto::HashValue;
use aptos_logger::prelude::*;
Expand Down Expand Up @@ -485,17 +486,15 @@ impl BlockTree {
storage: Arc<dyn PersistentLivenessStorage>,
block_id: HashValue,
block_round: Round,
finality_proof: WrappedLedgerInfo,
commit_decision: LedgerInfoWithSignatures,
) {
let commit_proof = finality_proof
.create_merged_with_executed_state(commit_decision)
.expect("Inconsistent commit proof and evaluation decision, cannot commit block");

// let block_to_commit = blocks_to_commit.last().expect("pipeline is empty").clone();
// update_counters_for_committed_blocks(blocks_to_commit);
let current_round = self.commit_root().round();
let committed_round = block_round;
if current_round == committed_round {
return;
}
let commit_proof = WrappedLedgerInfo::new(VoteData::dummy(), commit_decision);

debug!(
LogSchema::new(LogEvent::CommitViaBlock).round(current_round),
committed_round = committed_round,
Expand Down
Loading

0 comments on commit e03085f

Please sign in to comment.