Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add analysis for bench-tps transactions #92

Merged
merged 25 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
91e9d92
save progress
KirillLykov Feb 27, 2024
99312e7
rename threads handler
KirillLykov Feb 29, 2024
d00bc58
added writer for txs
KirillLykov Mar 4, 2024
27bf672
after extracting structure to handle tx confirmations
KirillLykov Mar 4, 2024
7cac96c
extract LogWriter
KirillLykov Mar 5, 2024
43e538c
Replace pair TimestampedTransaction with struct
KirillLykov Mar 5, 2024
4e295a0
add compute_unit_price to TimestampedTransaction
KirillLykov Mar 5, 2024
afcfb24
add cu_price to LogWriter
KirillLykov Mar 5, 2024
c2ac294
add block time to the logs
KirillLykov Mar 5, 2024
e0f5515
Fix warnings
KirillLykov Mar 5, 2024
71a360d
add comments and restructure code
KirillLykov Mar 5, 2024
d67f7b4
some small improvements
KirillLykov Mar 5, 2024
15b44af
Renamed conformation_processing.rs to log_transaction_service.rs
KirillLykov Mar 5, 2024
f9740e7
address numerous PR comments
KirillLykov Mar 6, 2024
c4124ca
split LogWriter into two structs
KirillLykov Mar 6, 2024
e5aa95f
simplify code of LogWriters
KirillLykov Mar 6, 2024
1606b33
extract process_blocks
KirillLykov Mar 6, 2024
9a43ceb
specify commitment in LogTransactionService
KirillLykov Mar 7, 2024
2d7699a
break thread loop if receiver happens to be dropped
KirillLykov Mar 14, 2024
03b3315
update start_slot when processing blocks
KirillLykov Mar 19, 2024
304fcf2
address pr comments
KirillLykov Mar 22, 2024
4374f5a
fix clippy error
KirillLykov Mar 24, 2024
57db349
minor changes
KirillLykov Mar 24, 2024
54adc0c
fix ms problem
KirillLykov Mar 24, 2024
7cd0d67
fix bug with time in clear transaction map
KirillLykov Mar 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions bench-tps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
chrono = { workspace = true }
clap = { workspace = true }
crossbeam-channel = { workspace = true }
csv = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
solana-clap-utils = { workspace = true }
Expand Down
130 changes: 89 additions & 41 deletions bench-tps/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ use {
crate::{
bench_tps_client::*,
cli::{ComputeUnitPrice, Config, InstructionPaddingConfig},
log_transaction_service::{
create_log_transactions_service_and_sender, SignatureBatchSender, TransactionInfoBatch,
},
perf_utils::{sample_txs, SampleStats},
send_batch::*,
},
chrono::Utc,
log::*,
rand::distributions::{Distribution, Uniform},
rayon::prelude::*,
Expand Down Expand Up @@ -87,8 +91,14 @@ fn get_transaction_loaded_accounts_data_size(enable_padding: bool) -> u32 {
}
}

pub type TimestampedTransaction = (Transaction, Option<u64>);
pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<TimestampedTransaction>>>>;
#[derive(Debug, PartialEq, Default, Eq, Clone)]
pub(crate) struct TimestampedTransaction {
transaction: Transaction,
timestamp: Option<u64>,
compute_unit_price: Option<u64>,
}

pub(crate) type SharedTransactions = Arc<RwLock<VecDeque<Vec<TimestampedTransaction>>>>;

/// Keypairs split into source and destination
/// used for transfer transactions
Expand Down Expand Up @@ -356,6 +366,7 @@ fn create_sender_threads<T>(
threads: usize,
exit_signal: Arc<AtomicBool>,
shared_tx_active_thread_count: &Arc<AtomicIsize>,
signatures_sender: Option<SignatureBatchSender>,
) -> Vec<JoinHandle<()>>
where
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
Expand All @@ -367,6 +378,7 @@ where
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
let client = client.clone();
let signatures_sender = signatures_sender.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
Expand All @@ -377,6 +389,7 @@ where
&total_tx_sent_count,
thread_batch_sleep_ms,
&client,
signatures_sender,
);
})
.unwrap()
Expand Down Expand Up @@ -406,6 +419,8 @@ where
use_durable_nonce,
instruction_padding_config,
num_conflict_groups,
block_data_file,
transaction_data_file,
..
} = config;

Expand Down Expand Up @@ -464,14 +479,21 @@ where
None
};

