From 38ab1c618472c88b65b4df69cf85bf944c2f7534 Mon Sep 17 00:00:00 2001 From: Greg Cusack Date: Thu, 13 Jun 2024 10:31:10 -0700 Subject: [PATCH] Remove `ThinClient` from `LocalCluster` (#1300) * setup tpu client methods required for localcluster to use TpuClient * add new_tpu_quic_client() for local cluster tests * update local-cluster src files to use TpuClient. tests next * finish removing thinclient from localcluster * address comments * add note for send_and_confirm_transaction_with_retries * remove retry logic from tpu-client. Send directly to upcoming leaders without retry. --- local-cluster/src/cluster.rs | 3 +- local-cluster/src/cluster_tests.rs | 122 +++++++++-------- local-cluster/src/local_cluster.rs | 126 ++++++++++-------- .../src/local_cluster_snapshot_utils.rs | 3 +- local-cluster/tests/local_cluster.rs | 74 +++++----- tpu-client/src/nonblocking/tpu_client.rs | 19 ++- tpu-client/src/tpu_client.rs | 82 +++++++++++- 7 files changed, 281 insertions(+), 148 deletions(-) diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index 99f31bf93504f7..c2e3acc60751a3 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -1,5 +1,4 @@ use { - solana_client::thin_client::ThinClient, solana_core::validator::{Validator, ValidatorConfig}, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, solana_ledger::shred::Shred, @@ -41,7 +40,7 @@ impl ClusterValidatorInfo { pub trait Cluster { fn get_node_pubkeys(&self) -> Vec; - fn get_validator_client(&self, pubkey: &Pubkey) -> Option; + fn get_validator_client(&self, pubkey: &Pubkey) -> Option; fn build_tpu_quic_client(&self) -> Result; fn build_tpu_quic_client_with_commitment( &self, diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 09ee39d2e47dd8..b31c874b9e6db3 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -4,12 +4,10 @@ /// discover the rest of the network. use log::*; use { + crate::cluster::QuicTpuClient, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, - solana_client::{ - connection_cache::{ConnectionCache, Protocol}, - thin_client::ThinClient, - }, + solana_client::connection_cache::{ConnectionCache, Protocol}, solana_core::consensus::{ tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage}, VOTE_THRESHOLD_DEPTH, @@ -24,8 +22,8 @@ use { gossip_service::{self, discover_cluster, GossipService}, }, solana_ledger::blockstore::Blockstore, + solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ - client::SyncClient, clock::{self, Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, commitment_config::CommitmentConfig, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, @@ -40,6 +38,7 @@ use { transport::TransportError, }, solana_streamer::socket::SocketAddrSpace, + solana_tpu_client::tpu_client::{TpuClient, TpuClientConfig, TpuSenderError}, solana_vote::vote_transaction::VoteTransaction, solana_vote_program::vote_transaction, std::{ @@ -89,9 +88,9 @@ pub fn spend_and_verify_all_nodes( return; } let random_keypair = Keypair::new(); - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); let bal = client + .rpc_client() .poll_get_balance_with_commitment( &funding_keypair.pubkey(), CommitmentConfig::processed(), @@ -99,21 +98,24 @@ pub fn spend_and_verify_all_nodes( .expect("balance in source"); assert!(bal > 0); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) .unwrap(); - let mut transaction = + let transaction = system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash); let confs = VOTE_THRESHOLD_DEPTH + 1; - let sig = client - .retry_transfer_until_confirmed(funding_keypair, &mut transaction, 10, confs) + client + .send_transaction_to_upcoming_leaders(&transaction) .unwrap(); for validator in &cluster_nodes { if ignore_nodes.contains(validator.pubkey()) { continue; } - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); - client.poll_for_signature_confirmation(&sig, confs).unwrap(); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); + client + .rpc_client() + .poll_for_signature_confirmation(&transaction.signatures[0], confs) + .unwrap(); } }); } @@ -123,10 +125,10 @@ pub fn verify_balances( node: &ContactInfo, connection_cache: Arc, ) { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node); - let client = ThinClient::new(rpc, tpu, connection_cache); + let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap(); for (pk, b) in expected_balances { let bal = client + .rpc_client() .poll_get_balance_with_commitment(&pk, CommitmentConfig::processed()) .expect("balance in source"); assert_eq!(bal, b); @@ -140,12 +142,12 @@ pub fn send_many_transactions( max_tokens_per_transfer: u64, num_txs: u64, ) -> HashMap { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap(); let mut expected_balances = HashMap::new(); for _ in 0..num_txs { let random_keypair = Keypair::new(); let bal = client + .rpc_client() .poll_get_balance_with_commitment( &funding_keypair.pubkey(), CommitmentConfig::processed(), @@ -153,11 +155,12 @@ pub fn send_many_transactions( .expect("balance in source"); assert!(bal > 0); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer); - let mut transaction = system_transaction::transfer( + let transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), transfer_amount, @@ -165,7 +168,7 @@ pub fn send_many_transactions( ); client - .retry_transfer(funding_keypair, &mut transaction, 5) + .send_transaction_to_upcoming_leaders(&transaction) .unwrap(); expected_balances.insert(random_keypair.pubkey(), transfer_amount); @@ -238,14 +241,14 @@ pub fn kill_entry_and_spend_and_verify_rest( ) .unwrap(); assert!(cluster_nodes.len() >= nodes); - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), entry_point_info); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(entry_point_info, connection_cache.clone()).unwrap(); // sleep long enough to make sure we are in epoch 3 let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1); for ingress_node in &cluster_nodes { client + .rpc_client() .poll_get_balance_with_commitment(ingress_node.pubkey(), CommitmentConfig::processed()) .unwrap_or_else(|err| panic!("Node {} has no balance: {}", ingress_node.pubkey(), err)); } @@ -266,9 +269,9 @@ pub fn kill_entry_and_spend_and_verify_rest( continue; } - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); let balance = client + .rpc_client() .poll_get_balance_with_commitment( &funding_keypair.pubkey(), CommitmentConfig::processed(), @@ -286,9 +289,10 @@ pub fn kill_entry_and_spend_and_verify_rest( let random_keypair = Keypair::new(); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let mut transaction = system_transaction::transfer( + let transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), 1, @@ -296,28 +300,16 @@ pub fn kill_entry_and_spend_and_verify_rest( ); let confs = VOTE_THRESHOLD_DEPTH + 1; - let sig = { - let sig = client.retry_transfer_until_confirmed( - funding_keypair, - &mut transaction, - 5, - confs, - ); - match sig { - Err(e) => { - result = Err(e); - continue; - } - - Ok(sig) => sig, - } - }; + if let Err(e) = client.send_transaction_to_upcoming_leaders(&transaction) { + result = Err(e); + continue; + } info!("poll_all_nodes_for_signature()"); match poll_all_nodes_for_signature( entry_point_info, &cluster_nodes, connection_cache, - &sig, + &transaction.signatures[0], confs, ) { Err(e) => { @@ -353,10 +345,10 @@ pub fn check_min_slot_is_rooted( let loop_start = Instant::now(); let loop_timeout = Duration::from_secs(180); for ingress_node in contact_infos.iter() { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); loop { let root_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap_or(0); if root_slot >= min_slot || last_print.elapsed().as_secs() > 3 { @@ -394,9 +386,9 @@ pub fn check_for_new_roots( assert!(loop_start.elapsed() < loop_timeout); for (i, ingress_node) in contact_infos.iter().enumerate() { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); let root_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap_or(0); roots[i].insert(root_slot); @@ -427,13 +419,14 @@ pub fn check_no_new_roots( .iter() .enumerate() .map(|(i, ingress_node)| { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); let initial_root = client + .rpc_client() .get_slot() .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())); roots[i] = initial_root; client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())) }) @@ -446,9 +439,9 @@ pub fn check_no_new_roots( let mut reached_end_slot = false; loop { for contact_info in contact_infos { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), contact_info); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(contact_info, connection_cache.clone()).unwrap(); current_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].pubkey())); if current_slot > end_slot { @@ -472,10 +465,10 @@ pub fn check_no_new_roots( } for (i, ingress_node) in contact_infos.iter().enumerate() { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); assert_eq!( client + .rpc_client() .get_slot() .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())), roots[i] @@ -494,9 +487,10 @@ fn poll_all_nodes_for_signature( if validator.pubkey() == entry_point_info.pubkey() { continue; } - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); - client.poll_for_signature_confirmation(sig, confs)?; + let client = new_tpu_quic_client(validator, connection_cache.clone()).unwrap(); + client + .rpc_client() + .poll_for_signature_confirmation(sig, confs)?; } Ok(()) @@ -678,3 +672,23 @@ pub fn submit_vote_to_cluster_gossip( socket_addr_space, ) } + +pub fn new_tpu_quic_client( + contact_info: &ContactInfo, + connection_cache: Arc, +) -> Result { + let rpc_pubsub_url = format!("ws://{}/", contact_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", contact_info.rpc().unwrap()); + + let cache = match &*connection_cache { + ConnectionCache::Quic(cache) => cache, + ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"), + }; + + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) +} diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index e07b11677b181a..64fefb1114f99e 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -8,9 +8,7 @@ use { itertools::izip, log::*, solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs, - solana_client::{ - connection_cache::ConnectionCache, rpc_client::RpcClient, thin_client::ThinClient, - }, + solana_client::{connection_cache::ConnectionCache, rpc_client::RpcClient}, solana_core::{ consensus::tower_storage::FileTowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, @@ -30,7 +28,6 @@ use { }, solana_sdk::{ account::{Account, AccountSharedData}, - client::SyncClient, clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, commitment_config::CommitmentConfig, epoch_schedule::EpochSchedule, @@ -482,11 +479,7 @@ impl LocalCluster { mut voting_keypair: Option>, socket_addr_space: SocketAddrSpace, ) -> Pubkey { - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - self.connection_cache.protocol(), - &self.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); + let client = self.build_tpu_quic_client().expect("tpu_client"); // Must have enough tokens to fund vote account and set delegate let should_create_vote_pubkey = voting_keypair.is_none(); @@ -579,11 +572,7 @@ impl LocalCluster { } pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - self.connection_cache.protocol(), - &self.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); + let client = self.build_tpu_quic_client().expect("new tpu quic client"); Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) } @@ -677,36 +666,39 @@ impl LocalCluster { } fn transfer_with_client( - client: &ThinClient, + client: &QuicTpuClient, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64, ) -> u64 { trace!("getting leader blockhash"); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let mut tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); + let tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); info!( "executing transfer of {} from {} to {}", lamports, source_keypair.pubkey(), *dest_pubkey ); + client - .retry_transfer(source_keypair, &mut tx, 10) - .expect("client transfer"); + .send_transaction_to_upcoming_leaders(&tx) + .expect("client transfer should succeed"); client + .rpc_client() .wait_for_balance_with_commitment( dest_pubkey, Some(lamports), CommitmentConfig::processed(), ) - .expect("get balance") + .expect("get balance should succeed") } fn setup_vote_and_stake_accounts( - client: &ThinClient, + client: &QuicTpuClient, vote_account: &Keypair, from_account: &Arc, amount: u64, @@ -722,6 +714,7 @@ impl LocalCluster { // Create the vote account if necessary if client + .rpc_client() .poll_get_balance_with_commitment(&vote_account_pubkey, CommitmentConfig::processed()) .unwrap_or(0) == 0 @@ -731,6 +724,7 @@ impl LocalCluster { // as the cluster is already running, and using the wrong account size will cause the // InitializeAccount tx to fail let use_current_vote_state = client + .rpc_client() .poll_get_balance_with_commitment( &feature_set::vote_state_add_vote_latency::id(), CommitmentConfig::processed(), @@ -755,18 +749,20 @@ impl LocalCluster { }, ); let message = Message::new(&instructions, Some(&from_account.pubkey())); - let mut transaction = Transaction::new( + let transaction = Transaction::new( &[from_account.as_ref(), vote_account], message, client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap() .0, ); client - .retry_transfer(from_account, &mut transaction, 10) + .send_transaction_to_upcoming_leaders(&transaction) .expect("fund vote"); client + .rpc_client() .wait_for_balance_with_commitment( &vote_account_pubkey, Some(amount), @@ -783,24 +779,21 @@ impl LocalCluster { amount, ); let message = Message::new(&instructions, Some(&from_account.pubkey())); - let mut transaction = Transaction::new( + let transaction = Transaction::new( &[from_account.as_ref(), &stake_account_keypair], message, client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap() .0, ); client - .send_and_confirm_transaction( - &[from_account.as_ref(), &stake_account_keypair], - &mut transaction, - 5, - 0, - ) + .send_transaction_to_upcoming_leaders(&transaction) .expect("delegate stake"); client + .rpc_client() .wait_for_balance_with_commitment( &stake_account_pubkey, Some(amount), @@ -816,36 +809,58 @@ impl LocalCluster { info!("Checking for vote account registration of {}", node_pubkey); match ( client + .rpc_client() .get_account_with_commitment(&stake_account_pubkey, CommitmentConfig::processed()), - client.get_account_with_commitment(&vote_account_pubkey, CommitmentConfig::processed()), + client + .rpc_client() + .get_account_with_commitment(&vote_account_pubkey, CommitmentConfig::processed()), ) { - (Ok(Some(stake_account)), Ok(Some(vote_account))) => { - match ( - stake_state::stake_from(&stake_account), - vote_state::from(&vote_account), - ) { - (Some(stake_state), Some(vote_state)) => { - if stake_state.delegation.voter_pubkey != vote_account_pubkey - || stake_state.delegation.stake != amount - { - Err(Error::new(ErrorKind::Other, "invalid stake account state")) - } else if vote_state.node_pubkey != node_pubkey { - Err(Error::new(ErrorKind::Other, "invalid vote account state")) - } else { - info!("node {} {:?} {:?}", node_pubkey, stake_state, vote_state); - - Ok(()) + (Ok(stake_account), Ok(vote_account)) => { + match (stake_account.value, vote_account.value) { + (Some(stake_account), Some(vote_account)) => { + match ( + stake_state::stake_from(&stake_account), + vote_state::from(&vote_account), + ) { + (Some(stake_state), Some(vote_state)) => { + if stake_state.delegation.voter_pubkey != vote_account_pubkey + || stake_state.delegation.stake != amount + { + Err(Error::new(ErrorKind::Other, "invalid stake account state")) + } else if vote_state.node_pubkey != node_pubkey { + Err(Error::new(ErrorKind::Other, "invalid vote account state")) + } else { + info!( + "node {} {:?} {:?}", + node_pubkey, stake_state, vote_state + ); + + return Ok(()); + } + } + (None, _) => { + Err(Error::new(ErrorKind::Other, "invalid stake account data")) + } + (_, None) => { + Err(Error::new(ErrorKind::Other, "invalid vote account data")) + } } } - (None, _) => Err(Error::new(ErrorKind::Other, "invalid stake account data")), - (_, None) => Err(Error::new(ErrorKind::Other, "invalid vote account data")), + (None, _) => Err(Error::new( + ErrorKind::Other, + "unable to retrieve stake account data", + )), + (_, None) => Err(Error::new( + ErrorKind::Other, + "unable to retrieve vote account data", + )), } } - (Ok(None), _) | (Err(_), _) => Err(Error::new( + (Err(_), _) => Err(Error::new( ErrorKind::Other, "unable to retrieve stake account data", )), - (_, Ok(None)) | (_, Err(_)) => Err(Error::new( + (_, Err(_)) => Err(Error::new( ErrorKind::Other, "unable to retrieve vote account data", )), @@ -897,13 +912,10 @@ impl Cluster for LocalCluster { self.validators.keys().cloned().collect() } - fn get_validator_client(&self, pubkey: &Pubkey) -> Option { - self.validators.get(pubkey).map(|f| { - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - self.connection_cache.protocol(), - &f.info.contact_info, - ); - ThinClient::new(rpc, tpu, self.connection_cache.clone()) + fn get_validator_client(&self, pubkey: &Pubkey) -> Option { + self.validators.get(pubkey).map(|_| { + self.build_tpu_quic_client() + .expect("should build tpu quic client") }) } diff --git a/local-cluster/src/local_cluster_snapshot_utils.rs b/local-cluster/src/local_cluster_snapshot_utils.rs index 259b9e1559ab69..3ba9cda04d9489 100644 --- a/local-cluster/src/local_cluster_snapshot_utils.rs +++ b/local-cluster/src/local_cluster_snapshot_utils.rs @@ -7,7 +7,7 @@ use { }, snapshot_utils, }, - solana_sdk::{client::SyncClient, commitment_config::CommitmentConfig}, + solana_sdk::commitment_config::CommitmentConfig, std::{ path::Path, thread::sleep, @@ -76,6 +76,7 @@ impl LocalCluster { .get_validator_client(self.entry_point_info.pubkey()) .unwrap(); let last_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .expect("Couldn't get slot"); diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 3bc1759af4e057..f0613a0d0e5469 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -9,7 +9,6 @@ use { solana_accounts_db::{ hardened_unpack::open_genesis_config, utils::create_accounts_run_and_snapshot_dirs, }, - solana_client::thin_client::ThinClient, solana_core::{ consensus::{ tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH, @@ -31,7 +30,7 @@ use { use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup, }, solana_local_cluster::{ - cluster::{Cluster, ClusterValidatorInfo}, + cluster::{Cluster, ClusterValidatorInfo, QuicTpuClient}, cluster_tests, integration_tests::{ copy_blocks, create_custom_leader_schedule, @@ -62,7 +61,7 @@ use { }, solana_sdk::{ account::AccountSharedData, - client::{AsyncClient, SyncClient}, + client::AsyncClient, clock::{self, Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE}, commitment_config::CommitmentConfig, epoch_schedule::{DEFAULT_SLOTS_PER_EPOCH, MINIMUM_SLOTS_PER_EPOCH}, @@ -216,17 +215,14 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - cluster.connection_cache.protocol(), - non_bootstrap_info, - ); - let tx_client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); + let tx_client = cluster.build_tpu_quic_client().unwrap(); let (blockhash, _) = tx_client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let mut transaction = system_transaction::transfer( + let transaction = system_transaction::transfer( &cluster.funding_keypair, &solana_sdk::pubkey::new_rand(), 10, @@ -244,7 +240,7 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); tx_client - .retry_transfer(&cluster.funding_keypair, &mut transaction, 5) + .send_transaction_to_upcoming_leaders(&transaction) .unwrap(); let mut got_received_notification = false; @@ -371,6 +367,7 @@ fn test_restart_node() { ticks_per_slot, slots_per_epoch, stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, ..ClusterConfig::default() }, SocketAddrSpace::Unspecified, @@ -422,11 +419,7 @@ fn test_mainnet_beta_cluster_type() { .unwrap(); assert_eq!(cluster_nodes.len(), 1); - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - cluster.connection_cache.protocol(), - &cluster.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); + let client = cluster.build_tpu_quic_client().unwrap(); // Programs that are available at epoch 0 for program_id in [ @@ -444,8 +437,10 @@ fn test_mainnet_beta_cluster_type() { ( program_id, client + .rpc_client() .get_account_with_commitment(program_id, CommitmentConfig::processed()) .unwrap() + .value ), (_program_id, Some(_)) ); @@ -457,8 +452,10 @@ fn test_mainnet_beta_cluster_type() { ( program_id, client + .rpc_client() .get_account_with_commitment(program_id, CommitmentConfig::processed()) .unwrap() + .value ), (program_id, None) ); @@ -995,6 +992,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st let validator_current_slot = cluster .get_validator_client(&validator_identity.pubkey()) .unwrap() + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap(); trace!("validator current slot: {validator_current_slot}"); @@ -1230,6 +1228,7 @@ fn test_snapshot_restart_tower() { safe_clone_config(&leader_snapshot_test_config.validator_config), safe_clone_config(&validator_snapshot_test_config.validator_config), ], + skip_warmup_slots: true, ..ClusterConfig::default() }; @@ -1373,7 +1372,10 @@ fn test_snapshots_blockstore_floor() { let target_slot = slot_floor + 40; while current_slot <= target_slot { trace!("current_slot: {}", current_slot); - if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::processed()) { + if let Ok(slot) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::processed()) + { current_slot = slot; } else { continue; @@ -1565,6 +1567,7 @@ fn test_no_voting() { .unwrap(); loop { let last_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .expect("Couldn't get slot"); if last_slot > 4 * VOTE_THRESHOLD_DEPTH as u64 { @@ -1628,6 +1631,7 @@ fn test_optimistic_confirmation_violation_detection() { let mut prev_voted_slot = 0; loop { let last_voted_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap(); if last_voted_slot > 50 { @@ -1681,6 +1685,7 @@ fn test_optimistic_confirmation_violation_detection() { let client = cluster.get_validator_client(&node_to_restart).unwrap(); loop { let last_root = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap(); if last_root > prev_voted_slot { @@ -1758,7 +1763,10 @@ fn test_validator_saves_tower() { // Wait for some votes to be generated loop { - if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::processed()) { + if let Ok(slot) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::processed()) + { trace!("current slot: {}", slot); if slot > 2 { break; @@ -1783,7 +1791,10 @@ fn test_validator_saves_tower() { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained - if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + if let Ok(root) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::root()) + { trace!("current root: {}", root); if root > 0 { break root; @@ -1812,7 +1823,10 @@ fn test_validator_saves_tower() { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained - if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + if let Ok(root) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::root()) + { trace!( "current root: {}, last_replayed_root: {}", root, @@ -1845,7 +1859,10 @@ fn test_validator_saves_tower() { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained - if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + if let Ok(root) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::root()) + { trace!("current root: {}, last tower root: {}", root, tower3_root); if root > tower3_root { break root; @@ -2651,12 +2668,7 @@ fn test_oc_bad_signatures() { ); // 3) Start up a spy to listen for and push votes to leader TPU - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - cluster.connection_cache.protocol(), - &cluster.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); - let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); + let client = cluster.build_tpu_quic_client().unwrap(); let voter_thread_sleep_ms: usize = 100; let num_votes_simulated = Arc::new(AtomicUsize::new(0)); let gossip_voter = cluster_tests::start_gossip_voter( @@ -2691,7 +2703,7 @@ fn test_oc_bad_signatures() { let vote_slots: Vec = vec![vote_slot]; let bad_authorized_signer_keypair = Keypair::new(); - let mut vote_tx = vote_transaction::new_vote_transaction( + let vote_tx = vote_transaction::new_vote_transaction( vote_slots, vote_hash, leader_vote_tx.message.recent_blockhash, @@ -2702,7 +2714,7 @@ fn test_oc_bad_signatures() { None, ); client - .retry_transfer(&cluster_funding_keypair, &mut vote_tx, 5) + .send_transaction_to_upcoming_leaders(&vote_tx) .unwrap(); num_votes_simulated.fetch_add(1, Ordering::Relaxed); @@ -2856,8 +2868,8 @@ fn setup_transfer_scan_threads( num_starting_accounts: usize, exit: Arc, scan_commitment: CommitmentConfig, - update_client_receiver: Receiver, - scan_client_receiver: Receiver, + update_client_receiver: Receiver, + scan_client_receiver: Receiver, ) -> ( JoinHandle<()>, JoinHandle<()>, @@ -2895,6 +2907,7 @@ fn setup_transfer_scan_threads( return; } let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); for i in 0..starting_keypairs_.len() { @@ -2941,6 +2954,7 @@ fn setup_transfer_scan_threads( return; } if let Some(total_scan_balance) = client + .rpc_client() .get_program_accounts_with_config( &system_program::id(), scan_commitment_config.clone(), diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 57a9b0b4033c61..ce274b8245d4d5 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -676,6 +676,23 @@ where self.exit.store(true, Ordering::Relaxed); self.leader_tpu_service.join().await; } + + pub fn get_connection_cache(&self) -> &Arc> + where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, + { + &self.connection_cache + } + + pub fn get_leader_tpu_service(&self) -> &LeaderTpuService { + &self.leader_tpu_service + } + + pub fn get_fanout_slots(&self) -> u64 { + self.fanout_slots + } } impl Drop for TpuClient { @@ -752,7 +769,7 @@ impl LeaderTpuService { self.recent_slots.estimated_current_slot() } - fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { + pub fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { let current_slot = self.recent_slots.estimated_current_slot(); self.leader_tpu_cache .read() diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 6b1e7dbe44f7ac..469b4105fe27ec 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -2,11 +2,20 @@ pub use crate::nonblocking::tpu_client::TpuSenderError; use { crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient, rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_connection_cache::connection_cache::{ - ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + solana_connection_cache::{ + client_connection::ClientConnection, + connection_cache::{ + ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + }, }, solana_rpc_client::rpc_client::RpcClient, - solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, + solana_sdk::{ + client::AsyncClient, + clock::Slot, + signature::Signature, + transaction::{Transaction, VersionedTransaction}, + transport::Result as TransportResult, + }, std::{ collections::VecDeque, net::UdpSocket, @@ -95,6 +104,32 @@ where self.invoke(self.tpu_client.try_send_transaction(transaction)) } + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// NOTE: send_wire_transaction() and try_send_transaction() above both fail in a specific case when used in LocalCluster + /// They both invoke the nonblocking TPUClient and both fail when calling "transfer_with_client()" multiple times + /// I do not full understand WHY the nonblocking TPUClient fails in this specific case. But the method defined below + /// does work although it has only been tested in LocalCluster integration tests + pub fn send_transaction_to_upcoming_leaders( + &self, + transaction: &Transaction, + ) -> TransportResult<()> { + let wire_transaction = + bincode::serialize(&transaction).expect("should serialize transaction"); + + let leaders = self + .tpu_client + .get_leader_tpu_service() + .leader_tpu_sockets(self.tpu_client.get_fanout_slots()); + + for tpu_address in &leaders { + let cache = self.tpu_client.get_connection_cache(); + let conn = cache.get_connection(tpu_address); + conn.send_data_async(wire_transaction.clone())?; + } + + Ok(()) + } + /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according /// to fanout size /// Returns the last error if all sends fail @@ -115,6 +150,16 @@ where self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) } + pub fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> TransportResult<()> { + self.invoke( + self.tpu_client + .try_send_wire_transaction_batch(wire_transactions), + ) + } + /// Create a new client that disconnects when dropped pub fn new( name: &'static str, @@ -187,6 +232,37 @@ where } } +// Methods below are required for calls to client.async_transfer() +// where client is of type TpuClient +impl AsyncClient for TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ + fn async_send_versioned_transaction( + &self, + transaction: VersionedTransaction, + ) -> TransportResult { + let wire_transaction = + bincode::serialize(&transaction).expect("serialize Transaction in send_batch"); + self.send_wire_transaction(wire_transaction); + Ok(transaction.signatures[0]) + } + + fn async_send_versioned_transaction_batch( + &self, + batch: Vec, + ) -> TransportResult<()> { + let buffers = batch + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + self.try_send_wire_transaction_batch(buffers)?; + Ok(()) + } +} + // 48 chosen because it's unlikely that 12 leaders in a row will miss their slots const MAX_SLOT_SKIP_DISTANCE: u64 = 48;