Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipeline] fixes and counters #15505

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use aptos_executor_types::{
state_compute_result::StateComputeResult, ExecutorError, ExecutorResult,
};
use aptos_infallible::Mutex;
use aptos_logger::{error, warn};
use aptos_logger::{error, info, warn};
use aptos_types::{
block_info::BlockInfo,
contract_event::ContractEvent,
Expand Down Expand Up @@ -427,7 +427,9 @@ impl PipelinedBlock {
/// Pipeline related functions
impl PipelinedBlock {
pub fn pipeline_enabled(&self) -> bool {
self.pipeline_futs.lock().is_some()
// if the pipeline_tx is set, the pipeline is enabled,
// we don't use pipeline fut here because it can't be taken when abort
self.pipeline_tx.lock().is_some()
}

pub fn pipeline_futs(&self) -> Option<PipelineFutures> {
Expand All @@ -451,6 +453,12 @@ impl PipelinedBlock {
}

pub fn abort_pipeline(&self) -> Option<PipelineFutures> {
info!(
"[Pipeline] Aborting pipeline for block {} {} {}",
self.id(),
self.epoch(),
self.round()
);
if let Some(abort_handles) = self.pipeline_abort_handle.lock().take() {
for handle in abort_handles {
handle.abort();
Expand All @@ -461,7 +469,9 @@ impl PipelinedBlock {

pub async fn wait_for_compute_result(&self) -> ExecutorResult<(StateComputeResult, Duration)> {
self.pipeline_futs()
.expect("Pipeline needs to be enabled")
.ok_or(ExecutorError::InternalError {
error: "Pipeline aborted".to_string(),
})?
.ledger_update_fut
.await
.map(|(compute_result, execution_time, _)| (compute_result, execution_time))
Expand All @@ -471,11 +481,11 @@ impl PipelinedBlock {
}

pub async fn wait_for_commit_ledger(&self) {
self.pipeline_futs()
.expect("Pipeline needs to be enabled")
.commit_ledger_fut
.await
.expect("Commit ledger should succeed");
// may be aborted (e.g. by reset)
if let Some(fut) = self.pipeline_futs() {
// this may be cancelled
let _ = fut.commit_ledger_fut.await;
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,9 @@ impl BlockStore {
});
pipeline_builder.build(
&pipelined_block,
parent_block
.pipeline_futs()
.expect("Futures should exist when pipeline enabled"),
parent_block.pipeline_futs().ok_or_else(|| {
anyhow::anyhow!("Parent future doesn't exist, potentially epoch ended")
})?,
callback,
);
}
Expand Down
15 changes: 13 additions & 2 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ pub static NUM_BYTES_PER_BLOCK: Lazy<Histogram> = Lazy::new(|| {
// * 0.3 to 2.0: step 0.1
// * 2.0 to 4.0: step 0.2
// * 4.0 to 7.5: step 0.5
const BLOCK_TRACING_BUCKETS: &[f64] = &[
const TRACING_BUCKETS: &[f64] = &[
0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1,
1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0, 2.2, 2.4, 2.6, 2.8, 3.0, 3.2, 3.4, 3.6, 3.8, 4.0,
4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 10.0,
Expand All @@ -848,7 +848,18 @@ pub static BLOCK_TRACING: Lazy<HistogramVec> = Lazy::new(|| {
"aptos_consensus_block_tracing",
"Histogram for different stages of a block",
&["stage"],
BLOCK_TRACING_BUCKETS.to_vec()
TRACING_BUCKETS.to_vec()
)
.unwrap()
});

/// Traces pipeline stages
pub static PIPELINE_TRACING: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_consensus_pipeline_tracing",
"Histogram for different stages of a block's pipeline",
&["stage", "type"],
TRACING_BUCKETS.to_vec()
)
.unwrap()
});
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use quorum_store::quorum_store_db::QUORUM_STORE_DB_NAME;
#[cfg(feature = "fuzzing")]
pub use round_manager::round_manager_fuzzing;

struct IntGaugeGuard {
pub struct IntGaugeGuard {
gauge: IntGauge,
}

Expand Down
79 changes: 59 additions & 20 deletions consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
monitor,
payload_manager::TPayloadManager,
txn_notifier::TxnNotifier,
IntGaugeGuard,
};
use anyhow::anyhow;
use aptos_consensus_notifications::ConsensusNotificationSender;
Expand Down Expand Up @@ -104,18 +105,30 @@ struct Tracker {
block_id: HashValue,
epoch: u64,
round: Round,
created_at: Instant,
started_at: Option<Instant>,
running_guard: Option<IntGaugeGuard>,
}

impl Tracker {
pub fn new(name: &'static str, block: &Block) -> Self {
let ret = Self {
fn start_waiting(name: &'static str, block: &Block) -> Self {
Self {
name,
block_id: block.id(),
epoch: block.epoch(),
round: block.round(),
};
ret.log_start();
ret
created_at: Instant::now(),
started_at: None,
running_guard: None,
}
}

fn start_working(&mut self) {
self.started_at = Some(Instant::now());
self.running_guard = Some(IntGaugeGuard::new(
counters::OP_COUNTERS.gauge(&format!("{}_running", self.name)),
));
self.log_start();
}

fn log_start(&self) {
Expand All @@ -126,9 +139,25 @@ impl Tracker {
}

fn log_end(&self) {
let Some(started_at) = self.started_at else {
return;
};
let wait_time = started_at.duration_since(self.created_at);
let work_time = Instant::now().duration_since(started_at);
counters::PIPELINE_TRACING
.with_label_values(&[self.name, "wait_time"])
.observe(wait_time.as_secs_f64());
counters::PIPELINE_TRACING
.with_label_values(&[self.name, "work_time"])
.observe(work_time.as_secs_f64());
info!(
"[Pipeline] Block {} {} {} finishes {}",
self.block_id, self.epoch, self.round, self.name
"[Pipeline] Block {} {} {} finishes {}, waits {}, takes {}",
self.block_id,
self.epoch,
self.round,
self.name,
wait_time.as_millis(),
work_time.as_millis()
);
}
}
Expand All @@ -139,7 +168,6 @@ impl Drop for Tracker {
}
}

// TODO: add counters for each phase
impl PipelineBuilder {
pub fn new(
block_preparer: Arc<BlockPreparer>,
Expand Down Expand Up @@ -348,6 +376,7 @@ impl PipelineBuilder {
Self::post_commit_ledger(
pre_commit_fut.clone(),
commit_ledger_fut.clone(),
post_pre_commit_fut.clone(),
parent.post_commit_fut.clone(),
self.payload_manager.clone(),
block_store_callback,
Expand Down Expand Up @@ -378,7 +407,8 @@ impl PipelineBuilder {
/// Precondition: Block is inserted into block tree (all ancestors are available)
/// What it does: Wait for all data becomes available and verify transaction signatures
async fn prepare(preparer: Arc<BlockPreparer>, block: Arc<Block>) -> TaskResult<PrepareResult> {
let _tracker = Tracker::new("prepare", &block);
let mut tracker = Tracker::start_waiting("prepare", &block);
tracker.start_working();
// the loop can only be abort by the caller
let input_txns = loop {
match preparer.prepare_block(&block).await {
Expand Down Expand Up @@ -419,13 +449,14 @@ impl PipelineBuilder {
validator: Arc<[AccountAddress]>,
onchain_execution_config: BlockExecutorConfigFromOnchain,
) -> TaskResult<ExecuteResult> {
let mut tracker = Tracker::start_waiting("execute", &block);
parent_block_execute_phase.await?;
let user_txns = prepare_phase.await?;
let maybe_rand = randomness_rx
.await
.map_err(|_| anyhow!("randomness tx cancelled"))?;

let _tracker = Tracker::new("execute", &block);
tracker.start_working();
let metadata_txn = if is_randomness_enabled {
block.new_metadata_with_randomness(&validator, maybe_rand)
} else {
Expand Down Expand Up @@ -470,9 +501,11 @@ impl PipelineBuilder {
executor: Arc<dyn BlockExecutorTrait>,
block: Arc<Block>,
) -> TaskResult<LedgerUpdateResult> {
let mut tracker = Tracker::start_waiting("ledger_update", &block);
let (_, _, prev_epoch_end_timestamp) = parent_block_ledger_update_phase.await?;
let execution_time = execute_phase.await?;
let _tracker = Tracker::new("ledger_update", &block);

tracker.start_working();
let timestamp = block.timestamp_usecs();
let result = tokio::task::spawn_blocking(move || {
executor
Expand Down Expand Up @@ -500,10 +533,11 @@ impl PipelineBuilder {
mempool_notifier: Arc<dyn TxnNotifier>,
block: Arc<Block>,
) -> TaskResult<PostLedgerUpdateResult> {
let mut tracker = Tracker::start_waiting("post_ledger_update", &block);
let user_txns = prepare_fut.await?;
let (compute_result, _, _) = ledger_update.await?;

let _tracker = Tracker::new("post_ledger_update", &block);
tracker.start_working();
let compute_status = compute_result.compute_status_for_input_txns();
// the length of compute_status is user_txns.len() + num_vtxns + 1 due to having blockmetadata
if user_txns.len() >= compute_status.len() {
Expand Down Expand Up @@ -550,6 +584,7 @@ impl PipelineBuilder {
signer: Arc<ValidatorSigner>,
block: Arc<Block>,
) -> TaskResult<CommitVoteResult> {
let mut tracker = Tracker::start_waiting("sign_commit_vote", &block);
let (compute_result, _, epoch_end_timestamp) = ledger_update_phase.await?;
// either order_vote_rx or order_proof_fut can trigger the next phase
select! {
Expand All @@ -563,8 +598,8 @@ impl PipelineBuilder {
return Err(anyhow!("all receivers dropped"))?;
}
}
tracker.start_working();

let _tracker = Tracker::new("sign_commit_vote", &block);
let mut block_info = block.gen_block_info(
compute_result.root_hash(),
compute_result.last_version_or_0(),
Expand Down Expand Up @@ -600,6 +635,7 @@ impl PipelineBuilder {
executor: Arc<dyn BlockExecutorTrait>,
block: Arc<Block>,
) -> TaskResult<PreCommitResult> {
let mut tracker = Tracker::start_waiting("pre_commit", &block);
let (compute_result, _, _) = ledger_update_phase.await?;
parent_block_pre_commit_phase.await?;

Expand All @@ -609,7 +645,7 @@ impl PipelineBuilder {
commit_proof_fut.await?;
}

let _tracker = Tracker::new("pre_commit", &block);
tracker.start_working();
tokio::task::spawn_blocking(move || {
executor
.pre_commit_block(block.id())
Expand All @@ -629,12 +665,11 @@ impl PipelineBuilder {
state_sync_notifier: Arc<dyn ConsensusNotificationSender>,
block: Arc<Block>,
) -> TaskResult<PostPreCommitResult> {
let mut tracker = Tracker::start_waiting("post_pre_commit", &block);
let compute_result = pre_commit.await?;
parent_post_pre_commit.await?;

let _tracker = Tracker::new("post_pre_commit", &block);
let _timer = counters::OP_COUNTERS.timer("pre_commit_notify");

tracker.start_working();
let txns = compute_result.transactions_to_commit().to_vec();
let subscribable_events = compute_result.subscribable_events().to_vec();
if let Err(e) = monitor!(
Expand All @@ -658,6 +693,7 @@ impl PipelineBuilder {
executor: Arc<dyn BlockExecutorTrait>,
block: Arc<Block>,
) -> TaskResult<CommitLedgerResult> {
let mut tracker = Tracker::start_waiting("commit_ledger", &block);
parent_block_commit_phase.await?;
pre_commit_fut.await?;
let ledger_info_with_sigs = commit_proof_fut.await?;
Expand All @@ -667,7 +703,7 @@ impl PipelineBuilder {
return Ok(None);
}

let _tracker = Tracker::new("commit_ledger", &block);
tracker.start_working();
let ledger_info_with_sigs_clone = ledger_info_with_sigs.clone();
tokio::task::spawn_blocking(move || {
executor
Expand All @@ -679,21 +715,24 @@ impl PipelineBuilder {
Ok(Some(ledger_info_with_sigs))
}

/// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes
/// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes 3. post pre commit finishes
/// What it does: Update counters for the block, and notify block tree about the commit
async fn post_commit_ledger(
pre_commit_fut: TaskFuture<PreCommitResult>,
commit_ledger_fut: TaskFuture<CommitLedgerResult>,
post_pre_commit_fut: TaskFuture<PostPreCommitResult>,
parent_post_commit: TaskFuture<PostCommitResult>,
payload_manager: Arc<dyn TPayloadManager>,
block_store_callback: Box<dyn FnOnce(LedgerInfoWithSignatures) + Send + Sync>,
block: Arc<Block>,
) -> TaskResult<PostCommitResult> {
let mut tracker = Tracker::start_waiting("post_commit_ledger", &block);
parent_post_commit.await?;
let maybe_ledger_info_with_sigs = commit_ledger_fut.await?;
let compute_result = pre_commit_fut.await?;
post_pre_commit_fut.await?;

let _tracker = Tracker::new("post_commit_ledger", &block);
tracker.start_working();
update_counters_for_block(&block);
update_counters_for_compute_result(&compute_result);

Expand Down
Loading