diff --git a/bench-tps/src/bench_tps_client/tpu_client.rs b/bench-tps/src/bench_tps_client/tpu_client.rs index 0b027303c34eaf..4fa6455f4c0cd7 100644 --- a/bench-tps/src/bench_tps_client/tpu_client.rs +++ b/bench-tps/src/bench_tps_client/tpu_client.rs @@ -14,9 +14,7 @@ impl BenchTpsClient for TpuClient { Ok(signature) } fn send_batch(&self, transactions: Vec) -> Result<()> { - for transaction in transactions { - BenchTpsClient::send_transaction(self, transaction)?; - } + self.try_send_transaction_batch(&transactions)?; Ok(()) } fn get_latest_blockhash(&self) -> Result { diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 3fc087031944bb..2b271f8397218c 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -231,6 +231,15 @@ async fn send_wire_transaction_to_addr( conn.send_wire_transaction(wire_transaction.clone()).await } +async fn send_wire_transaction_batch_to_addr( + connection_cache: &ConnectionCache, + addr: &SocketAddr, + wire_transactions: &[Vec], +) -> TransportResult<()> { + let conn = connection_cache.get_nonblocking_connection(addr); + conn.send_wire_transaction_batch(wire_transactions).await +} + impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size @@ -297,6 +306,50 @@ impl TpuClient { } } + /// Send a batch of wire transactions to the current and upcoming leader TPUs according to + /// fanout size + /// Returns the last error if all sends fail + pub async fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> TransportResult<()> { + let leaders = self + .leader_tpu_service + .leader_tpu_sockets(self.fanout_slots); + let futures = leaders + .iter() + .map(|addr| { + send_wire_transaction_batch_to_addr( + &self.connection_cache, + addr, + &wire_transactions, + ) + }) + .collect::>(); + let results: Vec> = join_all(futures).await; + + let mut last_error: Option = None; + let mut some_success = false; + for result in results { + if let Err(e) = result { + if last_error.is_none() { + last_error = Some(e); + } + } else { + some_success = true; + } + } + if !some_success { + Err(if let Some(err) = last_error { + err + } else { + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() + }) + } else { + Ok(()) + } + } + /// Create a new client that disconnects when dropped pub async fn new( rpc_client: Arc, diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 92a6011109c703..7ce01f671d7e6e 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -4,6 +4,7 @@ use { connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient as NonblockingTpuClient, }, + rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, std::{ @@ -77,6 +78,20 @@ impl TpuClient { self.invoke(self.tpu_client.try_send_transaction(transaction)) } + /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according + /// to fanout size + /// Returns the last error if all sends fail + pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { + let wire_transactions = transactions + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + self.invoke( + self.tpu_client + .try_send_wire_transaction_batch(wire_transactions), + ) + } + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Returns the last error if all sends fail pub fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> {