diff --git a/config/src/config/consensus_config.rs b/config/src/config/consensus_config.rs index a16abf48cd795..b10b431edd9fc 100644 --- a/config/src/config/consensus_config.rs +++ b/config/src/config/consensus_config.rs @@ -155,6 +155,9 @@ pub struct ExecutionBackpressureConfig { pub percentile: f64, /// Recalibrating max block size, to target blocks taking this long. pub target_block_time_ms: usize, + /// A minimal number of transactions per block, even if calibration suggests otherwise + /// To make sure backpressure doesn't become too aggressive. + pub min_calibrated_txns_per_block: u64, // We compute re-calibrated block size, and use that for `max_txns_in_block`. // But after execution pool and cost of overpacking being minimal - we should // change so that backpressure sets `max_txns_to_execute` instead @@ -228,6 +231,8 @@ impl Default for ConsensusConfig { percentile: 0.5, target_block_time_ms: 250, min_block_time_ms_to_activate: 100, + // allow at least two spreading group from reordering in a single block, to utilize paralellism + min_calibrated_txns_per_block: 8, }), pipeline_backpressure: vec![ PipelineBackpressureValues { diff --git a/consensus/consensus-types/src/request_response.rs b/consensus/consensus-types/src/request_response.rs index ef9e9485a6a6b..d65c548689f5f 100644 --- a/consensus/consensus-types/src/request_response.rs +++ b/consensus/consensus-types/src/request_response.rs @@ -11,7 +11,9 @@ pub enum GetPayloadCommand { GetPayloadRequest( // max number of transactions in the block u64, - // max number of unique transactions in the block + // max number of transactions after filtering in the block + u64, + // soft max number of transactions after filtering in the block (i.e. include one that crosses it) u64, // max byte size u64, @@ -36,6 +38,7 @@ impl fmt::Display for GetPayloadCommand { GetPayloadCommand::GetPayloadRequest( max_txns, max_txns_after_filtering, + soft_max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, @@ -46,8 +49,8 @@ impl fmt::Display for GetPayloadCommand { ) => { write!( f, - "GetPayloadRequest [max_txns: {}, max_txns_after_filtering: {}, max_bytes: {}, max_inline_txns: {}, max_inline_bytes:{}, return_non_full: {}, excluded: {}, block_timestamp: {:?}]", - max_txns, max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, return_non_full, excluded, block_timestamp + "GetPayloadRequest [max_txns: {}, max_txns_after_filtering: {} (soft: {}), max_bytes: {}, max_inline_txns: {}, max_inline_bytes:{}, return_non_full: {}, excluded: {}, block_timestamp: {:?}]", + max_txns, max_txns_after_filtering, soft_max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, return_non_full, excluded, block_timestamp ) }, } diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 2a81472d7cf14..28fb11a222c8a 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -258,6 +258,7 @@ impl DagDriver { Duration::from_millis(self.payload_config.payload_pull_max_poll_time_ms), max_txns, max_txns, + max_txns, max_size_bytes, // TODO: Set max_inline_items and max_inline_bytes correctly 100, diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index f70e4a154f1e3..d2933bbd35ed2 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -195,9 +195,10 @@ impl PipelineBackpressureConfig { .sorted() .collect::>(); if sizes.len() >= config.min_blocks_to_activate { - let calibrated_block_size = *sizes + let calibrated_block_size = (*sizes .get(((config.percentile * sizes.len() as f64) as usize).min(sizes.len() - 1)) - .expect("guaranteed to be within vector size"); + .expect("guaranteed to be within vector size")) + .max(config.min_calibrated_txns_per_block); PROPOSER_ESTIMATED_CALIBRATED_BLOCK_TXNS.observe(calibrated_block_size as f64); // Check if calibrated block size is reduction in size, to turn on backpressure. if max_block_txns > calibrated_block_size { @@ -449,12 +450,14 @@ impl ProposalGenerator { .collect(); let validator_txn_filter = vtxn_pool::TransactionFilter::PendingTxnHashSet(pending_validator_txn_hashes); + let (validator_txns, mut payload) = self .payload_client .pull_payload( self.quorum_store_poll_time.saturating_sub(proposal_delay), self.max_block_txns, max_block_txns_after_filtering, + max_txns_from_block_to_execute.unwrap_or(max_block_txns_after_filtering), max_block_bytes, // TODO: Set max_inline_txns and max_inline_bytes correctly self.max_inline_txns, diff --git a/consensus/src/payload_client/mixed.rs b/consensus/src/payload_client/mixed.rs index 3b7ede4d813f0..5b36aac85f342 100644 --- a/consensus/src/payload_client/mixed.rs +++ b/consensus/src/payload_client/mixed.rs @@ -68,7 +68,8 @@ impl PayloadClient for MixedPayloadClient { &self, mut max_poll_time: Duration, mut max_items: u64, - mut max_unique_items: u64, + mut max_items_after_filtering: u64, + mut soft_max_items_after_filtering: u64, mut max_bytes: u64, max_inline_items: u64, max_inline_bytes: u64, @@ -103,7 +104,8 @@ impl PayloadClient for MixedPayloadClient { debug!("num_validator_txns={}", validator_txns.len()); // Update constraints with validator txn pull results. max_items -= validator_txns.len() as u64; - max_unique_items -= validator_txns.len() as u64; + max_items_after_filtering -= validator_txns.len() as u64; + soft_max_items_after_filtering -= validator_txns.len() as u64; max_bytes -= validator_txns .iter() .map(|txn| txn.size_in_bytes()) @@ -116,7 +118,8 @@ impl PayloadClient for MixedPayloadClient { .pull( max_poll_time, max_items, - max_unique_items, + max_items_after_filtering, + soft_max_items_after_filtering, max_bytes, max_inline_items, max_inline_bytes, @@ -158,6 +161,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() { Duration::from_secs(1), // max_poll_time 120, // max_items 99, // max_unique_items + 99, // soft max_unique_items 1048576, // size limit: 1MB 50, 500000, // inline limit: 500KB @@ -183,6 +187,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() { Duration::from_micros(500), // max_poll_time 120, // max_items 99, // max_unique_items + 99, // soft max_unique_items 1048576, // size limit: 1MB 50, 500000, // inline limit: 500KB @@ -208,6 +213,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() { Duration::from_secs(1), // max_poll_time 2, // max_items 2, // max_unique_items + 2, // soft max_unique_items 1048576, // size limit: 1MB 0, 0, // inline limit: 0 @@ -233,6 +239,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() { Duration::from_secs(1), // max_poll_time 120, // max_items 99, // max_unique_items + 99, // soft max_unique_items all_validator_txns[0].size_in_bytes() as u64, 50, all_validator_txns[0].size_in_bytes() as u64, @@ -276,6 +283,7 @@ async fn mixed_payload_client_should_respect_validator_txn_feature_flag() { Duration::from_millis(50), // max_poll_time 120, // max_items 99, // max_unique_items + 99, // soft max_unique_items 1048576, // size limit: 1MB 50, 500000, // inline limit: 500KB diff --git a/consensus/src/payload_client/mod.rs b/consensus/src/payload_client/mod.rs index c91ee0cb5fed8..d7a87abea01f1 100644 --- a/consensus/src/payload_client/mod.rs +++ b/consensus/src/payload_client/mod.rs @@ -14,11 +14,13 @@ pub mod validator; #[async_trait::async_trait] pub trait PayloadClient: Send + Sync { + #[allow(clippy::too_many_arguments)] async fn pull_payload( &self, max_poll_time: Duration, max_items: u64, - max_unique_items: u64, + max_items_after_filtering: u64, + soft_max_items_after_filtering: u64, max_bytes: u64, max_inline_items: u64, max_inline_bytes: u64, diff --git a/consensus/src/payload_client/user/mod.rs b/consensus/src/payload_client/user/mod.rs index 3f5c4ffde1a47..caac9652f7e32 100644 --- a/consensus/src/payload_client/user/mod.rs +++ b/consensus/src/payload_client/user/mod.rs @@ -18,7 +18,8 @@ pub trait UserPayloadClient: Send + Sync { &self, max_poll_time: Duration, max_items: u64, - max_unique_items: u64, + max_items_after_filtering: u64, + soft_max_items_after_filtering: u64, max_bytes: u64, max_inline_items: u64, max_inline_bytes: u64, @@ -51,7 +52,8 @@ impl UserPayloadClient for DummyClient { &self, max_poll_time: Duration, mut max_items: u64, - mut max_unique_items: u64, + mut max_items_after_filtering: u64, + mut soft_max_items_after_filtering: u64, mut max_bytes: u64, _max_inline_items: u64, _max_inline_bytes: u64, @@ -67,7 +69,8 @@ impl UserPayloadClient for DummyClient { let mut txns = vec![]; while timer.elapsed() < max_poll_time && max_items >= 1 - && max_unique_items >= 1 + && max_items_after_filtering >= 1 + && soft_max_items_after_filtering >= 1 && max_bytes >= 1 && nxt_txn_idx < self.txns.len() { @@ -78,7 +81,8 @@ impl UserPayloadClient for DummyClient { break; } max_items -= 1; - max_unique_items -= 1; + max_items_after_filtering -= 1; + soft_max_items_after_filtering -= 1; max_bytes -= txn_size; nxt_txn_idx += 1; txns.push(txn); diff --git a/consensus/src/payload_client/user/quorum_store_client.rs b/consensus/src/payload_client/user/quorum_store_client.rs index 576dfea4c7259..8e0b6fb134cef 100644 --- a/consensus/src/payload_client/user/quorum_store_client.rs +++ b/consensus/src/payload_client/user/quorum_store_client.rs @@ -46,7 +46,8 @@ impl QuorumStoreClient { async fn pull_internal( &self, max_items: u64, - max_unique_items: u64, + max_items_after_filtering: u64, + soft_max_items_after_filtering: u64, max_bytes: u64, max_inline_items: u64, max_inline_bytes: u64, @@ -57,7 +58,8 @@ impl QuorumStoreClient { let (callback, callback_rcv) = oneshot::channel(); let req = GetPayloadCommand::GetPayloadRequest( max_items, - max_unique_items, + max_items_after_filtering, + soft_max_items_after_filtering, max_bytes, max_inline_items, max_inline_bytes, @@ -92,7 +94,8 @@ impl UserPayloadClient for QuorumStoreClient { &self, max_poll_time: Duration, max_items: u64, - max_unique_items: u64, + max_items_after_filtering: u64, + soft_max_items_after_filtering: u64, max_bytes: u64, max_inline_items: u64, max_inline_bytes: u64, @@ -123,7 +126,8 @@ impl UserPayloadClient for QuorumStoreClient { let payload = self .pull_internal( max_items, - max_unique_items, + max_items_after_filtering, + soft_max_items_after_filtering, max_bytes, max_inline_items, max_inline_bytes, @@ -146,7 +150,8 @@ impl UserPayloadClient for QuorumStoreClient { max_poll_time_ms = max_poll_time.as_millis() as u64, payload_len = payload.len(), max_items = max_items, - max_unique_items = max_unique_items, + max_items_after_filtering = max_items_after_filtering, + soft_max_items_after_filtering = soft_max_items_after_filtering, max_bytes = max_bytes, max_inline_items = max_inline_items, max_inline_bytes = max_inline_bytes, diff --git a/consensus/src/quorum_store/direct_mempool_quorum_store.rs b/consensus/src/quorum_store/direct_mempool_quorum_store.rs index 3c7cd599da73a..82af6b534e9a8 100644 --- a/consensus/src/quorum_store/direct_mempool_quorum_store.rs +++ b/consensus/src/quorum_store/direct_mempool_quorum_store.rs @@ -140,6 +140,7 @@ impl DirectMempoolQuorumStore { GetPayloadCommand::GetPayloadRequest( _max_txns, max_txns_after_filtering, + _soft_max_txns_after_filtering, max_bytes, _max_inline_txns, _max_inline_bytes, diff --git a/consensus/src/quorum_store/proof_manager.rs b/consensus/src/quorum_store/proof_manager.rs index 997ee89789e4d..659ad433d9195 100644 --- a/consensus/src/quorum_store/proof_manager.rs +++ b/consensus/src/quorum_store/proof_manager.rs @@ -207,6 +207,7 @@ impl ProofManager { GetPayloadCommand::GetPayloadRequest( max_txns, max_txns_after_filtering, + soft_max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, @@ -228,6 +229,7 @@ impl ProofManager { &excluded_batches, max_txns, max_txns_after_filtering, + soft_max_txns_after_filtering, max_bytes, return_non_full, block_timestamp, diff --git a/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs b/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs index 03d9aa45cae37..cb6e56d1d8e5a 100644 --- a/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs +++ b/consensus/src/quorum_store/tests/direct_mempool_quorum_store_test.rs @@ -30,6 +30,7 @@ async fn test_block_request_no_txns() { let (consensus_callback, consensus_callback_rcv) = oneshot::channel(); consensus_to_quorum_store_sender .try_send(GetPayloadCommand::GetPayloadRequest( + 100, 100, 100, 1000, diff --git a/consensus/src/quorum_store/tests/proof_manager_test.rs b/consensus/src/quorum_store/tests/proof_manager_test.rs index 3424bccb45441..1899423d9f042 100644 --- a/consensus/src/quorum_store/tests/proof_manager_test.rs +++ b/consensus/src/quorum_store/tests/proof_manager_test.rs @@ -54,6 +54,7 @@ async fn get_proposal( let (callback_tx, callback_rx) = oneshot::channel(); let filter_set = HashSet::from_iter(filter.iter().cloned()); let req = GetPayloadCommand::GetPayloadRequest( + max_txns, max_txns, max_txns, 1000000, diff --git a/consensus/src/quorum_store/tests/utils.rs b/consensus/src/quorum_store/tests/utils.rs index 6214bb7db6603..6b6f765d3080d 100644 --- a/consensus/src/quorum_store/tests/utils.rs +++ b/consensus/src/quorum_store/tests/utils.rs @@ -33,6 +33,28 @@ fn proof_of_store( ) } +fn proof_of_store_with_size( + author: PeerId, + batch_id: BatchId, + gas_bucket_start: u64, + expiration: u64, + num_txns: u64, +) -> ProofOfStore { + ProofOfStore::new( + BatchInfo::new( + author, + batch_id, + 0, + expiration, + HashValue::random(), + num_txns, + num_txns, + gas_bucket_start, + ), + AggregateSignature::empty(), + ) +} + #[test] fn test_proof_queue_sorting() { let my_peer_id = PeerId::random(); @@ -66,6 +88,7 @@ fn test_proof_queue_sorting() { 4, 2, 2, + 2, true, aptos_infallible::duration_since_epoch(), ); @@ -95,6 +118,7 @@ fn test_proof_queue_sorting() { 6, 4, 4, + 4, true, aptos_infallible::duration_since_epoch(), ); @@ -474,6 +498,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![], 8, 4, + 4, 400, true, Duration::from_micros(now_in_usecs), @@ -500,6 +525,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![info_0.clone()], 8, 4, + 4, 400, true, Duration::from_micros(now_in_usecs), @@ -514,6 +540,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![], 8, 5, + 5, 400, true, Duration::from_micros(now_in_usecs + 500_100), @@ -526,6 +553,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![], 8, 5, + 5, 400, true, Duration::from_micros(now_in_usecs + 1_000_100), @@ -539,6 +567,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![], 8, 4, + 4, 400, true, Duration::from_micros(now_in_usecs + 1_200_100), @@ -552,6 +581,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![], 8, 4, + 4, 400, true, Duration::from_micros(now_in_usecs + 2_000_100), @@ -565,6 +595,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![], 8, 4, + 4, 400, true, Duration::from_micros(now_in_usecs + 2_500_100), @@ -576,6 +607,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![info_7], 8, 4, + 4, 400, true, Duration::from_micros(now_in_usecs + 2_500_100), @@ -589,6 +621,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![], 8, 4, + 4, 400, true, Duration::from_micros(now_in_usecs + 3_000_100), @@ -602,6 +635,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![], 8, 4, + 4, 400, true, Duration::from_micros(now_in_usecs + 3_500_100), @@ -615,6 +649,7 @@ fn test_proof_pull_proofs_with_duplicates() { &hashset![], 8, 4, + 4, 400, true, Duration::from_micros(now_in_usecs + 4_000_100), @@ -624,3 +659,46 @@ fn test_proof_pull_proofs_with_duplicates() { assert_eq!(result.0.len(), 2); assert_eq!(result.1, 0); } + +#[test] +fn test_proof_queue_soft_limit() { + let my_peer_id = PeerId::random(); + let mut proof_queue = ProofQueue::new(my_peer_id); + + let author = PeerId::random(); + + let author_batches = vec![ + proof_of_store_with_size(author, BatchId::new_for_test(0), 100, 1, 10), + proof_of_store_with_size(author, BatchId::new_for_test(1), 200, 1, 10), + proof_of_store_with_size(author, BatchId::new_for_test(2), 200, 1, 10), + ]; + for batch in author_batches { + proof_queue.push(batch); + } + + let (pulled, num_unique_txns, _) = proof_queue.pull_proofs( + &hashset![], + 100, + 12, + 12, + 100, + true, + aptos_infallible::duration_since_epoch(), + ); + + assert_eq!(pulled.len(), 1); + assert_eq!(num_unique_txns, 10); + + let (pulled, num_unique_txns, _) = proof_queue.pull_proofs( + &hashset![], + 100, + 30, + 12, + 100, + true, + aptos_infallible::duration_since_epoch(), + ); + + assert_eq!(pulled.len(), 2); + assert_eq!(num_unique_txns, 20); +} diff --git a/consensus/src/quorum_store/utils.rs b/consensus/src/quorum_store/utils.rs index 6e631e06afe2a..a293140913446 100644 --- a/consensus/src/quorum_store/utils.rs +++ b/consensus/src/quorum_store/utils.rs @@ -462,6 +462,7 @@ impl ProofQueue { excluded_batches: &HashSet, max_txns: u64, max_txns_after_filtering: u64, + soft_max_txns_after_filtering: u64, max_bytes: u64, return_non_full: bool, block_timestamp: Duration, @@ -544,6 +545,7 @@ impl ProofQueue { if cur_bytes == max_bytes || cur_all_txns == max_txns || cur_unique_txns == max_txns_after_filtering + || cur_unique_txns >= soft_max_txns_after_filtering { full = true; return false; @@ -562,6 +564,7 @@ impl ProofQueue { block_unique_txns = cur_unique_txns, max_txns = max_txns, max_txns_after_filtering = max_txns_after_filtering, + soft_max_txns_after_filtering = soft_max_txns_after_filtering, max_bytes = max_bytes, batch_count = ret.len(), full = full, diff --git a/consensus/src/test_utils/mock_payload_manager.rs b/consensus/src/test_utils/mock_payload_manager.rs index fec52c63599b2..0902df7109fba 100644 --- a/consensus/src/test_utils/mock_payload_manager.rs +++ b/consensus/src/test_utils/mock_payload_manager.rs @@ -58,7 +58,8 @@ impl PayloadClient for MockPayloadManager { &self, _max_poll_time: Duration, _max_size: u64, - _max_unique_size: u64, + _max_size_after_filtering: u64, + _soft_max_size_after_filtering: u64, _max_bytes: u64, _max_inline_size: u64, _max_inline_bytes: u64,