Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

tpu-client: Speed up performance by awaiting all futures at once #32945

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 140 additions & 7 deletions tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ pub use crate::tpu_client::Result;
use {
crate::tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS},
bincode::serialize,
futures_util::{future::join_all, stream::StreamExt},
futures_util::{
future::{join_all, FutureExt, TryFutureExt},
stream::StreamExt,
},
log::*,
solana_connection_cache::{
connection_cache::{
Expand All @@ -29,6 +32,8 @@ use {
},
std::{
collections::{HashMap, HashSet},
future::Future,
iter,
net::SocketAddr,
str::FromStr,
sync::{
Expand All @@ -45,6 +50,7 @@ use {
#[cfg(feature = "spinner")]
use {
crate::tpu_client::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL},
indicatif::ProgressBar,
solana_rpc_client::spinner::{self, SendTransactionProgress},
solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
solana_sdk::{message::Message, signers::Signers, transaction::TransactionError},
Expand Down Expand Up @@ -247,6 +253,100 @@ pub struct TpuClient<
connection_cache: Arc<ConnectionCache<P, M, C>>,
}

/// Helper function which generates futures to all be awaited together for maximum
/// throughput
#[cfg(feature = "spinner")]
fn send_wire_transaction_futures<'a, P, M, C>(
progress_bar: &'a ProgressBar,
progress: &'a SendTransactionProgress,
index: usize,
num_transactions: usize,
wire_transaction: Vec<u8>,
leaders: Vec<SocketAddr>,
connection_cache: &'a ConnectionCache<P, M, C>,
) -> Vec<impl Future<Output = TransportResult<()>> + 'a>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
{
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
let sleep_duration = SEND_TRANSACTION_INTERVAL.saturating_mul(index as u32);
let send_timeout = SEND_TIMEOUT_INTERVAL.saturating_add(sleep_duration);
leaders
.into_iter()
.map(|addr| {
timeout_future(
send_timeout,
sleep_and_send_wire_transaction_to_addr(
sleep_duration,
connection_cache,
addr,
wire_transaction.clone(),
),
)
.boxed_local() // required to make types work simply
})
.chain(iter::once(
timeout_future(
send_timeout,
sleep_and_set_message(
sleep_duration,
progress_bar,
progress,
index,
num_transactions,
),
)
.boxed_local(), // required to make types work simply
))
.collect::<Vec<_>>()
}

// Wrap an existing future with a timeout.
//
// Useful for end-users who don't need a persistent connection to each validator,
// and want to abort more quickly.
fn timeout_future<'a, Fut: Future<Output = TransportResult<()>> + 'a>(
timeout_duration: Duration,
future: Fut,
) -> impl Future<Output = TransportResult<()>> + 'a {
timeout(timeout_duration, future)
.unwrap_or_else(|_| Err(TransportError::Custom("Timed out".to_string())))
.boxed_local()
}

#[cfg(feature = "spinner")]
async fn sleep_and_set_message(
sleep_duration: Duration,
progress_bar: &ProgressBar,
progress: &SendTransactionProgress,
index: usize,
num_transactions: usize,
) -> TransportResult<()> {
sleep(sleep_duration).await;
progress.set_message_for_confirmed_transactions(
progress_bar,
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
);
Ok(())
}

#[cfg(feature = "spinner")]
async fn sleep_and_send_wire_transaction_to_addr<P, M, C>(
sleep_duration: Duration,
connection_cache: &ConnectionCache<P, M, C>,
addr: SocketAddr,
wire_transaction: Vec<u8>,
) -> TransportResult<()>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
{
sleep(sleep_duration).await;
let conn = connection_cache.get_nonblocking_connection(&addr);
conn.send_data(&wire_transaction).await
}

async fn send_wire_transaction_to_addr<P, M, C>(
connection_cache: &ConnectionCache<P, M, C>,
addr: &SocketAddr,
Expand Down Expand Up @@ -459,15 +559,48 @@ where

// Periodically re-send all pending transactions
if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
// Prepare futures for all transactions
let mut futures = vec![];
for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
if !self.send_transaction(transaction).await {
let wire_transaction = serialize(transaction).unwrap();
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
futures.extend(send_wire_transaction_futures(
&progress_bar,
&progress,
index,
num_transactions,
wire_transaction,
leaders,
&self.connection_cache,
));
}

// Start the process of sending them all
let results = join_all(futures).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we transpose all those timeout wrappers by just timeout-wrapping the JoinAll future here, or do we want the granularity?

Copy link
Contributor Author

@joncinque joncinque Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the granularity unfortunately.

We're mimicking the old behavior by waiting 10ms between each send, and that timeout encompasses the waiting too. So for example, if we set a global timeout of 5 seconds, and we want to send 800 transactions, the last 300 transactions will never get sent since they'll timeout just waiting their turn.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we send all the transactions at the same time won't it create spiky traffic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually send them all at once! We wait the proper amount of time before each round of sending, just like before. The gain is that we await all of the futures at once, so if one leader takes a long time for the send, it doesn't hold up the rest of the transactions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... but I can see why that's confusing given the comment, I'll update it


progress.set_message_for_confirmed_transactions(
&progress_bar,
"Checking sent transactions",
);
for (index, (tx_results, (_i, transaction))) in results
.chunks(self.fanout_slots as usize)
.zip(pending_transactions.values())
.enumerate()
{
// Only report an error if every future in the chunk errored
if tx_results.iter().all(|r| r.is_err()) {
progress.set_message_for_confirmed_transactions(
&progress_bar,
&format!(
"Resending failed transaction {} of {}",
index + 1,
num_transactions,
),
);
let _result = self.rpc_client.send_transaction(transaction).await.ok();
}
progress.set_message_for_confirmed_transactions(
&progress_bar,
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
);
sleep(SEND_TRANSACTION_INTERVAL).await;
}
last_resend = Instant::now();
}
Expand Down