Skip to content

Commit

Permalink
add retries to transaction sending in LocalCluster (#1747)
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack authored Jun 17, 2024
1 parent f43cfc8 commit 7f2beb2
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 36 deletions.
53 changes: 38 additions & 15 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/// discover the rest of the network.
use log::*;
use {
crate::cluster::QuicTpuClient,
crate::{cluster::QuicTpuClient, local_cluster::LocalCluster},
rand::{thread_rng, Rng},
rayon::{prelude::*, ThreadPool},
solana_client::connection_cache::{ConnectionCache, Protocol},
Expand Down Expand Up @@ -101,12 +101,17 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
.unwrap();
let transaction =
let mut transaction =
system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash);
let confs = VOTE_THRESHOLD_DEPTH + 1;
client
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();
LocalCluster::send_transaction_with_retries(
&client,
&[funding_keypair],
&mut transaction,
10,
confs,
)
.unwrap();
for validator in &cluster_nodes {
if ignore_nodes.contains(validator.pubkey()) {
continue;
Expand Down Expand Up @@ -160,16 +165,21 @@ pub fn send_many_transactions(
.unwrap();
let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer);

let transaction = system_transaction::transfer(
let mut transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
transfer_amount,
blockhash,
);

client
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();
LocalCluster::send_transaction_with_retries(
&client,
&[funding_keypair],
&mut transaction,
5,
0,
)
.unwrap();

expected_balances.insert(random_keypair.pubkey(), transfer_amount);
}
Expand Down Expand Up @@ -292,24 +302,37 @@ pub fn kill_entry_and_spend_and_verify_rest(
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let transaction = system_transaction::transfer(
let mut transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
1,
blockhash,
);

let confs = VOTE_THRESHOLD_DEPTH + 1;
if let Err(e) = client.send_transaction_to_upcoming_leaders(&transaction) {
result = Err(e);
continue;
}
let sig = {
let sig = LocalCluster::send_transaction_with_retries(
&client,
&[funding_keypair],
&mut transaction,
5,
confs,
);
match sig {
Err(e) => {
result = Err(e);
continue;
}

Ok(sig) => sig,
}
};
info!("poll_all_nodes_for_signature()");
match poll_all_nodes_for_signature(
entry_point_info,
&cluster_nodes,
connection_cache,
&transaction.signatures[0],
&sig,
confs,
) {
Err(e) => {
Expand Down
85 changes: 72 additions & 13 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@ use {
},
solana_sdk::{
account::{Account, AccountSharedData},
clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT},
clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE},
commitment_config::CommitmentConfig,
epoch_schedule::EpochSchedule,
feature_set,
genesis_config::{ClusterType, GenesisConfig},
message::Message,
poh_config::PohConfig,
pubkey::Pubkey,
signature::{Keypair, Signer},
signature::{Keypair, Signature, Signer},
signers::Signers,
stake::{
instruction as stake_instruction,
state::{Authorized, Lockup},
},
system_transaction,
transaction::Transaction,
transport::TransportError,
},
solana_stake_program::stake_state,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
Expand All @@ -61,6 +63,7 @@ use {
net::{IpAddr, Ipv4Addr, UdpSocket},
path::{Path, PathBuf},
sync::{Arc, RwLock},
time::Instant,
},
};

Expand Down Expand Up @@ -665,6 +668,53 @@ impl LocalCluster {
info!("{} done waiting for roots", test_name);
}

/// Attempt to send and confirm tx "attempts" times
/// Wait for signature confirmation before returning
/// Return the transaction signature
pub fn send_transaction_with_retries<T: Signers + ?Sized>(
client: &QuicTpuClient,
keypairs: &T,
transaction: &mut Transaction,
attempts: usize,
pending_confirmations: usize,
) -> std::result::Result<Signature, TransportError> {
for attempt in 0..attempts {
let now = Instant::now();
let mut num_confirmed = 0;
let mut wait_time = MAX_PROCESSING_AGE;

while now.elapsed().as_secs() < wait_time as u64 {
if num_confirmed == 0 {
client.send_transaction_to_upcoming_leaders(transaction)?;
}

if let Ok(confirmed_blocks) = client.rpc_client().poll_for_signature_confirmation(
&transaction.signatures[0],
pending_confirmations,
) {
num_confirmed = confirmed_blocks;
if confirmed_blocks >= pending_confirmations {
return Ok(transaction.signatures[0]);
}
// Since network has seen the transaction, wait longer to receive
// all pending confirmations. Resending the transaction could result into
// extra transaction fees
wait_time = wait_time.max(
MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
);
}
}
info!("{attempt} tries failed transfer");
let blockhash = client.rpc_client().get_latest_blockhash()?;
transaction.sign(keypairs, blockhash);
}
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to confirm transaction".to_string(),
)
.into())
}

