From 91e9d9219d0a80e08da907821125bd8b63172625 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 27 Feb 2024 10:06:17 +0000 Subject: [PATCH 01/25] save progress --- Cargo.lock | 2 + bench-tps/Cargo.toml | 2 + bench-tps/src/bench.rs | 17 ++ bench-tps/src/confirmations_processing.rs | 286 ++++++++++++++++++++++ bench-tps/src/lib.rs | 1 + 5 files changed, 308 insertions(+) create mode 100644 bench-tps/src/confirmations_processing.rs diff --git a/Cargo.lock b/Cargo.lock index 6a2abab7f46afc..a22e9f204c7cfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5560,11 +5560,13 @@ dependencies = [ name = "solana-bench-tps" version = "2.0.0" dependencies = [ + "chrono", "clap 2.33.3", "crossbeam-channel", "log", "rand 0.8.5", "rayon", + "serde", "serde_json", "serde_yaml 0.9.32", "serial_test", diff --git a/bench-tps/Cargo.toml b/bench-tps/Cargo.toml index 3693f70e4ed9b8..800dd55e16484c 100644 --- a/bench-tps/Cargo.toml +++ b/bench-tps/Cargo.toml @@ -11,9 +11,11 @@ edition = { workspace = true } [dependencies] clap = { workspace = true } crossbeam-channel = { workspace = true } +chrono = { workspace = true } log = { workspace = true } rand = { workspace = true } rayon = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } solana-clap-utils = { workspace = true } diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 8b370786861cea..0e90f1bf99e21a 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -2,9 +2,12 @@ use { crate::{ bench_tps_client::*, cli::{ComputeUnitPrice, Config, InstructionPaddingConfig}, + confirmations_processing::{SignatureBatch, SignatureBatchSender}, perf_utils::{sample_txs, SampleStats}, send_batch::*, }, + chrono::Utc, + crossbeam_channel::unbounded, log::*, rand::distributions::{Distribution, Uniform}, rayon::prelude::*, @@ -356,6 +359,7 @@ fn create_sender_threads( threads: usize, exit_signal: Arc, shared_tx_active_thread_count: &Arc, + signatures_sender: SignatureBatchSender, ) -> Vec> where T: 'static + BenchTpsClient + Send + Sync + ?Sized, @@ -367,6 +371,7 @@ where let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let total_tx_sent_count = total_tx_sent_count.clone(); let client = client.clone(); + let signatures_sender = signatures_sender.clone(); Builder::new() .name("solana-client-sender".to_string()) .spawn(move || { @@ -377,6 +382,7 @@ where &total_tx_sent_count, thread_batch_sleep_ms, &client, + signatures_sender, ); }) .unwrap() @@ -464,6 +470,8 @@ where None }; + let (signatures_sender, signatures_receiver) = unbounded(); + let s_threads = create_sender_threads( &client, &shared_txs, @@ -472,6 +480,7 @@ where threads, exit_signal.clone(), &shared_tx_active_thread_count, + signatures_sender, ); wait_for_target_slots_per_epoch(target_slots_per_epoch, &client); @@ -916,6 +925,7 @@ fn do_tx_transfers( total_tx_sent_count: &Arc, thread_batch_sleep_ms: usize, client: &Arc, + signatures_sender: SignatureBatchSender, ) { let mut last_sent_time = timestamp(); loop { @@ -957,6 +967,12 @@ fn do_tx_transfers( ); } + let signatures = transactions.iter().map(|tx| tx.signatures[0]).collect(); + signatures_sender.send(SignatureBatch { + signatures, + sent_at: Utc::now(), + }); + if let Err(error) = client.send_batch(transactions) { warn!("send_batch_sync in do_tx_transfers failed: {}", error); } @@ -990,6 +1006,7 @@ fn do_tx_transfers( ); } if exit_signal.load(Ordering::Relaxed) { + drop(signatures_sender); break; } } diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs new file mode 100644 index 00000000000000..4f11f26b293a30 --- /dev/null +++ b/bench-tps/src/confirmations_processing.rs @@ -0,0 +1,286 @@ +use { + crate::bench_tps_client::BenchTpsClient, + chrono::{DateTime, Utc}, + crossbeam_channel::{select, tick, Receiver, Sender}, + log::*, + serde::Serialize, + solana_client::rpc_config::RpcBlockConfig, + solana_measure::measure::Measure, + solana_rpc_client::rpc_client::RpcClient, + solana_sdk::{ + commitment_config::CommitmentConfig, commitment_config::CommitmentLevel, + signature::Signature, slot_history::Slot, + }, + solana_transaction_status::{ + option_serializer::OptionSerializer, RewardType, TransactionDetails, UiConfirmedBlock, + UiTransactionEncoding, + }, + std::{ + collections::HashMap, + sync::Arc, + thread::{Builder, JoinHandle}, + time::Duration, + }, +}; + +const BLOCK_PROCESSING_PERIOD_MS: u64 = 400; + +//TODO(klykov) extract some method retry +fn get_blocks_with_retry(client: &Arc, start_block: u64) -> Result, ()> +where + T: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + const N_TRY_REQUEST_BLOCKS: u64 = 4; + for _ in 0..N_TRY_REQUEST_BLOCKS { + let block_slots = client.get_blocks(start_block, None); + + match block_slots { + Ok(slots) => { + return Ok(slots); + } + Err(error) => { + warn!("Failed to download blocks: {}, retry", error); + } + } + } + Err(()) +} + +#[derive(Clone)] +pub struct SignatureBatch { + pub signatures: Vec, + pub sent_at: DateTime, + // pub sent_slot: Slot, I think it can be calculated from time +} + +//TODO(klykov) If there will be no other data, rename to transaction time or something lile that +#[derive(Clone)] +pub struct TransactionSendInfo { + pub sent_at: DateTime, + //pub sent_slot: Slot, + //TODO add priority fee + //pub priority_fee: u64, +} + +#[derive(Clone, Serialize)] +pub struct BlockData { + pub block_hash: String, + pub block_slot: Slot, + pub block_leader: String, + pub block_time: u64, + pub total_num_transactions: usize, + pub num_bench_tps_transactions: usize, + pub total_cu_consumed: u64, + pub bench_tps_cu_consumed: u64, +} + +#[derive(Clone, Serialize)] +pub struct TransactionData { + pub signature: String, + //pub sent_slot: Slot, + pub sent_at: String, + pub confirmed_slot: Option, + //pub confirmed_at: Option, + pub successful: bool, + pub slot_leader: Option, + pub error: Option, + pub block_hash: Option, + pub slot_processed: Option, + pub timed_out: bool, + //TODO add priority fee + //pub priority_fee: u64, +} + +pub type SignatureBatchReceiver = Receiver; +pub type SignatureBatchSender = Sender; + +// TODO(klykov): extract to TxConfirmationService +fn create_confirmation_handler_thread( + client: &Arc, + sign_receiver: SignatureBatchReceiver, +) -> JoinHandle<()> +where + T: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + let block_processing_timer_receiver = tick(Duration::from_millis(BLOCK_PROCESSING_PERIOD_MS)); + + // TODO(klykov): wrap with retry + let from_slot = client.get_slot().expect("get_slot succeed"); + + let mut start_block = from_slot; + //TODO(klykov) use commitment setup globally + let commitment = CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }; + let rpc_block_config = RpcBlockConfig { + encoding: Some(UiTransactionEncoding::Base64), + transaction_details: Some(TransactionDetails::Full), + rewards: Some(true), + commitment: Some(commitment), + max_supported_transaction_version: Some(0), + }; + let client = client.clone(); + + Builder::new().name("ConfirmationHandler".to_string()).spawn(move || { + let mut signature_to_tx_info = HashMap::::new(); + loop { + select! { + recv(sign_receiver) -> msg => { + match msg { + Ok(SignatureBatch { + signatures, + sent_at, + //sent_slot + }) => { + let mut measure_send_txs = Measure::start("measure_send_txs"); + + signatures.iter().for_each( |sign| {signature_to_tx_info.insert(*sign, TransactionSendInfo { + sent_at, //sent_slot + });}); + + measure_send_txs.stop(); + let time_send_ns = measure_send_txs.as_ns(); + info!("TIME: {time_send_ns}") + } + _ => panic!("Sender panics"), + } + }, + recv(block_processing_timer_receiver) -> _ => { + info!("sign_receiver queue len: {}", sign_receiver.len()); + // TODO(klykov) Move to process_blocks(); + let block_slots = get_blocks_with_retry(&client, start_block); + let Ok(block_slots) = block_slots else { + error!("Failed to get blocks"); + //TODO(klykov) shall I drop receiver? + break; + }; + if block_slots.is_empty() { + continue; + } + start_block = *block_slots.last().unwrap() + 1; + let blocks = block_slots.iter().map(|slot| { + client.get_block_with_config( + *slot, + rpc_block_config + ) + }); + for block_slot in blocks.zip(&block_slots) { + let block = match block_slot.0 { + Ok(x) => x, + Err(_) => continue, + }; + process_blocks( + block, + &mut signature_to_tx_info, + *block_slot.1, + ) + } + + }, + } + } + }).unwrap() +} + +fn process_blocks( + block: UiConfirmedBlock, + signature_to_tx_info: &mut HashMap, + slot: u64, +) { + let rewards = block.rewards.as_ref().unwrap(); + let slot_leader = match rewards + .iter() + .find(|r| r.reward_type == Some(RewardType::Fee)) + { + Some(x) => x.pubkey.clone(), + None => "".to_string(), + }; + + let Some(transactions) = &block.transactions else { + warn!("Empty block: {slot}"); + return; + }; + + let mut num_bench_tps_transactions: usize = 0; + let mut total_cu_consumed: u64 = 0; + let mut bench_tps_cu_consumed: u64 = 0; + for solana_transaction_status::EncodedTransactionWithStatusMeta { + transaction, meta, .. + } in transactions + { + let Some(transaction) = transaction.decode() else { + continue; + }; + let cu_consumed = meta + .as_ref() + .map_or(0, |meta| match meta.compute_units_consumed { + OptionSerializer::Some(cu_consumed) => cu_consumed, //TODO(klykov): consider adding error info as well + _ => 0, + }); + let signature = &transaction.signatures[0]; + + total_cu_consumed = total_cu_consumed.saturating_add(cu_consumed); + // TODO(klykov): rename variable + if let Some(transaction_record) = signature_to_tx_info.remove(signature) { + num_bench_tps_transactions = num_bench_tps_transactions.saturating_add(1); + bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed); + + let tx_confirm = TransactionData { + signature: signature.to_string(), + confirmed_slot: Some(slot), + //confirmed_at: Some(Utc::now().to_string()), + // TODO use sent_slot instead of sent_at by using map + sent_at: transaction_record.sent_at.to_string(), + //sent_slot: transaction_record.sent_slot, + successful: if let Some(meta) = &meta { + meta.status.is_ok() + } else { + false + }, + error: if let Some(meta) = &meta { + meta.err.as_ref().map(|x| x.to_string()) + } else { + None + }, + block_hash: Some(block.blockhash.clone()), + slot_processed: Some(slot), + slot_leader: Some(slot_leader.clone()), + timed_out: false, + //priority_fees: transaction_record.priority_fees, + }; + let ss = serde_json::to_value(&tx_confirm).unwrap(); + info!("TransactionData: {}", ss.to_string()); + } + } + // push block data + { + let blockData = BlockData { + block_hash: block.blockhash.clone(), + block_leader: slot_leader, + block_slot: slot, + block_time: if let Some(time) = block.block_time { + time as u64 + } else { + 0 + }, + num_bench_tps_transactions, + total_num_transactions: transactions.len(), + bench_tps_cu_consumed, + total_cu_consumed, + }; + let ss = serde_json::to_value(&blockData).unwrap(); + info!("BlockData: {}", ss.to_string()); + //writer.serialize(record).await.unwrap(); + } +} + +/* writing to csv +let mut writer = csv_async::AsyncSerializer::from_writer( + File::create(block_data_save_file).await.unwrap(), +); +let mut block_data = block_data; +while let Ok(record) = block_data.recv().await { + writer.serialize(record).await.unwrap(); +} +writer.flush().await.unwrap(); +*/ diff --git a/bench-tps/src/lib.rs b/bench-tps/src/lib.rs index 7da3979a30d72c..a089656e198e7b 100644 --- a/bench-tps/src/lib.rs +++ b/bench-tps/src/lib.rs @@ -2,6 +2,7 @@ pub mod bench; pub mod bench_tps_client; pub mod cli; +mod confirmations_processing; pub mod keypairs; mod perf_utils; pub mod send_batch; From 99312e7d0dc271edc663af0591f1fdcf176b242c Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Thu, 29 Feb 2024 08:58:12 +0000 Subject: [PATCH 02/25] rename threads handler --- bench-tps/src/bench.rs | 23 ++++++++++++++++++++--- bench-tps/src/confirmations_processing.rs | 7 +++---- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 0e90f1bf99e21a..5b037a763b75ec 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -2,7 +2,9 @@ use { crate::{ bench_tps_client::*, cli::{ComputeUnitPrice, Config, InstructionPaddingConfig}, - confirmations_processing::{SignatureBatch, SignatureBatchSender}, + confirmations_processing::{ + create_confirmation_thread, SignatureBatch, SignatureBatchSender, + }, perf_utils::{sample_txs, SampleStats}, send_batch::*, }, @@ -472,7 +474,7 @@ where let (signatures_sender, signatures_receiver) = unbounded(); - let s_threads = create_sender_threads( + let sender_threads = create_sender_threads( &client, &shared_txs, thread_batch_sleep_ms, @@ -483,6 +485,14 @@ where signatures_sender, ); + // TODO maybe move the logic inside the create_xxx? + let check_confirmations = true; + let confirmation_thread = if check_confirmations { + Some(create_confirmation_thread(&client, signatures_receiver)) + } else { + None + }; + wait_for_target_slots_per_epoch(target_slots_per_epoch, &client); let start = Instant::now(); @@ -508,7 +518,7 @@ where // join the tx send threads info!("Waiting for transmit threads..."); - for t in s_threads { + for t in sender_threads { if let Err(err) = t.join() { info!(" join() failed with: {:?}", err); } @@ -521,6 +531,13 @@ where } } + if let Some(confirmation_thread) = confirmation_thread { + info!("Waiting for confirmation thread..."); + if let Err(err) = confirmation_thread.join() { + info!(" join() failed with: {:?}", err); + } + } + if let Some(nonce_keypairs) = nonce_keypairs { withdraw_durable_nonce_accounts(client.clone(), &gen_keypairs, &nonce_keypairs); } diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index 4f11f26b293a30..9d722ea564f870 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -6,7 +6,6 @@ use { serde::Serialize, solana_client::rpc_config::RpcBlockConfig, solana_measure::measure::Measure, - solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ commitment_config::CommitmentConfig, commitment_config::CommitmentLevel, signature::Signature, slot_history::Slot, @@ -95,7 +94,7 @@ pub type SignatureBatchReceiver = Receiver; pub type SignatureBatchSender = Sender; // TODO(klykov): extract to TxConfirmationService -fn create_confirmation_handler_thread( +pub fn create_confirmation_thread( client: &Arc, sign_receiver: SignatureBatchReceiver, ) -> JoinHandle<()> @@ -254,7 +253,7 @@ fn process_blocks( } // push block data { - let blockData = BlockData { + let block_data = BlockData { block_hash: block.blockhash.clone(), block_leader: slot_leader, block_slot: slot, @@ -268,7 +267,7 @@ fn process_blocks( bench_tps_cu_consumed, total_cu_consumed, }; - let ss = serde_json::to_value(&blockData).unwrap(); + let ss = serde_json::to_value(&block_data).unwrap(); info!("BlockData: {}", ss.to_string()); //writer.serialize(record).await.unwrap(); } From d00bc58d7a72e83572fcc1d0bde7053f0aa559fb Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 4 Mar 2024 14:45:46 +0000 Subject: [PATCH 03/25] added writer for txs --- Cargo.lock | 1 + bench-tps/Cargo.toml | 3 +- bench-tps/src/bench.rs | 36 ++--- bench-tps/src/cli.rs | 25 ++++ bench-tps/src/confirmations_processing.rs | 157 ++++++++++++++-------- bench-tps/src/main.rs | 2 + 6 files changed, 152 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a22e9f204c7cfd..b0390f9a2d926c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5563,6 +5563,7 @@ dependencies = [ "chrono", "clap 2.33.3", "crossbeam-channel", + "csv", "log", "rand 0.8.5", "rayon", diff --git a/bench-tps/Cargo.toml b/bench-tps/Cargo.toml index 800dd55e16484c..80a09fc8048ccd 100644 --- a/bench-tps/Cargo.toml +++ b/bench-tps/Cargo.toml @@ -9,9 +9,10 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +chrono = { workspace = true } clap = { workspace = true } crossbeam-channel = { workspace = true } -chrono = { workspace = true } +csv = { workspace = true } log = { workspace = true } rand = { workspace = true } rayon = { workspace = true } diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 5b037a763b75ec..83d1f5b80bd539 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -3,13 +3,13 @@ use { bench_tps_client::*, cli::{ComputeUnitPrice, Config, InstructionPaddingConfig}, confirmations_processing::{ - create_confirmation_thread, SignatureBatch, SignatureBatchSender, + create_confirmation_channel, create_confirmation_thread, SignatureBatch, + SignatureBatchSender, }, perf_utils::{sample_txs, SampleStats}, send_batch::*, }, chrono::Utc, - crossbeam_channel::unbounded, log::*, rand::distributions::{Distribution, Uniform}, rayon::prelude::*, @@ -361,7 +361,7 @@ fn create_sender_threads( threads: usize, exit_signal: Arc, shared_tx_active_thread_count: &Arc, - signatures_sender: SignatureBatchSender, + signatures_sender: Option, ) -> Vec> where T: 'static + BenchTpsClient + Send + Sync + ?Sized, @@ -414,6 +414,8 @@ where use_durable_nonce, instruction_padding_config, num_conflict_groups, + block_data_file, + transaction_data_file, .. } = config; @@ -472,7 +474,8 @@ where None }; - let (signatures_sender, signatures_receiver) = unbounded(); + let (signatures_sender, signatures_receiver) = + create_confirmation_channel(block_data_file.as_ref(), transaction_data_file.as_ref()); let sender_threads = create_sender_threads( &client, @@ -485,13 +488,12 @@ where signatures_sender, ); - // TODO maybe move the logic inside the create_xxx? - let check_confirmations = true; - let confirmation_thread = if check_confirmations { - Some(create_confirmation_thread(&client, signatures_receiver)) - } else { - None - }; + let confirmation_thread = create_confirmation_thread( + &client, + signatures_receiver, + block_data_file.as_ref(), + transaction_data_file.as_ref(), + ); wait_for_target_slots_per_epoch(target_slots_per_epoch, &client); @@ -942,7 +944,7 @@ fn do_tx_transfers( total_tx_sent_count: &Arc, thread_batch_sleep_ms: usize, client: &Arc, - signatures_sender: SignatureBatchSender, + signatures_sender: Option, ) { let mut last_sent_time = timestamp(); loop { @@ -985,10 +987,12 @@ fn do_tx_transfers( } let signatures = transactions.iter().map(|tx| tx.signatures[0]).collect(); - signatures_sender.send(SignatureBatch { - signatures, - sent_at: Utc::now(), - }); + if let Some(signatures_sender) = &signatures_sender { + signatures_sender.send(SignatureBatch { + signatures, + sent_at: Utc::now(), + }); + } if let Err(error) = client.send_batch(transactions) { warn!("send_batch_sync in do_tx_transfers failed: {}", error); diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 1804dbbc454e02..04bb869c2626bb 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -76,6 +76,8 @@ pub struct Config { pub bind_address: IpAddr, pub client_node_id: Option, pub commitment_config: CommitmentConfig, + pub block_data_file: Option, + pub transaction_data_file: Option, } impl Eq for Config {} @@ -109,6 +111,8 @@ impl Default for Config { bind_address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), client_node_id: None, commitment_config: CommitmentConfig::confirmed(), + block_data_file: None, + transaction_data_file: None, } } } @@ -419,6 +423,23 @@ pub fn build_args<'a>(version: &'_ str) -> App<'a, '_> { .default_value("confirmed") .help("Block commitment config for getting latest blockhash"), ) + .arg( + Arg::with_name("block_data_file") + .long("block-data-file") + .value_name("FILENAME") + .takes_value(true) + .help("File to save block statistics relevant to the submitted transactions."), + ) + .arg( + Arg::with_name("transaction_data_file") + .long("transaction-data-file") + .value_name("FILENAME") + .takes_value(true) + .help( + "File to save details about all the submitted transactions.\ + This option is useful for debug purposes." + ), + ) } /// Parses a clap `ArgMatches` structure into a `Config` @@ -587,6 +608,10 @@ pub fn parse_args(matches: &ArgMatches) -> Result { } args.commitment_config = value_t_or_exit!(matches, "commitment_config", CommitmentConfig); + args.block_data_file = matches.value_of("block_data_file").map(|s| s.to_string()); + args.transaction_data_file = matches + .value_of("transaction_data_file") + .map(|s| s.to_string()); Ok(args) } diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index 9d722ea564f870..95d33777cc0ca5 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -1,14 +1,16 @@ use { crate::bench_tps_client::BenchTpsClient, chrono::{DateTime, Utc}, - crossbeam_channel::{select, tick, Receiver, Sender}, + crossbeam_channel::{select, tick, unbounded, Receiver, Sender}, + csv, log::*, serde::Serialize, solana_client::rpc_config::RpcBlockConfig, solana_measure::measure::Measure, solana_sdk::{ - commitment_config::CommitmentConfig, commitment_config::CommitmentLevel, - signature::Signature, slot_history::Slot, + commitment_config::{CommitmentConfig, CommitmentLevel}, + signature::{self, Signature}, + slot_history::Slot, }, solana_transaction_status::{ option_serializer::OptionSerializer, RewardType, TransactionDetails, UiConfirmedBlock, @@ -16,6 +18,7 @@ use { }, std::{ collections::HashMap, + fs::File, sync::Arc, thread::{Builder, JoinHandle}, time::Duration, @@ -93,15 +96,41 @@ pub struct TransactionData { pub type SignatureBatchReceiver = Receiver; pub type SignatureBatchSender = Sender; +fn check_confirmations( + block_data_file: Option<&String>, + transaction_data_file: Option<&String>, +) -> bool { + block_data_file.is_some() || transaction_data_file.is_some() +} + +pub fn create_confirmation_channel( + block_data_file: Option<&String>, + transaction_data_file: Option<&String>, +) -> (Option, Option) { + if check_confirmations(block_data_file, transaction_data_file) { + let (sender, receiver) = unbounded(); + (Some(sender), Some(receiver)) + } else { + (None, None) + } +} // TODO(klykov): extract to TxConfirmationService pub fn create_confirmation_thread( client: &Arc, - sign_receiver: SignatureBatchReceiver, -) -> JoinHandle<()> + signature_receiver: Option, + block_data_file: Option<&String>, + transaction_data_file: Option<&String>, +) -> Option> where T: 'static + BenchTpsClient + Send + Sync + ?Sized, { - let block_processing_timer_receiver = tick(Duration::from_millis(BLOCK_PROCESSING_PERIOD_MS)); + if !check_confirmations(block_data_file, transaction_data_file) { + return None; + } + let signature_receiver = signature_receiver.expect("signature_receiver is Some."); + + let block_processing_timer_receiver = + tick(Duration::from_millis(16 * BLOCK_PROCESSING_PERIOD_MS)); // TODO(klykov): wrap with retry let from_slot = client.get_slot().expect("get_slot succeed"); @@ -119,12 +148,18 @@ where max_supported_transaction_version: Some(0), }; let client = client.clone(); + let mut block_log_writer = block_data_file.map(|block_data_file| { + csv::Writer::from_writer(File::create(block_data_file).expect("File can be created.")) + }); + let mut transaction_log_writer = block_data_file.map(|block_data_file| { + csv::Writer::from_writer(File::create(block_data_file).expect("File can be created.")) + }); Builder::new().name("ConfirmationHandler".to_string()).spawn(move || { let mut signature_to_tx_info = HashMap::::new(); loop { select! { - recv(sign_receiver) -> msg => { + recv(signature_receiver) -> msg => { match msg { Ok(SignatureBatch { signatures, @@ -138,14 +173,21 @@ where });}); measure_send_txs.stop(); - let time_send_ns = measure_send_txs.as_ns(); + let time_send_ns = measure_send_txs.as_ms(); info!("TIME: {time_send_ns}") } - _ => panic!("Sender panics"), + Err(e) => { + if let Some(block_log_writer) = &mut block_log_writer { + block_log_writer.flush(); + } + if let Some(transaction_log_writer) = &mut transaction_log_writer { + transaction_log_writer.flush(); + } + } } }, recv(block_processing_timer_receiver) -> _ => { - info!("sign_receiver queue len: {}", sign_receiver.len()); + info!("sign_receiver queue len: {}", signature_receiver.len()); // TODO(klykov) Move to process_blocks(); let block_slots = get_blocks_with_retry(&client, start_block); let Ok(block_slots) = block_slots else { @@ -163,28 +205,45 @@ where rpc_block_config ) }); - for block_slot in blocks.zip(&block_slots) { - let block = match block_slot.0 { + for (block, slot) in blocks.zip(&block_slots) { + let block = match block { Ok(x) => x, Err(_) => continue, }; process_blocks( block, &mut signature_to_tx_info, - *block_slot.1, + *slot, + &mut block_log_writer, + &mut transaction_log_writer ) } - + //TODO extract clean_transaction_map + let now: DateTime = Utc::now(); + signature_to_tx_info.retain(|_key, value| { + let duration_since_past_time = now.signed_duration_since(value.sent_at); + info!("Remove stale tx"); + duration_since_past_time.num_seconds() < 120 + }); + // maybe ok to write every time here? Or create a separate timer + if let Some(block_log_writer) = &mut block_log_writer { + block_log_writer.flush(); + } + if let Some(transaction_log_writer) = &mut transaction_log_writer { + transaction_log_writer.flush(); + } }, } } - }).unwrap() + }).map_err(|error| error!("Failed to create signatures thread: {error}")).ok() } fn process_blocks( block: UiConfirmedBlock, signature_to_tx_info: &mut HashMap, slot: u64, + block_log_writer: &mut Option>, + transaction_log_writer: &mut Option>, ) { let rewards = block.rewards.as_ref().unwrap(); let slot_leader = match rewards @@ -224,35 +283,36 @@ fn process_blocks( num_bench_tps_transactions = num_bench_tps_transactions.saturating_add(1); bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed); - let tx_confirm = TransactionData { - signature: signature.to_string(), - confirmed_slot: Some(slot), - //confirmed_at: Some(Utc::now().to_string()), - // TODO use sent_slot instead of sent_at by using map - sent_at: transaction_record.sent_at.to_string(), - //sent_slot: transaction_record.sent_slot, - successful: if let Some(meta) = &meta { - meta.status.is_ok() - } else { - false - }, - error: if let Some(meta) = &meta { - meta.err.as_ref().map(|x| x.to_string()) - } else { - None - }, - block_hash: Some(block.blockhash.clone()), - slot_processed: Some(slot), - slot_leader: Some(slot_leader.clone()), - timed_out: false, - //priority_fees: transaction_record.priority_fees, - }; - let ss = serde_json::to_value(&tx_confirm).unwrap(); - info!("TransactionData: {}", ss.to_string()); + if let Some(transaction_log_writer) = transaction_log_writer { + let tx_data = TransactionData { + signature: signature.to_string(), + confirmed_slot: Some(slot), + //confirmed_at: Some(Utc::now().to_string()), + // TODO use sent_slot instead of sent_at by using map + sent_at: transaction_record.sent_at.to_string(), + //sent_slot: transaction_record.sent_slot, + successful: if let Some(meta) = &meta { + meta.status.is_ok() + } else { + false + }, + error: if let Some(meta) = &meta { + meta.err.as_ref().map(|x| x.to_string()) + } else { + None + }, + block_hash: Some(block.blockhash.clone()), + slot_processed: Some(slot), + slot_leader: Some(slot_leader.clone()), + timed_out: false, + //priority_fees: transaction_record.priority_fees, + }; + transaction_log_writer.serialize(tx_data); + } } } // push block data - { + if let Some(block_log_writer) = block_log_writer { let block_data = BlockData { block_hash: block.blockhash.clone(), block_leader: slot_leader, @@ -267,19 +327,6 @@ fn process_blocks( bench_tps_cu_consumed, total_cu_consumed, }; - let ss = serde_json::to_value(&block_data).unwrap(); - info!("BlockData: {}", ss.to_string()); - //writer.serialize(record).await.unwrap(); + block_log_writer.serialize(block_data); } } - -/* writing to csv -let mut writer = csv_async::AsyncSerializer::from_writer( - File::create(block_data_save_file).await.unwrap(), -); -let mut block_data = block_data; -while let Ok(record) = block_data.recv().await { - writer.serialize(record).await.unwrap(); -} -writer.flush().await.unwrap(); -*/ diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index a5da5a515703c5..9c0e89ccc7c817 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -199,6 +199,8 @@ fn main() { bind_address, client_node_id, commitment_config, + block_data_file, + transaction_data_file, .. } = &cli_config; From 27bf672cce8f70e47f8f49e790858cd3806d8277 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 4 Mar 2024 15:56:09 +0000 Subject: [PATCH 04/25] after extracting structure to handle tx confirmations --- bench-tps/src/bench.rs | 21 +- bench-tps/src/confirmations_processing.rs | 307 ++++++++++++---------- 2 files changed, 180 insertions(+), 148 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 83d1f5b80bd539..952ac05dfcb437 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -3,8 +3,7 @@ use { bench_tps_client::*, cli::{ComputeUnitPrice, Config, InstructionPaddingConfig}, confirmations_processing::{ - create_confirmation_channel, create_confirmation_thread, SignatureBatch, - SignatureBatchSender, + create_log_transactions_service_and_sender, SignatureBatch, SignatureBatchSender, }, perf_utils::{sample_txs, SampleStats}, send_batch::*, @@ -474,8 +473,11 @@ where None }; - let (signatures_sender, signatures_receiver) = - create_confirmation_channel(block_data_file.as_ref(), transaction_data_file.as_ref()); + let (log_transaction_service, signatures_sender) = create_log_transactions_service_and_sender( + &client, + block_data_file.as_ref(), + transaction_data_file.as_ref(), + ); let sender_threads = create_sender_threads( &client, @@ -488,13 +490,6 @@ where signatures_sender, ); - let confirmation_thread = create_confirmation_thread( - &client, - signatures_receiver, - block_data_file.as_ref(), - transaction_data_file.as_ref(), - ); - wait_for_target_slots_per_epoch(target_slots_per_epoch, &client); let start = Instant::now(); @@ -533,9 +528,9 @@ where } } - if let Some(confirmation_thread) = confirmation_thread { + if let Some(log_transaction_service) = log_transaction_service { info!("Waiting for confirmation thread..."); - if let Err(err) = confirmation_thread.join() { + if let Err(err) = log_transaction_service.join() { info!(" join() failed with: {:?}", err); } } diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index 95d33777cc0ca5..c0736a0b560258 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -9,7 +9,7 @@ use { solana_measure::measure::Measure, solana_sdk::{ commitment_config::{CommitmentConfig, CommitmentLevel}, - signature::{self, Signature}, + signature::Signature, slot_history::Slot, }, solana_transaction_status::{ @@ -20,7 +20,7 @@ use { collections::HashMap, fs::File, sync::Arc, - thread::{Builder, JoinHandle}, + thread::{self, Builder, JoinHandle}, time::Duration, }, }; @@ -49,7 +49,7 @@ where } #[derive(Clone)] -pub struct SignatureBatch { +pub(crate) struct SignatureBatch { pub signatures: Vec, pub sent_at: DateTime, // pub sent_slot: Slot, I think it can be calculated from time @@ -57,7 +57,7 @@ pub struct SignatureBatch { //TODO(klykov) If there will be no other data, rename to transaction time or something lile that #[derive(Clone)] -pub struct TransactionSendInfo { +pub(crate) struct TransactionSendInfo { pub sent_at: DateTime, //pub sent_slot: Slot, //TODO add priority fee @@ -65,7 +65,7 @@ pub struct TransactionSendInfo { } #[derive(Clone, Serialize)] -pub struct BlockData { +struct BlockData { pub block_hash: String, pub block_slot: Slot, pub block_leader: String, @@ -77,7 +77,7 @@ pub struct BlockData { } #[derive(Clone, Serialize)] -pub struct TransactionData { +struct TransactionData { pub signature: String, //pub sent_slot: Slot, pub sent_at: String, @@ -93,8 +93,8 @@ pub struct TransactionData { //pub priority_fee: u64, } -pub type SignatureBatchReceiver = Receiver; -pub type SignatureBatchSender = Sender; +pub(crate) type SignatureBatchReceiver = Receiver; +pub(crate) type SignatureBatchSender = Sender; fn check_confirmations( block_data_file: Option<&String>, @@ -103,59 +103,90 @@ fn check_confirmations( block_data_file.is_some() || transaction_data_file.is_some() } -pub fn create_confirmation_channel( +pub(crate) fn create_log_transactions_service_and_sender( + client: &Arc, block_data_file: Option<&String>, transaction_data_file: Option<&String>, -) -> (Option, Option) { +) -> (Option, Option) +where + T: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ if check_confirmations(block_data_file, transaction_data_file) { let (sender, receiver) = unbounded(); - (Some(sender), Some(receiver)) + let log_tx_service = + LogTransactionService::new(client, receiver, block_data_file, transaction_data_file); + (Some(log_tx_service), Some(sender)) } else { (None, None) } } -// TODO(klykov): extract to TxConfirmationService -pub fn create_confirmation_thread( - client: &Arc, - signature_receiver: Option, - block_data_file: Option<&String>, - transaction_data_file: Option<&String>, -) -> Option> -where - T: 'static + BenchTpsClient + Send + Sync + ?Sized, -{ - if !check_confirmations(block_data_file, transaction_data_file) { - return None; - } - let signature_receiver = signature_receiver.expect("signature_receiver is Some."); - let block_processing_timer_receiver = - tick(Duration::from_millis(16 * BLOCK_PROCESSING_PERIOD_MS)); +type CsvFileWriter = csv::Writer; + +pub(crate) struct LogTransactionService { + thread_handler: JoinHandle<()>, +} + +impl LogTransactionService { + fn new( + client: &Arc, + signature_receiver: SignatureBatchReceiver, + block_data_file: Option<&String>, + transaction_data_file: Option<&String>, + ) -> Self + where + T: 'static + BenchTpsClient + Send + Sync + ?Sized, + { + if !check_confirmations(block_data_file, transaction_data_file) { + panic!("Expect block-data-file or transaction-data-file is specified."); + } + + let client = client.clone(); + let mut block_log_writer = block_data_file.map(|block_data_file| { + CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) + }); + let mut transaction_log_writer = block_data_file.map(|block_data_file| { + CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) + }); - // TODO(klykov): wrap with retry - let from_slot = client.get_slot().expect("get_slot succeed"); + let thread_handler = Builder::new() + .name("LogTransactionService".to_string()) + .spawn(move || { + Self::run( + client, + signature_receiver, + block_log_writer, + transaction_log_writer, + ); + }) + .expect("LogTransaction service is up."); + return Self { thread_handler }; + } + + fn run( + client: Arc, + signature_receiver: SignatureBatchReceiver, + mut block_log_writer: Option, + mut transaction_log_writer: Option, + ) where + T: 'static + BenchTpsClient + Send + Sync + ?Sized, + { + let commitment: CommitmentConfig = CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }; + let rpc_block_config = RpcBlockConfig { + encoding: Some(UiTransactionEncoding::Base64), + transaction_details: Some(TransactionDetails::Full), + rewards: Some(true), + commitment: Some(commitment), + max_supported_transaction_version: Some(0), + }; + let block_processing_timer_receiver = + tick(Duration::from_millis(16 * BLOCK_PROCESSING_PERIOD_MS)); - let mut start_block = from_slot; - //TODO(klykov) use commitment setup globally - let commitment = CommitmentConfig { - commitment: CommitmentLevel::Confirmed, - }; - let rpc_block_config = RpcBlockConfig { - encoding: Some(UiTransactionEncoding::Base64), - transaction_details: Some(TransactionDetails::Full), - rewards: Some(true), - commitment: Some(commitment), - max_supported_transaction_version: Some(0), - }; - let client = client.clone(); - let mut block_log_writer = block_data_file.map(|block_data_file| { - csv::Writer::from_writer(File::create(block_data_file).expect("File can be created.")) - }); - let mut transaction_log_writer = block_data_file.map(|block_data_file| { - csv::Writer::from_writer(File::create(block_data_file).expect("File can be created.")) - }); + // TODO(klykov): wrap with retry + let mut start_block = client.get_slot().expect("get_slot succeed"); - Builder::new().name("ConfirmationHandler".to_string()).spawn(move || { let mut signature_to_tx_info = HashMap::::new(); loop { select! { @@ -177,12 +208,14 @@ where info!("TIME: {time_send_ns}") } Err(e) => { + info!("Stop LogTransactionService, error message received {e}"); if let Some(block_log_writer) = &mut block_log_writer { block_log_writer.flush(); } if let Some(transaction_log_writer) = &mut transaction_log_writer { transaction_log_writer.flush(); } + break; } } }, @@ -210,7 +243,7 @@ where Ok(x) => x, Err(_) => continue, }; - process_blocks( + Self::process_blocks( block, &mut signature_to_tx_info, *slot, @@ -235,98 +268,102 @@ where }, } } - }).map_err(|error| error!("Failed to create signatures thread: {error}")).ok() -} - -fn process_blocks( - block: UiConfirmedBlock, - signature_to_tx_info: &mut HashMap, - slot: u64, - block_log_writer: &mut Option>, - transaction_log_writer: &mut Option>, -) { - let rewards = block.rewards.as_ref().unwrap(); - let slot_leader = match rewards - .iter() - .find(|r| r.reward_type == Some(RewardType::Fee)) - { - Some(x) => x.pubkey.clone(), - None => "".to_string(), - }; + } - let Some(transactions) = &block.transactions else { - warn!("Empty block: {slot}"); - return; - }; + fn process_blocks( + block: UiConfirmedBlock, + signature_to_tx_info: &mut HashMap, + slot: u64, + block_log_writer: &mut Option>, + transaction_log_writer: &mut Option>, + ) { + let rewards = block.rewards.as_ref().unwrap(); + let slot_leader = match rewards + .iter() + .find(|r| r.reward_type == Some(RewardType::Fee)) + { + Some(x) => x.pubkey.clone(), + None => "".to_string(), + }; - let mut num_bench_tps_transactions: usize = 0; - let mut total_cu_consumed: u64 = 0; - let mut bench_tps_cu_consumed: u64 = 0; - for solana_transaction_status::EncodedTransactionWithStatusMeta { - transaction, meta, .. - } in transactions - { - let Some(transaction) = transaction.decode() else { - continue; + let Some(transactions) = &block.transactions else { + warn!("Empty block: {slot}"); + return; }; - let cu_consumed = meta - .as_ref() - .map_or(0, |meta| match meta.compute_units_consumed { - OptionSerializer::Some(cu_consumed) => cu_consumed, //TODO(klykov): consider adding error info as well - _ => 0, - }); - let signature = &transaction.signatures[0]; - total_cu_consumed = total_cu_consumed.saturating_add(cu_consumed); - // TODO(klykov): rename variable - if let Some(transaction_record) = signature_to_tx_info.remove(signature) { - num_bench_tps_transactions = num_bench_tps_transactions.saturating_add(1); - bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed); + let mut num_bench_tps_transactions: usize = 0; + let mut total_cu_consumed: u64 = 0; + let mut bench_tps_cu_consumed: u64 = 0; + for solana_transaction_status::EncodedTransactionWithStatusMeta { + transaction, meta, .. + } in transactions + { + let Some(transaction) = transaction.decode() else { + continue; + }; + let cu_consumed = meta + .as_ref() + .map_or(0, |meta| match meta.compute_units_consumed { + OptionSerializer::Some(cu_consumed) => cu_consumed, //TODO(klykov): consider adding error info as well + _ => 0, + }); + let signature = &transaction.signatures[0]; - if let Some(transaction_log_writer) = transaction_log_writer { - let tx_data = TransactionData { - signature: signature.to_string(), - confirmed_slot: Some(slot), - //confirmed_at: Some(Utc::now().to_string()), - // TODO use sent_slot instead of sent_at by using map - sent_at: transaction_record.sent_at.to_string(), - //sent_slot: transaction_record.sent_slot, - successful: if let Some(meta) = &meta { - meta.status.is_ok() - } else { - false - }, - error: if let Some(meta) = &meta { - meta.err.as_ref().map(|x| x.to_string()) - } else { - None - }, - block_hash: Some(block.blockhash.clone()), - slot_processed: Some(slot), - slot_leader: Some(slot_leader.clone()), - timed_out: false, - //priority_fees: transaction_record.priority_fees, - }; - transaction_log_writer.serialize(tx_data); + total_cu_consumed = total_cu_consumed.saturating_add(cu_consumed); + // TODO(klykov): rename variable + if let Some(transaction_record) = signature_to_tx_info.remove(signature) { + num_bench_tps_transactions = num_bench_tps_transactions.saturating_add(1); + bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed); + + if let Some(transaction_log_writer) = transaction_log_writer { + let tx_data = TransactionData { + signature: signature.to_string(), + confirmed_slot: Some(slot), + //confirmed_at: Some(Utc::now().to_string()), + // TODO use sent_slot instead of sent_at by using map + sent_at: transaction_record.sent_at.to_string(), + //sent_slot: transaction_record.sent_slot, + successful: if let Some(meta) = &meta { + meta.status.is_ok() + } else { + false + }, + error: if let Some(meta) = &meta { + meta.err.as_ref().map(|x| x.to_string()) + } else { + None + }, + block_hash: Some(block.blockhash.clone()), + slot_processed: Some(slot), + slot_leader: Some(slot_leader.clone()), + timed_out: false, + //priority_fees: transaction_record.priority_fees, + }; + transaction_log_writer.serialize(tx_data); + } } } + // push block data + if let Some(block_log_writer) = block_log_writer { + let block_data = BlockData { + block_hash: block.blockhash.clone(), + block_leader: slot_leader, + block_slot: slot, + block_time: if let Some(time) = block.block_time { + time as u64 + } else { + 0 + }, + num_bench_tps_transactions, + total_num_transactions: transactions.len(), + bench_tps_cu_consumed, + total_cu_consumed, + }; + block_log_writer.serialize(block_data); + } } - // push block data - if let Some(block_log_writer) = block_log_writer { - let block_data = BlockData { - block_hash: block.blockhash.clone(), - block_leader: slot_leader, - block_slot: slot, - block_time: if let Some(time) = block.block_time { - time as u64 - } else { - 0 - }, - num_bench_tps_transactions, - total_num_transactions: transactions.len(), - bench_tps_cu_consumed, - total_cu_consumed, - }; - block_log_writer.serialize(block_data); + + pub fn join(self) -> thread::Result<()> { + self.thread_handler.join() } } From 7cac96c25b11011821d5a45126cf34c5a739843c Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 08:57:23 +0000 Subject: [PATCH 05/25] extract LogWriter --- bench-tps/src/bench.rs | 14 +- bench-tps/src/confirmations_processing.rs | 344 +++++++++++++--------- 2 files changed, 217 insertions(+), 141 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 952ac05dfcb437..df7e70a822055b 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -983,10 +983,16 @@ fn do_tx_transfers( let signatures = transactions.iter().map(|tx| tx.signatures[0]).collect(); if let Some(signatures_sender) = &signatures_sender { - signatures_sender.send(SignatureBatch { - signatures, - sent_at: Utc::now(), - }); + if signatures_sender + .send(SignatureBatch { + signatures, + sent_at: Utc::now(), + }) + .is_err() + { + info!("Receiver has been dropped, stop sending transactions."); + break; + } } if let Err(error) = client.send_batch(transactions) { diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index c0736a0b560258..b4d9a7eb126796 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -1,5 +1,5 @@ use { - crate::bench_tps_client::BenchTpsClient, + crate::bench_tps_client::{BenchTpsClient, Result}, chrono::{DateTime, Utc}, crossbeam_channel::{select, tick, unbounded, Receiver, Sender}, csv, @@ -13,8 +13,8 @@ use { slot_history::Slot, }, solana_transaction_status::{ - option_serializer::OptionSerializer, RewardType, TransactionDetails, UiConfirmedBlock, - UiTransactionEncoding, + option_serializer::OptionSerializer, EncodedTransactionWithStatusMeta, RewardType, + TransactionDetails, UiConfirmedBlock, UiTransactionEncoding, UiTransactionStatusMeta, }, std::{ collections::HashMap, @@ -26,27 +26,8 @@ use { }; const BLOCK_PROCESSING_PERIOD_MS: u64 = 400; - -//TODO(klykov) extract some method retry -fn get_blocks_with_retry(client: &Arc, start_block: u64) -> Result, ()> -where - T: 'static + BenchTpsClient + Send + Sync + ?Sized, -{ - const N_TRY_REQUEST_BLOCKS: u64 = 4; - for _ in 0..N_TRY_REQUEST_BLOCKS { - let block_slots = client.get_blocks(start_block, None); - - match block_slots { - Ok(slots) => { - return Ok(slots); - } - Err(error) => { - warn!("Failed to download blocks: {}, retry", error); - } - } - } - Err(()) -} +const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * BLOCK_PROCESSING_PERIOD_MS; +const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = 120; #[derive(Clone)] pub(crate) struct SignatureBatch { @@ -86,16 +67,12 @@ struct TransactionData { pub successful: bool, pub slot_leader: Option, pub error: Option, - pub block_hash: Option, - pub slot_processed: Option, + pub blockhash: Option, pub timed_out: bool, //TODO add priority fee //pub priority_fee: u64, } -pub(crate) type SignatureBatchReceiver = Receiver; -pub(crate) type SignatureBatchSender = Sender; - fn check_confirmations( block_data_file: Option<&String>, transaction_data_file: Option<&String>, @@ -103,6 +80,9 @@ fn check_confirmations( block_data_file.is_some() || transaction_data_file.is_some() } +pub(crate) type SignatureBatchSender = Sender; +type SignatureBatchReceiver = Receiver; + pub(crate) fn create_log_transactions_service_and_sender( client: &Arc, block_data_file: Option<&String>, @@ -121,13 +101,15 @@ where } } -type CsvFileWriter = csv::Writer; - pub(crate) struct LogTransactionService { thread_handler: JoinHandle<()>, } impl LogTransactionService { + pub fn join(self) -> thread::Result<()> { + self.thread_handler.join() + } + fn new( client: &Arc, signature_receiver: SignatureBatchReceiver, @@ -142,33 +124,19 @@ impl LogTransactionService { } let client = client.clone(); - let mut block_log_writer = block_data_file.map(|block_data_file| { - CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) - }); - let mut transaction_log_writer = block_data_file.map(|block_data_file| { - CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) - }); + let mut log_writer = LogWriter::new(block_data_file, transaction_data_file); let thread_handler = Builder::new() .name("LogTransactionService".to_string()) .spawn(move || { - Self::run( - client, - signature_receiver, - block_log_writer, - transaction_log_writer, - ); + Self::run(client, signature_receiver, log_writer); }) - .expect("LogTransaction service is up."); + .expect("LogTransactionService is up."); return Self { thread_handler }; } - fn run( - client: Arc, - signature_receiver: SignatureBatchReceiver, - mut block_log_writer: Option, - mut transaction_log_writer: Option, - ) where + fn run(client: Arc, signature_receiver: SignatureBatchReceiver, mut log_writer: LogWriter) + where T: 'static + BenchTpsClient + Send + Sync + ?Sized, { let commitment: CommitmentConfig = CommitmentConfig { @@ -181,11 +149,9 @@ impl LogTransactionService { commitment: Some(commitment), max_supported_transaction_version: Some(0), }; - let block_processing_timer_receiver = - tick(Duration::from_millis(16 * BLOCK_PROCESSING_PERIOD_MS)); + let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); - // TODO(klykov): wrap with retry - let mut start_block = client.get_slot().expect("get_slot succeed"); + let mut start_block = get_slot_with_retry(&client).expect("get_slot_with_retry succeed"); let mut signature_to_tx_info = HashMap::::new(); loop { @@ -195,41 +161,34 @@ impl LogTransactionService { Ok(SignatureBatch { signatures, sent_at, - //sent_slot }) => { - let mut measure_send_txs = Measure::start("measure_send_txs"); - + let mut measure_send_txs = Measure::start("measure_update_map"); signatures.iter().for_each( |sign| {signature_to_tx_info.insert(*sign, TransactionSendInfo { - sent_at, //sent_slot + sent_at, });}); measure_send_txs.stop(); - let time_send_ns = measure_send_txs.as_ms(); - info!("TIME: {time_send_ns}") + let time_send_ns = measure_send_txs.as_ns(); + info!("@@@ Time to add signatures to map: {time_send_ns}") } Err(e) => { info!("Stop LogTransactionService, error message received {e}"); - if let Some(block_log_writer) = &mut block_log_writer { - block_log_writer.flush(); - } - if let Some(transaction_log_writer) = &mut transaction_log_writer { - transaction_log_writer.flush(); - } + log_writer.flush(); break; } } }, recv(block_processing_timer_receiver) -> _ => { + let mut measure_send_txs = Measure::start("measure_update_map"); info!("sign_receiver queue len: {}", signature_receiver.len()); - // TODO(klykov) Move to process_blocks(); let block_slots = get_blocks_with_retry(&client, start_block); let Ok(block_slots) = block_slots else { error!("Failed to get blocks"); - //TODO(klykov) shall I drop receiver? + drop(signature_receiver); break; }; if block_slots.is_empty() { - continue; + continue; } start_block = *block_slots.last().unwrap() + 1; let blocks = block_slots.iter().map(|slot| { @@ -239,52 +198,40 @@ impl LogTransactionService { ) }); for (block, slot) in blocks.zip(&block_slots) { - let block = match block { - Ok(x) => x, - Err(_) => continue, + let Ok(block) = block else { + continue; }; - Self::process_blocks( + Self::process_block( block, &mut signature_to_tx_info, *slot, - &mut block_log_writer, - &mut transaction_log_writer + &mut log_writer, ) } - //TODO extract clean_transaction_map - let now: DateTime = Utc::now(); - signature_to_tx_info.retain(|_key, value| { - let duration_since_past_time = now.signed_duration_since(value.sent_at); - info!("Remove stale tx"); - duration_since_past_time.num_seconds() < 120 - }); + Self::clean_transaction_map(&mut log_writer, &mut signature_to_tx_info); + // maybe ok to write every time here? Or create a separate timer - if let Some(block_log_writer) = &mut block_log_writer { - block_log_writer.flush(); - } - if let Some(transaction_log_writer) = &mut transaction_log_writer { - transaction_log_writer.flush(); - } + log_writer.flush(); + measure_send_txs.stop(); + let time_send_ns = measure_send_txs.as_ns(); + info!("@@@ Time to process blocks: {time_send_ns}") + }, } } } - fn process_blocks( + fn process_block( block: UiConfirmedBlock, signature_to_tx_info: &mut HashMap, slot: u64, - block_log_writer: &mut Option>, - transaction_log_writer: &mut Option>, + log_writer: &mut LogWriter, ) { - let rewards = block.rewards.as_ref().unwrap(); - let slot_leader = match rewards + let rewards = block.rewards.as_ref().expect("Rewards are present."); + let slot_leader = rewards .iter() .find(|r| r.reward_type == Some(RewardType::Fee)) - { - Some(x) => x.pubkey.clone(), - None => "".to_string(), - }; + .map_or("".to_string(), |x| x.pubkey.clone()); let Some(transactions) = &block.transactions else { warn!("Empty block: {slot}"); @@ -294,7 +241,7 @@ impl LogTransactionService { let mut num_bench_tps_transactions: usize = 0; let mut total_cu_consumed: u64 = 0; let mut bench_tps_cu_consumed: u64 = 0; - for solana_transaction_status::EncodedTransactionWithStatusMeta { + for EncodedTransactionWithStatusMeta { transaction, meta, .. } in transactions { @@ -310,52 +257,130 @@ impl LogTransactionService { let signature = &transaction.signatures[0]; total_cu_consumed = total_cu_consumed.saturating_add(cu_consumed); - // TODO(klykov): rename variable - if let Some(transaction_record) = signature_to_tx_info.remove(signature) { + if let Some(TransactionSendInfo { sent_at }) = signature_to_tx_info.remove(signature) { num_bench_tps_transactions = num_bench_tps_transactions.saturating_add(1); bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed); - if let Some(transaction_log_writer) = transaction_log_writer { - let tx_data = TransactionData { - signature: signature.to_string(), - confirmed_slot: Some(slot), - //confirmed_at: Some(Utc::now().to_string()), - // TODO use sent_slot instead of sent_at by using map - sent_at: transaction_record.sent_at.to_string(), - //sent_slot: transaction_record.sent_slot, - successful: if let Some(meta) = &meta { - meta.status.is_ok() - } else { - false - }, - error: if let Some(meta) = &meta { - meta.err.as_ref().map(|x| x.to_string()) - } else { - None - }, - block_hash: Some(block.blockhash.clone()), - slot_processed: Some(slot), - slot_leader: Some(slot_leader.clone()), - timed_out: false, - //priority_fees: transaction_record.priority_fees, - }; - transaction_log_writer.serialize(tx_data); - } + log_writer.write_to_transaction_log( + signature, + Some(slot), + sent_at, + meta.as_ref(), + Some(block.blockhash.clone()), + Some(slot_leader.clone()), + true, + ); } } - // push block data - if let Some(block_log_writer) = block_log_writer { + log_writer.write_to_block_log( + block.blockhash.clone(), + slot_leader, + slot, + block.block_time, + num_bench_tps_transactions, + transactions.len(), + bench_tps_cu_consumed, + total_cu_consumed, + ) + } + + fn clean_transaction_map( + log_writer: &mut LogWriter, + signature_to_tx_info: &mut HashMap, + ) { + let now: DateTime = Utc::now(); + signature_to_tx_info.retain(|signature, tx_info| { + let duration_since_past_time = now.signed_duration_since(tx_info.sent_at); + let is_not_timeout_tx = + duration_since_past_time.num_seconds() < REMOVE_TIMEOUT_TX_EVERY_SEC; + if !is_not_timeout_tx { + log_writer.write_to_transaction_log( + signature, + None, + tx_info.sent_at, + None, + None, + None, + true, + ); + } + is_not_timeout_tx + }); + } +} + +type CsvFileWriter = csv::Writer; +struct LogWriter { + block_log_writer: Option, + transaction_log_writer: Option, +} + +impl LogWriter { + fn new(block_data_file: Option<&String>, transaction_data_file: Option<&String>) -> Self { + let block_log_writer = block_data_file.map(|block_data_file| { + CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) + }); + let transaction_log_writer = transaction_data_file.map(|transaction_data_file| { + CsvFileWriter::from_writer( + File::create(transaction_data_file).expect("File can be created."), + ) + }); + Self { + block_log_writer, + transaction_log_writer, + } + } + + fn write_to_transaction_log( + &mut self, + signature: &Signature, + confirmed_slot: Option, + sent_at: DateTime, + meta: Option<&UiTransactionStatusMeta>, + blockhash: Option, + slot_leader: Option, + timed_out: bool, + ) { + if let Some(transaction_log_writer) = &mut self.transaction_log_writer { + let tx_data = TransactionData { + signature: signature.to_string(), + confirmed_slot, + //confirmed_at: Some(Utc::now().to_string()), + // TODO use sent_slot instead of sent_at by using map + sent_at: sent_at.to_string(), + //sent_slot: transaction_record.sent_slot, + successful: meta.as_ref().map_or(false, |m| m.status.is_ok()), + error: meta + .as_ref() + .and_then(|m| m.err.as_ref().map(|x| x.to_string())), + blockhash, + slot_leader: slot_leader, + timed_out, + //priority_fees: transaction_record.priority_fees, + }; + transaction_log_writer.serialize(tx_data); + } + } + + fn write_to_block_log( + &mut self, + blockhash: String, + slot_leader: String, + slot: Slot, + block_time: Option, + num_bench_tps_transactions: usize, + total_num_transactions: usize, + bench_tps_cu_consumed: u64, + total_cu_consumed: u64, + ) { + if let Some(block_log_writer) = &mut self.block_log_writer { let block_data = BlockData { - block_hash: block.blockhash.clone(), + block_hash: blockhash, block_leader: slot_leader, block_slot: slot, - block_time: if let Some(time) = block.block_time { - time as u64 - } else { - 0 - }, + block_time: block_time.map_or(0, |time| time as u64), num_bench_tps_transactions, - total_num_transactions: transactions.len(), + total_num_transactions, bench_tps_cu_consumed, total_cu_consumed, }; @@ -363,7 +388,52 @@ impl LogTransactionService { } } - pub fn join(self) -> thread::Result<()> { - self.thread_handler.join() + fn flush(&mut self) { + if let Some(block_log_writer) = &mut self.block_log_writer { + let _ = block_log_writer.flush(); + } + if let Some(transaction_log_writer) = &mut self.transaction_log_writer { + let _ = transaction_log_writer.flush(); + } + } +} + +const NUM_RETRY: u64 = 5; + +fn get_slot_with_retry(client: &Arc) -> Result +where + T: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + for _ in 1..NUM_RETRY { + let current_slot = client.get_slot(); + + match current_slot { + Ok(slot) => { + return Ok(slot); + } + Err(error) => { + warn!("Failed to get slot: {error}, retry."); + } + } + } + client.get_slot() +} + +fn get_blocks_with_retry(client: &Arc, start_block: u64) -> Result> +where + T: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + for _ in 1..NUM_RETRY { + let block_slots = client.get_blocks(start_block, None); + + match block_slots { + Ok(slots) => { + return Ok(slots); + } + Err(error) => { + warn!("Failed to download blocks: {error}, retry."); + } + } } + client.get_blocks(start_block, None) } From 43e538c1e556fd3303dfd33358d4ad77a6580147 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 09:55:16 +0000 Subject: [PATCH 06/25] Replace pair TimestampedTransaction with struct --- bench-tps/src/bench.rs | 77 +++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index df7e70a822055b..ab762b73f78f2c 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -91,8 +91,13 @@ fn get_transaction_loaded_accounts_data_size(enable_padding: bool) -> u32 { } } -pub type TimestampedTransaction = (Transaction, Option); -pub type SharedTransactions = Arc>>>; +#[derive(Debug, PartialEq, Default, Eq, Clone)] +pub(crate) struct TimestampedTransaction { + transaction: Transaction, + timestamp: Option, +} + +pub(crate) type SharedTransactions = Arc>>>; /// Keypairs split into source and destination /// used for transfer transactions @@ -597,37 +602,33 @@ fn generate_system_txs( pairs_with_compute_unit_prices .par_iter() - .map(|((from, to), compute_unit_price)| { - ( - transfer_with_compute_unit_price_and_padding( - from, - &to.pubkey(), - 1, - *blockhash, - instruction_padding_config, - Some(**compute_unit_price), - skip_tx_account_data_size, - ), - Some(timestamp()), - ) + .map(|((from, to), compute_unit_price)| TimestampedTransaction { + transaction: transfer_with_compute_unit_price_and_padding( + from, + &to.pubkey(), + 1, + *blockhash, + instruction_padding_config, + Some(**compute_unit_price), + skip_tx_account_data_size, + ), + timestamp: Some(timestamp()), }) .collect() } else { pairs .par_iter() - .map(|(from, to)| { - ( - transfer_with_compute_unit_price_and_padding( - from, - &to.pubkey(), - 1, - *blockhash, - instruction_padding_config, - None, - skip_tx_account_data_size, - ), - Some(timestamp()), - ) + .map(|(from, to)| TimestampedTransaction { + transaction: transfer_with_compute_unit_price_and_padding( + from, + &to.pubkey(), + 1, + *blockhash, + instruction_padding_config, + None, + skip_tx_account_data_size, + ), + timestamp: Some(timestamp()), }) .collect() } @@ -802,8 +803,8 @@ fn generate_nonced_system_txs = get_nonce_blockhashes(&client, &pubkeys); for i in 0..length { - transactions.push(( - nonced_transfer_with_padding( + transactions.push(TimestampedTransaction { + transaction: nonced_transfer_with_padding( source[i], &dest[i].pubkey(), 1, @@ -813,16 +814,16 @@ fn generate_nonced_system_txs = dest_nonce.iter().map(|keypair| keypair.pubkey()).collect(); let blockhashes: Vec = get_nonce_blockhashes(&client, &pubkeys); for i in 0..length { - transactions.push(( - nonced_transfer_with_padding( + transactions.push(TimestampedTransaction { + transaction: nonced_transfer_with_padding( dest[i], &source[i].pubkey(), 1, @@ -832,8 +833,8 @@ fn generate_nonced_system_txs( let now = timestamp(); // Transactions without durable nonce that are too old will be rejected by the cluster Don't bother // sending them. - if let Some(tx_timestamp) = tx.1 { + if let Some(tx_timestamp) = tx.timestamp { if tx_timestamp < min_timestamp { min_timestamp = tx_timestamp; } @@ -971,7 +972,7 @@ fn do_tx_transfers( continue; } } - transactions.push(tx.0); + transactions.push(tx.transaction); } if min_timestamp != u64::MAX { From 4e295a0ec435033bf97c2b96047c6ec84dab663c Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 10:12:13 +0000 Subject: [PATCH 07/25] add compute_unit_price to TimestampedTransaction --- bench-tps/src/bench.rs | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index ab762b73f78f2c..c7178fc70a799b 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -95,6 +95,7 @@ fn get_transaction_loaded_accounts_data_size(enable_padding: bool) -> u32 { pub(crate) struct TimestampedTransaction { transaction: Transaction, timestamp: Option, + compute_unit_price: Option, } pub(crate) type SharedTransactions = Arc>>>; @@ -602,17 +603,21 @@ fn generate_system_txs( pairs_with_compute_unit_prices .par_iter() - .map(|((from, to), compute_unit_price)| TimestampedTransaction { - transaction: transfer_with_compute_unit_price_and_padding( - from, - &to.pubkey(), - 1, - *blockhash, - instruction_padding_config, - Some(**compute_unit_price), - skip_tx_account_data_size, - ), - timestamp: Some(timestamp()), + .map(|((from, to), compute_unit_price)| { + let compute_unit_price = Some(**compute_unit_price); + TimestampedTransaction { + transaction: transfer_with_compute_unit_price_and_padding( + from, + &to.pubkey(), + 1, + *blockhash, + instruction_padding_config, + compute_unit_price, + skip_tx_account_data_size, + ), + timestamp: Some(timestamp()), + compute_unit_price, + } }) .collect() } else { @@ -629,6 +634,7 @@ fn generate_system_txs( skip_tx_account_data_size, ), timestamp: Some(timestamp()), + compute_unit_price: None, }) .collect() } @@ -815,6 +821,7 @@ fn generate_nonced_system_txs Date: Tue, 5 Mar 2024 12:49:46 +0000 Subject: [PATCH 08/25] add cu_price to LogWriter --- bench-tps/src/bench.rs | 8 ++++-- bench-tps/src/confirmations_processing.rs | 31 +++++++++++++---------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index c7178fc70a799b..5cb9c798e44012 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -965,8 +965,10 @@ fn do_tx_transfers( let tx_len = txs0.len(); let transfer_start = Instant::now(); let mut old_transactions = false; - let mut transactions = Vec::<_>::new(); let mut min_timestamp = u64::MAX; + let mut transactions = Vec::<_>::with_capacity(txs0.len()); + let mut signatures = Vec::<_>::with_capacity(txs0.len()); + let mut compute_unit_prices = Vec::<_>::with_capacity(txs0.len()); for tx in txs0 { let now = timestamp(); // Transactions without durable nonce that are too old will be rejected by the cluster Don't bother @@ -980,7 +982,9 @@ fn do_tx_transfers( continue; } } + signatures.push(tx.transaction.signatures[0]); transactions.push(tx.transaction); + compute_unit_prices.push(tx.compute_unit_price); } if min_timestamp != u64::MAX { @@ -990,12 +994,12 @@ fn do_tx_transfers( ); } - let signatures = transactions.iter().map(|tx| tx.signatures[0]).collect(); if let Some(signatures_sender) = &signatures_sender { if signatures_sender .send(SignatureBatch { signatures, sent_at: Utc::now(), + compute_unit_prices, }) .is_err() { diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index b4d9a7eb126796..1104a717478b6f 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -8,6 +8,7 @@ use { solana_client::rpc_config::RpcBlockConfig, solana_measure::measure::Measure, solana_sdk::{ + clock::{DEFAULT_MS_PER_SLOT, DEFAULT_S_PER_SLOT, MAX_PROCESSING_AGE}, commitment_config::{CommitmentConfig, CommitmentLevel}, signature::Signature, slot_history::Slot, @@ -25,24 +26,20 @@ use { }, }; -const BLOCK_PROCESSING_PERIOD_MS: u64 = 400; -const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * BLOCK_PROCESSING_PERIOD_MS; -const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = 120; +const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT; +const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64; #[derive(Clone)] pub(crate) struct SignatureBatch { pub signatures: Vec, pub sent_at: DateTime, - // pub sent_slot: Slot, I think it can be calculated from time + pub compute_unit_prices: Vec>, // TODO(klykov) pub sent_slot: Slot, I think it can be calculated from time } -//TODO(klykov) If there will be no other data, rename to transaction time or something lile that #[derive(Clone)] pub(crate) struct TransactionSendInfo { pub sent_at: DateTime, - //pub sent_slot: Slot, - //TODO add priority fee - //pub priority_fee: u64, + pub compute_unit_price: Option, } #[derive(Clone, Serialize)] @@ -69,8 +66,7 @@ struct TransactionData { pub error: Option, pub blockhash: Option, pub timed_out: bool, - //TODO add priority fee - //pub priority_fee: u64, + pub compute_unit_price: u64, } fn check_confirmations( @@ -161,10 +157,12 @@ impl LogTransactionService { Ok(SignatureBatch { signatures, sent_at, + compute_unit_prices }) => { let mut measure_send_txs = Measure::start("measure_update_map"); - signatures.iter().for_each( |sign| {signature_to_tx_info.insert(*sign, TransactionSendInfo { + signatures.iter().zip(compute_unit_prices).for_each( |(sign, compute_unit_price)| {signature_to_tx_info.insert(*sign, TransactionSendInfo { sent_at, + compute_unit_price });}); measure_send_txs.stop(); @@ -257,7 +255,11 @@ impl LogTransactionService { let signature = &transaction.signatures[0]; total_cu_consumed = total_cu_consumed.saturating_add(cu_consumed); - if let Some(TransactionSendInfo { sent_at }) = signature_to_tx_info.remove(signature) { + if let Some(TransactionSendInfo { + sent_at, + compute_unit_price, + }) = signature_to_tx_info.remove(signature) + { num_bench_tps_transactions = num_bench_tps_transactions.saturating_add(1); bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed); @@ -268,6 +270,7 @@ impl LogTransactionService { meta.as_ref(), Some(block.blockhash.clone()), Some(slot_leader.clone()), + compute_unit_price, true, ); } @@ -301,6 +304,7 @@ impl LogTransactionService { None, None, None, + tx_info.compute_unit_price, true, ); } @@ -339,6 +343,7 @@ impl LogWriter { meta: Option<&UiTransactionStatusMeta>, blockhash: Option, slot_leader: Option, + compute_unit_price: Option, timed_out: bool, ) { if let Some(transaction_log_writer) = &mut self.transaction_log_writer { @@ -356,7 +361,7 @@ impl LogWriter { blockhash, slot_leader: slot_leader, timed_out, - //priority_fees: transaction_record.priority_fees, + compute_unit_price: compute_unit_price.unwrap_or(0), }; transaction_log_writer.serialize(tx_data); } From c2ac294f8902e3808ba48776aeb66f2350e82fc4 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 13:16:31 +0000 Subject: [PATCH 09/25] add block time to the logs --- bench-tps/src/confirmations_processing.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index 1104a717478b6f..3b51a5bc42e5c4 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -1,6 +1,6 @@ use { crate::bench_tps_client::{BenchTpsClient, Result}, - chrono::{DateTime, Utc}, + chrono::{DateTime, TimeZone, Utc}, crossbeam_channel::{select, tick, unbounded, Receiver, Sender}, csv, log::*, @@ -33,7 +33,7 @@ const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_ pub(crate) struct SignatureBatch { pub signatures: Vec, pub sent_at: DateTime, - pub compute_unit_prices: Vec>, // TODO(klykov) pub sent_slot: Slot, I think it can be calculated from time + pub compute_unit_prices: Vec>, } #[derive(Clone)] @@ -47,7 +47,7 @@ struct BlockData { pub block_hash: String, pub block_slot: Slot, pub block_leader: String, - pub block_time: u64, + pub block_time: Option>, pub total_num_transactions: usize, pub num_bench_tps_transactions: usize, pub total_cu_consumed: u64, @@ -57,10 +57,9 @@ struct BlockData { #[derive(Clone, Serialize)] struct TransactionData { pub signature: String, - //pub sent_slot: Slot, - pub sent_at: String, + pub sent_at: String, //TODO(klykov) use consistently DateTime? I think it cane be serialized pub confirmed_slot: Option, - //pub confirmed_at: Option, + pub confirmed_at: Option>, pub successful: bool, pub slot_leader: Option, pub error: Option, @@ -120,7 +119,7 @@ impl LogTransactionService { } let client = client.clone(); - let mut log_writer = LogWriter::new(block_data_file, transaction_data_file); + let log_writer = LogWriter::new(block_data_file, transaction_data_file); let thread_handler = Builder::new() .name("LogTransactionService".to_string()) @@ -266,6 +265,7 @@ impl LogTransactionService { log_writer.write_to_transaction_log( signature, Some(slot), + block.block_time, sent_at, meta.as_ref(), Some(block.blockhash.clone()), @@ -300,6 +300,7 @@ impl LogTransactionService { log_writer.write_to_transaction_log( signature, None, + None, tx_info.sent_at, None, None, @@ -339,6 +340,7 @@ impl LogWriter { &mut self, signature: &Signature, confirmed_slot: Option, + block_time: Option, sent_at: DateTime, meta: Option<&UiTransactionStatusMeta>, blockhash: Option, @@ -350,7 +352,7 @@ impl LogWriter { let tx_data = TransactionData { signature: signature.to_string(), confirmed_slot, - //confirmed_at: Some(Utc::now().to_string()), + confirmed_at: block_time.map(|time| Utc.timestamp(time, 0)), // TODO use sent_slot instead of sent_at by using map sent_at: sent_at.to_string(), //sent_slot: transaction_record.sent_slot, @@ -383,7 +385,7 @@ impl LogWriter { block_hash: blockhash, block_leader: slot_leader, block_slot: slot, - block_time: block_time.map_or(0, |time| time as u64), + block_time: block_time.map(|time| Utc.timestamp(time, 0)), num_bench_tps_transactions, total_num_transactions, bench_tps_cu_consumed, From e0f5515a83d4786cc69981ba62add2a3fdc4edc4 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 13:41:40 +0000 Subject: [PATCH 10/25] Fix warnings --- bench-tps/src/confirmations_processing.rs | 25 +++++++++++++++-------- bench-tps/src/main.rs | 2 -- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index 3b51a5bc42e5c4..25dbde066c431d 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -2,7 +2,6 @@ use { crate::bench_tps_client::{BenchTpsClient, Result}, chrono::{DateTime, TimeZone, Utc}, crossbeam_channel::{select, tick, unbounded, Receiver, Sender}, - csv, log::*, serde::Serialize, solana_client::rpc_config::RpcBlockConfig, @@ -127,7 +126,7 @@ impl LogTransactionService { Self::run(client, signature_receiver, log_writer); }) .expect("LogTransactionService is up."); - return Self { thread_handler }; + Self { thread_handler } } fn run(client: Arc, signature_receiver: SignatureBatchReceiver, mut log_writer: LogWriter) @@ -336,6 +335,7 @@ impl LogWriter { } } + #[allow(clippy::too_many_arguments)] fn write_to_transaction_log( &mut self, signature: &Signature, @@ -352,23 +352,26 @@ impl LogWriter { let tx_data = TransactionData { signature: signature.to_string(), confirmed_slot, - confirmed_at: block_time.map(|time| Utc.timestamp(time, 0)), - // TODO use sent_slot instead of sent_at by using map + confirmed_at: block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("valid timestamp") + }), sent_at: sent_at.to_string(), - //sent_slot: transaction_record.sent_slot, successful: meta.as_ref().map_or(false, |m| m.status.is_ok()), error: meta .as_ref() .and_then(|m| m.err.as_ref().map(|x| x.to_string())), blockhash, - slot_leader: slot_leader, + slot_leader, timed_out, compute_unit_price: compute_unit_price.unwrap_or(0), }; - transaction_log_writer.serialize(tx_data); + let _ = transaction_log_writer.serialize(tx_data); } } + #[allow(clippy::too_many_arguments)] fn write_to_block_log( &mut self, blockhash: String, @@ -385,13 +388,17 @@ impl LogWriter { block_hash: blockhash, block_leader: slot_leader, block_slot: slot, - block_time: block_time.map(|time| Utc.timestamp(time, 0)), + block_time: block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("valid timestamp") + }), num_bench_tps_transactions, total_num_transactions, bench_tps_cu_consumed, total_cu_consumed, }; - block_log_writer.serialize(block_data); + let _ = block_log_writer.serialize(block_data); } } diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 9c0e89ccc7c817..a5da5a515703c5 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -199,8 +199,6 @@ fn main() { bind_address, client_node_id, commitment_config, - block_data_file, - transaction_data_file, .. } = &cli_config; From 71a360dee0446ed9bbb9b98933461dfa3337ab40 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 14:23:46 +0000 Subject: [PATCH 11/25] add comments and restructure code --- bench-tps/src/bench.rs | 4 +- bench-tps/src/confirmations_processing.rs | 120 ++++++++++++---------- 2 files changed, 67 insertions(+), 57 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 5cb9c798e44012..558b65fa0becb3 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -3,7 +3,7 @@ use { bench_tps_client::*, cli::{ComputeUnitPrice, Config, InstructionPaddingConfig}, confirmations_processing::{ - create_log_transactions_service_and_sender, SignatureBatch, SignatureBatchSender, + create_log_transactions_service_and_sender, SignatureBatchSender, TransactionInfoBatch, }, perf_utils::{sample_txs, SampleStats}, send_batch::*, @@ -996,7 +996,7 @@ fn do_tx_transfers( if let Some(signatures_sender) = &signatures_sender { if signatures_sender - .send(SignatureBatch { + .send(TransactionInfoBatch { signatures, sent_at: Utc::now(), compute_unit_prices, diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index 25dbde066c431d..1da917379236a5 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -1,3 +1,6 @@ +//! `LogTransactionService` requests confirmed blocks, analyses transactions submitted by bench-tps, +//! and saves log files in csv format. + use { crate::bench_tps_client::{BenchTpsClient, Result}, chrono::{DateTime, TimeZone, Utc}, @@ -25,58 +28,21 @@ use { }, }; -const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT; -const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64; - +// Data to establish communication between sender thread and +// LogTransactionService. #[derive(Clone)] -pub(crate) struct SignatureBatch { +pub(crate) struct TransactionInfoBatch { pub signatures: Vec, pub sent_at: DateTime, pub compute_unit_prices: Vec>, } -#[derive(Clone)] -pub(crate) struct TransactionSendInfo { - pub sent_at: DateTime, - pub compute_unit_price: Option, -} +pub(crate) type SignatureBatchSender = Sender; -#[derive(Clone, Serialize)] -struct BlockData { - pub block_hash: String, - pub block_slot: Slot, - pub block_leader: String, - pub block_time: Option>, - pub total_num_transactions: usize, - pub num_bench_tps_transactions: usize, - pub total_cu_consumed: u64, - pub bench_tps_cu_consumed: u64, -} - -#[derive(Clone, Serialize)] -struct TransactionData { - pub signature: String, - pub sent_at: String, //TODO(klykov) use consistently DateTime? I think it cane be serialized - pub confirmed_slot: Option, - pub confirmed_at: Option>, - pub successful: bool, - pub slot_leader: Option, - pub error: Option, - pub blockhash: Option, - pub timed_out: bool, - pub compute_unit_price: u64, -} - -fn check_confirmations( - block_data_file: Option<&String>, - transaction_data_file: Option<&String>, -) -> bool { - block_data_file.is_some() || transaction_data_file.is_some() +pub(crate) struct LogTransactionService { + thread_handler: JoinHandle<()>, } -pub(crate) type SignatureBatchSender = Sender; -type SignatureBatchReceiver = Receiver; - pub(crate) fn create_log_transactions_service_and_sender( client: &Arc, block_data_file: Option<&String>, @@ -95,15 +61,22 @@ where } } -pub(crate) struct LogTransactionService { - thread_handler: JoinHandle<()>, +// How often process blocks. +const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT; +// Max age for transaction in the transaction map. +const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64; + +// Map used to filter submitted transactions. +#[derive(Clone)] +struct TransactionSendInfo { + pub sent_at: DateTime, + pub compute_unit_price: Option, } +type MapSignatureToTxInfo = HashMap; -impl LogTransactionService { - pub fn join(self) -> thread::Result<()> { - self.thread_handler.join() - } +type SignatureBatchReceiver = Receiver; +impl LogTransactionService { fn new( client: &Arc, signature_receiver: SignatureBatchReceiver, @@ -129,6 +102,10 @@ impl LogTransactionService { Self { thread_handler } } + pub fn join(self) -> thread::Result<()> { + self.thread_handler.join() + } + fn run(client: Arc, signature_receiver: SignatureBatchReceiver, mut log_writer: LogWriter) where T: 'static + BenchTpsClient + Send + Sync + ?Sized, @@ -147,12 +124,12 @@ impl LogTransactionService { let mut start_block = get_slot_with_retry(&client).expect("get_slot_with_retry succeed"); - let mut signature_to_tx_info = HashMap::::new(); + let mut signature_to_tx_info = MapSignatureToTxInfo::new(); loop { select! { recv(signature_receiver) -> msg => { match msg { - Ok(SignatureBatch { + Ok(TransactionInfoBatch { signatures, sent_at, compute_unit_prices @@ -219,7 +196,7 @@ impl LogTransactionService { fn process_block( block: UiConfirmedBlock, - signature_to_tx_info: &mut HashMap, + signature_to_tx_info: &mut MapSignatureToTxInfo, slot: u64, log_writer: &mut LogWriter, ) { @@ -247,7 +224,7 @@ impl LogTransactionService { let cu_consumed = meta .as_ref() .map_or(0, |meta| match meta.compute_units_consumed { - OptionSerializer::Some(cu_consumed) => cu_consumed, //TODO(klykov): consider adding error info as well + OptionSerializer::Some(cu_consumed) => cu_consumed, _ => 0, }); let signature = &transaction.signatures[0]; @@ -288,7 +265,7 @@ impl LogTransactionService { fn clean_transaction_map( log_writer: &mut LogWriter, - signature_to_tx_info: &mut HashMap, + signature_to_tx_info: &mut MapSignatureToTxInfo, ) { let now: DateTime = Utc::now(); signature_to_tx_info.retain(|signature, tx_info| { @@ -313,6 +290,39 @@ impl LogTransactionService { } } +fn check_confirmations( + block_data_file: Option<&String>, + transaction_data_file: Option<&String>, +) -> bool { + block_data_file.is_some() || transaction_data_file.is_some() +} + +#[derive(Clone, Serialize)] +struct BlockData { + pub block_hash: String, + pub block_slot: Slot, + pub block_leader: String, + pub block_time: Option>, + pub total_num_transactions: usize, + pub num_bench_tps_transactions: usize, + pub total_cu_consumed: u64, + pub bench_tps_cu_consumed: u64, +} + +#[derive(Clone, Serialize)] +struct TransactionData { + pub signature: String, + pub sent_at: Option>, + pub confirmed_slot: Option, + pub confirmed_at: Option>, + pub successful: bool, + pub slot_leader: Option, + pub error: Option, + pub blockhash: Option, + pub timed_out: bool, + pub compute_unit_price: u64, +} + type CsvFileWriter = csv::Writer; struct LogWriter { block_log_writer: Option, @@ -357,7 +367,7 @@ impl LogWriter { .latest() .expect("valid timestamp") }), - sent_at: sent_at.to_string(), + sent_at: Some(sent_at), successful: meta.as_ref().map_or(false, |m| m.status.is_ok()), error: meta .as_ref() From d67f7b454cff815eb2d8eac9a1782832e7b05022 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 16:35:56 +0000 Subject: [PATCH 12/25] some small improvements --- bench-tps/src/bench.rs | 2 +- bench-tps/src/confirmations_processing.rs | 27 ++++++++++------------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 558b65fa0becb3..261e5b38b9ce98 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -1003,7 +1003,7 @@ fn do_tx_transfers( }) .is_err() { - info!("Receiver has been dropped, stop sending transactions."); + error!("Receiver has been dropped, stop sending transactions."); break; } } diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/confirmations_processing.rs index 1da917379236a5..72b936b62a06f3 100644 --- a/bench-tps/src/confirmations_processing.rs +++ b/bench-tps/src/confirmations_processing.rs @@ -23,7 +23,7 @@ use { collections::HashMap, fs::File, sync::Arc, - thread::{self, Builder, JoinHandle}, + thread::{self, sleep, Builder, JoinHandle}, time::Duration, }, }; @@ -134,29 +134,24 @@ impl LogTransactionService { sent_at, compute_unit_prices }) => { - let mut measure_send_txs = Measure::start("measure_update_map"); signatures.iter().zip(compute_unit_prices).for_each( |(sign, compute_unit_price)| {signature_to_tx_info.insert(*sign, TransactionSendInfo { sent_at, compute_unit_price });}); - - measure_send_txs.stop(); - let time_send_ns = measure_send_txs.as_ns(); - info!("@@@ Time to add signatures to map: {time_send_ns}") } Err(e) => { - info!("Stop LogTransactionService, error message received {e}"); + info!("Stop LogTransactionService, message received: {e}"); log_writer.flush(); break; } } }, recv(block_processing_timer_receiver) -> _ => { - let mut measure_send_txs = Measure::start("measure_update_map"); + let mut measure_process_blocks = Measure::start("measure_process_blocks"); info!("sign_receiver queue len: {}", signature_receiver.len()); let block_slots = get_blocks_with_retry(&client, start_block); let Ok(block_slots) = block_slots else { - error!("Failed to get blocks"); + error!("Failed to get blocks, stop LogWriterService."); drop(signature_receiver); break; }; @@ -170,6 +165,7 @@ impl LogTransactionService { rpc_block_config ) }); + let num_blocks = blocks.len(); for (block, slot) in blocks.zip(&block_slots) { let Ok(block) = block else { continue; @@ -182,13 +178,11 @@ impl LogTransactionService { ) } Self::clean_transaction_map(&mut log_writer, &mut signature_to_tx_info); + measure_process_blocks.stop(); - // maybe ok to write every time here? Or create a separate timer + let time_send_us = measure_process_blocks.as_us(); + info!("Time to process {num_blocks} blocks: {time_send_us}"); log_writer.flush(); - measure_send_txs.stop(); - let time_send_ns = measure_send_txs.as_ns(); - info!("@@@ Time to process blocks: {time_send_ns}") - }, } } @@ -247,7 +241,7 @@ impl LogTransactionService { Some(block.blockhash.clone()), Some(slot_leader.clone()), compute_unit_price, - true, + false, ); } } @@ -423,6 +417,7 @@ impl LogWriter { } const NUM_RETRY: u64 = 5; +const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT; fn get_slot_with_retry(client: &Arc) -> Result where @@ -437,6 +432,7 @@ where } Err(error) => { warn!("Failed to get slot: {error}, retry."); + sleep(Duration::from_millis(RETRY_EVERY_MS)); } } } @@ -456,6 +452,7 @@ where } Err(error) => { warn!("Failed to download blocks: {error}, retry."); + sleep(Duration::from_millis(RETRY_EVERY_MS)); } } } From 15b44af87b8c7750edb7589c17e0ea2036f756fd Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 5 Mar 2024 17:12:28 +0000 Subject: [PATCH 13/25] Renamed conformation_processing.rs to log_transaction_service.rs --- bench-tps/src/bench.rs | 2 +- bench-tps/src/lib.rs | 2 +- .../{confirmations_processing.rs => log_transaction_service.rs} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename bench-tps/src/{confirmations_processing.rs => log_transaction_service.rs} (100%) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 261e5b38b9ce98..f67080a1cd6d92 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -2,7 +2,7 @@ use { crate::{ bench_tps_client::*, cli::{ComputeUnitPrice, Config, InstructionPaddingConfig}, - confirmations_processing::{ + log_transaction_service::{ create_log_transactions_service_and_sender, SignatureBatchSender, TransactionInfoBatch, }, perf_utils::{sample_txs, SampleStats}, diff --git a/bench-tps/src/lib.rs b/bench-tps/src/lib.rs index a089656e198e7b..62d185431cc318 100644 --- a/bench-tps/src/lib.rs +++ b/bench-tps/src/lib.rs @@ -2,7 +2,7 @@ pub mod bench; pub mod bench_tps_client; pub mod cli; -mod confirmations_processing; pub mod keypairs; +mod log_transaction_service; mod perf_utils; pub mod send_batch; diff --git a/bench-tps/src/confirmations_processing.rs b/bench-tps/src/log_transaction_service.rs similarity index 100% rename from bench-tps/src/confirmations_processing.rs rename to bench-tps/src/log_transaction_service.rs From f9740e707820e44d95dca8ff7c9582b9b6cb1c00 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Wed, 6 Mar 2024 11:54:38 +0000 Subject: [PATCH 14/25] address numerous PR comments --- bench-tps/src/bench.rs | 4 +- bench-tps/src/log_transaction_service.rs | 91 ++++++++++++------------ 2 files changed, 47 insertions(+), 48 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index f67080a1cd6d92..58930b21224ac9 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -481,8 +481,8 @@ where let (log_transaction_service, signatures_sender) = create_log_transactions_service_and_sender( &client, - block_data_file.as_ref(), - transaction_data_file.as_ref(), + block_data_file.as_deref(), + transaction_data_file.as_deref(), ); let sender_threads = create_sender_threads( diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index 72b936b62a06f3..646b26bb346f80 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -43,15 +43,15 @@ pub(crate) struct LogTransactionService { thread_handler: JoinHandle<()>, } -pub(crate) fn create_log_transactions_service_and_sender( - client: &Arc, - block_data_file: Option<&String>, - transaction_data_file: Option<&String>, +pub(crate) fn create_log_transactions_service_and_sender( + client: &Arc, + block_data_file: Option<&str>, + transaction_data_file: Option<&str>, ) -> (Option, Option) where - T: 'static + BenchTpsClient + Send + Sync + ?Sized, + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - if check_confirmations(block_data_file, transaction_data_file) { + if verify_data_files(block_data_file, transaction_data_file) { let (sender, receiver) = unbounded(); let log_tx_service = LogTransactionService::new(client, receiver, block_data_file, transaction_data_file); @@ -77,17 +77,17 @@ type MapSignatureToTxInfo = HashMap; type SignatureBatchReceiver = Receiver; impl LogTransactionService { - fn new( - client: &Arc, + fn new( + client: &Arc, signature_receiver: SignatureBatchReceiver, - block_data_file: Option<&String>, - transaction_data_file: Option<&String>, + block_data_file: Option<&str>, + transaction_data_file: Option<&str>, ) -> Self where - T: 'static + BenchTpsClient + Send + Sync + ?Sized, + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - if !check_confirmations(block_data_file, transaction_data_file) { - panic!("Expect block-data-file or transaction-data-file is specified."); + if !verify_data_files(block_data_file, transaction_data_file) { + panic!("Expect block-data-file or transaction-data-file is specified, must have been verified by callee."); } let client = client.clone(); @@ -106,9 +106,12 @@ impl LogTransactionService { self.thread_handler.join() } - fn run(client: Arc, signature_receiver: SignatureBatchReceiver, mut log_writer: LogWriter) - where - T: 'static + BenchTpsClient + Send + Sync + ?Sized, + fn run( + client: Arc, + signature_receiver: SignatureBatchReceiver, + mut log_writer: LogWriter, + ) where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { let commitment: CommitmentConfig = CommitmentConfig { commitment: CommitmentLevel::Confirmed, @@ -122,7 +125,8 @@ impl LogTransactionService { }; let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); - let mut start_block = get_slot_with_retry(&client).expect("get_slot_with_retry succeed"); + let mut start_block = get_slot_with_retry(&client) + .expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC."); let mut signature_to_tx_info = MapSignatureToTxInfo::new(); loop { @@ -152,7 +156,6 @@ impl LogTransactionService { let block_slots = get_blocks_with_retry(&client, start_block); let Ok(block_slots) = block_slots else { error!("Failed to get blocks, stop LogWriterService."); - drop(signature_receiver); break; }; if block_slots.is_empty() { @@ -284,10 +287,7 @@ impl LogTransactionService { } } -fn check_confirmations( - block_data_file: Option<&String>, - transaction_data_file: Option<&String>, -) -> bool { +fn verify_data_files(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> bool { block_data_file.is_some() || transaction_data_file.is_some() } @@ -324,7 +324,7 @@ struct LogWriter { } impl LogWriter { - fn new(block_data_file: Option<&String>, transaction_data_file: Option<&String>) -> Self { + fn new(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> Self { let block_log_writer = block_data_file.map(|block_data_file| { CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) }); @@ -419,42 +419,41 @@ impl LogWriter { const NUM_RETRY: u64 = 5; const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT; -fn get_slot_with_retry(client: &Arc) -> Result +fn call_rpc_with_retry(f: Func, retry_warning: &str) -> Result where - T: 'static + BenchTpsClient + Send + Sync + ?Sized, + Func: Fn() -> Result, { - for _ in 1..NUM_RETRY { - let current_slot = client.get_slot(); - - match current_slot { + let mut iretry = 0; + loop { + match f() { Ok(slot) => { return Ok(slot); } Err(error) => { - warn!("Failed to get slot: {error}, retry."); + if iretry == NUM_RETRY { + return Err(error); + } + warn!("{retry_warning}: {error}, retry."); sleep(Duration::from_millis(RETRY_EVERY_MS)); } } + iretry += 1; } - client.get_slot() } -fn get_blocks_with_retry(client: &Arc, start_block: u64) -> Result> +fn get_slot_with_retry(client: &Arc) -> Result where - T: 'static + BenchTpsClient + Send + Sync + ?Sized, + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - for _ in 1..NUM_RETRY { - let block_slots = client.get_blocks(start_block, None); + call_rpc_with_retry(|| client.get_slot(), "Failed to get slot") +} - match block_slots { - Ok(slots) => { - return Ok(slots); - } - Err(error) => { - warn!("Failed to download blocks: {error}, retry."); - sleep(Duration::from_millis(RETRY_EVERY_MS)); - } - } - } - client.get_blocks(start_block, None) +fn get_blocks_with_retry(client: &Arc, start_block: u64) -> Result> +where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + call_rpc_with_retry( + || client.get_blocks(start_block, None), + "Failed to download blocks", + ) } From c4124ca541b433ec90d9b7ccd734735a7ce49da2 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Wed, 6 Mar 2024 12:09:10 +0000 Subject: [PATCH 15/25] split LogWriter into two structs --- bench-tps/src/log_transaction_service.rs | 148 +++++++++++++---------- 1 file changed, 84 insertions(+), 64 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index 646b26bb346f80..1e170a75eed3d1 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -91,12 +91,13 @@ impl LogTransactionService { } let client = client.clone(); - let log_writer = LogWriter::new(block_data_file, transaction_data_file); + let tx_log_writer = TransactionLogWriter::new(transaction_data_file); + let block_log_writer = BlockLogWriter::new(block_data_file); let thread_handler = Builder::new() .name("LogTransactionService".to_string()) .spawn(move || { - Self::run(client, signature_receiver, log_writer); + Self::run(client, signature_receiver, tx_log_writer, block_log_writer); }) .expect("LogTransactionService is up."); Self { thread_handler } @@ -109,7 +110,8 @@ impl LogTransactionService { fn run( client: Arc, signature_receiver: SignatureBatchReceiver, - mut log_writer: LogWriter, + mut tx_log_writer: TransactionLogWriter, + mut block_log_writer: BlockLogWriter, ) where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { @@ -145,7 +147,8 @@ impl LogTransactionService { } Err(e) => { info!("Stop LogTransactionService, message received: {e}"); - log_writer.flush(); + tx_log_writer.flush(); + block_log_writer.flush(); break; } } @@ -163,10 +166,10 @@ impl LogTransactionService { } start_block = *block_slots.last().unwrap() + 1; let blocks = block_slots.iter().map(|slot| { - client.get_block_with_config( - *slot, - rpc_block_config - ) + client.get_block_with_config( + *slot, + rpc_block_config + ) }); let num_blocks = blocks.len(); for (block, slot) in blocks.zip(&block_slots) { @@ -177,15 +180,17 @@ impl LogTransactionService { block, &mut signature_to_tx_info, *slot, - &mut log_writer, + &mut tx_log_writer, + &mut block_log_writer ) } - Self::clean_transaction_map(&mut log_writer, &mut signature_to_tx_info); + Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info); measure_process_blocks.stop(); let time_send_us = measure_process_blocks.as_us(); info!("Time to process {num_blocks} blocks: {time_send_us}"); - log_writer.flush(); + tx_log_writer.flush(); + block_log_writer.flush(); }, } } @@ -195,7 +200,8 @@ impl LogTransactionService { block: UiConfirmedBlock, signature_to_tx_info: &mut MapSignatureToTxInfo, slot: u64, - log_writer: &mut LogWriter, + tx_log_writer: &mut TransactionLogWriter, + block_log_writer: &mut BlockLogWriter, ) { let rewards = block.rewards.as_ref().expect("Rewards are present."); let slot_leader = rewards @@ -235,7 +241,7 @@ impl LogTransactionService { num_bench_tps_transactions = num_bench_tps_transactions.saturating_add(1); bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed); - log_writer.write_to_transaction_log( + tx_log_writer.write( signature, Some(slot), block.block_time, @@ -248,7 +254,7 @@ impl LogTransactionService { ); } } - log_writer.write_to_block_log( + block_log_writer.write( block.blockhash.clone(), slot_leader, slot, @@ -261,7 +267,7 @@ impl LogTransactionService { } fn clean_transaction_map( - log_writer: &mut LogWriter, + tx_log_writer: &mut TransactionLogWriter, signature_to_tx_info: &mut MapSignatureToTxInfo, ) { let now: DateTime = Utc::now(); @@ -270,7 +276,7 @@ impl LogTransactionService { let is_not_timeout_tx = duration_since_past_time.num_seconds() < REMOVE_TIMEOUT_TX_EVERY_SEC; if !is_not_timeout_tx { - log_writer.write_to_transaction_log( + tx_log_writer.write( signature, None, None, @@ -291,6 +297,8 @@ fn verify_data_files(block_data_file: Option<&str>, transaction_data_file: Optio block_data_file.is_some() || transaction_data_file.is_some() } +type CsvFileWriter = csv::Writer; + #[derive(Clone, Serialize)] struct BlockData { pub block_hash: String, @@ -303,6 +311,58 @@ struct BlockData { pub bench_tps_cu_consumed: u64, } +struct BlockLogWriter { + log_writer: Option, +} + +impl BlockLogWriter { + fn new(block_data_file: Option<&str>) -> Self { + let block_log_writer = block_data_file.map(|block_data_file| { + CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) + }); + Self { + log_writer: block_log_writer, + } + } + + #[allow(clippy::too_many_arguments)] + fn write( + &mut self, + blockhash: String, + slot_leader: String, + slot: Slot, + block_time: Option, + num_bench_tps_transactions: usize, + total_num_transactions: usize, + bench_tps_cu_consumed: u64, + total_cu_consumed: u64, + ) { + if let Some(block_log_writer) = &mut self.log_writer { + let block_data = BlockData { + block_hash: blockhash, + block_leader: slot_leader, + block_slot: slot, + block_time: block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("valid timestamp") + }), + num_bench_tps_transactions, + total_num_transactions, + bench_tps_cu_consumed, + total_cu_consumed, + }; + let _ = block_log_writer.serialize(block_data); + } + } + + fn flush(&mut self) { + if let Some(block_log_writer) = &mut self.log_writer { + let _ = block_log_writer.flush(); + } + } +} + #[derive(Clone, Serialize)] struct TransactionData { pub signature: String, @@ -317,30 +377,24 @@ struct TransactionData { pub compute_unit_price: u64, } -type CsvFileWriter = csv::Writer; -struct LogWriter { - block_log_writer: Option, - transaction_log_writer: Option, +struct TransactionLogWriter { + log_writer: Option, } -impl LogWriter { - fn new(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> Self { - let block_log_writer = block_data_file.map(|block_data_file| { - CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) - }); +impl TransactionLogWriter { + fn new(transaction_data_file: Option<&str>) -> Self { let transaction_log_writer = transaction_data_file.map(|transaction_data_file| { CsvFileWriter::from_writer( File::create(transaction_data_file).expect("File can be created."), ) }); Self { - block_log_writer, - transaction_log_writer, + log_writer: transaction_log_writer, } } #[allow(clippy::too_many_arguments)] - fn write_to_transaction_log( + fn write( &mut self, signature: &Signature, confirmed_slot: Option, @@ -352,7 +406,7 @@ impl LogWriter { compute_unit_price: Option, timed_out: bool, ) { - if let Some(transaction_log_writer) = &mut self.transaction_log_writer { + if let Some(transaction_log_writer) = &mut self.log_writer { let tx_data = TransactionData { signature: signature.to_string(), confirmed_slot, @@ -375,42 +429,8 @@ impl LogWriter { } } - #[allow(clippy::too_many_arguments)] - fn write_to_block_log( - &mut self, - blockhash: String, - slot_leader: String, - slot: Slot, - block_time: Option, - num_bench_tps_transactions: usize, - total_num_transactions: usize, - bench_tps_cu_consumed: u64, - total_cu_consumed: u64, - ) { - if let Some(block_log_writer) = &mut self.block_log_writer { - let block_data = BlockData { - block_hash: blockhash, - block_leader: slot_leader, - block_slot: slot, - block_time: block_time.map(|time| { - Utc.timestamp_opt(time, 0) - .latest() - .expect("valid timestamp") - }), - num_bench_tps_transactions, - total_num_transactions, - bench_tps_cu_consumed, - total_cu_consumed, - }; - let _ = block_log_writer.serialize(block_data); - } - } - fn flush(&mut self) { - if let Some(block_log_writer) = &mut self.block_log_writer { - let _ = block_log_writer.flush(); - } - if let Some(transaction_log_writer) = &mut self.transaction_log_writer { + if let Some(transaction_log_writer) = &mut self.log_writer { let _ = transaction_log_writer.flush(); } } From e5aa95f1452ef47c5ce8a31ff3ebb830fa2af029 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Wed, 6 Mar 2024 13:37:54 +0000 Subject: [PATCH 16/25] simplify code of LogWriters --- bench-tps/src/log_transaction_service.rs | 108 ++++++++++++----------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index 1e170a75eed3d1..fef411ada6c6cf 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -242,15 +242,15 @@ impl LogTransactionService { bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed); tx_log_writer.write( + Some(block.blockhash.clone()), + Some(slot_leader.clone()), signature, + sent_at, Some(slot), block.block_time, - sent_at, meta.as_ref(), - Some(block.blockhash.clone()), - Some(slot_leader.clone()), - compute_unit_price, false, + compute_unit_price, ); } } @@ -277,15 +277,15 @@ impl LogTransactionService { duration_since_past_time.num_seconds() < REMOVE_TIMEOUT_TX_EVERY_SEC; if !is_not_timeout_tx { tx_log_writer.write( - signature, None, None, + signature, tx_info.sent_at, None, None, None, - tx_info.compute_unit_price, true, + tx_info.compute_unit_price, ); } is_not_timeout_tx @@ -301,9 +301,9 @@ type CsvFileWriter = csv::Writer; #[derive(Clone, Serialize)] struct BlockData { - pub block_hash: String, + pub blockhash: String, pub block_slot: Slot, - pub block_leader: String, + pub slot_leader: String, pub block_time: Option>, pub total_num_transactions: usize, pub num_bench_tps_transactions: usize, @@ -337,23 +337,24 @@ impl BlockLogWriter { bench_tps_cu_consumed: u64, total_cu_consumed: u64, ) { - if let Some(block_log_writer) = &mut self.log_writer { - let block_data = BlockData { - block_hash: blockhash, - block_leader: slot_leader, - block_slot: slot, - block_time: block_time.map(|time| { - Utc.timestamp_opt(time, 0) - .latest() - .expect("valid timestamp") - }), - num_bench_tps_transactions, - total_num_transactions, - bench_tps_cu_consumed, - total_cu_consumed, - }; - let _ = block_log_writer.serialize(block_data); - } + let Some(block_log_writer) = &mut self.log_writer else { + return; + }; + let block_data = BlockData { + blockhash, + slot_leader, + block_slot: slot, + block_time: block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("valid timestamp") + }), + num_bench_tps_transactions, + total_num_transactions, + bench_tps_cu_consumed, + total_cu_consumed, + }; + let _ = block_log_writer.serialize(block_data); } fn flush(&mut self) { @@ -365,14 +366,14 @@ impl BlockLogWriter { #[derive(Clone, Serialize)] struct TransactionData { + pub blockhash: Option, + pub slot_leader: Option, pub signature: String, pub sent_at: Option>, pub confirmed_slot: Option, - pub confirmed_at: Option>, + pub block_time: Option>, pub successful: bool, - pub slot_leader: Option, pub error: Option, - pub blockhash: Option, pub timed_out: bool, pub compute_unit_price: u64, } @@ -396,37 +397,38 @@ impl TransactionLogWriter { #[allow(clippy::too_many_arguments)] fn write( &mut self, + blockhash: Option, + slot_leader: Option, signature: &Signature, + sent_at: DateTime, confirmed_slot: Option, block_time: Option, - sent_at: DateTime, meta: Option<&UiTransactionStatusMeta>, - blockhash: Option, - slot_leader: Option, - compute_unit_price: Option, timed_out: bool, + compute_unit_price: Option, ) { - if let Some(transaction_log_writer) = &mut self.log_writer { - let tx_data = TransactionData { - signature: signature.to_string(), - confirmed_slot, - confirmed_at: block_time.map(|time| { - Utc.timestamp_opt(time, 0) - .latest() - .expect("valid timestamp") - }), - sent_at: Some(sent_at), - successful: meta.as_ref().map_or(false, |m| m.status.is_ok()), - error: meta - .as_ref() - .and_then(|m| m.err.as_ref().map(|x| x.to_string())), - blockhash, - slot_leader, - timed_out, - compute_unit_price: compute_unit_price.unwrap_or(0), - }; - let _ = transaction_log_writer.serialize(tx_data); - } + let Some(transaction_log_writer) = &mut self.log_writer else { + return; + }; + let tx_data = TransactionData { + blockhash, + slot_leader, + signature: signature.to_string(), + sent_at: Some(sent_at), + confirmed_slot, + block_time: block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("valid timestamp") + }), + successful: meta.as_ref().map_or(false, |m| m.status.is_ok()), + error: meta + .as_ref() + .and_then(|m| m.err.as_ref().map(|x| x.to_string())), + timed_out, + compute_unit_price: compute_unit_price.unwrap_or(0), + }; + let _ = transaction_log_writer.serialize(tx_data); } fn flush(&mut self) { From 1606b33f17ec713debe005a044509006ccaa6370 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Wed, 6 Mar 2024 14:34:20 +0000 Subject: [PATCH 17/25] extract process_blocks --- bench-tps/src/log_transaction_service.rs | 87 ++++++++++++++---------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index fef411ada6c6cf..5f5a057e020694 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -115,19 +115,9 @@ impl LogTransactionService { ) where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - let commitment: CommitmentConfig = CommitmentConfig { - commitment: CommitmentLevel::Confirmed, - }; - let rpc_block_config = RpcBlockConfig { - encoding: Some(UiTransactionEncoding::Base64), - transaction_details: Some(TransactionDetails::Full), - rewards: Some(true), - commitment: Some(commitment), - max_supported_transaction_version: Some(0), - }; let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); - let mut start_block = get_slot_with_retry(&client) + let start_block = get_slot_with_retry(&client) .expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC."); let mut signature_to_tx_info = MapSignatureToTxInfo::new(); @@ -154,9 +144,12 @@ impl LogTransactionService { } }, recv(block_processing_timer_receiver) -> _ => { - let mut measure_process_blocks = Measure::start("measure_process_blocks"); info!("sign_receiver queue len: {}", signature_receiver.len()); + let mut measure_get_blocks = Measure::start("measure_get_blocks"); let block_slots = get_blocks_with_retry(&client, start_block); + measure_get_blocks.stop(); + let time_get_blocks_us = measure_get_blocks.as_us(); + info!("Time to get_blocks : {time_get_blocks_us}us."); let Ok(block_slots) = block_slots else { error!("Failed to get blocks, stop LogWriterService."); break; @@ -164,31 +157,14 @@ impl LogTransactionService { if block_slots.is_empty() { continue; } - start_block = *block_slots.last().unwrap() + 1; - let blocks = block_slots.iter().map(|slot| { - client.get_block_with_config( - *slot, - rpc_block_config - ) - }); - let num_blocks = blocks.len(); - for (block, slot) in blocks.zip(&block_slots) { - let Ok(block) = block else { - continue; - }; - Self::process_block( - block, - &mut signature_to_tx_info, - *slot, - &mut tx_log_writer, - &mut block_log_writer - ) - } + Self::process_blocks( + &client, + block_slots, + &mut signature_to_tx_info, + &mut tx_log_writer, + &mut block_log_writer); Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info); - measure_process_blocks.stop(); - let time_send_us = measure_process_blocks.as_us(); - info!("Time to process {num_blocks} blocks: {time_send_us}"); tx_log_writer.flush(); block_log_writer.flush(); }, @@ -196,6 +172,47 @@ impl LogTransactionService { } } + fn process_blocks( + client: &Arc, + block_slots: Vec, + signature_to_tx_info: &mut MapSignatureToTxInfo, + tx_log_writer: &mut TransactionLogWriter, + block_log_writer: &mut BlockLogWriter, + ) where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, + { + let commitment: CommitmentConfig = CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }; + let rpc_block_config = RpcBlockConfig { + encoding: Some(UiTransactionEncoding::Base64), + transaction_details: Some(TransactionDetails::Full), + rewards: Some(true), + commitment: Some(commitment), + max_supported_transaction_version: Some(0), + }; + let mut measure_process_blocks = Measure::start("measure_process_blocks"); + let blocks = block_slots + .iter() + .map(|slot| client.get_block_with_config(*slot, rpc_block_config)); + let num_blocks = blocks.len(); + for (block, slot) in blocks.zip(&block_slots) { + let Ok(block) = block else { + continue; + }; + Self::process_block( + block, + signature_to_tx_info, + *slot, + tx_log_writer, + block_log_writer, + ) + } + measure_process_blocks.stop(); + let time_process_blocks_us = measure_process_blocks.as_us(); + info!("Time to process {num_blocks} blocks: {time_process_blocks_us}us."); + } + fn process_block( block: UiConfirmedBlock, signature_to_tx_info: &mut MapSignatureToTxInfo, From 9a43ceb54ee4a3e4ceb04aed08cdf896d8e47471 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Thu, 7 Mar 2024 16:37:25 +0000 Subject: [PATCH 18/25] specify commitment in LogTransactionService --- bench-tps/src/log_transaction_service.rs | 30 ++++++++++++++++-------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index 5f5a057e020694..3f03161420ad35 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -115,9 +115,12 @@ impl LogTransactionService { ) where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { + let commitment: CommitmentConfig = CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }; let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); - let start_block = get_slot_with_retry(&client) + let start_block = get_slot_with_retry(&client, commitment) .expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC."); let mut signature_to_tx_info = MapSignatureToTxInfo::new(); @@ -146,7 +149,7 @@ impl LogTransactionService { recv(block_processing_timer_receiver) -> _ => { info!("sign_receiver queue len: {}", signature_receiver.len()); let mut measure_get_blocks = Measure::start("measure_get_blocks"); - let block_slots = get_blocks_with_retry(&client, start_block); + let block_slots = get_blocks_with_retry(&client, start_block, commitment); measure_get_blocks.stop(); let time_get_blocks_us = measure_get_blocks.as_us(); info!("Time to get_blocks : {time_get_blocks_us}us."); @@ -162,7 +165,9 @@ impl LogTransactionService { block_slots, &mut signature_to_tx_info, &mut tx_log_writer, - &mut block_log_writer); + &mut block_log_writer, + commitment, + ); Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info); tx_log_writer.flush(); @@ -178,12 +183,10 @@ impl LogTransactionService { signature_to_tx_info: &mut MapSignatureToTxInfo, tx_log_writer: &mut TransactionLogWriter, block_log_writer: &mut BlockLogWriter, + commitment: CommitmentConfig, ) where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - let commitment: CommitmentConfig = CommitmentConfig { - commitment: CommitmentLevel::Confirmed, - }; let rpc_block_config = RpcBlockConfig { encoding: Some(UiTransactionEncoding::Base64), transaction_details: Some(TransactionDetails::Full), @@ -480,19 +483,26 @@ where } } -fn get_slot_with_retry(client: &Arc) -> Result +fn get_slot_with_retry(client: &Arc, commitment: CommitmentConfig) -> Result where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - call_rpc_with_retry(|| client.get_slot(), "Failed to get slot") + call_rpc_with_retry( + || client.get_slot_with_commitment(commitment), + "Failed to get slot", + ) } -fn get_blocks_with_retry(client: &Arc, start_block: u64) -> Result> +fn get_blocks_with_retry( + client: &Arc, + start_block: u64, + commitment: CommitmentConfig, +) -> Result> where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { call_rpc_with_retry( - || client.get_blocks(start_block, None), + || client.get_blocks_with_commitment(start_block, None, commitment), "Failed to download blocks", ) } From 2d7699a97489450b704047e43a23ce19e5e4f2b7 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Thu, 14 Mar 2024 13:50:45 +0000 Subject: [PATCH 19/25] break thread loop if receiver happens to be dropped --- bench-tps/src/bench.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 58930b21224ac9..76a1f686bb48d1 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -951,7 +951,7 @@ fn do_tx_transfers( signatures_sender: Option, ) { let mut last_sent_time = timestamp(); - loop { + 'thread_loop: loop { if thread_batch_sleep_ms > 0 { sleep(Duration::from_millis(thread_batch_sleep_ms as u64)); } @@ -995,16 +995,13 @@ fn do_tx_transfers( } if let Some(signatures_sender) = &signatures_sender { - if signatures_sender - .send(TransactionInfoBatch { - signatures, - sent_at: Utc::now(), - compute_unit_prices, - }) - .is_err() - { - error!("Receiver has been dropped, stop sending transactions."); - break; + if let Err(error) = signatures_sender.send(TransactionInfoBatch { + signatures, + sent_at: Utc::now(), + compute_unit_prices, + }) { + error!("Receiver has been dropped with error `{error}`, stop sending transactions."); + break 'thread_loop; } } From 03b33158b8e8f15f2e5df1064fd03c0e81636379 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 19 Mar 2024 15:57:44 +0000 Subject: [PATCH 20/25] update start_slot when processing blocks --- bench-tps/src/log_transaction_service.rs | 48 +++++++++++++++--------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index 3f03161420ad35..dec9353fc038a5 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -61,10 +61,18 @@ where } } +// How many blocks to process during one iteration. +// The time to process blocks is dominated by get_block calls. +// Each call takes slightly less time than slot. +const NUM_SLOTS_PER_ITERATION: u64 = 16; // How often process blocks. -const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT; +const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SLOT; +// Empirically calculated constant added to MAX_PROCESSING_AGE to avoid cleaning some transactions +// that still might be added to the block. +const AGE_EPSILON: usize = 50; // Max age for transaction in the transaction map. -const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64; +const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = + ((MAX_PROCESSING_AGE + AGE_EPSILON) as f64 * DEFAULT_S_PER_SLOT) as i64; // Map used to filter submitted transactions. #[derive(Clone)] @@ -120,9 +128,10 @@ impl LogTransactionService { }; let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); - let start_block = get_slot_with_retry(&client, commitment) + let mut start_slot = get_slot_with_retry(&client, commitment) .expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC."); + let mut sender_stopped = false; let mut signature_to_tx_info = MapSignatureToTxInfo::new(); loop { select! { @@ -138,18 +147,18 @@ impl LogTransactionService { compute_unit_price });}); } - Err(e) => { - info!("Stop LogTransactionService, message received: {e}"); - tx_log_writer.flush(); - block_log_writer.flush(); - break; + Err(_) => { + sender_stopped = true; } } }, recv(block_processing_timer_receiver) -> _ => { info!("sign_receiver queue len: {}", signature_receiver.len()); + if signature_receiver.len() != 0 { + continue; + } let mut measure_get_blocks = Measure::start("measure_get_blocks"); - let block_slots = get_blocks_with_retry(&client, start_block, commitment); + let block_slots = get_blocks_with_retry(&client, start_slot, Some(start_slot + NUM_SLOTS_PER_ITERATION - 1), commitment); measure_get_blocks.stop(); let time_get_blocks_us = measure_get_blocks.as_us(); info!("Time to get_blocks : {time_get_blocks_us}us."); @@ -170,8 +179,13 @@ impl LogTransactionService { ); Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info); + start_slot = start_slot.saturating_add(NUM_SLOTS_PER_ITERATION); tx_log_writer.flush(); block_log_writer.flush(); + if sender_stopped && signature_to_tx_info.len() == 0 { + info!("Stop LogTransactionService"); + break; + } }, } } @@ -292,10 +306,9 @@ impl LogTransactionService { ) { let now: DateTime = Utc::now(); signature_to_tx_info.retain(|signature, tx_info| { - let duration_since_past_time = now.signed_duration_since(tx_info.sent_at); - let is_not_timeout_tx = - duration_since_past_time.num_seconds() < REMOVE_TIMEOUT_TX_EVERY_SEC; - if !is_not_timeout_tx { + let duration_since_sent = now.signed_duration_since(tx_info.sent_at); + let is_timeout_tx = duration_since_sent.num_seconds() > REMOVE_TIMEOUT_TX_EVERY_SEC; + if is_timeout_tx { tx_log_writer.write( None, None, @@ -308,7 +321,7 @@ impl LogTransactionService { tx_info.compute_unit_price, ); } - is_not_timeout_tx + !is_timeout_tx }); } } @@ -367,7 +380,7 @@ impl BlockLogWriter { block_time: block_time.map(|time| { Utc.timestamp_opt(time, 0) .latest() - .expect("valid timestamp") + .expect("timestamp should be valid") }), num_bench_tps_transactions, total_num_transactions, @@ -495,14 +508,15 @@ where fn get_blocks_with_retry( client: &Arc, - start_block: u64, + start_slot: Slot, + end_slot: Option, commitment: CommitmentConfig, ) -> Result> where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { call_rpc_with_retry( - || client.get_blocks_with_commitment(start_block, None, commitment), + || client.get_blocks_with_commitment(start_slot, end_slot, commitment), "Failed to download blocks", ) } From 304fcf2eca569c6d4e231211ae1acfb081c96fe8 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 22 Mar 2024 17:19:12 +0000 Subject: [PATCH 21/25] address pr comments --- bench-tps/src/bench.rs | 20 +++--- bench-tps/src/lib.rs | 1 + bench-tps/src/log_transaction_service.rs | 83 +++++++----------------- bench-tps/src/rpc_with_retry_utils.rs | 61 +++++++++++++++++ 4 files changed, 94 insertions(+), 71 deletions(-) create mode 100644 bench-tps/src/rpc_with_retry_utils.rs diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 76a1f686bb48d1..06e054cd7f6f72 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -959,17 +959,17 @@ fn do_tx_transfers( let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers"); shared_txs_wl.pop_front() }; - if let Some(txs0) = txs { + if let Some(txs) = txs { shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); - info!("Transferring 1 unit {} times...", txs0.len()); - let tx_len = txs0.len(); + let num_txs = txs.len(); + info!("Transferring 1 unit {} times...", num_txs); let transfer_start = Instant::now(); let mut old_transactions = false; let mut min_timestamp = u64::MAX; - let mut transactions = Vec::<_>::with_capacity(txs0.len()); - let mut signatures = Vec::<_>::with_capacity(txs0.len()); - let mut compute_unit_prices = Vec::<_>::with_capacity(txs0.len()); - for tx in txs0 { + let mut transactions = Vec::<_>::with_capacity(num_txs); + let mut signatures = Vec::<_>::with_capacity(num_txs); + let mut compute_unit_prices = Vec::<_>::with_capacity(num_txs); + for tx in txs { let now = timestamp(); // Transactions without durable nonce that are too old will be rejected by the cluster Don't bother // sending them. @@ -1025,16 +1025,16 @@ fn do_tx_transfers( shared_txs_wl.clear(); } shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); - total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed); + total_tx_sent_count.fetch_add(num_txs, Ordering::Relaxed); info!( "Tx send done. {} ms {} tps", duration_as_ms(&transfer_start.elapsed()), - tx_len as f32 / duration_as_s(&transfer_start.elapsed()), + num_txs as f32 / duration_as_s(&transfer_start.elapsed()), ); datapoint_info!( "bench-tps-do_tx_transfers", ("duration", duration_as_us(&transfer_start.elapsed()), i64), - ("count", tx_len, i64) + ("count", num_txs, i64) ); } if exit_signal.load(Ordering::Relaxed) { diff --git a/bench-tps/src/lib.rs b/bench-tps/src/lib.rs index 62d185431cc318..6f55a4122e4c0b 100644 --- a/bench-tps/src/lib.rs +++ b/bench-tps/src/lib.rs @@ -5,4 +5,5 @@ pub mod cli; pub mod keypairs; mod log_transaction_service; mod perf_utils; +mod rpc_with_retry_utils; pub mod send_batch; diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index dec9353fc038a5..da9f344255c4c3 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -2,7 +2,10 @@ //! and saves log files in csv format. use { - crate::bench_tps_client::{BenchTpsClient, Result}, + crate::{ + bench_tps_client::BenchTpsClient, + rpc_with_retry_utils::{get_blocks_with_retry, get_slot_with_retry}, + }, chrono::{DateTime, TimeZone, Utc}, crossbeam_channel::{select, tick, unbounded, Receiver, Sender}, log::*, @@ -23,7 +26,7 @@ use { collections::HashMap, fs::File, sync::Arc, - thread::{self, sleep, Builder, JoinHandle}, + thread::{self, Builder, JoinHandle}, time::Duration, }, }; @@ -51,7 +54,7 @@ pub(crate) fn create_log_transactions_service_and_sender( where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - if verify_data_files(block_data_file, transaction_data_file) { + if data_file_provided(block_data_file, transaction_data_file) { let (sender, receiver) = unbounded(); let log_tx_service = LogTransactionService::new(client, receiver, block_data_file, transaction_data_file); @@ -70,7 +73,7 @@ const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SL // Empirically calculated constant added to MAX_PROCESSING_AGE to avoid cleaning some transactions // that still might be added to the block. const AGE_EPSILON: usize = 50; -// Max age for transaction in the transaction map. +// Max age for transaction in the transaction map, older transactions are cleaned up and marked as timeout. const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = ((MAX_PROCESSING_AGE + AGE_EPSILON) as f64 * DEFAULT_S_PER_SLOT) as i64; @@ -94,7 +97,7 @@ impl LogTransactionService { where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { - if !verify_data_files(block_data_file, transaction_data_file) { + if !data_file_provided(block_data_file, transaction_data_file) { panic!("Expect block-data-file or transaction-data-file is specified, must have been verified by callee."); } @@ -107,7 +110,7 @@ impl LogTransactionService { .spawn(move || { Self::run(client, signature_receiver, tx_log_writer, block_log_writer); }) - .expect("LogTransactionService is up."); + .expect("LogTransactionService should have started successfully."); Self { thread_handler } } @@ -123,13 +126,14 @@ impl LogTransactionService { ) where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { + // used to request blocks data and only confirmed makes sense in this context. let commitment: CommitmentConfig = CommitmentConfig { commitment: CommitmentLevel::Confirmed, }; let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS)); let mut start_slot = get_slot_with_retry(&client, commitment) - .expect("get_slot_with_retry succeed, cannot proceed without having slot. Must be a problem with RPC."); + .expect("get_slot_with_retry should have succeed, cannot proceed without having slot. Must be a problem with RPC."); let mut sender_stopped = false; let mut signature_to_tx_info = MapSignatureToTxInfo::new(); @@ -237,7 +241,10 @@ impl LogTransactionService { tx_log_writer: &mut TransactionLogWriter, block_log_writer: &mut BlockLogWriter, ) { - let rewards = block.rewards.as_ref().expect("Rewards are present."); + let rewards = block + .rewards + .as_ref() + .expect("Rewards should be part of the block information."); let slot_leader = rewards .iter() .find(|r| r.reward_type == Some(RewardType::Fee)) @@ -326,7 +333,7 @@ impl LogTransactionService { } } -fn verify_data_files(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> bool { +fn data_file_provided(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> bool { block_data_file.is_some() || transaction_data_file.is_some() } @@ -351,7 +358,10 @@ struct BlockLogWriter { impl BlockLogWriter { fn new(block_data_file: Option<&str>) -> Self { let block_log_writer = block_data_file.map(|block_data_file| { - CsvFileWriter::from_writer(File::create(block_data_file).expect("File can be created.")) + CsvFileWriter::from_writer( + File::create(block_data_file) + .expect("Application should be able to create a file."), + ) }); Self { log_writer: block_log_writer, @@ -419,7 +429,8 @@ impl TransactionLogWriter { fn new(transaction_data_file: Option<&str>) -> Self { let transaction_log_writer = transaction_data_file.map(|transaction_data_file| { CsvFileWriter::from_writer( - File::create(transaction_data_file).expect("File can be created."), + File::create(transaction_data_file) + .expect("Application should be able to create a file."), ) }); Self { @@ -470,53 +481,3 @@ impl TransactionLogWriter { } } } - -const NUM_RETRY: u64 = 5; -const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT; - -fn call_rpc_with_retry(f: Func, retry_warning: &str) -> Result -where - Func: Fn() -> Result, -{ - let mut iretry = 0; - loop { - match f() { - Ok(slot) => { - return Ok(slot); - } - Err(error) => { - if iretry == NUM_RETRY { - return Err(error); - } - warn!("{retry_warning}: {error}, retry."); - sleep(Duration::from_millis(RETRY_EVERY_MS)); - } - } - iretry += 1; - } -} - -fn get_slot_with_retry(client: &Arc, commitment: CommitmentConfig) -> Result -where - Client: 'static + BenchTpsClient + Send + Sync + ?Sized, -{ - call_rpc_with_retry( - || client.get_slot_with_commitment(commitment), - "Failed to get slot", - ) -} - -fn get_blocks_with_retry( - client: &Arc, - start_slot: Slot, - end_slot: Option, - commitment: CommitmentConfig, -) -> Result> -where - Client: 'static + BenchTpsClient + Send + Sync + ?Sized, -{ - call_rpc_with_retry( - || client.get_blocks_with_commitment(start_slot, end_slot, commitment), - "Failed to download blocks", - ) -} diff --git a/bench-tps/src/rpc_with_retry_utils.rs b/bench-tps/src/rpc_with_retry_utils.rs new file mode 100644 index 00000000000000..57af3923f0aeda --- /dev/null +++ b/bench-tps/src/rpc_with_retry_utils.rs @@ -0,0 +1,61 @@ +use { + crate::bench_tps_client::{BenchTpsClient, Result}, + log::*, + solana_sdk::{ + clock::DEFAULT_MS_PER_SLOT, commitment_config::CommitmentConfig, slot_history::Slot, + }, + std::{sync::Arc, thread::sleep, time::Duration}, +}; + +const NUM_RETRY: u64 = 5; +const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT; + +fn call_rpc_with_retry(f: Func, retry_warning: &str) -> Result +where + Func: Fn() -> Result, +{ + let mut iretry = 0; + loop { + match f() { + Ok(slot) => { + return Ok(slot); + } + Err(error) => { + if iretry == NUM_RETRY { + return Err(error); + } + warn!("{retry_warning}: {error}, retry."); + sleep(Duration::from_millis(RETRY_EVERY_MS)); + } + } + iretry += 1; + } +} + +pub(crate) fn get_slot_with_retry( + client: &Arc, + commitment: CommitmentConfig, +) -> Result +where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + call_rpc_with_retry( + || client.get_slot_with_commitment(commitment), + "Failed to get slot", + ) +} + +pub(crate) fn get_blocks_with_retry( + client: &Arc, + start_slot: Slot, + end_slot: Option, + commitment: CommitmentConfig, +) -> Result> +where + Client: 'static + BenchTpsClient + Send + Sync + ?Sized, +{ + call_rpc_with_retry( + || client.get_blocks_with_commitment(start_slot, end_slot, commitment), + "Failed to download blocks", + ) +} From 4374f5a6d29482d7897c9dc7f44399928a2399ca Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Sun, 24 Mar 2024 13:52:48 +0000 Subject: [PATCH 22/25] fix clippy error --- bench-tps/src/log_transaction_service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index da9f344255c4c3..a3005396ddb1a2 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -158,7 +158,7 @@ impl LogTransactionService { }, recv(block_processing_timer_receiver) -> _ => { info!("sign_receiver queue len: {}", signature_receiver.len()); - if signature_receiver.len() != 0 { + if !signature_receiver.is_empty() { continue; } let mut measure_get_blocks = Measure::start("measure_get_blocks"); @@ -186,7 +186,7 @@ impl LogTransactionService { start_slot = start_slot.saturating_add(NUM_SLOTS_PER_ITERATION); tx_log_writer.flush(); block_log_writer.flush(); - if sender_stopped && signature_to_tx_info.len() == 0 { + if sender_stopped && signature_to_tx_info.is_empty() { info!("Stop LogTransactionService"); break; } From 57db34954d7868714afb789cc961a9abd3732510 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Sun, 24 Mar 2024 14:16:30 +0000 Subject: [PATCH 23/25] minor changes --- bench-tps/src/bench.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 06e054cd7f6f72..f01745e6ce8c9e 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -535,7 +535,7 @@ where } if let Some(log_transaction_service) = log_transaction_service { - info!("Waiting for confirmation thread..."); + info!("Waiting for log_transaction_service thread..."); if let Err(err) = log_transaction_service.join() { info!(" join() failed with: {:?}", err); } @@ -1038,7 +1038,6 @@ fn do_tx_transfers( ); } if exit_signal.load(Ordering::Relaxed) { - drop(signatures_sender); break; } } From 54adc0cebf8e8ddbf4829fd3cd32e8765658eeb9 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Sun, 24 Mar 2024 19:28:26 +0000 Subject: [PATCH 24/25] fix ms problem --- bench-tps/src/log_transaction_service.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index a3005396ddb1a2..9af285a6a46408 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -13,7 +13,7 @@ use { solana_client::rpc_config::RpcBlockConfig, solana_measure::measure::Measure, solana_sdk::{ - clock::{DEFAULT_MS_PER_SLOT, DEFAULT_S_PER_SLOT, MAX_PROCESSING_AGE}, + clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE}, commitment_config::{CommitmentConfig, CommitmentLevel}, signature::Signature, slot_history::Slot, @@ -74,8 +74,8 @@ const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SL // that still might be added to the block. const AGE_EPSILON: usize = 50; // Max age for transaction in the transaction map, older transactions are cleaned up and marked as timeout. -const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = - ((MAX_PROCESSING_AGE + AGE_EPSILON) as f64 * DEFAULT_S_PER_SLOT) as i64; +const REMOVE_TIMEOUT_TX_EVERY_MS: i64 = + (MAX_PROCESSING_AGE + AGE_EPSILON) as i64 * (DEFAULT_MS_PER_SLOT as i64); // Map used to filter submitted transactions. #[derive(Clone)] @@ -314,7 +314,7 @@ impl LogTransactionService { let now: DateTime = Utc::now(); signature_to_tx_info.retain(|signature, tx_info| { let duration_since_sent = now.signed_duration_since(tx_info.sent_at); - let is_timeout_tx = duration_since_sent.num_seconds() > REMOVE_TIMEOUT_TX_EVERY_SEC; + let is_timeout_tx = duration_since_sent.num_milliseconds() > REMOVE_TIMEOUT_TX_EVERY_MS; if is_timeout_tx { tx_log_writer.write( None, From 7cd0d6795d219a859106ac5610aa5c431e3b40d4 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 25 Mar 2024 13:17:15 +0000 Subject: [PATCH 25/25] fix bug with time in clear transaction map --- bench-tps/src/log_transaction_service.rs | 43 +++++++++++++++--------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/bench-tps/src/log_transaction_service.rs b/bench-tps/src/log_transaction_service.rs index 9af285a6a46408..6363ff59914c83 100644 --- a/bench-tps/src/log_transaction_service.rs +++ b/bench-tps/src/log_transaction_service.rs @@ -70,12 +70,8 @@ where const NUM_SLOTS_PER_ITERATION: u64 = 16; // How often process blocks. const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SLOT; -// Empirically calculated constant added to MAX_PROCESSING_AGE to avoid cleaning some transactions -// that still might be added to the block. -const AGE_EPSILON: usize = 50; // Max age for transaction in the transaction map, older transactions are cleaned up and marked as timeout. -const REMOVE_TIMEOUT_TX_EVERY_MS: i64 = - (MAX_PROCESSING_AGE + AGE_EPSILON) as i64 * (DEFAULT_MS_PER_SLOT as i64); +const REMOVE_TIMEOUT_TX_EVERY_MS: i64 = MAX_PROCESSING_AGE as i64 * DEFAULT_MS_PER_SLOT as i64; // Map used to filter submitted transactions. #[derive(Clone)] @@ -173,7 +169,7 @@ impl LogTransactionService { if block_slots.is_empty() { continue; } - Self::process_blocks( + let last_block_time = Self::process_blocks( &client, block_slots, &mut signature_to_tx_info, @@ -181,7 +177,7 @@ impl LogTransactionService { &mut block_log_writer, commitment, ); - Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info); + Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info, last_block_time); start_slot = start_slot.saturating_add(NUM_SLOTS_PER_ITERATION); tx_log_writer.flush(); @@ -195,6 +191,8 @@ impl LogTransactionService { } } + /// Download and process the blocks. + /// Returns the time when the last processed block has been confirmed or now(). fn process_blocks( client: &Arc, block_slots: Vec, @@ -202,7 +200,8 @@ impl LogTransactionService { tx_log_writer: &mut TransactionLogWriter, block_log_writer: &mut BlockLogWriter, commitment: CommitmentConfig, - ) where + ) -> DateTime + where Client: 'static + BenchTpsClient + Send + Sync + ?Sized, { let rpc_block_config = RpcBlockConfig { @@ -217,21 +216,27 @@ impl LogTransactionService { .iter() .map(|slot| client.get_block_with_config(*slot, rpc_block_config)); let num_blocks = blocks.len(); + let mut last_block_time = None; for (block, slot) in blocks.zip(&block_slots) { let Ok(block) = block else { continue; }; - Self::process_block( + let block_time = Self::process_block( block, signature_to_tx_info, *slot, tx_log_writer, block_log_writer, - ) + ); + // if last_time is some, it means that the there is at least one valid block + if block_time.is_some() { + last_block_time = block_time; + } } measure_process_blocks.stop(); let time_process_blocks_us = measure_process_blocks.as_us(); info!("Time to process {num_blocks} blocks: {time_process_blocks_us}us."); + last_block_time.unwrap_or_else(Utc::now) } fn process_block( @@ -240,7 +245,7 @@ impl LogTransactionService { slot: u64, tx_log_writer: &mut TransactionLogWriter, block_log_writer: &mut BlockLogWriter, - ) { + ) -> Option> { let rewards = block .rewards .as_ref() @@ -252,7 +257,7 @@ impl LogTransactionService { let Some(transactions) = &block.transactions else { warn!("Empty block: {slot}"); - return; + return None; }; let mut num_bench_tps_transactions: usize = 0; @@ -304,16 +309,24 @@ impl LogTransactionService { transactions.len(), bench_tps_cu_consumed, total_cu_consumed, - ) + ); + + block.block_time.map(|time| { + Utc.timestamp_opt(time, 0) + .latest() + .expect("valid timestamp") + }) } + /// Remove from map all the signatures which we haven't processed before and they are + /// older than the the timestamp of the last processed block plus max blockhash age. fn clean_transaction_map( tx_log_writer: &mut TransactionLogWriter, signature_to_tx_info: &mut MapSignatureToTxInfo, + last_block_time: DateTime, ) { - let now: DateTime = Utc::now(); signature_to_tx_info.retain(|signature, tx_info| { - let duration_since_sent = now.signed_duration_since(tx_info.sent_at); + let duration_since_sent = last_block_time.signed_duration_since(tx_info.sent_at); let is_timeout_tx = duration_since_sent.num_milliseconds() > REMOVE_TIMEOUT_TX_EVERY_MS; if is_timeout_tx { tx_log_writer.write(