Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High tps test #13573

Closed
wants to merge 74 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
9836994
Quorum store config
sitalkedia Jun 5, 2024
1829a96
Block size and gas limit tuning
sitalkedia Jun 5, 2024
daeed8b
100 validator test
sitalkedia Jun 5, 2024
ac0e4e1
enable consensus observer and mempool backlog tuning
sitalkedia Jun 5, 2024
82d4f71
Use crazy machines
sitalkedia Jun 5, 2024
89e2e66
Optimized state sync throughput
sitalkedia Jun 5, 2024
3dca531
100 node test
sitalkedia Jun 5, 2024
9b53d88
remove VFNs
sitalkedia Jun 5, 2024
bb85d11
Increase the duration to 20m
sitalkedia Jun 6, 2024
66f5846
Add increase fraction to qs backpressure config
vusirikala Jun 6, 2024
ec34e5d
Start with high dynamic_pull_txn_per_s
vusirikala Jun 6, 2024
e5860f0
Increase batch expiration time to 3 seconds
vusirikala Jun 7, 2024
942e2d6
increase duration
vusirikala Jun 7, 2024
2dd6fd9
Fixed the typo in batch generator
vusirikala Jun 7, 2024
635d541
Add counters
vusirikala Jun 7, 2024
63f13e6
Pull txns more frequently
vusirikala Jun 7, 2024
bc5eb70
Merge branch 'main' into high_tps_test
vusirikala Jun 7, 2024
a9c951c
Reduced the limits for pulling
vusirikala Jun 7, 2024
2d7a174
Add more counters
vusirikala Jun 8, 2024
336dc35
Add counters
vusirikala Jun 8, 2024
8db0ed0
Change priority order when pulling from mempool
vusirikala Jun 8, 2024
cbb2e7c
Update sorting order:
vusirikala Jun 8, 2024
bf71df1
Fixing a counter
vusirikala Jun 8, 2024
b43b466
Add more conters
vusirikala Jun 8, 2024
022ed02
Merge branch 'main' into high_tps_test
vusirikala Jun 11, 2024
d24337f
Remove inserted txns from skipped
vusirikala Jun 11, 2024
c033e27
Add a counter
vusirikala Jun 11, 2024
93d1898
Added more counters for sent batch requests and implemented increase …
vusirikala Jun 12, 2024
cc87685
Add more counters
vusirikala Jun 12, 2024
00b54d0
Merge branch 'main' into high_tps_test
vusirikala Jun 12, 2024
69b830e
remove some counters
vusirikala Jun 12, 2024
d8f7188
remove some counters
vusirikala Jun 12, 2024
52bb2b6
Update counters
vusirikala Jun 12, 2024
c7dbd14
Merge branch 'main' into high_tps_test
vusirikala Jun 12, 2024
b08df0a
Add counters
vusirikala Jun 12, 2024
f6409bd
Add latency counters
vusirikala Jun 12, 2024
1460da1
Add latency counters
vusirikala Jun 12, 2024
dbbf7c3
Use the old get_batch code
vusirikala Jun 12, 2024
83c9b92
Use durationhistogram
vusirikala Jun 13, 2024
dca1972
Fix
vusirikala Jun 13, 2024
c0efbd9
Add more counters
vusirikala Jun 13, 2024
5c70d3d
Add counters in mempool
vusirikala Jun 13, 2024
2728f93
Add transactions pulled total count counter
vusirikala Jun 14, 2024
bb281ed
Add transactions pulled total count counter
vusirikala Jun 14, 2024
76c6ad8
Add transactions pulled total count counter
vusirikala Jun 14, 2024
94e32a2
Merge branch 'main' into high_tps_test
vusirikala Jun 18, 2024
bc0ca52
Switch with load_sweep_env
vusirikala Jun 18, 2024
7f32471
Add some latency counters
vusirikala Jun 22, 2024
7d53ab1
Adding more counters
vusirikala Jun 24, 2024
2227114
Removing total_num_txns
vusirikala Jun 24, 2024
fbdeba9
Remove call to actual num transactions
vusirikala Jun 24, 2024
8e23dc9
Merge branch 'main' into high_tps_test
vusirikala Jun 24, 2024
4fa1d43
Remove subtraction
vusirikala Jun 24, 2024
489ae74
Add some counters in info
vusirikala Jun 24, 2024
7e9241b
Add info for skipped txns
vusirikala Jun 24, 2024
f9ab336
Check for account sequence number in excluded
vusirikala Jun 24, 2024
081a348
bypass if the previous seq number is excluded
vusirikala Jun 24, 2024
73631ee
Add more print statements
vusirikala Jun 25, 2024
e95a2cb
Add more counters
vusirikala Jun 25, 2024
a04b89c
Add more info statements
vusirikala Jun 25, 2024
d38c880
Replace btreeset with hashet for priority index
vusirikala Jun 25, 2024
e977065
Add info statements in transaction store
vusirikala Jun 26, 2024
7c384da
Print result length
vusirikala Jun 26, 2024
0db0057
Removing ordering for orderedkey
vusirikala Jun 26, 2024
f9a6c36
Print max txns as well
vusirikala Jun 27, 2024
d64e7d5
More info statements
vusirikala Jun 27, 2024
3336a64
Use pull txn window in batch generator
vusirikala Jun 27, 2024
236f86b
Add info statements in batch generator
vusirikala Jun 27, 2024
7244f40
Increasing to 20 validators
vusirikala Jun 27, 2024
5a87f1c
using btreeset again
vusirikala Jun 27, 2024
d5cfd8d
Use pull window
vusirikala Jun 27, 2024
eed9035
Merge branch 'main' into high_tps_test
vusirikala Jun 27, 2024
8b3f438
Change the ordering for priority index
vusirikala Jun 27, 2024
d6ebc54
Merge branch 'main' into high_tps_test
vusirikala Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use std::path::PathBuf;