let s_threads = create_sender_threads(
let (log_transaction_service, signatures_sender) = create_log_transactions_service_and_sender(
&client,
block_data_file.as_deref(),
transaction_data_file.as_deref(),
);

let sender_threads = create_sender_threads(
&client,
&shared_txs,
thread_batch_sleep_ms,
&total_tx_sent_count,
threads,
exit_signal.clone(),
&shared_tx_active_thread_count,
signatures_sender,
);

wait_for_target_slots_per_epoch(target_slots_per_epoch, &client);
Expand Down Expand Up @@ -499,7 +521,7 @@ where

// join the tx send threads
info!("Waiting for transmit threads...");
for t in s_threads {
for t in sender_threads {
if let Err(err) = t.join() {
info!(" join() failed with: {:?}", err);
}
Expand All @@ -512,6 +534,13 @@ where
}
}

if let Some(log_transaction_service) = log_transaction_service {
info!("Waiting for log_transaction_service thread...");
if let Err(err) = log_transaction_service.join() {
info!(" join() failed with: {:?}", err);
}
}

if let Some(nonce_keypairs) = nonce_keypairs {
withdraw_durable_nonce_accounts(client.clone(), &gen_keypairs, &nonce_keypairs);
}
Expand Down Expand Up @@ -575,36 +604,37 @@ fn generate_system_txs(
pairs_with_compute_unit_prices
.par_iter()
.map(|((from, to), compute_unit_price)| {
(
transfer_with_compute_unit_price_and_padding(
let compute_unit_price = Some(**compute_unit_price);
TimestampedTransaction {
transaction: transfer_with_compute_unit_price_and_padding(
from,
&to.pubkey(),
1,
*blockhash,
instruction_padding_config,
Some(**compute_unit_price),
compute_unit_price,
skip_tx_account_data_size,
),
Some(timestamp()),
)
timestamp: Some(timestamp()),
compute_unit_price,
}
})
.collect()
} else {
pairs
.par_iter()
.map(|(from, to)| {
(
transfer_with_compute_unit_price_and_padding(
from,
&to.pubkey(),
1,
*blockhash,
instruction_padding_config,
None,
skip_tx_account_data_size,
),
Some(timestamp()),
)
.map(|(from, to)| TimestampedTransaction {
transaction: transfer_with_compute_unit_price_and_padding(
from,
&to.pubkey(),
1,
*blockhash,
instruction_padding_config,
None,
skip_tx_account_data_size,
),
timestamp: Some(timestamp()),
compute_unit_price: None,
})
.collect()
}
Expand Down Expand Up @@ -779,8 +809,8 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized

let blockhashes: Vec<Hash> = get_nonce_blockhashes(&client, &pubkeys);
for i in 0..length {
transactions.push((
nonced_transfer_with_padding(
transactions.push(TimestampedTransaction {
transaction: nonced_transfer_with_padding(
source[i],
&dest[i].pubkey(),
1,
Expand All @@ -790,16 +820,17 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized
skip_tx_account_data_size,
instruction_padding_config,
),
None,
));
timestamp: None,
compute_unit_price: None,
});
}
} else {
let pubkeys: Vec<Pubkey> = dest_nonce.iter().map(|keypair| keypair.pubkey()).collect();
let blockhashes: Vec<Hash> = get_nonce_blockhashes(&client, &pubkeys);

for i in 0..length {
transactions.push((
nonced_transfer_with_padding(
transactions.push(TimestampedTransaction {
transaction: nonced_transfer_with_padding(
dest[i],
&source[i].pubkey(),
1,
Expand All @@ -809,8 +840,9 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized
skip_tx_account_data_size,
instruction_padding_config,
),
None,
));
timestamp: None,
compute_unit_price: None,
});
}
}
transactions
Expand Down Expand Up @@ -916,29 +948,32 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
total_tx_sent_count: &Arc<AtomicUsize>,
thread_batch_sleep_ms: usize,
client: &Arc<T>,
signatures_sender: Option<SignatureBatchSender>,
) {
let mut last_sent_time = timestamp();
loop {
'thread_loop: loop {
if thread_batch_sleep_ms > 0 {
sleep(Duration::from_millis(thread_batch_sleep_ms as u64));
}
let txs = {
let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers");
shared_txs_wl.pop_front()
};
if let Some(txs0) = txs {
if let Some(txs) = txs {
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
info!("Transferring 1 unit {} times...", txs0.len());
let tx_len = txs0.len();
let num_txs = txs.len();
info!("Transferring 1 unit {} times...", num_txs);
let transfer_start = Instant::now();
let mut old_transactions = false;
let mut transactions = Vec::<_>::new();
let mut min_timestamp = u64::MAX;
for tx in txs0 {
let mut transactions = Vec::<_>::with_capacity(num_txs);
let mut signatures = Vec::<_>::with_capacity(num_txs);
let mut compute_unit_prices = Vec::<_>::with_capacity(num_txs);
for tx in txs {
let now = timestamp();
// Transactions without durable nonce that are too old will be rejected by the cluster Don't bother
// sending them.
if let Some(tx_timestamp) = tx.1 {
if let Some(tx_timestamp) = tx.timestamp {
if tx_timestamp < min_timestamp {
min_timestamp = tx_timestamp;
}
Expand All @@ -947,7 +982,9 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
continue;
}
}
transactions.push(tx.0);
signatures.push(tx.transaction.signatures[0]);
transactions.push(tx.transaction);
compute_unit_prices.push(tx.compute_unit_price);
}

if min_timestamp != u64::MAX {
Expand All @@ -957,6 +994,17 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
);
}

if let Some(signatures_sender) = &signatures_sender {
if let Err(error) = signatures_sender.send(TransactionInfoBatch {
signatures,
sent_at: Utc::now(),
compute_unit_prices,
}) {
error!("Receiver has been dropped with error `{error}`, stop sending transactions.");
break 'thread_loop;
}
}

if let Err(error) = client.send_batch(transactions) {
warn!("send_batch_sync in do_tx_transfers failed: {}", error);
}
Expand All @@ -977,16 +1025,16 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
shared_txs_wl.clear();
}
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed);
total_tx_sent_count.fetch_add(num_txs, Ordering::Relaxed);
info!(
"Tx send done. {} ms {} tps",
duration_as_ms(&transfer_start.elapsed()),
tx_len as f32 / duration_as_s(&transfer_start.elapsed()),
num_txs as f32 / duration_as_s(&transfer_start.elapsed()),
);
datapoint_info!(
"bench-tps-do_tx_transfers",
("duration", duration_as_us(&transfer_start.elapsed()), i64),
("count", tx_len, i64)
("count", num_txs, i64)
);
}
if exit_signal.load(Ordering::Relaxed) {
Expand Down
25 changes: 25 additions & 0 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub struct Config {
pub bind_address: IpAddr,
pub client_node_id: Option<Keypair>,
pub commitment_config: CommitmentConfig,
pub block_data_file: Option<String>,
pub transaction_data_file: Option<String>,
}

impl Eq for Config {}
Expand Down Expand Up @@ -109,6 +111,8 @@ impl Default for Config {
bind_address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
client_node_id: None,
commitment_config: CommitmentConfig::confirmed(),
block_data_file: None,
transaction_data_file: None,
}
}
}
Expand Down Expand Up @@ -419,6 +423,23 @@ pub fn build_args<'a>(version: &'_ str) -> App<'a, '_> {
.default_value("confirmed")
.help("Block commitment config for getting latest blockhash"),
)
.arg(
Arg::with_name("block_data_file")
.long("block-data-file")
.value_name("FILENAME")
.takes_value(true)
.help("File to save block statistics relevant to the submitted transactions."),
)
.arg(
Arg::with_name("transaction_data_file")
.long("transaction-data-file")
.value_name("FILENAME")
.takes_value(true)
.help(
"File to save details about all the submitted transactions.\
This option is useful for debug purposes."
),
)
}

/// Parses a clap `ArgMatches` structure into a `Config`
Expand Down Expand Up @@ -587,6 +608,10 @@ pub fn parse_args(matches: &ArgMatches) -> Result<Config, &'static str> {
}

args.commitment_config = value_t_or_exit!(matches, "commitment_config", CommitmentConfig);
args.block_data_file = matches.value_of("block_data_file").map(|s| s.to_string());
args.transaction_data_file = matches
.value_of("transaction_data_file")
.map(|s| s.to_string());

Ok(args)
}
Expand Down
2 changes: 2 additions & 0 deletions bench-tps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ pub mod bench;
pub mod bench_tps_client;
pub mod cli;
pub mod keypairs;
mod log_transaction_service;
mod perf_utils;
mod rpc_with_retry_utils;
pub mod send_batch;
Loading
Loading