From 7f2beb21a1251723c0095896425ddc24199c1424 Mon Sep 17 00:00:00 2001 From: Greg Cusack Date: Mon, 17 Jun 2024 09:20:17 -0700 Subject: [PATCH] add retries to transaction sending in LocalCluster (#1747) --- local-cluster/src/cluster_tests.rs | 53 ++++++++++++----- local-cluster/src/local_cluster.rs | 85 +++++++++++++++++++++++----- local-cluster/tests/local_cluster.rs | 27 ++++++--- 3 files changed, 129 insertions(+), 36 deletions(-) diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index b31c874b9e6db3..e297457dcc3bd5 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -4,7 +4,7 @@ /// discover the rest of the network. use log::*; use { - crate::cluster::QuicTpuClient, + crate::{cluster::QuicTpuClient, local_cluster::LocalCluster}, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, solana_client::connection_cache::{ConnectionCache, Protocol}, @@ -101,12 +101,17 @@ pub fn spend_and_verify_all_nodes( .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) .unwrap(); - let transaction = + let mut transaction = system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash); let confs = VOTE_THRESHOLD_DEPTH + 1; - client - .send_transaction_to_upcoming_leaders(&transaction) - .unwrap(); + LocalCluster::send_transaction_with_retries( + &client, + &[funding_keypair], + &mut transaction, + 10, + confs, + ) + .unwrap(); for validator in &cluster_nodes { if ignore_nodes.contains(validator.pubkey()) { continue; @@ -160,16 +165,21 @@ pub fn send_many_transactions( .unwrap(); let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer); - let transaction = system_transaction::transfer( + let mut transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), transfer_amount, blockhash, ); - client - .send_transaction_to_upcoming_leaders(&transaction) - .unwrap(); + LocalCluster::send_transaction_with_retries( + &client, + &[funding_keypair], + &mut transaction, + 5, + 0, + ) + .unwrap(); expected_balances.insert(random_keypair.pubkey(), transfer_amount); } @@ -292,7 +302,7 @@ pub fn kill_entry_and_spend_and_verify_rest( .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let transaction = system_transaction::transfer( + let mut transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), 1, @@ -300,16 +310,29 @@ pub fn kill_entry_and_spend_and_verify_rest( ); let confs = VOTE_THRESHOLD_DEPTH + 1; - if let Err(e) = client.send_transaction_to_upcoming_leaders(&transaction) { - result = Err(e); - continue; - } + let sig = { + let sig = LocalCluster::send_transaction_with_retries( + &client, + &[funding_keypair], + &mut transaction, + 5, + confs, + ); + match sig { + Err(e) => { + result = Err(e); + continue; + } + + Ok(sig) => sig, + } + }; info!("poll_all_nodes_for_signature()"); match poll_all_nodes_for_signature( entry_point_info, &cluster_nodes, connection_cache, - &transaction.signatures[0], + &sig, confs, ) { Err(e) => { diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 64fefb1114f99e..5849398468203a 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -28,7 +28,7 @@ use { }, solana_sdk::{ account::{Account, AccountSharedData}, - clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, + clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE}, commitment_config::CommitmentConfig, epoch_schedule::EpochSchedule, feature_set, @@ -36,13 +36,15 @@ use { message::Message, poh_config::PohConfig, pubkey::Pubkey, - signature::{Keypair, Signer}, + signature::{Keypair, Signature, Signer}, + signers::Signers, stake::{ instruction as stake_instruction, state::{Authorized, Lockup}, }, system_transaction, transaction::Transaction, + transport::TransportError, }, solana_stake_program::stake_state, solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, @@ -61,6 +63,7 @@ use { net::{IpAddr, Ipv4Addr, UdpSocket}, path::{Path, PathBuf}, sync::{Arc, RwLock}, + time::Instant, }, }; @@ -665,6 +668,53 @@ impl LocalCluster { info!("{} done waiting for roots", test_name); } + /// Attempt to send and confirm tx "attempts" times + /// Wait for signature confirmation before returning + /// Return the transaction signature + pub fn send_transaction_with_retries( + client: &QuicTpuClient, + keypairs: &T, + transaction: &mut Transaction, + attempts: usize, + pending_confirmations: usize, + ) -> std::result::Result { + for attempt in 0..attempts { + let now = Instant::now(); + let mut num_confirmed = 0; + let mut wait_time = MAX_PROCESSING_AGE; + + while now.elapsed().as_secs() < wait_time as u64 { + if num_confirmed == 0 { + client.send_transaction_to_upcoming_leaders(transaction)?; + } + + if let Ok(confirmed_blocks) = client.rpc_client().poll_for_signature_confirmation( + &transaction.signatures[0], + pending_confirmations, + ) { + num_confirmed = confirmed_blocks; + if confirmed_blocks >= pending_confirmations { + return Ok(transaction.signatures[0]); + } + // Since network has seen the transaction, wait longer to receive + // all pending confirmations. Resending the transaction could result into + // extra transaction fees + wait_time = wait_time.max( + MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed), + ); + } + } + info!("{attempt} tries failed transfer"); + let blockhash = client.rpc_client().get_latest_blockhash()?; + transaction.sign(keypairs, blockhash); + } + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to confirm transaction".to_string(), + ) + .into()) + } + fn transfer_with_client( client: &QuicTpuClient, source_keypair: &Keypair, @@ -676,7 +726,7 @@ impl LocalCluster { .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); + let mut tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); info!( "executing transfer of {} from {} to {}", lamports, @@ -684,8 +734,7 @@ impl LocalCluster { *dest_pubkey ); - client - .send_transaction_to_upcoming_leaders(&tx) + LocalCluster::send_transaction_with_retries(client, &[source_keypair], &mut tx, 10, 0) .expect("client transfer should succeed"); client .rpc_client() @@ -749,7 +798,7 @@ impl LocalCluster { }, ); let message = Message::new(&instructions, Some(&from_account.pubkey())); - let transaction = Transaction::new( + let mut transaction = Transaction::new( &[from_account.as_ref(), vote_account], message, client @@ -758,9 +807,14 @@ impl LocalCluster { .unwrap() .0, ); - client - .send_transaction_to_upcoming_leaders(&transaction) - .expect("fund vote"); + LocalCluster::send_transaction_with_retries( + client, + &[from_account], + &mut transaction, + 10, + 0, + ) + .expect("should fund vote"); client .rpc_client() .wait_for_balance_with_commitment( @@ -779,7 +833,7 @@ impl LocalCluster { amount, ); let message = Message::new(&instructions, Some(&from_account.pubkey())); - let transaction = Transaction::new( + let mut transaction = Transaction::new( &[from_account.as_ref(), &stake_account_keypair], message, client @@ -789,9 +843,14 @@ impl LocalCluster { .0, ); - client - .send_transaction_to_upcoming_leaders(&transaction) - .expect("delegate stake"); + LocalCluster::send_transaction_with_retries( + client, + &[from_account.as_ref(), &stake_account_keypair], + &mut transaction, + 5, + 0, + ) + .expect("should delegate stake"); client .rpc_client() .wait_for_balance_with_commitment( diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index f0613a0d0e5469..f4e1a927c3f00e 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -222,7 +222,7 @@ fn test_local_cluster_signature_subscribe() { .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let transaction = system_transaction::transfer( + let mut transaction = system_transaction::transfer( &cluster.funding_keypair, &solana_sdk::pubkey::new_rand(), 10, @@ -239,9 +239,14 @@ fn test_local_cluster_signature_subscribe() { ) .unwrap(); - tx_client - .send_transaction_to_upcoming_leaders(&transaction) - .unwrap(); + LocalCluster::send_transaction_with_retries( + &tx_client, + &[&cluster.funding_keypair], + &mut transaction, + 5, + 0, + ) + .unwrap(); let mut got_received_notification = false; loop { @@ -2669,6 +2674,7 @@ fn test_oc_bad_signatures() { // 3) Start up a spy to listen for and push votes to leader TPU let client = cluster.build_tpu_quic_client().unwrap(); + let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); let voter_thread_sleep_ms: usize = 100; let num_votes_simulated = Arc::new(AtomicUsize::new(0)); let gossip_voter = cluster_tests::start_gossip_voter( @@ -2703,7 +2709,7 @@ fn test_oc_bad_signatures() { let vote_slots: Vec = vec![vote_slot]; let bad_authorized_signer_keypair = Keypair::new(); - let vote_tx = vote_transaction::new_vote_transaction( + let mut vote_tx = vote_transaction::new_vote_transaction( vote_slots, vote_hash, leader_vote_tx.message.recent_blockhash, @@ -2713,9 +2719,14 @@ fn test_oc_bad_signatures() { &bad_authorized_signer_keypair, None, ); - client - .send_transaction_to_upcoming_leaders(&vote_tx) - .unwrap(); + LocalCluster::send_transaction_with_retries( + &client, + &[&cluster_funding_keypair], + &mut vote_tx, + 5, + 0, + ) + .unwrap(); num_votes_simulated.fetch_add(1, Ordering::Relaxed); }