// NOTE: when changing, make sure to update QuorumStoreBackPressureConfig::backlog_txn_limit_count as well.
const MAX_SENDING_BLOCK_TXNS: u64 = 1900;
const MAX_SENDING_BLOCK_TXNS: u64 = 4000;
pub(crate) static MAX_RECEIVING_BLOCK_TXNS: Lazy<u64> =
Lazy::new(|| 10000.max(2 * MAX_SENDING_BLOCK_TXNS));
// stop reducing size at this point, so 1MB transactions can still go through
Expand Down Expand Up @@ -153,7 +153,7 @@ impl Default for ConsensusConfig {
ConsensusConfig {
max_network_channel_size: 1024,
max_sending_block_txns: MAX_SENDING_BLOCK_TXNS,
max_sending_block_bytes: 3 * 1024 * 1024, // 3MB
max_sending_block_bytes: 6 * 1024 * 1024, // 3MB
max_receiving_block_txns: *MAX_RECEIVING_BLOCK_TXNS,
max_sending_inline_txns: 100,
max_sending_inline_bytes: 200 * 1024, // 200 KB
Expand Down
4 changes: 3 additions & 1 deletion config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct QuorumStoreBackPressureConfig {
pub decrease_duration_ms: u64,
pub increase_duration_ms: u64,
pub decrease_fraction: f64,
pub increase_fraction: f64,
pub dynamic_min_txn_per_s: u64,
pub dynamic_max_txn_per_s: u64,
}
Expand All @@ -35,6 +36,7 @@ impl Default for QuorumStoreBackPressureConfig {
decrease_duration_ms: 1000,
increase_duration_ms: 1000,
decrease_fraction: 0.5,
increase_fraction: 1.5,
dynamic_min_txn_per_s: 160,
dynamic_max_txn_per_s: 4000,
}
Expand Down Expand Up @@ -120,7 +122,7 @@ impl Default for QuorumStoreConfig {
batch_request_retry_interval_ms: 1000,
batch_request_rpc_timeout_ms: 5000,
batch_expiry_gap_when_init_usecs: Duration::from_secs(60).as_micros() as u64,
remote_batch_expiry_gap_when_init_usecs: Duration::from_millis(500).as_micros() as u64,
remote_batch_expiry_gap_when_init_usecs: Duration::from_millis(3000).as_micros() as u64,
memory_quota: 120_000_000,
db_quota: 300_000_000,
batch_quota: 300_000,
Expand Down
73 changes: 72 additions & 1 deletion consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,25 @@ pub static PROPOSER_COLLECTED_TIMEOUT_VOTING_POWER: Lazy<Counter> = Lazy::new(||
.unwrap()
});

/// Histogram for the number of txns per (committed) blocks.
pub static NUM_INPUT_TXNS_PER_BLOCK: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_consensus_num_input_txns_per_block",
"Histogram for the number of input txns per (committed) blocks.",
NUM_CONSENSUS_TRANSACTIONS_BUCKETS.to_vec()
)
.unwrap()
});

