Skip to content

Commit

Permalink
separate state sync notifications to pre-commit and commit (#14889)
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Oct 9, 2024
1 parent 0cd5233 commit b96fa20
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 181 deletions.
8 changes: 8 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,14 @@ pub static PENDING_STATE_SYNC_NOTIFICATION: Lazy<IntGauge> = Lazy::new(|| {
.unwrap()
});

pub static PENDING_COMMIT_NOTIFICATION: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_consensus_pending_commit_notification",
"Count of the pending commit notification"
)
.unwrap()
});

/// Count of the pending quorum store commit notification.
pub static PENDING_QUORUM_STORE_COMMIT_NOTIFICATION: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
Expand Down
30 changes: 27 additions & 3 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use aptos_consensus_types::{block::Block, pipeline_execution_result::PipelineExe
use aptos_crypto::HashValue;
use aptos_executor_types::{
state_checkpoint_output::StateCheckpointOutput, BlockExecutorTrait, ExecutorError,
ExecutorResult,
ExecutorResult, StateComputeResult,
};
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
use aptos_logger::{debug, warn};
Expand All @@ -35,6 +35,12 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};

pub type PreCommitHook = Box<
dyn 'static
+ FnOnce(&[SignedTransaction], &StateComputeResult) -> BoxFuture<'static, ()>
+ Send,
>;

#[allow(clippy::unwrap_used)]
pub static SIG_VERIFY_POOL: Lazy<Arc<rayon::ThreadPool>> = Lazy::new(|| {
Arc::new(
Expand Down Expand Up @@ -88,6 +94,7 @@ impl ExecutionPipeline {
parent_block_id: HashValue,
txn_generator: BlockPreparer,
block_executor_onchain_config: BlockExecutorConfigFromOnchain,
pre_commit_hook: PreCommitHook,
lifetime_guard: CountedRequest<()>,
) -> StateComputeResultFut {
let (result_tx, result_rx) = oneshot::channel();
Expand All @@ -101,6 +108,7 @@ impl ExecutionPipeline {
block_preparer: txn_generator,
result_tx,
command_creation_time: Instant::now(),
pre_commit_hook,
lifetime_guard,
})
.expect("Failed to send block to execution pipeline.");
Expand All @@ -127,6 +135,7 @@ impl ExecutionPipeline {
block_executor_onchain_config,
parent_block_id,
block_preparer,
pre_commit_hook,
result_tx,
command_creation_time,
lifetime_guard,
Expand Down Expand Up @@ -163,6 +172,7 @@ impl ExecutionPipeline {
block: (block.id(), sig_verified_txns).into(),
parent_block_id,
block_executor_onchain_config,
pre_commit_hook,
result_tx,
command_creation_time: Instant::now(),
lifetime_guard,
Expand Down Expand Up @@ -196,6 +206,7 @@ impl ExecutionPipeline {
block,
parent_block_id,
block_executor_onchain_config,
pre_commit_hook,
result_tx,
command_creation_time,
lifetime_guard,
Expand Down Expand Up @@ -232,6 +243,7 @@ impl ExecutionPipeline {
block_id,
parent_block_id,
state_checkpoint_output,
pre_commit_hook,
result_tx,
command_creation_time: Instant::now(),
lifetime_guard,
Expand All @@ -252,6 +264,7 @@ impl ExecutionPipeline {
block_id,
parent_block_id,
state_checkpoint_output: execution_result,
pre_commit_hook,
result_tx,
command_creation_time,
lifetime_guard,
Expand All @@ -274,6 +287,7 @@ impl ExecutionPipeline {
}
.await;
let pipeline_res = res.map(|(output, execution_duration)| {
let pre_commit_hook_fut = pre_commit_hook(&input_txns, &output);
let pre_commit_fut: BoxFuture<'static, ExecutorResult<()>> =
if output.epoch_state().is_some() || !enable_pre_commit {
// hack: it causes issue if pre-commit is finished at an epoch ending, and
Expand All @@ -285,7 +299,9 @@ impl ExecutionPipeline {
executor.pre_commit_block(block_id, parent_block_id)
})
.await
.expect("failed to spawn_blocking")
.expect("failed to spawn_blocking")?;
pre_commit_hook_fut.await;
Ok(())
})
} else {
// kick off pre-commit right away
Expand All @@ -295,6 +311,7 @@ impl ExecutionPipeline {
.send(PreCommitCommand {
block_id,
parent_block_id,
pre_commit_hook_fut,
result_tx: pre_commit_result_tx,
lifetime_guard,
})
Expand Down Expand Up @@ -322,6 +339,7 @@ impl ExecutionPipeline {
while let Some(PreCommitCommand {
block_id,
parent_block_id,
pre_commit_hook_fut,
result_tx,
lifetime_guard,
}) = block_rx.recv().await
Expand All @@ -336,7 +354,9 @@ impl ExecutionPipeline {
})
)
.await
.expect("Failed to spawn_blocking().")
.expect("Failed to spawn_blocking().")?;
pre_commit_hook_fut.await;
Ok(())
}
.await;
result_tx
Expand All @@ -355,6 +375,7 @@ struct PrepareBlockCommand {
// The parent block id.
parent_block_id: HashValue,
block_preparer: BlockPreparer,
pre_commit_hook: PreCommitHook,
result_tx: oneshot::Sender<ExecutorResult<PipelineExecutionResult>>,
command_creation_time: Instant,
lifetime_guard: CountedRequest<()>,
Expand All @@ -365,6 +386,7 @@ struct ExecuteBlockCommand {
block: ExecutableBlock,
parent_block_id: HashValue,
block_executor_onchain_config: BlockExecutorConfigFromOnchain,
pre_commit_hook: PreCommitHook,
result_tx: oneshot::Sender<ExecutorResult<PipelineExecutionResult>>,
command_creation_time: Instant,
lifetime_guard: CountedRequest<()>,
Expand All @@ -375,6 +397,7 @@ struct LedgerApplyCommand {
block_id: HashValue,
parent_block_id: HashValue,
state_checkpoint_output: ExecutorResult<(StateCheckpointOutput, Duration)>,
pre_commit_hook: PreCommitHook,
result_tx: oneshot::Sender<ExecutorResult<PipelineExecutionResult>>,
command_creation_time: Instant,
lifetime_guard: CountedRequest<()>,
Expand All @@ -383,6 +406,7 @@ struct LedgerApplyCommand {
struct PreCommitCommand {
block_id: HashValue,
parent_block_id: HashValue,
pre_commit_hook_fut: BoxFuture<'static, ()>,
result_tx: oneshot::Sender<ExecutorResult<()>>,
lifetime_guard: CountedRequest<()>,
}
Expand Down
Loading

0 comments on commit b96fa20

Please sign in to comment.