Skip to content

Commit

Permalink
use commit / retry ratio in estimation
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Jul 11, 2024
1 parent b4bcd49 commit 01404b6
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 25 deletions.
43 changes: 33 additions & 10 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use aptos_crypto::hash::HashValue;
use aptos_executor_types::StateComputeResult;
use aptos_types::{
block_info::BlockInfo, contract_event::ContractEvent, randomness::Randomness,
transaction::SignedTransaction, validator_txn::ValidatorTransaction,
transaction::{SignedTransaction, TransactionStatus}, validator_txn::ValidatorTransaction,
};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand All @@ -38,7 +38,7 @@ pub struct PipelinedBlock {
state_compute_result: StateComputeResult,
randomness: OnceCell<Randomness>,
pipeline_insertion_time: OnceCell<Instant>,
execution_time: Arc<OnceCell<Duration>>,
execution_summary: Arc<OnceCell<ExecutionSummary>>,
}

impl Serialize for PipelinedBlock {
Expand Down Expand Up @@ -92,7 +92,7 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
state_compute_result,
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_time: Arc::new(OnceCell::new()),
execution_summary: Arc::new(OnceCell::new()),
};
if let Some(r) = randomness {
block.set_randomness(r);
Expand All @@ -110,7 +110,24 @@ impl PipelinedBlock {
) -> Self {
self.state_compute_result = result;
self.input_transactions = input_transactions;
assert!(self.execution_time.set(execution_time).is_ok());

let mut to_commit = 0;
let mut to_retry = 0;
for txn in self.state_compute_result.compute_status_for_input_txns() {
match txn {
TransactionStatus::Keep(_) => to_commit += 1,
TransactionStatus::Retry => to_retry += 1,
_ => {}
}
}

assert!(self.execution_summary.set(ExecutionSummary {
payload_len: self.block.payload().map_or(0, |payload| payload.len_for_execution()),
to_commit,
to_retry,
execution_time,
}).is_ok());

self
}

Expand Down Expand Up @@ -147,7 +164,7 @@ impl PipelinedBlock {
state_compute_result,
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_time: Arc::new(OnceCell::new()),
execution_summary: Arc::new(OnceCell::new()),
}
}

Expand All @@ -158,7 +175,7 @@ impl PipelinedBlock {
state_compute_result: StateComputeResult::new_dummy(),
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_time: Arc::new(OnceCell::new()),
execution_summary: Arc::new(OnceCell::new()),
}
}

Expand Down Expand Up @@ -257,9 +274,15 @@ impl PipelinedBlock {
self.pipeline_insertion_time.get().map(|t| t.elapsed())
}

pub fn get_execution_time_and_size(&self) -> Option<(u64, Duration)> {
self.execution_time
.get()
.map(|v| (self.block.payload().map_or(0, |payload| payload.len_for_execution()), *v))
pub fn get_execution_summary(&self) -> Option<ExecutionSummary> {
self.execution_summary.get().cloned()
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ExecutionSummary {
pub payload_len: u64,
pub to_commit: u64,
pub to_retry: u64,
pub execution_time: Duration,
}
6 changes: 3 additions & 3 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
};
use anyhow::{bail, ensure, format_err, Context};
use aptos_consensus_types::{
block::Block, common::Round, pipelined_block::PipelinedBlock, quorum_cert::QuorumCert,
block::Block, common::Round, pipelined_block::{ExecutionSummary, PipelinedBlock}, quorum_cert::QuorumCert,
sync_info::SyncInfo, timeout_2chain::TwoChainTimeoutCertificate,
wrapped_ledger_info::WrappedLedgerInfo,
};
Expand Down Expand Up @@ -613,13 +613,13 @@ impl BlockReader for BlockStore {
}
}

fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec<(u64, Duration)> {
fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec<ExecutionSummary> {
let mut res = vec![];
let mut cur_block = Some(self.ordered_root());
loop {
match cur_block {
Some(block) => {
if let Some(execution_time_and_size) = block.get_execution_time_and_size() {
if let Some(execution_time_and_size) = block.get_execution_summary() {
info!(
"Found execution time for {}, {:?}",
block.id(),
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/block_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use aptos_consensus_types::{
pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, sync_info::SyncInfo,
pipelined_block::{ExecutionSummary, PipelinedBlock}, quorum_cert::QuorumCert, sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo,
};
use aptos_crypto::HashValue;
Expand Down Expand Up @@ -65,5 +65,5 @@ pub trait BlockReader: Send + Sync {
// Return time difference between last committed block and new proposal
fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration;

fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec<(u64, Duration)>;
fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec<ExecutionSummary>;
}
14 changes: 6 additions & 8 deletions consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use aptos_config::config::{
ChainHealthBackoffValues, ExecutionBackpressureConfig, PipelineBackpressureValues,
};
use aptos_consensus_types::{
block::Block,
block_data::BlockData,
common::{Author, Payload, PayloadFilter, Round},
quorum_cert::QuorumCert,
block::Block, block_data::BlockData, common::{Author, Payload, PayloadFilter, Round}, pipelined_block::ExecutionSummary, quorum_cert::QuorumCert
};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_logger::{error, info, sample, sample::SampleRate, warn};
Expand Down Expand Up @@ -153,7 +150,7 @@ impl PipelineBackpressureConfig {

pub fn get_execution_block_size_backoff(
&self,
block_execution_times: &[(u64, Duration)],
block_execution_times: &[ExecutionSummary],
) -> Option<u64> {
info!(
"Estimated block execution times: {:?}",
Expand All @@ -163,12 +160,13 @@ impl PipelineBackpressureConfig {
self.execution.as_ref().and_then(|config| {
let sizes = block_execution_times
.iter()
.flat_map(|(num_txns, execution_time)| {
let execution_time_ms = execution_time.as_millis();
.flat_map(|summary| {
let execution_time_ms = summary.execution_time.as_millis();
if execution_time_ms > config.min_block_time_ms_to_activate as u128 {
Some(
((config.target_block_time_ms as f64 / execution_time_ms as f64
* *num_txns as f64)
* (summary.to_commit as f64 / (summary.to_commit + summary.to_retry) as f64)
* summary.payload_len as f64)
.floor() as u64)
.max(1),
)
Expand Down
7 changes: 5 additions & 2 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,11 @@ impl StateComputeResult {
}

pub fn transactions_to_commit_len(&self) -> usize {
// StateCheckpoint/BlockEpilogue is added if there is no reconfiguration
self.compute_status_for_input_txns().len()
self.compute_status_for_input_txns()
.iter()
.filter(|status| if let TransactionStatus::Keep(_) = status { true } else { false })
.count()
// StateCheckpoint/BlockEpilogue is added if there is no reconfiguration
+ (if self.has_reconfiguration() { 0 } else { 1 })
}

Expand Down

0 comments on commit 01404b6

Please sign in to comment.