pub static NUM_BYTES_PER_BLOCK: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_consensus_num_bytes_per_block",
"Histogram for the number of bytes per (committed) blocks.",
exponential_buckets(/*start=*/ 500.0, /*factor=*/ 1.4, /*count=*/ 32).unwrap()
)
.unwrap()
});

/// Committed proposals map when using LeaderReputation as the ProposerElection
pub static COMMITTED_PROPOSALS_IN_WINDOW: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
Expand Down Expand Up @@ -1064,7 +1083,10 @@ pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc<PipelinedBlo
COMMITTED_BLOCKS_COUNT.inc();
LAST_COMMITTED_ROUND.set(block.round() as i64);
LAST_COMMITTED_VERSION.set(block.compute_result().num_leaves() as i64);

NUM_INPUT_TXNS_PER_BLOCK
.observe(block.block().payload().map_or(0, |payload| payload.len()) as f64);
NUM_BYTES_PER_BLOCK
.observe(block.block().payload().map_or(0, |payload| payload.size()) as f64);
let failed_rounds = block
.block()
.block_data()
Expand Down Expand Up @@ -1133,3 +1155,52 @@ pub static RAND_QUEUE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
)
.unwrap()
});

pub static PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_DURATION: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_consensus_payload_manager_request_transactions_duration",
"Histogram of the time it takes to request transactions from the payload manager.",
[
0.005, 0.010, 0.015, 0.020, 0.025, 0.030, 0.035, 0.040, 0.045, 0.050, 0.055, 0.060,
0.065, 0.070, 0.075, 0.080, 0.085, 0.090, 0.095, 0.100, 0.110, 0.120, 0.130, 0.140,
0.150, 0.160, 0.170, 0.180, 0.190, 0.200, 0.225, 0.250
]
.to_vec()
)
.unwrap()
});

pub static PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_PROOF_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"aptos_consensus_payload_manager_request_transactions_proof_count",
"Count of the number of times a proof is requested for transactions."
)
.unwrap()
});

pub static PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_PROOF_COUNT_PURPOSE_1: Lazy<IntCounter> =
Lazy::new(|| {
register_int_counter!(
"aptos_consensus_payload_manager_request_transactions_proof_count_purpose_1",
"Count of the number of times a proof is requested for transactions for purpose 1."
)
.unwrap()
});

pub static PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_PROOF_COUNT_PURPOSE_2: Lazy<IntCounter> =
Lazy::new(|| {
register_int_counter!(
"aptos_consensus_payload_manager_request_transactions_proof_count_purpose_2",
"Count of the number of times a proof is requested for transactions for purpose 2."
)
.unwrap()
});

