Skip to content

Commit

Permalink
Try fix forge stable (#14034)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos authored Jul 18, 2024
1 parent bb69950 commit a5dc814
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
20 changes: 16 additions & 4 deletions crates/transaction-emitter-lib/src/emitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ use aptos_transaction_generator_lib::{
use aptos_types::account_config::aptos_test_root_address;
use futures::future::{try_join_all, FutureExt};
use once_cell::sync::Lazy;
use rand::{rngs::StdRng, seq::IteratorRandom, Rng};
use rand::{
rngs::StdRng,
seq::{IteratorRandom, SliceRandom},
Rng,
};
use rand_core::SeedableRng;
use std::{
cmp::{max, min},
Expand Down Expand Up @@ -781,8 +785,10 @@ impl TxnEmitter {
// traffic pattern to be correct.
info!("Tx emitter creating workers");
let mut submission_workers = Vec::with_capacity(num_accounts);
let all_clients = Arc::new(req.rest_clients.clone());
for index in 0..num_accounts {
let client = &req.rest_clients[index % req.rest_clients.len()];
let main_client_index = index % all_clients.len();

let accounts = all_accounts.split_off(all_accounts.len() - 1);
let stop = stop.clone();
let stats = Arc::clone(&stats);
Expand All @@ -791,7 +797,8 @@ impl TxnEmitter {

let worker = SubmissionWorker::new(
accounts,
client.clone(),
all_clients.clone(),
main_client_index,
stop,
mode_params.clone(),
stats,
Expand Down Expand Up @@ -897,6 +904,10 @@ impl TxnEmitter {
}
}

fn pick_client(clients: &Vec<RestClient>) -> &RestClient {
clients.choose(&mut rand::thread_rng()).unwrap()
}

/// This function waits for the submitted transactions to be committed, up to
/// a wait_timeout (counted from the start_time passed in, not from the function call).
/// It returns number of transactions that expired without being committed,
Expand All @@ -906,7 +917,7 @@ impl TxnEmitter {
/// we were able to fetch last.
async fn wait_for_accounts_sequence(
start_time: Instant,
client: &RestClient,
clients: &Vec<RestClient>,
account_seqs: &HashMap<AccountAddress, (u64, u64)>,
txn_expiration_ts_secs: u64,
sleep_between_cycles: Duration,
Expand All @@ -916,6 +927,7 @@ async fn wait_for_accounts_sequence(

let mut sum_of_completion_timestamps_millis = 0u128;
loop {
let client = pick_client(clients);
match query_sequence_numbers(client, pending_addresses.iter()).await {
Ok((sequence_numbers, ledger_timestamp_secs)) => {
let millis_elapsed = start_time.elapsed().as_millis();
Expand Down
26 changes: 17 additions & 9 deletions crates/transaction-emitter-lib/src/emitter/submission_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use tokio::time::sleep;

pub struct SubmissionWorker {
pub(crate) accounts: Vec<Arc<LocalAccount>>,
client: RestClient,
clients: Arc<Vec<RestClient>>,
/// Main one is used to submit requests, all are used for querying/latency
main_client_index: usize,
stop: Arc<AtomicBool>,
params: EmitModeParams,
stats: Arc<DynamicStatsTracking>,
Expand All @@ -47,7 +49,8 @@ pub struct SubmissionWorker {
impl SubmissionWorker {
pub fn new(
accounts: Vec<LocalAccount>,
client: RestClient,
clients: Arc<Vec<RestClient>>,
main_client_index: usize,
stop: Arc<AtomicBool>,
params: EmitModeParams,
stats: Arc<DynamicStatsTracking>,
Expand All @@ -59,7 +62,8 @@ impl SubmissionWorker {
let accounts = accounts.into_iter().map(Arc::new).collect();
Self {
accounts,
client,
clients,
main_client_index,
stop,
params,
stats,
Expand All @@ -70,6 +74,10 @@ impl SubmissionWorker {
}
}

fn client(&self) -> &RestClient {
&self.clients[self.main_client_index]
}

#[allow(clippy::collapsible_if)]
pub(crate) async fn run(mut self, start_instant: Instant) -> Vec<LocalAccount> {
let mut wait_until = start_instant + self.start_sleep_duration;
Expand All @@ -92,7 +100,7 @@ impl SubmissionWorker {
SampleRate::Duration(Duration::from_secs(120)),
warn!(
"[{:?}] txn_emitter worker drifted out of sync too much: {}s",
self.client.path_prefix_string(),
self.client().path_prefix_string(),
loop_start_time.duration_since(wait_until).as_secs()
)
);
Expand Down Expand Up @@ -123,7 +131,7 @@ impl SubmissionWorker {
SampleRate::Duration(Duration::from_secs(300)),
info!(
"[{:?}] txn_emitter worker: handling {} accounts, generated txns for: {}",
self.client.path_prefix_string(),
self.client().path_prefix_string(),
self.accounts.len(),
account_to_start_and_end_seq_num.len(),
)
Expand All @@ -142,7 +150,7 @@ impl SubmissionWorker {
.chunks(self.params.max_submit_batch_size)
.map(|reqs| {
submit_transactions(
&self.client,
self.client(),
reqs,
loop_start_time,
txn_offset_time.clone(),
Expand All @@ -158,7 +166,7 @@ impl SubmissionWorker {
SampleRate::Duration(Duration::from_secs(120)),
warn!(
"[{:?}] txn_emitter worker waited for more than 5s to submit transactions: {}s after loop start",
self.client.path_prefix_string(),
self.client().path_prefix_string(),
submitted_after.as_secs(),
)
);
Expand Down Expand Up @@ -241,7 +249,7 @@ impl SubmissionWorker {
let (latest_fetched_counts, sum_of_completion_timestamps_millis) =
wait_for_accounts_sequence(
start_time,
&self.client,
&self.clients,
&account_to_start_and_end_seq_num,
txn_expiration_ts_secs,
check_account_sleep_duration,
Expand All @@ -266,7 +274,7 @@ impl SubmissionWorker {
SampleRate::Duration(Duration::from_secs(120)),
warn!(
"[{:?}] Transactions were not committed before expiration: {:?}, for {:?}",
self.client.path_prefix_string(),
self.client().path_prefix_string(),
num_expired,
self.accounts
.iter()
Expand Down
8 changes: 4 additions & 4 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1451,10 +1451,10 @@ fn realistic_env_graceful_overload() -> ForgeConfig {
.add_wait_for_catchup_s(180) // 3 minutes
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// overload test uses more CPUs than others, so increase the limit
// Check that we don't use more than 18 CPU cores for 30% of the time.
MetricsThreshold::new(18.0, 40),
// Check that we don't use more than 6 GB of memory for more than 10% of the time.
MetricsThreshold::new_gb(6.0, 10),
// Check that we don't use more than 24 CPU cores for 20% of the time.
MetricsThreshold::new(24.0, 20),
// Check that we don't use more than 7.5 GB of memory for more than 10% of the time.
MetricsThreshold::new_gb(7.5, 10),
))
.add_latency_threshold(10.0, LatencyType::P50)
.add_latency_threshold(30.0, LatencyType::P90)
Expand Down

0 comments on commit a5dc814

Please sign in to comment.