Skip to content

Commit

Permalink
Use batch send in bench-tps to send transactions (backport solana-lab…
Browse files Browse the repository at this point in the history
…s#27527) (solana-labs#27560)

* Use batch send in bench-tps to send transactions (solana-labs#27527)

* Use batch send in bench-tps to send transactions

* serialize using par iter

(cherry picked from commit 49df1c4)

# Conflicts:
#	tpu-client/src/tpu_client.rs

* fix merge issues

Co-authored-by: Pankaj Garg <[email protected]>
  • Loading branch information
2 people authored and lijunwangs committed Sep 13, 2022
1 parent a9c5b8c commit 3f8e095
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 3 deletions.
4 changes: 1 addition & 3 deletions bench-tps/src/bench_tps_client/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ impl BenchTpsClient for TpuClient {
Ok(signature)
}
fn send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
for transaction in transactions {
BenchTpsClient::send_transaction(self, transaction)?;
}
self.try_send_transaction_batch(&transactions)?;
Ok(())
}
fn get_latest_blockhash(&self) -> Result<Hash> {
Expand Down
53 changes: 53 additions & 0 deletions client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ async fn send_wire_transaction_to_addr(
conn.send_wire_transaction(wire_transaction.clone()).await
}

async fn send_wire_transaction_batch_to_addr(
connection_cache: &ConnectionCache,
addr: &SocketAddr,
wire_transactions: &[Vec<u8>],
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction_batch(wire_transactions).await
}

impl TpuClient {
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
Expand Down Expand Up @@ -140,6 +149,50 @@ impl TpuClient {
}
}

/// Send a batch of wire transactions to the current and upcoming leader TPUs according to
/// fanout size
/// Returns the last error if all sends fail
pub async fn try_send_wire_transaction_batch(
&self,
wire_transactions: Vec<Vec<u8>>,
) -> TransportResult<()> {
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
let futures = leaders
.iter()
.map(|addr| {
send_wire_transaction_batch_to_addr(
&self.connection_cache,
addr,
&wire_transactions,
)
})
.collect::<Vec<_>>();
let results: Vec<TransportResult<()>> = join_all(futures).await;

let mut last_error: Option<TransportError> = None;
let mut some_success = false;
for result in results {
if let Err(e) = result {
if last_error.is_none() {
last_error = Some(e);
}
} else {
some_success = true;
}
}
if !some_success {
Err(if let Some(err) = last_error {
err
} else {
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
})
} else {
Ok(())
}
}

/// Create a new client that disconnects when dropped
pub async fn new(
rpc_client: Arc<RpcClient>,
Expand Down
45 changes: 45 additions & 0 deletions client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
},
bincode::serialize,
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_sdk::{
clock::Slot,
commitment_config::CommitmentConfig,
Expand Down Expand Up @@ -111,6 +112,17 @@ impl TpuClient {
self.try_send_wire_transaction(wire_transaction)
}

/// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
/// to fanout size
/// Returns the last error if all sends fail
pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
let wire_transactions = transactions
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
self.try_send_wire_transaction_batch(wire_transactions)
}

/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
/// Returns the last error if all sends fail
fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
Expand Down Expand Up @@ -140,6 +152,39 @@ impl TpuClient {
}
}

/// Send a batch of wire transactions to the current and upcoming leader TPUs according to
/// fanout size
/// Returns the last error if all sends fail
fn try_send_wire_transaction_batch(
&self,
wire_transactions: Vec<Vec<u8>>,
) -> TransportResult<()> {
let mut last_error: Option<TransportError> = None;
let mut some_success = false;

for tpu_address in self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots)
{
let conn = self.connection_cache.get_connection(&tpu_address);
let result = conn.send_wire_transaction_batch_async(wire_transactions.clone());
if let Err(err) = result {
last_error = Some(err);
} else {
some_success = true;
}
}
if !some_success {
Err(if let Some(err) = last_error {
err
} else {
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
})
} else {
Ok(())
}
}

/// Create a new client that disconnects when dropped
pub fn new(
rpc_client: Arc<RpcClient>,
Expand Down

0 comments on commit 3f8e095

Please sign in to comment.