Skip to content

Commit

Permalink
[consensus] Proposal generator: fetch only to round-up over max_to_ex…
Browse files Browse the repository at this point in the history
…ecute (#14230)

Co-authored-by: Igor <[email protected]>
  • Loading branch information
igor-aptos and igor-aptos authored Aug 7, 2024
1 parent 9662ea3 commit 0690055
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 19 deletions.
5 changes: 5 additions & 0 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ pub struct ExecutionBackpressureConfig {
pub percentile: f64,
/// Recalibrating max block size, to target blocks taking this long.
pub target_block_time_ms: usize,
/// A minimal number of transactions per block, even if calibration suggests otherwise
/// To make sure backpressure doesn't become too aggressive.
pub min_calibrated_txns_per_block: u64,
// We compute re-calibrated block size, and use that for `max_txns_in_block`.
// But after execution pool and cost of overpacking being minimal - we should
// change so that backpressure sets `max_txns_to_execute` instead
Expand Down Expand Up @@ -228,6 +231,8 @@ impl Default for ConsensusConfig {
percentile: 0.5,
target_block_time_ms: 250,
min_block_time_ms_to_activate: 100,
// allow at least two spreading group from reordering in a single block, to utilize paralellism
min_calibrated_txns_per_block: 8,
}),
pipeline_backpressure: vec![
PipelineBackpressureValues {
Expand Down
9 changes: 6 additions & 3 deletions consensus/consensus-types/src/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pub enum GetPayloadCommand {
GetPayloadRequest(
// max number of transactions in the block
u64,
// max number of unique transactions in the block
// max number of transactions after filtering in the block
u64,
// soft max number of transactions after filtering in the block (i.e. include one that crosses it)
u64,
// max byte size
u64,
Expand All @@ -36,6 +38,7 @@ impl fmt::Display for GetPayloadCommand {
GetPayloadCommand::GetPayloadRequest(
max_txns,
max_txns_after_filtering,
soft_max_txns_after_filtering,
max_bytes,
max_inline_txns,
max_inline_bytes,
Expand All @@ -46,8 +49,8 @@ impl fmt::Display for GetPayloadCommand {
) => {
write!(
f,
"GetPayloadRequest [max_txns: {}, max_txns_after_filtering: {}, max_bytes: {}, max_inline_txns: {}, max_inline_bytes:{}, return_non_full: {}, excluded: {}, block_timestamp: {:?}]",
max_txns, max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, return_non_full, excluded, block_timestamp
"GetPayloadRequest [max_txns: {}, max_txns_after_filtering: {} (soft: {}), max_bytes: {}, max_inline_txns: {}, max_inline_bytes:{}, return_non_full: {}, excluded: {}, block_timestamp: {:?}]",
max_txns, max_txns_after_filtering, soft_max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, return_non_full, excluded, block_timestamp
)
},
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl DagDriver {
Duration::from_millis(self.payload_config.payload_pull_max_poll_time_ms),
max_txns,
max_txns,
max_txns,
max_size_bytes,
// TODO: Set max_inline_items and max_inline_bytes correctly
100,
Expand Down
7 changes: 5 additions & 2 deletions consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,10 @@ impl PipelineBackpressureConfig {
.sorted()
.collect::<Vec<_>>();
if sizes.len() >= config.min_blocks_to_activate {
let calibrated_block_size = *sizes
let calibrated_block_size = (*sizes
.get(((config.percentile * sizes.len() as f64) as usize).min(sizes.len() - 1))
.expect("guaranteed to be within vector size");
.expect("guaranteed to be within vector size"))
.max(config.min_calibrated_txns_per_block);
PROPOSER_ESTIMATED_CALIBRATED_BLOCK_TXNS.observe(calibrated_block_size as f64);
// Check if calibrated block size is reduction in size, to turn on backpressure.
if max_block_txns > calibrated_block_size {
Expand Down Expand Up @@ -449,12 +450,14 @@ impl ProposalGenerator {
.collect();
let validator_txn_filter =
vtxn_pool::TransactionFilter::PendingTxnHashSet(pending_validator_txn_hashes);

let (validator_txns, mut payload) = self
.payload_client
.pull_payload(
self.quorum_store_poll_time.saturating_sub(proposal_delay),
self.max_block_txns,
max_block_txns_after_filtering,
max_txns_from_block_to_execute.unwrap_or(max_block_txns_after_filtering),
max_block_bytes,
// TODO: Set max_inline_txns and max_inline_bytes correctly
self.max_inline_txns,
Expand Down
14 changes: 11 additions & 3 deletions consensus/src/payload_client/mixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl PayloadClient for MixedPayloadClient {
&self,
mut max_poll_time: Duration,
mut max_items: u64,
mut max_unique_items: u64,
mut max_items_after_filtering: u64,
mut soft_max_items_after_filtering: u64,
mut max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand Down Expand Up @@ -103,7 +104,8 @@ impl PayloadClient for MixedPayloadClient {
debug!("num_validator_txns={}", validator_txns.len());
// Update constraints with validator txn pull results.
max_items -= validator_txns.len() as u64;
max_unique_items -= validator_txns.len() as u64;
max_items_after_filtering -= validator_txns.len() as u64;
soft_max_items_after_filtering -= validator_txns.len() as u64;
max_bytes -= validator_txns
.iter()
.map(|txn| txn.size_in_bytes())
Expand All @@ -116,7 +118,8 @@ impl PayloadClient for MixedPayloadClient {
.pull(
max_poll_time,
max_items,
max_unique_items,
max_items_after_filtering,
soft_max_items_after_filtering,
max_bytes,
max_inline_items,
max_inline_bytes,
Expand Down Expand Up @@ -158,6 +161,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() {
Duration::from_secs(1), // max_poll_time
120, // max_items
99, // max_unique_items
99, // soft max_unique_items
1048576, // size limit: 1MB
50,
500000, // inline limit: 500KB
Expand All @@ -183,6 +187,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() {
Duration::from_micros(500), // max_poll_time
120, // max_items
99, // max_unique_items
99, // soft max_unique_items
1048576, // size limit: 1MB
50,
500000, // inline limit: 500KB
Expand All @@ -208,6 +213,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() {
Duration::from_secs(1), // max_poll_time
2, // max_items
2, // max_unique_items
2, // soft max_unique_items
1048576, // size limit: 1MB
0,
0, // inline limit: 0
Expand All @@ -233,6 +239,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() {
Duration::from_secs(1), // max_poll_time
120, // max_items
99, // max_unique_items
99, // soft max_unique_items
all_validator_txns[0].size_in_bytes() as u64,
50,
all_validator_txns[0].size_in_bytes() as u64,
Expand Down Expand Up @@ -276,6 +283,7 @@ async fn mixed_payload_client_should_respect_validator_txn_feature_flag() {
Duration::from_millis(50), // max_poll_time
120, // max_items
99, // max_unique_items
99, // soft max_unique_items
1048576, // size limit: 1MB
50,
500000, // inline limit: 500KB
Expand Down
4 changes: 3 additions & 1 deletion consensus/src/payload_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ pub mod validator;

#[async_trait::async_trait]
pub trait PayloadClient: Send + Sync {
#[allow(clippy::too_many_arguments)]
async fn pull_payload(
&self,
max_poll_time: Duration,
max_items: u64,
max_unique_items: u64,
max_items_after_filtering: u64,
soft_max_items_after_filtering: u64,
max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand Down
12 changes: 8 additions & 4 deletions consensus/src/payload_client/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ pub trait UserPayloadClient: Send + Sync {
&self,
max_poll_time: Duration,
max_items: u64,
max_unique_items: u64,
max_items_after_filtering: u64,
soft_max_items_after_filtering: u64,
max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand Down Expand Up @@ -51,7 +52,8 @@ impl UserPayloadClient for DummyClient {
&self,
max_poll_time: Duration,
mut max_items: u64,
mut max_unique_items: u64,
mut max_items_after_filtering: u64,
mut soft_max_items_after_filtering: u64,
mut max_bytes: u64,
_max_inline_items: u64,
_max_inline_bytes: u64,
Expand All @@ -67,7 +69,8 @@ impl UserPayloadClient for DummyClient {
let mut txns = vec![];
while timer.elapsed() < max_poll_time
&& max_items >= 1
&& max_unique_items >= 1
&& max_items_after_filtering >= 1
&& soft_max_items_after_filtering >= 1
&& max_bytes >= 1
&& nxt_txn_idx < self.txns.len()
{
Expand All @@ -78,7 +81,8 @@ impl UserPayloadClient for DummyClient {
break;
}
max_items -= 1;
max_unique_items -= 1;
max_items_after_filtering -= 1;
soft_max_items_after_filtering -= 1;
max_bytes -= txn_size;
nxt_txn_idx += 1;
txns.push(txn);
Expand Down
15 changes: 10 additions & 5 deletions consensus/src/payload_client/user/quorum_store_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ impl QuorumStoreClient {
async fn pull_internal(
&self,
max_items: u64,
max_unique_items: u64,
max_items_after_filtering: u64,
soft_max_items_after_filtering: u64,
max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand All @@ -57,7 +58,8 @@ impl QuorumStoreClient {
let (callback, callback_rcv) = oneshot::channel();
let req = GetPayloadCommand::GetPayloadRequest(
max_items,
max_unique_items,
max_items_after_filtering,
soft_max_items_after_filtering,
max_bytes,
max_inline_items,
max_inline_bytes,
Expand Down Expand Up @@ -92,7 +94,8 @@ impl UserPayloadClient for QuorumStoreClient {
&self,
max_poll_time: Duration,
max_items: u64,
max_unique_items: u64,
max_items_after_filtering: u64,
soft_max_items_after_filtering: u64,
max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand Down Expand Up @@ -123,7 +126,8 @@ impl UserPayloadClient for QuorumStoreClient {
let payload = self
.pull_internal(
max_items,
max_unique_items,
max_items_after_filtering,
soft_max_items_after_filtering,
max_bytes,
max_inline_items,
max_inline_bytes,
Expand All @@ -146,7 +150,8 @@ impl UserPayloadClient for QuorumStoreClient {
max_poll_time_ms = max_poll_time.as_millis() as u64,
payload_len = payload.len(),
max_items = max_items,
max_unique_items = max_unique_items,
max_items_after_filtering = max_items_after_filtering,
soft_max_items_after_filtering = soft_max_items_after_filtering,
max_bytes = max_bytes,
max_inline_items = max_inline_items,
max_inline_bytes = max_inline_bytes,
Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/direct_mempool_quorum_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl DirectMempoolQuorumStore {
GetPayloadCommand::GetPayloadRequest(
_max_txns,
max_txns_after_filtering,
_soft_max_txns_after_filtering,
max_bytes,
_max_inline_txns,
_max_inline_bytes,
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/quorum_store/proof_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl ProofManager {
GetPayloadCommand::GetPayloadRequest(
max_txns,
max_txns_after_filtering,
soft_max_txns_after_filtering,
max_bytes,
max_inline_txns,
max_inline_bytes,
Expand All @@ -228,6 +229,7 @@ impl ProofManager {
&excluded_batches,
max_txns,
max_txns_after_filtering,
soft_max_txns_after_filtering,
max_bytes,
return_non_full,
block_timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn test_block_request_no_txns() {
let (consensus_callback, consensus_callback_rcv) = oneshot::channel();
consensus_to_quorum_store_sender
.try_send(GetPayloadCommand::GetPayloadRequest(
100,
100,
100,
1000,
Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/tests/proof_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn get_proposal(
let (callback_tx, callback_rx) = oneshot::channel();
let filter_set = HashSet::from_iter(filter.iter().cloned());
let req = GetPayloadCommand::GetPayloadRequest(
max_txns,
max_txns,
max_txns,
1000000,
Expand Down
Loading

0 comments on commit 0690055

Please sign in to comment.