Skip to content

Commit

Permalink
Add analysis for bench-tps transactions (solana-labs#92)
Browse files Browse the repository at this point in the history
* save progress

* rename threads handler

* added writer for txs

* after extracting structure to handle tx confirmations

* extract LogWriter

* Replace pair TimestampedTransaction with struct

* add compute_unit_price to TimestampedTransaction

* add cu_price to LogWriter

* add block time to the logs

* Fix warnings

* add comments and restructure code

* some small improvements

* Renamed conformation_processing.rs to log_transaction_service.rs

* address numerous PR comments

* split LogWriter into two structs

* simplify code of LogWriters

* extract process_blocks

* specify commitment in LogTransactionService

* break thread loop if receiver happens to be dropped

* update start_slot when processing blocks

* address pr comments

* fix clippy error

* minor changes

* fix ms problem

* fix bug with time in clear transaction map
  • Loading branch information
KirillLykov authored Mar 26, 2024
1 parent 30eecd6 commit 1261f1f
Show file tree
Hide file tree
Showing 7 changed files with 679 additions and 41 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions bench-tps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
chrono = { workspace = true }
clap = { workspace = true }
crossbeam-channel = { workspace = true }
csv = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
solana-clap-utils = { workspace = true }
Expand Down
130 changes: 89 additions & 41 deletions bench-tps/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ use {
crate::{
bench_tps_client::*,
cli::{ComputeUnitPrice, Config, InstructionPaddingConfig},
log_transaction_service::{
create_log_transactions_service_and_sender, SignatureBatchSender, TransactionInfoBatch,
},
perf_utils::{sample_txs, SampleStats},
send_batch::*,
},
chrono::Utc,
log::*,
rand::distributions::{Distribution, Uniform},
rayon::prelude::*,
Expand Down Expand Up @@ -87,8 +91,14 @@ fn get_transaction_loaded_accounts_data_size(enable_padding: bool) -> u32 {
}
}

pub type TimestampedTransaction = (Transaction, Option<u64>);
pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<TimestampedTransaction>>>>;
#[derive(Debug, PartialEq, Default, Eq, Clone)]
pub(crate) struct TimestampedTransaction {
transaction: Transaction,
timestamp: Option<u64>,
compute_unit_price: Option<u64>,
}

pub(crate) type SharedTransactions = Arc<RwLock<VecDeque<Vec<TimestampedTransaction>>>>;

/// Keypairs split into source and destination
/// used for transfer transactions
Expand Down Expand Up @@ -356,6 +366,7 @@ fn create_sender_threads<T>(
threads: usize,
exit_signal: Arc<AtomicBool>,
shared_tx_active_thread_count: &Arc<AtomicIsize>,
signatures_sender: Option<SignatureBatchSender>,
) -> Vec<JoinHandle<()>>
where
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
Expand All @@ -367,6 +378,7 @@ where
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
let client = client.clone();
let signatures_sender = signatures_sender.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
Expand All @@ -377,6 +389,7 @@ where
&total_tx_sent_count,
thread_batch_sleep_ms,
&client,
signatures_sender,
);
})
.unwrap()
Expand Down Expand Up @@ -406,6 +419,8 @@ where
use_durable_nonce,
instruction_padding_config,
num_conflict_groups,
block_data_file,
transaction_data_file,
..
} = config;

Expand Down Expand Up @@ -464,14 +479,21 @@ where
None
};

