From 48e1defbfd0b7d0290bf2670fc2b23cdf3e6e1ff Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 26 Apr 2024 19:28:46 +0200 Subject: [PATCH] v1.18: client: Resend transactions using same mechanism as initial send (backport of #915) (#1072) client: Resend transactions using same mechanism as initial send (#915) * client: Timeout tpu sends * Resend transactions one at a time, same as the first send * Simplify resends to happen right after initial sends * Skip already processed transaction errors (cherry picked from commit ed11b72c3ed8d1c20018b16acb0350dbba6ce382) Co-authored-by: Jon C --- ...nd_and_confirm_transactions_in_parallel.rs | 137 ++++++++++-------- 1 file changed, 75 insertions(+), 62 deletions(-) diff --git a/client/src/send_and_confirm_transactions_in_parallel.rs b/client/src/send_and_confirm_transactions_in_parallel.rs index 976539c4a48b5d..b9ec1ef08b8355 100644 --- a/client/src/send_and_confirm_transactions_in_parallel.rs +++ b/client/src/send_and_confirm_transactions_in_parallel.rs @@ -5,7 +5,7 @@ use { }, bincode::serialize, dashmap::DashMap, - futures_util::future::{join_all, FutureExt}, + futures_util::future::join_all, solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc_client::spinner::{self, SendTransactionProgress}, solana_rpc_client_api::{ @@ -28,12 +28,16 @@ use { }, time::Duration, }, - tokio::{sync::RwLock, task::JoinHandle, time::Instant}, + tokio::{sync::RwLock, task::JoinHandle}, }; const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5); -const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2); const SEND_INTERVAL: Duration = Duration::from_millis(10); +// This is a "reasonable" constant for how long it should +// take to fan the transactions out, taken from +// `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures` +const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); + type QuicTpuClient = TpuClient; #[derive(Clone, Debug)] @@ -141,8 +145,11 @@ fn create_transaction_confirmation_task( }) { num_confirmed_transactions.fetch_add(1, Ordering::Relaxed); - if let Some(error) = status.err { - errors_map.insert(data.index, error); + match status.err { + Some(TransactionError::AlreadyProcessed) | None => {} + Some(error) => { + errors_map.insert(data.index, error); + } } }; } @@ -190,9 +197,12 @@ async fn send_transaction_with_rpc_fallback( index: usize, ) -> Result<()> { let send_over_rpc = if let Some(tpu_client) = tpu_client { - !tpu_client - .send_wire_transaction(serialized_transaction.clone()) - .await + !tokio::time::timeout( + SEND_TIMEOUT_INTERVAL, + tpu_client.send_wire_transaction(serialized_transaction.clone()), + ) + .await + .unwrap_or(false) } else { true }; @@ -341,7 +351,6 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction let block_height = current_block_height.load(Ordering::Relaxed); if let Some(tpu_client) = tpu_client { - let instant = Instant::now(); // retry sending transaction only over TPU port // any transactions sent over RPC will be automatically rebroadcast by the RPC server let txs_to_resend_over_tpu = unconfirmed_transaction_map @@ -349,33 +358,14 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction .filter(|x| block_height < x.last_valid_block_height) .map(|x| x.serialized_transaction.clone()) .collect::>(); - let num_txs_to_resend = txs_to_resend_over_tpu.len(); - // This is a "reasonable" constant for how long it should - // take to fan the transactions out, taken from - // `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures` - const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); - let message = if tokio::time::timeout( - SEND_TIMEOUT_INTERVAL, - tpu_client.try_send_wire_transaction_batch(txs_to_resend_over_tpu), + send_staggered_transactions( + progress_bar, + tpu_client, + txs_to_resend_over_tpu, + max_valid_block_height, + context, ) - .await - .is_err() - { - format!("Timed out resending {num_txs_to_resend} transactions...") - } else { - format!("Resent {num_txs_to_resend} transactions...") - }; - - if let Some(progress_bar) = progress_bar { - let progress = - progress_from_context_and_block_height(context, max_valid_block_height); - progress.set_message_for_confirmed_transactions(progress_bar, &message); - } - - let elapsed = instant.elapsed(); - if elapsed < TPU_RESEND_REFRESH_RATE { - tokio::time::sleep(TPU_RESEND_REFRESH_RATE - elapsed).await; - } + .await; } else { tokio::time::sleep(Duration::from_millis(100)).await; } @@ -391,6 +381,41 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction } } +async fn send_staggered_transactions( + progress_bar: &Option, + tpu_client: &QuicTpuClient, + wire_transactions: Vec>, + last_valid_block_height: u64, + context: &SendingContext, +) { + let current_transaction_count = wire_transactions.len(); + let futures = wire_transactions + .into_iter() + .enumerate() + .map(|(counter, transaction)| async move { + tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await; + if let Some(progress_bar) = progress_bar { + let progress = + progress_from_context_and_block_height(context, last_valid_block_height); + progress.set_message_for_confirmed_transactions( + progress_bar, + &format!( + "Resending {}/{} transactions", + counter + 1, + current_transaction_count, + ), + ); + } + tokio::time::timeout( + SEND_TIMEOUT_INTERVAL, + tpu_client.send_wire_transaction(transaction), + ) + .await + }) + .collect::>(); + join_all(futures).await; +} + /// Sends and confirms transactions concurrently /// /// The sending and confirmation of transactions is done in parallel tasks @@ -483,33 +508,21 @@ pub async fn send_and_confirm_transactions_in_parallel( // clear the map so that we can start resending unconfirmed_transasction_map.clear(); - let futures = [ - sign_all_messages_and_send( - &progress_bar, - &rpc_client, - &tpu_client, - messages_with_index, - signers, - &context, - ) - .boxed_local(), - async { - // Give the signing and sending a head start before trying to - // confirm and resend - tokio::time::sleep(TPU_RESEND_REFRESH_RATE).await; - confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu( - &progress_bar, - &tpu_client, - &context, - ) - .await; - // Infallible, but required to have the same return type as - // `sign_all_messages_and_send` - Ok(()) - } - .boxed_local(), - ]; - join_all(futures).await.into_iter().collect::>()?; + sign_all_messages_and_send( + &progress_bar, + &rpc_client, + &tpu_client, + messages_with_index, + signers, + &context, + ) + .await?; + confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu( + &progress_bar, + &tpu_client, + &context, + ) + .await; if unconfirmed_transasction_map.is_empty() { break;