Skip to content

Commit

Permalink
review comments.
Browse files Browse the repository at this point in the history
increase block size
  • Loading branch information
igor-aptos committed Jul 24, 2024
1 parent fc87213 commit 651c559
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 102 deletions.
120 changes: 67 additions & 53 deletions config/src/config/consensus_config.rs

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use std::time::Duration;

pub const BATCH_PADDING_BYTES: usize = 160;
pub const DEFEAULT_MAX_BATCH_TXNS: usize = 250;
const DEFAULT_MAX_NUM_BATCHES: usize = 20;

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -101,7 +102,7 @@ impl Default for QuorumStoreConfig {
batch_generation_poll_interval_ms: 25,
batch_generation_min_non_empty_interval_ms: 200,
batch_generation_max_interval_ms: 250,
sender_max_batch_txns: 250,
sender_max_batch_txns: DEFEAULT_MAX_BATCH_TXNS,
// TODO: on next release, remove BATCH_PADDING_BYTES
sender_max_batch_bytes: 1024 * 1024 - BATCH_PADDING_BYTES,
sender_max_num_batches: DEFAULT_MAX_NUM_BATCHES,
Expand Down
20 changes: 12 additions & 8 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,19 @@ impl Payload {
Payload::InQuorumStoreWithLimit(proof_with_status) => {
// here we return the actual length of the payload; limit is considered at the stage
// where we prepare the block from the payload
(proof_with_status.proof_with_data.len() as u64).min(proof_with_status.max_txns_to_execute.unwrap_or(u64::MAX))
},
Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, max_txns_to_execute) => {
((proof_with_data.len()
+ inline_batches
.iter()
.map(|(_, txns)| txns.len())
.sum::<usize>()) as u64).min(max_txns_to_execute.unwrap_or(u64::MAX))
(proof_with_status.proof_with_data.len() as u64)
.min(proof_with_status.max_txns_to_execute.unwrap_or(u64::MAX))
},
Payload::QuorumStoreInlineHybrid(
inline_batches,
proof_with_data,
max_txns_to_execute,
) => ((proof_with_data.len()
+ inline_batches
.iter()
.map(|(_, txns)| txns.len())
.sum::<usize>()) as u64)
.min(max_txns_to_execute.unwrap_or(u64::MAX)),
}
}

Expand Down
27 changes: 18 additions & 9 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use crate::{
use aptos_crypto::hash::HashValue;
use aptos_executor_types::StateComputeResult;
use aptos_types::{
block_info::BlockInfo, contract_event::ContractEvent, randomness::Randomness,
transaction::{SignedTransaction, TransactionStatus}, validator_txn::ValidatorTransaction,
block_info::BlockInfo,
contract_event::ContractEvent,
randomness::Randomness,
transaction::{SignedTransaction, TransactionStatus},
validator_txn::ValidatorTransaction,
};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down Expand Up @@ -117,16 +120,22 @@ impl PipelinedBlock {
match txn {
TransactionStatus::Keep(_) => to_commit += 1,
TransactionStatus::Retry => to_retry += 1,
_ => {}
_ => {},
}
}

assert!(self.execution_summary.set(ExecutionSummary {
payload_len: self.block.payload().map_or(0, |payload| payload.len_for_execution()),
to_commit,
to_retry,
execution_time,
}).is_ok());
assert!(self
.execution_summary
.set(ExecutionSummary {
payload_len: self
.block
.payload()
.map_or(0, |payload| payload.len_for_execution()),
to_commit,
to_retry,
execution_time,
})
.is_ok());

self
}
Expand Down
8 changes: 6 additions & 2 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ use crate::{
};
use anyhow::{bail, ensure, format_err, Context};
use aptos_consensus_types::{
block::Block, common::Round, pipelined_block::{ExecutionSummary, PipelinedBlock}, quorum_cert::QuorumCert,
sync_info::SyncInfo, timeout_2chain::TwoChainTimeoutCertificate,
block::Block,
common::Round,
pipelined_block::{ExecutionSummary, PipelinedBlock},
quorum_cert::QuorumCert,
sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutCertificate,
wrapped_ledger_info::WrappedLedgerInfo,
};
use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue};
Expand Down
7 changes: 5 additions & 2 deletions consensus/src/block_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
// SPDX-License-Identifier: Apache-2.0

use aptos_consensus_types::{
pipelined_block::{ExecutionSummary, PipelinedBlock}, quorum_cert::QuorumCert, sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutCertificate, wrapped_ledger_info::WrappedLedgerInfo,
pipelined_block::{ExecutionSummary, PipelinedBlock},
quorum_cert::QuorumCert,
sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutCertificate,
wrapped_ledger_info::WrappedLedgerInfo,
};
use aptos_crypto::HashValue;
pub use block_store::{sync_manager::BlockRetriever, BlockStore};
Expand Down
5 changes: 2 additions & 3 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,11 @@ pub static CONSENSUS_PROPOSAL_PENDING_DURATION: Lazy<DurationHistogram> = Lazy::
});