let s_threads = create_sender_threads(
let (log_transaction_service, signatures_sender) = create_log_transactions_service_and_sender(
&client,
block_data_file.as_deref(),
transaction_data_file.as_deref(),
);

let sender_threads = create_sender_threads(
&client,
&shared_txs,
thread_batch_sleep_ms,
&total_tx_sent_count,
threads,
exit_signal.clone(),
&shared_tx_active_thread_count,
signatures_sender,
);

wait_for_target_slots_per_epoch(target_slots_per_epoch, &client);
Expand Down Expand Up @@ -499,7 +521,7 @@ where

// join the tx send threads
info!("Waiting for transmit threads...");
for t in s_threads {
for t in sender_threads {
if let Err(err) = t.join() {
info!(" join() failed with: {:?}", err);
}
Expand All @@ -512,6 +534,13 @@ where
}
}

if let Some(log_transaction_service) = log_transaction_service {
info!("Waiting for log_transaction_service thread...");
if let Err(err) = log_transaction_service.join() {
info!(" join() failed with: {:?}", err);
}
}

if let Some(nonce_keypairs) = nonce_keypairs {
withdraw_durable_nonce_accounts(client.clone(), &gen_keypairs, &nonce_keypairs);
}
Expand Down Expand Up @@ -575,36 +604,37 @@ fn generate_system_txs(
pairs_with_compute_unit_prices
.par_iter()
.map(|((from, to), compute_unit_price)| {
(
transfer_with_compute_unit_price_and_padding(
let compute_unit_price = Some(**compute_unit_price);
TimestampedTransaction {
transaction: transfer_with_compute_unit_price_and_padding(
from,
&to.pubkey(),
1,
*blockhash,
instruction_padding_config,
Some(**compute_unit_price),
compute_unit_price,
skip_tx_account_data_size,
),
Some(timestamp()),
)
timestamp: Some(timestamp()),
compute_unit_price,
}
})
.collect()
} else {
pairs
.par_iter()
.map(|(from, to)| {
(
transfer_with_compute_unit_price_and_padding(
from,
&to.pubkey(),
1,
*blockhash,
instruction_padding_config,
None,
skip_tx_account_data_size,
),
Some(timestamp()),
)
.map(|(from, to)| TimestampedTransaction {
transaction: transfer_with_compute_unit_price_and_padding(
from,
&to.pubkey(),
1,
*blockhash,
instruction_padding_config,
None,
skip_tx_account_data_size,
),
timestamp: Some(timestamp()),
compute_unit_price: None,
})
.collect()
}
Expand Down Expand Up @@ -779,8 +809,8 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized

let blockhashes: Vec<Hash> = get_nonce_blockhashes(&client, &pubkeys);
for i in 0..length {
transactions.push((
nonced_transfer_with_padding(
transactions.push(TimestampedTransaction {
transaction: nonced_transfer_with_padding(
source[i],
&dest[i].pubkey(),
1,
Expand All @@ -790,16 +820,17 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized
skip_tx_account_data_size,
instruction_padding_config,
),
None,
));
timestamp: None,
compute_unit_price: None,
});
}
} else {
let pubkeys: Vec<Pubkey> = dest_nonce.iter().map(|keypair| keypair.pubkey()).collect();
let blockhashes: Vec<Hash> = get_nonce_blockhashes(&client, &pubkeys);

for i in 0..length {
transactions.push((
nonced_transfer_with_padding(
transactions.push(TimestampedTransaction {
transaction: nonced_transfer_with_padding(
dest[i],
&source[i].pubkey(),
1,
Expand All @@ -809,8 +840,9 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized
skip_tx_account_data_size,
instruction_padding_config,
),
None,
));
timestamp: None,
compute_unit_price: None,
});
}
}
transactions
Expand Down Expand Up @@ -916,29 +948,32 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
total_tx_sent_count: &Arc<AtomicUsize>,
thread_batch_sleep_ms: usize,
client: &Arc<T>,
signatures_sender: Option<SignatureBatchSender>,
) {
let mut last_sent_time = timestamp();
loop {
'thread_loop: loop {
if thread_batch_sleep_ms > 0 {
sleep(Duration::from_millis(thread_batch_sleep_ms as u64));
}
let txs = {
let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers");
shared_txs_wl.pop_front()
};
if let Some(txs0) = txs {
if let Some(txs) = txs {
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
info!("Transferring 1 unit {} times...", txs0.len());
let tx_len = txs0.len();
let num_txs = txs.len();
info!("Transferring 1 unit {} times...", num_txs);
let transfer_start = Instant::now();
let mut old_transactions = false;
let mut transactions = Vec::<_>::new();
let mut min_timestamp = u64::MAX;
for tx in txs0 {
let mut transactions = Vec::<_>::with_capacity(num_txs);
let mut signatures = Vec::<_>::with_capacity(num_txs);
let mut compute_unit_prices = Vec::<_>::with_capacity(num_txs);
for tx in txs {
let now = timestamp();
// Transactions without durable nonce that are too old will be rejected by the cluster Don't bother
// sending them.
if let Some(tx_timestamp) = tx.1 {
if let Some(tx_timestamp) = tx.timestamp {
if tx_timestamp < min_timestamp {
min_timestamp = tx_timestamp;
}
Expand All @@ -947,7 +982,9 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
continue;
}
}
transactions.push(tx.0);
signatures.push(tx.transaction.signatures[0]);
transactions.push(tx.transaction);
compute_unit_prices.push(tx.compute_unit_price);
}

if min_timestamp != u64::MAX {
Expand All @@ -957,6 +994,17 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
);
}

if let Some(signatures_sender) = &signatures_sender {
if let Err(error) = signatures_sender.send(TransactionInfoBatch {
signatures,
sent_at: Utc::now(),
compute_unit_prices,
}) {
error!("Receiver has been dropped with error `{error}`, stop sending transactions.");
break 'thread_loop;
}
}

if let Err(error) = client.send_batch(transactions) {
warn!("send_batch_sync in do_tx_transfers failed: {}", error);
}
Expand All @@ -977,16 +1025,16 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
shared_txs_wl.clear();
}
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed);
total_tx_sent_count.fetch_add(num_txs, Ordering::Relaxed);
info!(
"Tx send done. {} ms {} tps",
duration_as_ms(&transfer_start.elapsed()),
tx_len as f32 / duration_as_s(&transfer_start.elapsed()),
num_txs as f32 / duration_as_s(&transfer_start.elapsed()),
);
datapoint_info!(
"bench-tps-do_tx_transfers",
("duration", duration_as_us(&transfer_start.elapsed()), i64),
("count", tx_len, i64)
("count", num_txs, i64)
);
}
if exit_signal.load(Ordering::Relaxed) {
Expand Down
25 changes: 25 additions & 0 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub struct Config {
pub bind_address: IpAddr,
pub client_node_id: Option<Keypair>,
pub commitment_config: CommitmentConfig,
pub block_data_file: Option<String>,
pub transaction_data_file: Option<String>,
}

impl Eq for Config {}
Expand Down Expand Up @@ -109,6 +111,8 @@ impl Default for Config {
bind_address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
client_node_id: None,
commitment_config: CommitmentConfig::confirmed(),
block_data_file: None,
transaction_data_file: None,
}
}
}
Expand Down Expand Up @@ -419,6 +423,23 @@ pub fn build_args<'a>(version: &'_ str) -> App<'a, '_> {
.default_value("confirmed")
.help("Block commitment config for getting latest blockhash"),
)
.arg(
Arg::with_name("block_data_file")
.long("block-data-file")
.value_name("FILENAME")
.takes_value(true)
.help("File to save block statistics relevant to the submitted transactions."),
)
.arg(
Arg::with_name("transaction_data_file")
.long("transaction-data-file")
.value_name("FILENAME")
.takes_value(true)
.help(
"File to save details about all the submitted transactions.\
This option is useful for debug purposes."
),
)
}

/// Parses a clap `ArgMatches` structure into a `Config`
Expand Down Expand Up @@ -587,6 +608,10 @@ pub fn parse_args(matches: &ArgMatches) -> Result<Config, &'static str> {
}

args.commitment_config = value_t_or_exit!(matches, "commitment_config", CommitmentConfig);
args.block_data_file = matches.value_of("block_data_file").map(|s| s.to_string());
args.transaction_data_file = matches
.value_of("transaction_data_file")
.map(|s| s.to_string());

Ok(args)
}
Expand Down
2 changes: 2 additions & 0 deletions bench-tps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ pub mod bench;
pub mod bench_tps_client;
pub mod cli;
pub mod keypairs;
mod log_transaction_service;
mod perf_utils;
mod rpc_with_retry_utils;
pub mod send_batch;
Loading

0 comments on commit 1261f1f

Please sign in to comment.