fn transfer_with_client(
client: &QuicTpuClient,
source_keypair: &Keypair,
Expand All @@ -676,16 +726,15 @@ impl LocalCluster {
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash);
let mut tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash);
info!(
"executing transfer of {} from {} to {}",
lamports,
source_keypair.pubkey(),
*dest_pubkey
);

client
.send_transaction_to_upcoming_leaders(&tx)
LocalCluster::send_transaction_with_retries(client, &[source_keypair], &mut tx, 10, 0)
.expect("client transfer should succeed");
client
.rpc_client()
Expand Down Expand Up @@ -749,7 +798,7 @@ impl LocalCluster {
},
);
let message = Message::new(&instructions, Some(&from_account.pubkey()));
let transaction = Transaction::new(
let mut transaction = Transaction::new(
&[from_account.as_ref(), vote_account],
message,
client
Expand All @@ -758,9 +807,14 @@ impl LocalCluster {
.unwrap()
.0,
);
client
.send_transaction_to_upcoming_leaders(&transaction)
.expect("fund vote");
LocalCluster::send_transaction_with_retries(
client,
&[from_account],
&mut transaction,
10,
0,
)
.expect("should fund vote");
client
.rpc_client()
.wait_for_balance_with_commitment(
Expand All @@ -779,7 +833,7 @@ impl LocalCluster {
amount,
);
let message = Message::new(&instructions, Some(&from_account.pubkey()));
let transaction = Transaction::new(
let mut transaction = Transaction::new(
&[from_account.as_ref(), &stake_account_keypair],
message,
client
Expand All @@ -789,9 +843,14 @@ impl LocalCluster {
.0,
);

client
.send_transaction_to_upcoming_leaders(&transaction)
.expect("delegate stake");
LocalCluster::send_transaction_with_retries(
client,
&[from_account.as_ref(), &stake_account_keypair],
&mut transaction,
5,
0,
)
.expect("should delegate stake");
client
.rpc_client()
.wait_for_balance_with_commitment(
Expand Down
27 changes: 19 additions & 8 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn test_local_cluster_signature_subscribe() {
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();

let transaction = system_transaction::transfer(
let mut transaction = system_transaction::transfer(
&cluster.funding_keypair,
&solana_sdk::pubkey::new_rand(),
10,
Expand All @@ -239,9 +239,14 @@ fn test_local_cluster_signature_subscribe() {
)
.unwrap();

tx_client
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();
LocalCluster::send_transaction_with_retries(
&tx_client,
&[&cluster.funding_keypair],
&mut transaction,
5,
0,
)
.unwrap();

let mut got_received_notification = false;
loop {
Expand Down Expand Up @@ -2669,6 +2674,7 @@ fn test_oc_bad_signatures() {

// 3) Start up a spy to listen for and push votes to leader TPU
let client = cluster.build_tpu_quic_client().unwrap();
let cluster_funding_keypair = cluster.funding_keypair.insecure_clone();
let voter_thread_sleep_ms: usize = 100;
let num_votes_simulated = Arc::new(AtomicUsize::new(0));
let gossip_voter = cluster_tests::start_gossip_voter(
Expand Down Expand Up @@ -2703,7 +2709,7 @@ fn test_oc_bad_signatures() {
let vote_slots: Vec<Slot> = vec![vote_slot];

let bad_authorized_signer_keypair = Keypair::new();
let vote_tx = vote_transaction::new_vote_transaction(
let mut vote_tx = vote_transaction::new_vote_transaction(
vote_slots,
vote_hash,
leader_vote_tx.message.recent_blockhash,
Expand All @@ -2713,9 +2719,14 @@ fn test_oc_bad_signatures() {
&bad_authorized_signer_keypair,
None,
);
client
.send_transaction_to_upcoming_leaders(&vote_tx)
.unwrap();
LocalCluster::send_transaction_with_retries(
&client,
&[&cluster_funding_keypair],
&mut vote_tx,
5,
0,
)
.unwrap();

num_votes_simulated.fetch_add(1, Ordering::Relaxed);
}
Expand Down

0 comments on commit 7f2beb2

Please sign in to comment.