/// Amount of time (in seconds) proposal is delayed due to backpressure/backoff
pub static PROPOSER_DELAY_PROPOSAL: Lazy<Gauge> = Lazy::new(|| {
register_gauge!(
pub static PROPOSER_DELAY_PROPOSAL: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
"aptos_proposer_delay_proposal",
"Amount of time (in seconds) proposal is delayed due to backpressure/backoff",
)
.unwrap()
});

/// Histogram for max number of transactions (after filtering for dedup, expirations, etc) proposer uses when creating block.
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/health/chain_health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl TChainHealth for ChainHealthBackoff {

chain_health_backoff.map(|value| {
(
value.max_sending_block_txns_override,
value.max_sending_block_txns_after_filtering_override,
value.max_sending_block_bytes_override,
)
})
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/health/pipeline_health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl TPipelineHealth for PipelineLatencyBasedBackpressure {
let latency = self.adapter.pipeline_pending_latency();
self.pipeline_config.get_backoff(latency).map(|config| {
(
config.max_sending_block_txns_override,
config.max_sending_block_txns_after_filtering_override,
config.max_sending_block_bytes_override,
)
})
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
self.time_service.clone(),
Duration::from_millis(self.config.quorum_store_poll_time_ms),
self.config.max_sending_block_txns,
self.config.max_sending_block_unique_txns,
self.config.max_sending_block_txns_after_filtering,
self.config.max_sending_block_bytes,
self.config.max_sending_inline_txns,
self.config.max_sending_inline_bytes,
Expand Down
63 changes: 46 additions & 17 deletions consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use super::{
use crate::{
block_storage::BlockReader,
counters::{
CHAIN_HEALTH_BACKOFF_TRIGGERED, PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED,
PROPOSER_DELAY_PROPOSAL, PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING,
PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE, PROPOSER_PENDING_BLOCKS_COUNT,
PROPOSER_PENDING_BLOCKS_FILL_FRACTION,
CHAIN_HEALTH_BACKOFF_TRIGGERED, EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED,
PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, PROPOSER_DELAY_PROPOSAL,
PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING, PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE,
PROPOSER_PENDING_BLOCKS_COUNT, PROPOSER_PENDING_BLOCKS_FILL_FRACTION,
},
payload_client::PayloadClient,
util::time_service::TimeService,
Expand All @@ -21,7 +21,11 @@ use aptos_config::config::{
ChainHealthBackoffValues, ExecutionBackpressureConfig, PipelineBackpressureValues,
};
use aptos_consensus_types::{
block::Block, block_data::BlockData, common::{Author, Payload, PayloadFilter, Round}, pipelined_block::ExecutionSummary, quorum_cert::QuorumCert
block::Block,
block_data::BlockData,
common::{Author, Payload, PayloadFilter, Round},
pipelined_block::ExecutionSummary,
quorum_cert::QuorumCert,
};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_logger::{error, info, sample, sample::SampleRate, warn};
Expand Down Expand Up @@ -161,11 +165,27 @@ impl PipelineBackpressureConfig {
let sizes = block_execution_times
.iter()
.flat_map(|summary| {
// for each block, compute target (re-calibrated) block size

let execution_time_ms = summary.execution_time.as_millis();
// Only block above the time threshold are considered giving enough signal to support calibration
// so we filter out shorter locks
if execution_time_ms > config.min_block_time_ms_to_activate as u128 {
// TODO: After cost of "retries" is reduced with execution pool, we
// should be computing block gas limit here, simply as:
// `config.target_block_time_ms / execution_time_ms * gas_consumed_by_block``
//
// Until then, we need to compute wanted block size to create.
// Unfortunatelly, there is multiple layers where transactions are filtered.
// After deduping/reordering logic is applied, max_txns_to_execute limits the transactions
// passed to executor (`summary.payload_len` here), and then some are discarded for various
// reasons, which we approximate are cheaply ignored.
// For the rest, only `summary.to_commit` fraction of `summary.to_commit + summary.to_retry`
// was executed. And so assuming same discard rate, we scale `summary.payload_len` with it.
Some(
((config.target_block_time_ms as f64 / execution_time_ms as f64
* (summary.to_commit as f64 / (summary.to_commit + summary.to_retry) as f64)
* (summary.to_commit as f64
/ (summary.to_commit + summary.to_retry) as f64)
* summary.payload_len as f64)
.floor() as u64)
.max(1),
Expand All @@ -179,9 +199,11 @@ impl PipelineBackpressureConfig {
.collect::<Vec<_>>();
info!("Estimated block back-offs block sizes: {:?}", sizes);
if sizes.len() >= config.min_blocks_to_activate {
Some(*sizes
.get((config.percentile * sizes.len() as f64) as usize)
.unwrap())
Some(
*sizes
.get((config.percentile * sizes.len() as f64) as usize)
.unwrap(),
)
} else {
None
}
Expand Down Expand Up @@ -379,7 +401,7 @@ impl ProposalGenerator {
max_txns_from_block_to_execute.unwrap_or(max_block_txns_after_filtering) as f64,
);

PROPOSER_DELAY_PROPOSAL.set(proposal_delay.as_secs_f64());
PROPOSER_DELAY_PROPOSAL.observe(proposal_delay.as_secs_f64());
if !proposal_delay.is_zero() {
tokio::time::sleep(proposal_delay).await;
}
Expand Down Expand Up @@ -476,7 +498,7 @@ impl ProposalGenerator {
timestamp: Duration,
round: Round,
) -> (u64, u64, Option<u64>, Duration) {
let mut values_max_block_txns = vec![self.max_block_txns_after_filtering];
let mut values_max_block_txns_after_filtering = vec![self.max_block_txns_after_filtering];
let mut values_max_block_bytes = vec![self.max_block_bytes];
let mut values_proposal_delay = vec![Duration::ZERO];
let mut values_max_txns_from_block_to_execute = vec![];
Expand All @@ -485,7 +507,8 @@ impl ProposalGenerator {
.chain_health_backoff_config
.get_backoff(voting_power_ratio);
if let Some(value) = chain_health_backoff {
values_max_block_txns.push(value.max_sending_block_txns_override);
values_max_block_txns_after_filtering
.push(value.max_sending_block_txns_after_filtering_override);
values_max_block_bytes.push(value.max_sending_block_bytes_override);
if let Some(val) = value.max_txns_from_block_to_execute {
values_max_txns_from_block_to_execute.push(val);
Expand All @@ -501,7 +524,8 @@ impl ProposalGenerator {
.pipeline_backpressure_config
.get_backoff(pipeline_pending_latency);
if let Some(value) = pipeline_backpressure {
values_max_block_txns.push(value.max_sending_block_txns_override);
values_max_block_txns_after_filtering
.push(value.max_sending_block_txns_after_filtering_override);
values_max_block_bytes.push(value.max_sending_block_bytes_override);
if let Some(val) = value.max_txns_from_block_to_execute {
values_max_txns_from_block_to_execute.push(val);
Expand All @@ -525,7 +549,7 @@ impl ProposalGenerator {
.get_recent_block_execution_times(config.num_blocks_to_look_at),
);
if let Some(execution_backpressure_block_size) = execution_backpressure {
values_max_block_txns.push(
values_max_block_txns_after_filtering.push(
(execution_backpressure_block_size as f64
* config.reordering_ovarpacking_factor.max(1.0))
as u64,
Expand All @@ -543,11 +567,16 @@ impl ProposalGenerator {
},
);

let max_block_txns = values_max_block_txns.into_iter().min().unwrap();
let max_block_txns = values_max_block_txns_after_filtering
.into_iter()
.min()
.unwrap();
let max_block_bytes = values_max_block_bytes.into_iter().min().unwrap();
let proposal_delay = values_proposal_delay.into_iter().max().unwrap();
let max_txns_from_block_to_execute =
values_max_txns_from_block_to_execute.into_iter().min().filter(|v| *v < max_block_txns);
let max_txns_from_block_to_execute = values_max_txns_from_block_to_execute
.into_iter()
.min()
.filter(|v| *v < max_block_txns);

warn!(
pipeline_pending_latency = pipeline_pending_latency.as_millis(),
Expand Down
2 changes: 1 addition & 1 deletion execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ impl StateComputeResult {
pub fn transactions_to_commit_len(&self) -> usize {
self.compute_status_for_input_txns()
.iter()
.filter(|status| if let TransactionStatus::Keep(_) = status { true } else { false })
.filter(|status| matches!(status, TransactionStatus::Keep(_)))
.count()
// StateCheckpoint/BlockEpilogue is added if there is no reconfiguration
+ (if self.has_reconfiguration() { 0 } else { 1 })
Expand Down
7 changes: 4 additions & 3 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ fn optimize_for_maximum_throughput(
) {
mempool_config_practically_non_expiring(&mut config.mempool);

config.consensus.max_sending_block_unique_txns = max_txns_per_block as u64;
config.consensus.max_sending_block_txns_after_filtering = max_txns_per_block as u64;
config.consensus.max_sending_block_txns = config
.consensus
.max_sending_block_txns
Expand Down Expand Up @@ -2253,9 +2253,10 @@ pub fn changing_working_quorum_test_helper(
} else {
for (i, item) in chain_health_backoff.iter_mut().enumerate() {
// as we have lower TPS, make limits smaller
item.max_sending_block_txns_override =
item.max_sending_block_txns_after_filtering_override =
(block_size / 2_u64.pow(i as u32 + 1)).max(2);
min_block_txns = min_block_txns.min(item.max_sending_block_txns_override);
min_block_txns =
min_block_txns.min(item.max_sending_block_txns_after_filtering_override);
// as we have fewer nodes, make backoff triggered earlier:
item.backoff_if_below_participating_voting_power_percentage = 90 - i * 5;
}
Expand Down

0 comments on commit 651c559

Please sign in to comment.