diff --git a/client/src/send_and_confirm_transactions_in_parallel.rs b/client/src/send_and_confirm_transactions_in_parallel.rs index f97761cba14fde..43196d05a8a519 100644 --- a/client/src/send_and_confirm_transactions_in_parallel.rs +++ b/client/src/send_and_confirm_transactions_in_parallel.rs @@ -5,7 +5,7 @@ use { }, bincode::serialize, dashmap::DashMap, - futures_util::future::{join_all, TryFutureExt}, + futures_util::future::{join_all, FutureExt}, solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc_client::spinner::{self, SendTransactionProgress}, solana_rpc_client_api::{ @@ -188,9 +188,7 @@ async fn send_transaction_with_rpc_fallback( serialized_transaction: Vec, context: &SendingContext, index: usize, - counter: usize, ) -> Result<()> { - tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await; let send_over_rpc = if let Some(tpu_client) = tpu_client { !tpu_client .send_wire_transaction(serialized_transaction.clone()) @@ -261,44 +259,42 @@ async fn sign_all_messages_and_send( .expect("Transaction should be signable"); let serialized_transaction = serialize(&transaction).expect("Transaction should serialize"); let signature = transaction.signatures[0]; - futures.push( + futures.push(async move { + tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await; + // send to confirm the transaction + context.unconfirmed_transaction_map.insert( + signature, + TransactionData { + index: *index, + serialized_transaction: serialized_transaction.clone(), + last_valid_block_height: blockhashdata.last_valid_block_height, + message: message.clone(), + }, + ); + if let Some(progress_bar) = progress_bar { + let progress = progress_from_context_and_block_height( + context, + blockhashdata.last_valid_block_height, + ); + progress.set_message_for_confirmed_transactions( + progress_bar, + &format!( + "Sending {}/{} transactions", + counter + 1, + current_transaction_count, + ), + ); + } send_transaction_with_rpc_fallback( rpc_client, tpu_client, transaction, - serialized_transaction.clone(), + serialized_transaction, context, *index, - counter, ) - .and_then(move |_| async move { - // send to confirm the transaction - context.unconfirmed_transaction_map.insert( - signature, - TransactionData { - index: *index, - serialized_transaction, - last_valid_block_height: blockhashdata.last_valid_block_height, - message: message.clone(), - }, - ); - if let Some(progress_bar) = progress_bar { - let progress = progress_from_context_and_block_height( - context, - blockhashdata.last_valid_block_height, - ); - progress.set_message_for_confirmed_transactions( - progress_bar, - &format!( - "Sending {}/{} transactions", - counter + 1, - current_transaction_count, - ), - ); - } - Ok(()) - }), - ); + .await + }); } // collect to convert Vec> to Result> join_all(futures).await.into_iter().collect::>()?; @@ -477,23 +473,33 @@ pub async fn send_and_confirm_transactions_in_parallel( // clear the map so that we can start resending unconfirmed_transasction_map.clear(); - sign_all_messages_and_send( - &progress_bar, - &rpc_client, - &tpu_client, - messages_with_index, - signers, - &context, - ) - .await?; - - // wait until all the transactions are confirmed or expired - confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu( - &progress_bar, - &tpu_client, - &context, - ) - .await; + let futures = [ + sign_all_messages_and_send( + &progress_bar, + &rpc_client, + &tpu_client, + messages_with_index, + signers, + &context, + ) + .boxed_local(), + async { + // Give the signing and sending a head start before trying to + // confirm and resend + tokio::time::sleep(TPU_RESEND_REFRESH_RATE).await; + confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu( + &progress_bar, + &tpu_client, + &context, + ) + .await; + // Infallible, but required to have the same return type as + // `sign_all_messages_and_send` + Ok(()) + } + .boxed_local(), + ]; + join_all(futures).await.into_iter().collect::>()?; if unconfirmed_transasction_map.is_empty() { break;