Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use batch send in bench-tps to send transactions (backport #27527) #27560

Merged
merged 2 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions bench-tps/src/bench_tps_client/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ impl BenchTpsClient for TpuClient {
Ok(signature)
}
fn send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
for transaction in transactions {
BenchTpsClient::send_transaction(self, transaction)?;
}
self.try_send_transaction_batch(&transactions)?;
Ok(())
}
fn get_latest_blockhash(&self) -> Result<Hash> {
Expand Down
53 changes: 53 additions & 0 deletions client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ async fn send_wire_transaction_to_addr(
conn.send_wire_transaction(wire_transaction.clone()).await
}

async fn send_wire_transaction_batch_to_addr(
connection_cache: &ConnectionCache,
addr: &SocketAddr,
wire_transactions: &[Vec<u8>],
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction_batch(wire_transactions).await
}

impl TpuClient {
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
Expand Down Expand Up @@ -140,6 +149,50 @@ impl TpuClient {
}
}

/// Send a batch of wire transactions to the current and upcoming leader TPUs according to
/// fanout size
/// Returns the last error if all sends fail
pub async fn try_send_wire_transaction_batch(
&self,
wire_transactions: Vec<Vec<u8>>,
) -> TransportResult<()> {
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
let futures = leaders
.iter()
.map(|addr| {
send_wire_transaction_batch_to_addr(
&self.connection_cache,
addr,
&wire_transactions,
)
})
.collect::<Vec<_>>();
let results: Vec<TransportResult<()>> = join_all(futures).await;

let mut last_error: Option<TransportError> = None;
let mut some_success = false;
for result in results {
if let Err(e) = result {
if last_error.is_none() {
last_error = Some(e);
}
} else {
some_success = true;
}
}
if !some_success {
Err(if let Some(err) = last_error {
err
} else {
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
})
} else {
Ok(())
}
}

/// Create a new client that disconnects when dropped
pub async fn new(
rpc_client: Arc<RpcClient>,
Expand Down
45 changes: 45 additions & 0 deletions client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
},
bincode::serialize,
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_sdk::{
clock::Slot,
commitment_config::CommitmentConfig,
Expand Down Expand Up @@ -111,6 +112,17 @@ impl TpuClient {
self.try_send_wire_transaction(wire_transaction)
}

/// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
/// to fanout size
/// Returns the last error if all sends fail
pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
let wire_transactions = transactions
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
self.try_send_wire_transaction_batch(wire_transactions)
}

/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
/// Returns the last error if all sends fail
fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
Expand Down Expand Up @@ -140,6 +152,39 @@ impl TpuClient {
}
}

/// Send a batch of wire transactions to the current and upcoming leader TPUs according to
/// fanout size
/// Returns the last error if all sends fail
fn try_send_wire_transaction_batch(
&self,
wire_transactions: Vec<Vec<u8>>,
) -> TransportResult<()> {
let mut last_error: Option<TransportError> = None;
let mut some_success = false;

for tpu_address in self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots)
{
let conn = self.connection_cache.get_connection(&tpu_address);
let result = conn.send_wire_transaction_batch_async(wire_transactions.clone());
if let Err(err) = result {
last_error = Some(err);
} else {
some_success = true;
}
}
if !some_success {
Err(if let Some(err) = last_error {
err
} else {
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
})
} else {
Ok(())
}
}

/// Create a new client that disconnects when dropped
pub fn new(
rpc_client: Arc<RpcClient>,
Expand Down