Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution speed backpressure to handle consistent gas miscalibration #13829

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 90 additions & 54 deletions config/src/config/consensus_config.rs

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use std::time::Duration;

pub const BATCH_PADDING_BYTES: usize = 160;
pub const DEFEAULT_MAX_BATCH_TXNS: usize = 250;
const DEFAULT_MAX_NUM_BATCHES: usize = 20;

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -101,7 +102,7 @@ impl Default for QuorumStoreConfig {
batch_generation_poll_interval_ms: 25,
batch_generation_min_non_empty_interval_ms: 200,
batch_generation_max_interval_ms: 250,
sender_max_batch_txns: 250,
sender_max_batch_txns: DEFEAULT_MAX_BATCH_TXNS,
// TODO: on next release, remove BATCH_PADDING_BYTES
sender_max_batch_bytes: 1024 * 1024 - BATCH_PADDING_BYTES,
sender_max_num_batches: DEFAULT_MAX_NUM_BATCHES,
Expand Down
23 changes: 23 additions & 0 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,29 @@ impl Payload {
}
}

pub fn len_for_execution(&self) -> u64 {
match self {
Payload::DirectMempool(txns) => txns.len() as u64,
Payload::InQuorumStore(proof_with_status) => proof_with_status.len() as u64,
Payload::InQuorumStoreWithLimit(proof_with_status) => {
// here we return the actual length of the payload; limit is considered at the stage
// where we prepare the block from the payload
(proof_with_status.proof_with_data.len() as u64)
.min(proof_with_status.max_txns_to_execute.unwrap_or(u64::MAX))
},
Payload::QuorumStoreInlineHybrid(
inline_batches,
proof_with_data,
max_txns_to_execute,
) => ((proof_with_data.len()
+ inline_batches
.iter()
.map(|(_, txns)| txns.len())
.sum::<usize>()) as u64)
.min(max_txns_to_execute.unwrap_or(u64::MAX)),
}
}