pub static PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_PROOF_COUNT_PURPOSE_3: Lazy<IntCounter> =
Lazy::new(|| {
register_int_counter!(
"aptos_consensus_payload_manager_request_transactions_proof_count_purpose_3",
"Count of the number of times a proof is requested for transactions for purpose 3."
)
.unwrap()
});
19 changes: 19 additions & 0 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,25 @@ impl PayloadManager {
proofs: Vec<ProofOfStore>,
block_timestamp: u64,
batch_reader: Arc<dyn BatchReader>,
purpose: u64,
) -> Vec<(
HashValue,
oneshot::Receiver<ExecutorResult<Vec<SignedTransaction>>>,
)> {
let mut receivers = Vec::new();
counters::PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_PROOF_COUNT.inc_by(proofs.len() as u64);
if purpose == 1 {
counters::PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_PROOF_COUNT_PURPOSE_1
.inc_by(proofs.len() as u64);
} else if purpose == 2 {
counters::PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_PROOF_COUNT_PURPOSE_2
.inc_by(proofs.len() as u64);
} else if purpose == 3 {
counters::PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_PROOF_COUNT_PURPOSE_3
.inc_by(proofs.len() as u64);
}

let start_time = std::time::Instant::now();
for pos in proofs {
trace!(
"QSE: requesting pos {:?}, digest {}, time = {}",
Expand All @@ -76,6 +90,8 @@ impl PayloadManager {
debug!("QSE: skipped expired pos {}", pos.digest());
}
}
counters::PAYLOAD_MANAGER_REQUEST_TRANSACTIONS_DURATION
.observe(start_time.elapsed().as_secs_f64());
receivers
}

Expand Down Expand Up @@ -144,6 +160,7 @@ impl PayloadManager {
proof_with_status.proofs.clone(),
timestamp,
batch_reader.clone(),
1,
);
proof_with_status
.status
Expand Down Expand Up @@ -260,6 +277,7 @@ impl PayloadManager {
proof_with_data.proofs.clone(),
block.timestamp_usecs(),
batch_reader.clone(),
2,
);
// Could not get all data so requested again
proof_with_data
Expand All @@ -276,6 +294,7 @@ impl PayloadManager {
proof_with_data.proofs.clone(),
block.timestamp_usecs(),
batch_reader.clone(),
3,
);
// Could not get all data so requested again
proof_with_data
Expand Down
50 changes: 40 additions & 10 deletions consensus/src/quorum_store/batch_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use aptos_types::{transaction::SignedTransaction, PeerId};
use futures_channel::mpsc::Sender;
use rayon::prelude::*;
use std::{
collections::{btree_map::Entry, BTreeMap, HashMap},
collections::{btree_map::Entry, BTreeMap, HashMap, VecDeque},
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -231,6 +231,7 @@ impl BatchGenerator {
if num_batch_txns > 0 {
let batch_txns: Vec<_> = txns.drain(0..num_batch_txns).collect();
let batch = self.create_new_batch(batch_txns, expiry_time, bucket_start);
counters::BATCH_SIZE_IN_BYTES.observe(batch.batch_info().num_bytes() as f64);
batches.push(batch);
*total_batches_remaining = total_batches_remaining.saturating_sub(1);
txns_remaining -= num_batch_txns;
Expand Down Expand Up @@ -326,12 +327,15 @@ impl BatchGenerator {
}

pub(crate) async fn handle_scheduled_pull(&mut self, max_count: u64) -> Vec<Batch> {
counters::SCHEDULED_BATCH_PULLS.inc();
counters::QS_MAX_PULL_TXNS.observe(max_count as f64);
counters::BATCH_PULL_EXCLUDED_TXNS.observe(self.txns_in_progress_sorted.len() as f64);
trace!(
"QS: excluding txs len: {:?}",
self.txns_in_progress_sorted.len()
);

let pull_start_time = Instant::now();
let mut pulled_txns = self
.mempool_proxy
.pull_internal(
Expand All @@ -341,6 +345,7 @@ impl BatchGenerator {
)
.await
.unwrap_or_default();
counters::MEMPOOL_PULL_DURATION.observe_duration(pull_start_time.elapsed());

trace!("QS: pulled_txns len: {:?}", pulled_txns.len());

Expand All @@ -356,6 +361,7 @@ impl BatchGenerator {
} else {
counters::PULLED_TXNS_COUNT.inc();
counters::PULLED_TXNS_NUM.observe(pulled_txns.len() as f64);
counters::PULLED_TXNS_TOTAL_COUNT.inc_by(pulled_txns.len() as u64);
if pulled_txns.len() as u64 == max_count {
counters::BATCH_PULL_FULL_TXNS.observe(max_count as f64)
}
Expand Down Expand Up @@ -391,6 +397,8 @@ impl BatchGenerator {
mut interval: Interval,
) {
let start = Instant::now();
let mut last_pulled_num_txns = 0;
let mut last_pulled_max_txn = 0;

let mut last_non_empty_pull = start;
let back_pressure_decrease_duration =
Expand All @@ -399,10 +407,8 @@ impl BatchGenerator {
Duration::from_millis(self.config.back_pressure.increase_duration_ms);
let mut back_pressure_decrease_latest = start;
let mut back_pressure_increase_latest = start;
let mut dynamic_pull_txn_per_s = (self.config.back_pressure.dynamic_min_txn_per_s
+ self.config.back_pressure.dynamic_max_txn_per_s)
/ 2;

let mut dynamic_pull_txn_per_s = self.config.back_pressure.dynamic_max_txn_per_s;
let mut pulled_txns_window: VecDeque<(Instant, u64)> = VecDeque::new();
loop {
let _timer = counters::BATCH_GENERATOR_MAIN_LOOP.start_timer();

Expand Down Expand Up @@ -431,7 +437,7 @@ impl BatchGenerator {
if back_pressure_increase_latest.elapsed() >= back_pressure_increase_duration {
back_pressure_increase_latest = tick_start;
dynamic_pull_txn_per_s = std::cmp::min(
dynamic_pull_txn_per_s + self.config.back_pressure.dynamic_min_txn_per_s,
(dynamic_pull_txn_per_s as f64 * self.config.back_pressure.increase_fraction) as u64,
self.config.back_pressure.dynamic_max_txn_per_s,
);
trace!("QS: dynamic_max_pull_txn_per_s: {}", dynamic_pull_txn_per_s);
Expand All @@ -450,15 +456,39 @@ impl BatchGenerator {
) as usize;
if (!self.back_pressure.proof_count
&& since_last_non_empty_pull_ms >= self.config.batch_generation_min_non_empty_interval_ms)
|| since_last_non_empty_pull_ms == self.config.batch_generation_max_interval_ms {

let dynamic_pull_max_txn = std::cmp::max(
(since_last_non_empty_pull_ms as f64 / 1000.0 * dynamic_pull_txn_per_s as f64) as u64, 1);
|| since_last_non_empty_pull_ms == self.config.batch_generation_max_interval_ms
|| last_pulled_num_txns >= 50_u64
|| last_pulled_num_txns > (last_pulled_max_txn as f64 / 2.0) as u64 {

while let Some(&(tick_start, _)) = pulled_txns_window.front() {
if tick_start.elapsed() > Duration::from_secs(1) {
pulled_txns_window.pop_front();
} else {
break;
}
}
info!("pulled_txns_window: {:?}, current_time: {:?}", pulled_txns_window, Instant::now());
let pulled_in_last_sec = pulled_txns_window.iter().map(|(_, txns)| txns).sum();
info!("pulled_in_last_sec: {}", pulled_in_last_sec);
let dynamic_pull_max_txn = dynamic_pull_txn_per_s.saturating_sub(pulled_in_last_sec);
// let dynamic_pull_max_txn = std::cmp::max(
// (since_last_non_empty_pull_ms as f64 / 1000.0 * dynamic_pull_txn_per_s as f64) as u64, 1);
counters::QS_SINCE_LAST_NON_EMPTY_PULL_TIME.observe_duration(Duration::from_millis(since_last_non_empty_pull_ms as u64));
counters::QS_DYNAMIC_MAX_PULL_TXNS.observe(dynamic_pull_max_txn as f64);
let pull_max_txn = std::cmp::min(
dynamic_pull_max_txn,
self.config.sender_max_total_txns as u64,
);
info!("pull_max_txn: {}", pull_max_txn);
last_pulled_max_txn = pull_max_txn;
let pull_start_time = Instant::now();
let batches = self.handle_scheduled_pull(pull_max_txn).await;
counters::MEMPOOL_PULL_AND_CREATE_BATCHES_DURATION.observe_duration(pull_start_time.elapsed());

last_pulled_num_txns = batches.iter().map(|b| b.batch_info().num_txns()).sum();
pulled_txns_window.push_back((Instant::now(), last_pulled_num_txns));
counters::LAST_PULLED_NUM_TXNS.observe(last_pulled_num_txns as f64);

if !batches.is_empty() {
last_non_empty_pull = tick_start;

Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/batch_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl BatchRequesterState {
// make sure nodes request from the different set of nodes
self.next_index = rng.gen::<usize>() % self.signers.len();
counters::SENT_BATCH_REQUEST_COUNT.inc_by(num_peers as u64);
counters::SENT_INDIVIDUAL_BATCH_REQUEST_COUNT.inc();
} else {
counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc_by(num_peers as u64);
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for Batch
let batch_requester = self.batch_requester.clone();
tokio::spawn(async move {
if let Ok(mut value) = batch_store.get_batch_from_local(proof.digest()) {
counters::FOUND_BATCHES_LOCALLY_COUNT.inc();
if tx
.send(Ok(value.take_payload().expect("Must have payload")))
.is_err()
Expand Down
Loading
Loading