Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consensus] Proposal generator: fetch only to round-up over max_to_execute #14230

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ pub struct ExecutionBackpressureConfig {
pub percentile: f64,
/// Recalibrating max block size, to target blocks taking this long.
pub target_block_time_ms: usize,
/// A minimal number of transactions per block, even if calibration suggests otherwise
/// To make sure backpressure doesn't become too aggressive.
pub min_calibrated_txns_per_block: u64,
// We compute re-calibrated block size, and use that for `max_txns_in_block`.
// But after execution pool and cost of overpacking being minimal - we should
// change so that backpressure sets `max_txns_to_execute` instead
Expand Down Expand Up @@ -228,6 +231,8 @@ impl Default for ConsensusConfig {
percentile: 0.5,
target_block_time_ms: 250,
min_block_time_ms_to_activate: 100,
// allow at least two spreading group from reordering in a single block, to utilize paralellism
min_calibrated_txns_per_block: 8,
}),
pipeline_backpressure: vec![
PipelineBackpressureValues {
Expand Down
9 changes: 6 additions & 3 deletions consensus/consensus-types/src/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pub enum GetPayloadCommand {
GetPayloadRequest(
// max number of transactions in the block
u64,
// max number of unique transactions in the block
// max number of transactions after filtering in the block
u64,
// soft max number of transactions after filtering in the block (i.e. include one that crosses it)
u64,
// max byte size
u64,
Expand All @@ -36,6 +38,7 @@ impl fmt::Display for GetPayloadCommand {
GetPayloadCommand::GetPayloadRequest(
max_txns,
max_txns_after_filtering,
soft_max_txns_after_filtering,
max_bytes,
max_inline_txns,
max_inline_bytes,
Expand All @@ -46,8 +49,8 @@ impl fmt::Display for GetPayloadCommand {
) => {
write!(
f,
"GetPayloadRequest [max_txns: {}, max_txns_after_filtering: {}, max_bytes: {}, max_inline_txns: {}, max_inline_bytes:{}, return_non_full: {}, excluded: {}, block_timestamp: {:?}]",
max_txns, max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, return_non_full, excluded, block_timestamp
"GetPayloadRequest [max_txns: {}, max_txns_after_filtering: {} (soft: {}), max_bytes: {}, max_inline_txns: {}, max_inline_bytes:{}, return_non_full: {}, excluded: {}, block_timestamp: {:?}]",
max_txns, max_txns_after_filtering, soft_max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, return_non_full, excluded, block_timestamp
)
},
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl DagDriver {
Duration::from_millis(self.payload_config.payload_pull_max_poll_time_ms),
max_txns,
max_txns,
max_txns,
max_size_bytes,
// TODO: Set max_inline_items and max_inline_bytes correctly
100,
Expand Down
7 changes: 5 additions & 2 deletions consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,10 @@ impl PipelineBackpressureConfig {
.sorted()
.collect::<Vec<_>>();
if sizes.len() >= config.min_blocks_to_activate {
let calibrated_block_size = *sizes
let calibrated_block_size = (*sizes
.get(((config.percentile * sizes.len() as f64) as usize).min(sizes.len() - 1))
.expect("guaranteed to be within vector size");
.expect("guaranteed to be within vector size"))
.max(config.min_calibrated_txns_per_block);
PROPOSER_ESTIMATED_CALIBRATED_BLOCK_TXNS.observe(calibrated_block_size as f64);
// Check if calibrated block size is reduction in size, to turn on backpressure.
if max_block_txns > calibrated_block_size {
Expand Down Expand Up @@ -449,12 +450,14 @@ impl ProposalGenerator {
.collect();
let validator_txn_filter =
vtxn_pool::TransactionFilter::PendingTxnHashSet(pending_validator_txn_hashes);

let (validator_txns, mut payload) = self
.payload_client
.pull_payload(
self.quorum_store_poll_time.saturating_sub(proposal_delay),
self.max_block_txns,
max_block_txns_after_filtering,
max_txns_from_block_to_execute.unwrap_or(max_block_txns_after_filtering),
max_block_bytes,
// TODO: Set max_inline_txns and max_inline_bytes correctly
self.max_inline_txns,
Expand Down
14 changes: 11 additions & 3 deletions consensus/src/payload_client/mixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl PayloadClient for MixedPayloadClient {
&self,
mut max_poll_time: Duration,
mut max_items: u64,
mut max_unique_items: u64,
mut max_items_after_filtering: u64,
mut soft_max_items_after_filtering: u64,
mut max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand Down Expand Up @@ -103,7 +104,8 @@ impl PayloadClient for MixedPayloadClient {
debug!("num_validator_txns={}", validator_txns.len());
// Update constraints with validator txn pull results.
max_items -= validator_txns.len() as u64;
max_unique_items -= validator_txns.len() as u64;
max_items_after_filtering -= validator_txns.len() as u64;
soft_max_items_after_filtering -= validator_txns.len() as u64;
max_bytes -= validator_txns
.iter()
.map(|txn| txn.size_in_bytes())
Expand All @@ -116,7 +118,8 @@ impl PayloadClient for MixedPayloadClient {
.pull(
max_poll_time,
max_items,
max_unique_items,
max_items_after_filtering,
soft_max_items_after_filtering,
max_bytes,
max_inline_items,
max_inline_bytes,
Expand Down Expand Up @@ -158,6 +161,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() {
Duration::from_secs(1), // max_poll_time
120, // max_items
99, // max_unique_items
99, // soft max_unique_items
1048576, // size limit: 1MB
50,
500000, // inline limit: 500KB
Expand All @@ -183,6 +187,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() {
Duration::from_micros(500), // max_poll_time
120, // max_items
99, // max_unique_items
99, // soft max_unique_items
1048576, // size limit: 1MB
50,
500000, // inline limit: 500KB
Expand All @@ -208,6 +213,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() {
Duration::from_secs(1), // max_poll_time
2, // max_items
2, // max_unique_items
2, // soft max_unique_items
1048576, // size limit: 1MB
0,
0, // inline limit: 0
Expand All @@ -233,6 +239,7 @@ async fn mixed_payload_client_should_prioritize_validator_txns() {
Duration::from_secs(1), // max_poll_time
120, // max_items
99, // max_unique_items
99, // soft max_unique_items
all_validator_txns[0].size_in_bytes() as u64,
50,
all_validator_txns[0].size_in_bytes() as u64,
Expand Down Expand Up @@ -276,6 +283,7 @@ async fn mixed_payload_client_should_respect_validator_txn_feature_flag() {
Duration::from_millis(50), // max_poll_time
120, // max_items
99, // max_unique_items
99, // soft max_unique_items
1048576, // size limit: 1MB
50,
500000, // inline limit: 500KB
Expand Down
4 changes: 3 additions & 1 deletion consensus/src/payload_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ pub mod validator;

#[async_trait::async_trait]
pub trait PayloadClient: Send + Sync {
#[allow(clippy::too_many_arguments)]
async fn pull_payload(
&self,
max_poll_time: Duration,
max_items: u64,
max_unique_items: u64,
max_items_after_filtering: u64,
soft_max_items_after_filtering: u64,
max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand Down
12 changes: 8 additions & 4 deletions consensus/src/payload_client/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ pub trait UserPayloadClient: Send + Sync {
&self,
max_poll_time: Duration,
max_items: u64,
max_unique_items: u64,
max_items_after_filtering: u64,
soft_max_items_after_filtering: u64,
max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand Down Expand Up @@ -51,7 +52,8 @@ impl UserPayloadClient for DummyClient {
&self,
max_poll_time: Duration,
mut max_items: u64,
mut max_unique_items: u64,
mut max_items_after_filtering: u64,
mut soft_max_items_after_filtering: u64,
mut max_bytes: u64,
_max_inline_items: u64,
_max_inline_bytes: u64,
Expand All @@ -67,7 +69,8 @@ impl UserPayloadClient for DummyClient {
let mut txns = vec![];
while timer.elapsed() < max_poll_time
&& max_items >= 1
&& max_unique_items >= 1
&& max_items_after_filtering >= 1
&& soft_max_items_after_filtering >= 1
&& max_bytes >= 1
&& nxt_txn_idx < self.txns.len()
{
Expand All @@ -78,7 +81,8 @@ impl UserPayloadClient for DummyClient {
break;
}
max_items -= 1;
max_unique_items -= 1;
max_items_after_filtering -= 1;
soft_max_items_after_filtering -= 1;
max_bytes -= txn_size;
nxt_txn_idx += 1;
txns.push(txn);
Expand Down
15 changes: 10 additions & 5 deletions consensus/src/payload_client/user/quorum_store_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ impl QuorumStoreClient {
async fn pull_internal(
&self,
max_items: u64,
max_unique_items: u64,
max_items_after_filtering: u64,
soft_max_items_after_filtering: u64,
max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand All @@ -57,7 +58,8 @@ impl QuorumStoreClient {
let (callback, callback_rcv) = oneshot::channel();
let req = GetPayloadCommand::GetPayloadRequest(
max_items,
max_unique_items,
max_items_after_filtering,
soft_max_items_after_filtering,
max_bytes,
max_inline_items,
max_inline_bytes,
Expand Down Expand Up @@ -92,7 +94,8 @@ impl UserPayloadClient for QuorumStoreClient {
&self,
max_poll_time: Duration,
max_items: u64,
max_unique_items: u64,
max_items_after_filtering: u64,
soft_max_items_after_filtering: u64,
max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
Expand Down Expand Up @@ -123,7 +126,8 @@ impl UserPayloadClient for QuorumStoreClient {
let payload = self
.pull_internal(
max_items,
max_unique_items,
max_items_after_filtering,
soft_max_items_after_filtering,
max_bytes,
max_inline_items,
max_inline_bytes,
Expand All @@ -146,7 +150,8 @@ impl UserPayloadClient for QuorumStoreClient {
max_poll_time_ms = max_poll_time.as_millis() as u64,
payload_len = payload.len(),
max_items = max_items,
max_unique_items = max_unique_items,
max_items_after_filtering = max_items_after_filtering,
soft_max_items_after_filtering = soft_max_items_after_filtering,
max_bytes = max_bytes,
max_inline_items = max_inline_items,
max_inline_bytes = max_inline_bytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl DirectMempoolQuorumStore {
GetPayloadCommand::GetPayloadRequest(
_max_txns,
max_txns_after_filtering,
_soft_max_txns_after_filtering,
max_bytes,
_max_inline_txns,
_max_inline_bytes,
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/quorum_store/proof_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl ProofManager {
GetPayloadCommand::GetPayloadRequest(
max_txns,
max_txns_after_filtering,
soft_max_txns_after_filtering,
max_bytes,
max_inline_txns,
max_inline_bytes,
Expand All @@ -228,6 +229,7 @@ impl ProofManager {
&excluded_batches,
max_txns,
max_txns_after_filtering,
soft_max_txns_after_filtering,
max_bytes,
return_non_full,
block_timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn test_block_request_no_txns() {
let (consensus_callback, consensus_callback_rcv) = oneshot::channel();
consensus_to_quorum_store_sender
.try_send(GetPayloadCommand::GetPayloadRequest(
100,
100,
100,
1000,
Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/tests/proof_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn get_proposal(
let (callback_tx, callback_rx) = oneshot::channel();
let filter_set = HashSet::from_iter(filter.iter().cloned());
let req = GetPayloadCommand::GetPayloadRequest(
max_txns,
max_txns,
max_txns,
1000000,
Expand Down
Loading
Loading