diff --git a/bench-tps/src/bench_tps_client/tpu_client.rs b/bench-tps/src/bench_tps_client/tpu_client.rs index 53b0102a00f11b..a55326ae0f2b0b 100644 --- a/bench-tps/src/bench_tps_client/tpu_client.rs +++ b/bench-tps/src/bench_tps_client/tpu_client.rs @@ -14,9 +14,7 @@ impl BenchTpsClient for TpuClient { Ok(signature) } fn send_batch(&self, transactions: Vec) -> Result<()> { - for transaction in transactions { - BenchTpsClient::send_transaction(self, transaction)?; - } + self.try_send_transaction_batch(&transactions)?; Ok(()) } fn get_latest_blockhash(&self) -> Result { diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs index 4dd39c7685407b..deb34e97114b56 100644 --- a/client/src/nonblocking/tpu_client.rs +++ b/client/src/nonblocking/tpu_client.rs @@ -77,6 +77,15 @@ async fn send_wire_transaction_to_addr( conn.send_wire_transaction(wire_transaction.clone()).await } +async fn send_wire_transaction_batch_to_addr( + connection_cache: &ConnectionCache, + addr: &SocketAddr, + wire_transactions: &[Vec], +) -> TransportResult<()> { + let conn = connection_cache.get_nonblocking_connection(addr); + conn.send_wire_transaction_batch(wire_transactions).await +} + impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size @@ -140,6 +149,50 @@ impl TpuClient { } } + /// Send a batch of wire transactions to the current and upcoming leader TPUs according to + /// fanout size + /// Returns the last error if all sends fail + pub async fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> TransportResult<()> { + let leaders = self + .leader_tpu_service + .leader_tpu_sockets(self.fanout_slots); + let futures = leaders + .iter() + .map(|addr| { + send_wire_transaction_batch_to_addr( + &self.connection_cache, + addr, + &wire_transactions, + ) + }) + .collect::>(); + let results: Vec> = join_all(futures).await; + + let mut last_error: Option = None; + let mut some_success = false; + for result in results { + if let Err(e) = result { + if last_error.is_none() { + last_error = Some(e); + } + } else { + some_success = true; + } + } + if !some_success { + Err(if let Some(err) = last_error { + err + } else { + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() + }) + } else { + Ok(()) + } + } + /// Create a new client that disconnects when dropped pub async fn new( rpc_client: Arc, diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs new file mode 100644 index 00000000000000..7ce01f671d7e6e --- /dev/null +++ b/tpu-client/src/tpu_client.rs @@ -0,0 +1,253 @@ +pub use crate::nonblocking::tpu_client::TpuSenderError; +use { + crate::{ + connection_cache::ConnectionCache, + nonblocking::tpu_client::TpuClient as NonblockingTpuClient, + }, + rayon::iter::{IntoParallelIterator, ParallelIterator}, + solana_rpc_client::rpc_client::RpcClient, + solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, + std::{ + collections::VecDeque, + net::UdpSocket, + sync::{Arc, RwLock}, + }, +}; +#[cfg(feature = "spinner")] +use { + solana_sdk::{message::Message, signers::Signers, transaction::TransactionError}, + tokio::time::Duration, +}; + +type Result = std::result::Result; + +/// Default number of slots used to build TPU socket fanout set +pub const DEFAULT_FANOUT_SLOTS: u64 = 12; + +/// Maximum number of slots used to build TPU socket fanout set +pub const MAX_FANOUT_SLOTS: u64 = 100; + +/// Send at ~100 TPS +#[cfg(feature = "spinner")] +pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10); +/// Retry batch send after 4 seconds +#[cfg(feature = "spinner")] +pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4); + +/// Config params for `TpuClient` +#[derive(Clone, Debug)] +pub struct TpuClientConfig { + /// The range of upcoming slots to include when determining which + /// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`) + pub fanout_slots: u64, +} + +impl Default for TpuClientConfig { + fn default() -> Self { + Self { + fanout_slots: DEFAULT_FANOUT_SLOTS, + } + } +} + +/// Client which sends transactions directly to the current leader's TPU port over UDP. +/// The client uses RPC to determine the current leader and fetch node contact info +pub struct TpuClient { + _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket + //todo: get rid of this field + rpc_client: Arc, + tpu_client: Arc, +} + +impl TpuClient { + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size + pub fn send_transaction(&self, transaction: &Transaction) -> bool { + self.invoke(self.tpu_client.send_transaction(transaction)) + } + + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size + pub fn send_wire_transaction(&self, wire_transaction: Vec) -> bool { + self.invoke(self.tpu_client.send_wire_transaction(wire_transaction)) + } + + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size + /// Returns the last error if all sends fail + pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> { + self.invoke(self.tpu_client.try_send_transaction(transaction)) + } + + /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according + /// to fanout size + /// Returns the last error if all sends fail + pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { + let wire_transactions = transactions + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + self.invoke( + self.tpu_client + .try_send_wire_transaction_batch(wire_transactions), + ) + } + + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size + /// Returns the last error if all sends fail + pub fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { + self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) + } + + /// Create a new client that disconnects when dropped + pub fn new( + rpc_client: Arc, + websocket_url: &str, + config: TpuClientConfig, + ) -> Result { + let create_tpu_client = + NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config); + let tpu_client = + tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; + + Ok(Self { + _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + rpc_client, + tpu_client: Arc::new(tpu_client), + }) + } + + /// Create a new client that disconnects when dropped + pub fn new_with_connection_cache( + rpc_client: Arc, + websocket_url: &str, + config: TpuClientConfig, + connection_cache: Arc, + ) -> Result { + let create_tpu_client = NonblockingTpuClient::new_with_connection_cache( + rpc_client.get_inner_client().clone(), + websocket_url, + config, + connection_cache, + ); + let tpu_client = + tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; + + Ok(Self { + _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + rpc_client, + tpu_client: Arc::new(tpu_client), + }) + } + + #[cfg(feature = "spinner")] + pub fn send_and_confirm_messages_with_spinner( + &self, + messages: &[Message], + signers: &T, + ) -> Result>> { + self.invoke( + self.tpu_client + .send_and_confirm_messages_with_spinner(messages, signers), + ) + } + + pub fn rpc_client(&self) -> &RpcClient { + &self.rpc_client + } + + fn invoke>(&self, f: F) -> T { + // `block_on()` panics if called within an asynchronous execution context. Whereas + // `block_in_place()` only panics if called from a current_thread runtime, which is the + // lesser evil. + tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f)) + } +} + +// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots +const MAX_SLOT_SKIP_DISTANCE: u64 = 48; + +#[derive(Clone, Debug)] +pub(crate) struct RecentLeaderSlots(Arc>>); +impl RecentLeaderSlots { + pub(crate) fn new(current_slot: Slot) -> Self { + let mut recent_slots = VecDeque::new(); + recent_slots.push_back(current_slot); + Self(Arc::new(RwLock::new(recent_slots))) + } + + pub(crate) fn record_slot(&self, current_slot: Slot) { + let mut recent_slots = self.0.write().unwrap(); + recent_slots.push_back(current_slot); + // 12 recent slots should be large enough to avoid a misbehaving + // validator from affecting the median recent slot + while recent_slots.len() > 12 { + recent_slots.pop_front(); + } + } + + // Estimate the current slot from recent slot notifications. + pub(crate) fn estimated_current_slot(&self) -> Slot { + let mut recent_slots: Vec = self.0.read().unwrap().iter().cloned().collect(); + assert!(!recent_slots.is_empty()); + recent_slots.sort_unstable(); + + // Validators can broadcast invalid blocks that are far in the future + // so check if the current slot is in line with the recent progression. + let max_index = recent_slots.len() - 1; + let median_index = max_index / 2; + let median_recent_slot = recent_slots[median_index]; + let expected_current_slot = median_recent_slot + (max_index - median_index) as u64; + let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE; + + // Return the highest slot that doesn't exceed what we believe is a + // reasonable slot. + recent_slots + .into_iter() + .rev() + .find(|slot| *slot <= max_reasonable_current_slot) + .unwrap() + } +} + +#[cfg(test)] +impl From> for RecentLeaderSlots { + fn from(recent_slots: Vec) -> Self { + assert!(!recent_slots.is_empty()); + Self(Arc::new(RwLock::new(recent_slots.into_iter().collect()))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) { + assert_eq!(recent_slots.estimated_current_slot(), expected_slot); + } + + #[test] + fn test_recent_leader_slots() { + assert_slot(RecentLeaderSlots::new(0), 0); + + let mut recent_slots: Vec = (1..=12).collect(); + assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12); + + recent_slots.reverse(); + assert_slot(RecentLeaderSlots::from(recent_slots), 12); + + assert_slot( + RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]), + 1 + MAX_SLOT_SKIP_DISTANCE, + ); + assert_slot( + RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]), + 0, + ); + + assert_slot(RecentLeaderSlots::from(vec![1]), 1); + assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1); + assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2); + assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3); + assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3); + } +}