diff --git a/bench-tps/src/bench_tps_client/tpu_client.rs b/bench-tps/src/bench_tps_client/tpu_client.rs index d9683381619d67..ace644d8b55feb 100644 --- a/bench-tps/src/bench_tps_client/tpu_client.rs +++ b/bench-tps/src/bench_tps_client/tpu_client.rs @@ -14,9 +14,7 @@ impl BenchTpsClient for TpuClient { Ok(signature) } fn send_batch(&self, transactions: Vec) -> Result<()> { - for transaction in transactions { - BenchTpsClient::send_transaction(self, transaction)?; - } + self.try_send_transaction_batch(&transactions)?; Ok(()) } fn get_latest_blockhash(&self) -> Result { diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs index 4dd39c7685407b..deb34e97114b56 100644 --- a/client/src/nonblocking/tpu_client.rs +++ b/client/src/nonblocking/tpu_client.rs @@ -77,6 +77,15 @@ async fn send_wire_transaction_to_addr( conn.send_wire_transaction(wire_transaction.clone()).await } +async fn send_wire_transaction_batch_to_addr( + connection_cache: &ConnectionCache, + addr: &SocketAddr, + wire_transactions: &[Vec], +) -> TransportResult<()> { + let conn = connection_cache.get_nonblocking_connection(addr); + conn.send_wire_transaction_batch(wire_transactions).await +} + impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size @@ -140,6 +149,50 @@ impl TpuClient { } } + /// Send a batch of wire transactions to the current and upcoming leader TPUs according to + /// fanout size + /// Returns the last error if all sends fail + pub async fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> TransportResult<()> { + let leaders = self + .leader_tpu_service + .leader_tpu_sockets(self.fanout_slots); + let futures = leaders + .iter() + .map(|addr| { + send_wire_transaction_batch_to_addr( + &self.connection_cache, + addr, + &wire_transactions, + ) + }) + .collect::>(); + let results: Vec> = join_all(futures).await; + + let mut last_error: Option = None; + let mut some_success = false; + for result in results { + if let Err(e) = result { + if last_error.is_none() { + last_error = Some(e); + } + } else { + some_success = true; + } + } + if !some_success { + Err(if let Some(err) = last_error { + err + } else { + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() + }) + } else { + Ok(()) + } + } + /// Create a new client that disconnects when dropped pub async fn new( rpc_client: Arc, diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 5e94098e2624b9..f47c29e609c5aa 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -11,6 +11,7 @@ use { }, bincode::serialize, log::*, + rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_sdk::{ clock::Slot, commitment_config::CommitmentConfig, @@ -111,6 +112,17 @@ impl TpuClient { self.try_send_wire_transaction(wire_transaction) } + /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according + /// to fanout size + /// Returns the last error if all sends fail + pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { + let wire_transactions = transactions + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + self.try_send_wire_transaction_batch(wire_transactions) + } + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Returns the last error if all sends fail fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { @@ -140,6 +152,39 @@ impl TpuClient { } } + /// Send a batch of wire transactions to the current and upcoming leader TPUs according to + /// fanout size + /// Returns the last error if all sends fail + fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> TransportResult<()> { + let mut last_error: Option = None; + let mut some_success = false; + + for tpu_address in self + .leader_tpu_service + .leader_tpu_sockets(self.fanout_slots) + { + let conn = self.connection_cache.get_connection(&tpu_address); + let result = conn.send_wire_transaction_batch_async(wire_transactions.clone()); + if let Err(err) = result { + last_error = Some(err); + } else { + some_success = true; + } + } + if !some_success { + Err(if let Some(err) = last_error { + err + } else { + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() + }) + } else { + Ok(()) + } + } + /// Create a new client that disconnects when dropped pub fn new( rpc_client: Arc,