diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index b3a97589bbc024..43c9793ea4db67 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -128,8 +128,61 @@ impl Default for Config { } } +/// The maximum duration the retry thread may be configured to sleep before +/// processing the transactions that need to be retried. +pub const MAX_RETRY_SLEEP_MS: u64 = 1000; + +/// The leader info refresh rate. +pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; + +/// A struct responsible for holding up-to-date leader information +/// used for sending transactions. +pub struct CurrentLeaderInfo +where + T: TpuInfo + std::marker::Send + 'static, +{ + /// The last time the leader info was refreshed + last_leader_refresh: Option, + + /// The leader info + leader_info: Option, + + /// How often to refresh the leader info + refresh_rate: Duration, +} + +impl CurrentLeaderInfo +where + T: TpuInfo + std::marker::Send + 'static, +{ + /// Get the leader info, refresh if expired + pub fn get_leader_info(&mut self) -> Option<&T> { + if let Some(leader_info) = self.leader_info.as_mut() { + let now = Instant::now(); + let need_refresh = self + .last_leader_refresh + .map(|last| now.duration_since(last) >= self.refresh_rate) + .unwrap_or(true); + + if need_refresh { + leader_info.refresh_recent_peers(); + self.last_leader_refresh = Some(now); + } + } + self.leader_info.as_ref() + } + + pub fn new(leader_info: Option) -> Self { + Self { + last_leader_refresh: None, + leader_info, + refresh_rate: Duration::from_millis(LEADER_INFO_REFRESH_RATE_MS), + } + } +} + impl SendTransactionService { - pub fn new( + pub fn new( tpu_address: SocketAddr, bank_forks: &Arc>, leader_info: Option, @@ -147,7 +200,7 @@ impl SendTransactionService { Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config) } - pub fn new_with_config( + pub fn new_with_config( tpu_address: SocketAddr, bank_forks: &Arc>, leader_info: Option, @@ -155,11 +208,14 @@ impl SendTransactionService { config: Config, ) -> Self { let retry_transactions = Arc::new(Mutex::new(HashMap::new())); + + let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info))); + let exit = Arc::new(AtomicBool::new(false)); let receive_txn_thread = Self::receive_txn_thread( tpu_address, receiver, - leader_info.clone(), + leader_info_provider.clone(), config.clone(), retry_transactions.clone(), exit.clone(), @@ -168,7 +224,7 @@ impl SendTransactionService { let retry_thread = Self::retry_thread( tpu_address, bank_forks.clone(), - leader_info, + leader_info_provider, config, retry_transactions, exit.clone(), @@ -184,28 +240,24 @@ impl SendTransactionService { fn receive_txn_thread( tpu_address: SocketAddr, receiver: Receiver, - mut leader_info: Option, + leader_info_provider: Arc>>, config: Config, retry_transactions: Arc>>, exit: Arc, ) -> JoinHandle<()> { let mut last_batch_sent = Instant::now(); - let mut last_leader_refresh = Instant::now(); let mut transactions = HashMap::new(); info!( "Starting send-transaction-service::receive_txn_thread with config {:?}", config ); - if let Some(leader_info) = leader_info.as_mut() { - leader_info.refresh_recent_peers(); - } connection_cache::set_use_quic(config.use_quic); Builder::new() .name("send-tx-receive".to_string()) .spawn(move || loop { let recv_timeout_ms = config.batch_send_rate_ms; - match receiver.recv_timeout(Duration::from_millis(1000.min(recv_timeout_ms))) { + match receiver.recv_timeout(Duration::from_millis(recv_timeout_ms)) { Err(RecvTimeoutError::Disconnected) => { info!("Terminating send-transaction-service."); exit.store(true, Ordering::Relaxed); @@ -243,7 +295,7 @@ impl SendTransactionService { let _result = Self::send_transactions_in_batch( &tpu_address, &mut transactions, - &leader_info, + leader_info_provider.lock().unwrap().get_leader_info(), &config, ); let last_sent_time = Instant::now(); @@ -266,12 +318,6 @@ impl SendTransactionService { } last_batch_sent = Instant::now(); - if last_leader_refresh.elapsed().as_millis() > 1000 { - if let Some(leader_info) = leader_info.as_mut() { - leader_info.refresh_recent_peers(); - } - last_leader_refresh = Instant::now(); - } } }) .unwrap() @@ -281,26 +327,23 @@ impl SendTransactionService { fn retry_thread( tpu_address: SocketAddr, bank_forks: Arc>, - mut leader_info: Option, + leader_info_provider: Arc>>, config: Config, retry_transactions: Arc>>, exit: Arc, ) -> JoinHandle<()> { - let mut last_leader_refresh = Instant::now(); - info!( "Starting send-transaction-service::retry_thread with config {:?}", config ); - if let Some(leader_info) = leader_info.as_mut() { - leader_info.refresh_recent_peers(); - } connection_cache::set_use_quic(config.use_quic); Builder::new() .name("send-tx-retry".to_string()) .spawn(move || loop { let retry_interval_ms = config.retry_rate_ms; - sleep(Duration::from_millis(1000.min(retry_interval_ms))); + sleep(Duration::from_millis( + MAX_RETRY_SLEEP_MS.min(retry_interval_ms), + )); if exit.load(Ordering::Relaxed) { break; } @@ -323,16 +366,10 @@ impl SendTransactionService { &root_bank, &tpu_address, &mut transactions, - &leader_info, + &leader_info_provider, &config, ); } - if last_leader_refresh.elapsed().as_millis() > 1000 { - if let Some(leader_info) = leader_info.as_mut() { - leader_info.refresh_recent_peers(); - } - last_leader_refresh = Instant::now(); - } }) .unwrap() } @@ -341,7 +378,7 @@ impl SendTransactionService { fn send_transactions_in_batch( tpu_address: &SocketAddr, transactions: &mut HashMap, - leader_info: &Option, + leader_info: Option<&T>, config: &Config, ) { let mut measure = Measure::start("send_transactions_in_batch-us"); @@ -367,12 +404,12 @@ impl SendTransactionService { } /// Retry transactions sent before. - fn process_transactions( + fn process_transactions( working_bank: &Arc, root_bank: &Arc, tpu_address: &SocketAddr, transactions: &mut HashMap, - leader_info: &Option, + leader_info_provider: &Arc>>, config: &Config, ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); @@ -469,17 +506,19 @@ impl SendTransactionService { if !batched_transactions.is_empty() { // Processing the transactions in batch - let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config); - let wire_transactions = transactions .iter() .filter(|(signature, _)| batched_transactions.contains(signature)) .map(|(_, transaction_info)| transaction_info.wire_transaction.as_ref()) .collect::>(); - for address in &addresses { - let iter = wire_transactions.chunks(config.batch_size); - for chunk in iter { + let iter = wire_transactions.chunks(config.batch_size); + for chunk in iter { + let mut leader_info_provider = leader_info_provider.lock().unwrap(); + let leader_info = leader_info_provider.get_leader_info(); + let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config); + + for address in &addresses { Self::send_transactions(address, chunk); } } @@ -532,7 +571,7 @@ impl SendTransactionService { fn get_tpu_addresses<'a, T: TpuInfo>( tpu_address: &'a SocketAddr, - leader_info: &'a Option, + leader_info: Option<&'a T>, config: &'a Config, ) -> Vec<&'a SocketAddr> { let addresses = leader_info @@ -630,6 +669,7 @@ mod test { let mut transactions = HashMap::new(); info!("Expired transactions are dropped..."); + let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); transactions.insert( Signature::default(), TransactionInfo::new( @@ -646,7 +686,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert!(transactions.is_empty()); @@ -675,7 +715,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert!(transactions.is_empty()); @@ -704,7 +744,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert!(transactions.is_empty()); @@ -733,7 +773,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert_eq!(transactions.len(), 1); @@ -764,7 +804,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert_eq!(transactions.len(), 1); @@ -805,7 +845,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert_eq!(transactions.len(), 1); @@ -822,7 +862,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert!(transactions.is_empty()); @@ -896,12 +936,13 @@ mod test { Some(Instant::now()), ), ); + let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert!(transactions.is_empty()); @@ -929,7 +970,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert!(transactions.is_empty()); @@ -959,7 +1000,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert!(transactions.is_empty()); @@ -987,7 +1028,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert!(transactions.is_empty()); @@ -1016,7 +1057,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert!(transactions.is_empty()); @@ -1045,7 +1086,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert_eq!(transactions.len(), 1); @@ -1076,7 +1117,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert_eq!(transactions.len(), 1); @@ -1104,7 +1145,7 @@ mod test { &root_bank, &tpu_address, &mut transactions, - &None, + &leader_info_provider, &config, ); assert_eq!(transactions.len(), 0);