pub fn is_empty(&self) -> bool {
match self {
Payload::DirectMempool(txns) => txns.is_empty(),
Expand Down
48 changes: 46 additions & 2 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use crate::{
use aptos_crypto::hash::HashValue;
use aptos_executor_types::StateComputeResult;
use aptos_types::{
block_info::BlockInfo, contract_event::ContractEvent, randomness::Randomness,
transaction::SignedTransaction, validator_txn::ValidatorTransaction,
block_info::BlockInfo,
contract_event::ContractEvent,
randomness::Randomness,
transaction::{SignedTransaction, TransactionStatus},
validator_txn::ValidatorTransaction,
};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand All @@ -38,6 +41,7 @@ pub struct PipelinedBlock {
state_compute_result: StateComputeResult,
randomness: OnceCell<Randomness>,
pipeline_insertion_time: OnceCell<Instant>,
execution_summary: Arc<OnceCell<ExecutionSummary>>,
}

impl Serialize for PipelinedBlock {
Expand Down Expand Up @@ -91,6 +95,7 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
state_compute_result,
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
};
if let Some(r) = randomness {
block.set_randomness(r);
Expand All @@ -104,9 +109,34 @@ impl PipelinedBlock {
mut self,
input_transactions: Vec<SignedTransaction>,
result: StateComputeResult,
execution_time: Duration,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose a validator can write any value she wishes here. Can this value be verified to be true?
The validator might have an incentive to lie due to a "free-riding" argument. That is, let other validators slow down (and receive less rewards), their slow down will also "solve the problem for me" while not slowing down my rewards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a local value, from it's own computation. there is no need for it to lie here, validator can just adjust block creation computation in any way they want.

that's why longterm - getting incentives/protocol right is needed here for best results

) -> Self {
self.state_compute_result = result;
self.input_transactions = input_transactions;

let mut to_commit = 0;
let mut to_retry = 0;
for txn in self.state_compute_result.compute_status_for_input_txns() {
match txn {
TransactionStatus::Keep(_) => to_commit += 1,
TransactionStatus::Retry => to_retry += 1,
_ => {},
}
}

assert!(self
.execution_summary
.set(ExecutionSummary {
payload_len: self
.block
.payload()
.map_or(0, |payload| payload.len_for_execution()),
to_commit,
to_retry,
execution_time,
})
.is_ok());

self
}

Expand Down Expand Up @@ -143,6 +173,7 @@ impl PipelinedBlock {
state_compute_result,
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
}
}

Expand All @@ -153,6 +184,7 @@ impl PipelinedBlock {
state_compute_result: StateComputeResult::new_dummy(),
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
}
}

Expand Down Expand Up @@ -250,4 +282,16 @@ impl PipelinedBlock {
pub fn elapsed_in_pipeline(&self) -> Option<Duration> {
self.pipeline_insertion_time.get().map(|t| t.elapsed())
}

pub fn get_execution_summary(&self) -> Option<ExecutionSummary> {
self.execution_summary.get().cloned()
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ExecutionSummary {
pub payload_len: u64,
pub to_commit: u64,
pub to_retry: u64,
pub execution_time: Duration,
}
168 changes: 95 additions & 73 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ use crate::{
};
use anyhow::{bail, ensure, format_err, Context};
use aptos_consensus_types::{
block::Block, common::Round, pipelined_block::PipelinedBlock, quorum_cert::QuorumCert,
sync_info::SyncInfo, timeout_2chain::TwoChainTimeoutCertificate,
block::Block,
common::Round,
pipelined_block::{ExecutionSummary, PipelinedBlock},
quorum_cert::QuorumCert,
sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutCertificate,
wrapped_ledger_info::WrappedLedgerInfo,
};
use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue};
Expand Down Expand Up @@ -460,6 +464,70 @@ impl BlockStore {
.store(back_pressure, Ordering::Relaxed)
}

pub fn pending_blocks(&self) -> Arc<Mutex<PendingBlocks>> {
self.pending_blocks.clone()
}
}

impl BlockReader for BlockStore {
fn block_exists(&self, block_id: HashValue) -> bool {
self.inner.read().block_exists(&block_id)
}

fn get_block(&self, block_id: HashValue) -> Option<Arc<PipelinedBlock>> {
self.inner.read().get_block(&block_id)
}

fn ordered_root(&self) -> Arc<PipelinedBlock> {
self.inner.read().ordered_root()
}

fn commit_root(&self) -> Arc<PipelinedBlock> {
self.inner.read().commit_root()
}

fn get_quorum_cert_for_block(&self, block_id: HashValue) -> Option<Arc<QuorumCert>> {
self.inner.read().get_quorum_cert_for_block(&block_id)
}

fn path_from_ordered_root(&self, block_id: HashValue) -> Option<Vec<Arc<PipelinedBlock>>> {
self.inner.read().path_from_ordered_root(block_id)
}

fn path_from_commit_root(&self, block_id: HashValue) -> Option<Vec<Arc<PipelinedBlock>>> {
self.inner.read().path_from_commit_root(block_id)
}

fn highest_certified_block(&self) -> Arc<PipelinedBlock> {
self.inner.read().highest_certified_block()
}

fn highest_quorum_cert(&self) -> Arc<QuorumCert> {
self.inner.read().highest_quorum_cert()
}

fn highest_ordered_cert(&self) -> Arc<WrappedLedgerInfo> {
self.inner.read().highest_ordered_cert()
}

fn highest_commit_cert(&self) -> Arc<WrappedLedgerInfo> {
self.inner.read().highest_commit_cert()
}

fn highest_2chain_timeout_cert(&self) -> Option<Arc<TwoChainTimeoutCertificate>> {
self.inner.read().highest_2chain_timeout_cert()
}

fn sync_info(&self) -> SyncInfo {
SyncInfo::new_decoupled(
self.highest_quorum_cert().as_ref().clone(),
self.highest_ordered_cert().as_ref().clone(),
self.highest_commit_cert().as_ref().clone(),
self.highest_2chain_timeout_cert()
.map(|tc| tc.as_ref().clone()),
)
}

/// Return if the consensus is backpressured
fn vote_back_pressure(&self) -> bool {
#[cfg(any(test, feature = "fuzzing"))]
Expand All @@ -476,11 +544,7 @@ impl BlockStore {
ordered_round > self.vote_back_pressure_limit + commit_round
}

pub fn pending_blocks(&self) -> Arc<Mutex<PendingBlocks>> {
self.pending_blocks.clone()
}

pub fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration {
fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration {
let ordered_root = self.ordered_root();
let commit_root = self.commit_root();
let pending_path = self
Expand Down Expand Up @@ -551,73 +615,31 @@ impl BlockStore {
Duration::ZERO
}
}
}

impl BlockReader for BlockStore {
fn block_exists(&self, block_id: HashValue) -> bool {
self.inner.read().block_exists(&block_id)
}

fn get_block(&self, block_id: HashValue) -> Option<Arc<PipelinedBlock>> {
self.inner.read().get_block(&block_id)
}

fn ordered_root(&self) -> Arc<PipelinedBlock> {
self.inner.read().ordered_root()
}

fn commit_root(&self) -> Arc<PipelinedBlock> {
self.inner.read().commit_root()
}

fn get_quorum_cert_for_block(&self, block_id: HashValue) -> Option<Arc<QuorumCert>> {
self.inner.read().get_quorum_cert_for_block(&block_id)
}

fn path_from_ordered_root(&self, block_id: HashValue) -> Option<Vec<Arc<PipelinedBlock>>> {
self.inner.read().path_from_ordered_root(block_id)
}

fn path_from_commit_root(&self, block_id: HashValue) -> Option<Vec<Arc<PipelinedBlock>>> {
self.inner.read().path_from_commit_root(block_id)
}

fn highest_certified_block(&self) -> Arc<PipelinedBlock> {
self.inner.read().highest_certified_block()
}

fn highest_quorum_cert(&self) -> Arc<QuorumCert> {
self.inner.read().highest_quorum_cert()
}

fn highest_ordered_cert(&self) -> Arc<WrappedLedgerInfo> {
self.inner.read().highest_ordered_cert()
}

fn highest_commit_cert(&self) -> Arc<WrappedLedgerInfo> {
self.inner.read().highest_commit_cert()
}

fn highest_2chain_timeout_cert(&self) -> Option<Arc<TwoChainTimeoutCertificate>> {
self.inner.read().highest_2chain_timeout_cert()
}

fn sync_info(&self) -> SyncInfo {
SyncInfo::new_decoupled(
self.highest_quorum_cert().as_ref().clone(),
self.highest_ordered_cert().as_ref().clone(),
self.highest_commit_cert().as_ref().clone(),
self.highest_2chain_timeout_cert()
.map(|tc| tc.as_ref().clone()),
)
}

fn vote_back_pressure(&self) -> bool {
self.vote_back_pressure()
}

fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration {
self.pipeline_pending_latency(proposal_timestamp)
fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec<ExecutionSummary> {
let mut res = vec![];
let mut cur_block = Some(self.ordered_root());
loop {
match cur_block {
Some(block) => {
if let Some(execution_time_and_size) = block.get_execution_summary() {
info!(
"Found execution time for {}, {:?}",
block.id(),
execution_time_and_size
);
res.push(execution_time_and_size);
if res.len() >= num_blocks {
return res;
}
} else {
info!("Couldn't find execution time for {}", block.id());
}
cur_block = self.get_block(block.parent_id());
},
None => return res,
}
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions consensus/src/block_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
// SPDX-License-Identifier: Apache-2.0

use aptos_consensus_types::{
pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo,
pipelined_block::{ExecutionSummary, PipelinedBlock},
quorum_cert::QuorumCert,
sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutCertificate,
wrapped_ledger_info::WrappedLedgerInfo,
};
use aptos_crypto::HashValue;
pub use block_store::{sync_manager::BlockRetriever, BlockStore};
Expand Down Expand Up @@ -64,4 +67,6 @@ pub trait BlockReader: Send + Sync {

// Return time difference between last committed block and new proposal
fn pipeline_pending_latency(&self, proposal_timestamp: Duration) -> Duration;

fn get_recent_block_execution_times(&self, num_blocks: usize) -> Vec<ExecutionSummary>;
}
Loading
Loading