From bb7acb22beb3ff99795562490d63c6d1f190d63a Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 1 Sep 2022 10:32:23 -0700 Subject: [PATCH 1/2] Use batch send in bench-tps to send transactions (#27527) * Use batch send in bench-tps to send transactions * serialize using par iter (cherry picked from commit 49df1c47e8188218772ff460aaa5dc1e621b02ef) # Conflicts: # tpu-client/src/tpu_client.rs --- bench-tps/src/bench_tps_client/tpu_client.rs | 4 +- client/src/nonblocking/tpu_client.rs | 53 ++++ tpu-client/src/tpu_client.rs | 253 +++++++++++++++++++ 3 files changed, 307 insertions(+), 3 deletions(-) create mode 100644 tpu-client/src/tpu_client.rs diff --git a/bench-tps/src/bench_tps_client/tpu_client.rs b/bench-tps/src/bench_tps_client/tpu_client.rs index 53b0102a00f11b..a55326ae0f2b0b 100644 --- a/bench-tps/src/bench_tps_client/tpu_client.rs +++ b/bench-tps/src/bench_tps_client/tpu_client.rs @@ -14,9 +14,7 @@ impl BenchTpsClient for TpuClient { Ok(signature) } fn send_batch(&self, transactions: Vec) -> Result<()> { - for transaction in transactions { - BenchTpsClient::send_transaction(self, transaction)?; - } + self.try_send_transaction_batch(&transactions)?; Ok(()) } fn get_latest_blockhash(&self) -> Result { diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs index 4dd39c7685407b..deb34e97114b56 100644 --- a/client/src/nonblocking/tpu_client.rs +++ b/client/src/nonblocking/tpu_client.rs @@ -77,6 +77,15 @@ async fn send_wire_transaction_to_addr( conn.send_wire_transaction(wire_transaction.clone()).await } +async fn send_wire_transaction_batch_to_addr( + connection_cache: &ConnectionCache, + addr: &SocketAddr, + wire_transactions: &[Vec], +) -> TransportResult<()> { + let conn = connection_cache.get_nonblocking_connection(addr); + conn.send_wire_transaction_batch(wire_transactions).await +} + impl TpuClient { /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size @@ -140,6 +149,50 @@ impl TpuClient { } } + /// Send a batch of wire transactions to the current and upcoming leader TPUs according to + /// fanout size + /// Returns the last error if all sends fail + pub async fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> TransportResult<()> { + let leaders = self + .leader_tpu_service + .leader_tpu_sockets(self.fanout_slots); + let futures = leaders + .iter() + .map(|addr| { + send_wire_transaction_batch_to_addr( + &self.connection_cache, + addr, + &wire_transactions, + ) + }) + .collect::>(); + let results: Vec> = join_all(futures).await; + + let mut last_error: Option = None; + let mut some_success = false; + for result in results { + if let Err(e) = result { + if last_error.is_none() { + last_error = Some(e); + } + } else { + some_success = true; + } + } + if !some_success { + Err(if let Some(err) = last_error { + err + } else { + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() + }) + } else { + Ok(()) + } + } + /// Create a new client that disconnects when dropped pub async fn new( rpc_client: Arc, diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs new file mode 100644 index 00000000000000..7ce01f671d7e6e --- /dev/null +++ b/tpu-client/src/tpu_client.rs @@ -0,0 +1,253 @@ +pub use crate::nonblocking::tpu_client::TpuSenderError; +use { + crate::{ + connection_cache::ConnectionCache, + nonblocking::tpu_client::TpuClient as NonblockingTpuClient, + }, + rayon::iter::{IntoParallelIterator, ParallelIterator}, + solana_rpc_client::rpc_client::RpcClient, + solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, + std::{ + collections::VecDeque, + net::UdpSocket, + sync::{Arc, RwLock}, + }, +}; +#[cfg(feature = "spinner")] +use { + solana_sdk::{message::Message, signers::Signers, transaction::TransactionError}, + tokio::time::Duration, +}; + +type Result = std::result::Result; + +/// Default number of slots used to build TPU socket fanout set +pub const DEFAULT_FANOUT_SLOTS: u64 = 12; + +/// Maximum number of slots used to build TPU socket fanout set +pub const MAX_FANOUT_SLOTS: u64 = 100; + +/// Send at ~100 TPS +#[cfg(feature = "spinner")] +pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10); +/// Retry batch send after 4 seconds +#[cfg(feature = "spinner")] +pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4); + +/// Config params for `TpuClient` +#[derive(Clone, Debug)] +pub struct TpuClientConfig { + /// The range of upcoming slots to include when determining which + /// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`) + pub fanout_slots: u64, +} + +impl Default for TpuClientConfig { + fn default() -> Self { + Self { + fanout_slots: DEFAULT_FANOUT_SLOTS, + } + } +} + +/// Client which sends transactions directly to the current leader's TPU port over UDP. +/// The client uses RPC to determine the current leader and fetch node contact info +pub struct TpuClient { + _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket + //todo: get rid of this field + rpc_client: Arc, + tpu_client: Arc, +} + +impl TpuClient { + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size + pub fn send_transaction(&self, transaction: &Transaction) -> bool { + self.invoke(self.tpu_client.send_transaction(transaction)) + } + + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size + pub fn send_wire_transaction(&self, wire_transaction: Vec) -> bool { + self.invoke(self.tpu_client.send_wire_transaction(wire_transaction)) + } + + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size + /// Returns the last error if all sends fail + pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> { + self.invoke(self.tpu_client.try_send_transaction(transaction)) + } + + /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according + /// to fanout size + /// Returns the last error if all sends fail + pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { + let wire_transactions = transactions + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + self.invoke( + self.tpu_client + .try_send_wire_transaction_batch(wire_transactions), + ) + } + + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size + /// Returns the last error if all sends fail + pub fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { + self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) + } + + /// Create a new client that disconnects when dropped + pub fn new( + rpc_client: Arc, + websocket_url: &str, + config: TpuClientConfig, + ) -> Result { + let create_tpu_client = + NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config); + let tpu_client = + tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; + + Ok(Self { + _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + rpc_client, + tpu_client: Arc::new(tpu_client), + }) + } + + /// Create a new client that disconnects when dropped + pub fn new_with_connection_cache( + rpc_client: Arc, + websocket_url: &str, + config: TpuClientConfig, + connection_cache: Arc, + ) -> Result { + let create_tpu_client = NonblockingTpuClient::new_with_connection_cache( + rpc_client.get_inner_client().clone(), + websocket_url, + config, + connection_cache, + ); + let tpu_client = + tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; + + Ok(Self { + _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + rpc_client, + tpu_client: Arc::new(tpu_client), + }) + } + + #[cfg(feature = "spinner")] + pub fn send_and_confirm_messages_with_spinner( + &self, + messages: &[Message], + signers: &T, + ) -> Result>> { + self.invoke( + self.tpu_client + .send_and_confirm_messages_with_spinner(messages, signers), + ) + } + + pub fn rpc_client(&self) -> &RpcClient { + &self.rpc_client + } + + fn invoke>(&self, f: F) -> T { + // `block_on()` panics if called within an asynchronous execution context. Whereas + // `block_in_place()` only panics if called from a current_thread runtime, which is the + // lesser evil. + tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f)) + } +} + +// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots +const MAX_SLOT_SKIP_DISTANCE: u64 = 48; + +#[derive(Clone, Debug)] +pub(crate) struct RecentLeaderSlots(Arc>>); +impl RecentLeaderSlots { + pub(crate) fn new(current_slot: Slot) -> Self { + let mut recent_slots = VecDeque::new(); + recent_slots.push_back(current_slot); + Self(Arc::new(RwLock::new(recent_slots))) + } + + pub(crate) fn record_slot(&self, current_slot: Slot) { + let mut recent_slots = self.0.write().unwrap(); + recent_slots.push_back(current_slot); + // 12 recent slots should be large enough to avoid a misbehaving + // validator from affecting the median recent slot + while recent_slots.len() > 12 { + recent_slots.pop_front(); + } + } + + // Estimate the current slot from recent slot notifications. + pub(crate) fn estimated_current_slot(&self) -> Slot { + let mut recent_slots: Vec = self.0.read().unwrap().iter().cloned().collect(); + assert!(!recent_slots.is_empty()); + recent_slots.sort_unstable(); + + // Validators can broadcast invalid blocks that are far in the future + // so check if the current slot is in line with the recent progression. + let max_index = recent_slots.len() - 1; + let median_index = max_index / 2; + let median_recent_slot = recent_slots[median_index]; + let expected_current_slot = median_recent_slot + (max_index - median_index) as u64; + let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE; + + // Return the highest slot that doesn't exceed what we believe is a + // reasonable slot. + recent_slots + .into_iter() + .rev() + .find(|slot| *slot <= max_reasonable_current_slot) + .unwrap() + } +} + +#[cfg(test)] +impl From> for RecentLeaderSlots { + fn from(recent_slots: Vec) -> Self { + assert!(!recent_slots.is_empty()); + Self(Arc::new(RwLock::new(recent_slots.into_iter().collect()))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) { + assert_eq!(recent_slots.estimated_current_slot(), expected_slot); + } + + #[test] + fn test_recent_leader_slots() { + assert_slot(RecentLeaderSlots::new(0), 0); + + let mut recent_slots: Vec = (1..=12).collect(); + assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12); + + recent_slots.reverse(); + assert_slot(RecentLeaderSlots::from(recent_slots), 12); + + assert_slot( + RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]), + 1 + MAX_SLOT_SKIP_DISTANCE, + ); + assert_slot( + RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]), + 0, + ); + + assert_slot(RecentLeaderSlots::from(vec![1]), 1); + assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1); + assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2); + assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3); + assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3); + } +} From 1a2c8fb4967c1a9e2f2fca3cf7d2d2904ff23cc4 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 1 Sep 2022 14:32:57 -0700 Subject: [PATCH 2/2] fix merge issues --- client/src/tpu_client.rs | 45 +++++++ tpu-client/src/tpu_client.rs | 253 ----------------------------------- 2 files changed, 45 insertions(+), 253 deletions(-) delete mode 100644 tpu-client/src/tpu_client.rs diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 5e94098e2624b9..f47c29e609c5aa 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -11,6 +11,7 @@ use { }, bincode::serialize, log::*, + rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_sdk::{ clock::Slot, commitment_config::CommitmentConfig, @@ -111,6 +112,17 @@ impl TpuClient { self.try_send_wire_transaction(wire_transaction) } + /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according + /// to fanout size + /// Returns the last error if all sends fail + pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { + let wire_transactions = transactions + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + self.try_send_wire_transaction_batch(wire_transactions) + } + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Returns the last error if all sends fail fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { @@ -140,6 +152,39 @@ impl TpuClient { } } + /// Send a batch of wire transactions to the current and upcoming leader TPUs according to + /// fanout size + /// Returns the last error if all sends fail + fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> TransportResult<()> { + let mut last_error: Option = None; + let mut some_success = false; + + for tpu_address in self + .leader_tpu_service + .leader_tpu_sockets(self.fanout_slots) + { + let conn = self.connection_cache.get_connection(&tpu_address); + let result = conn.send_wire_transaction_batch_async(wire_transactions.clone()); + if let Err(err) = result { + last_error = Some(err); + } else { + some_success = true; + } + } + if !some_success { + Err(if let Some(err) = last_error { + err + } else { + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() + }) + } else { + Ok(()) + } + } + /// Create a new client that disconnects when dropped pub fn new( rpc_client: Arc, diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs deleted file mode 100644 index 7ce01f671d7e6e..00000000000000 --- a/tpu-client/src/tpu_client.rs +++ /dev/null @@ -1,253 +0,0 @@ -pub use crate::nonblocking::tpu_client::TpuSenderError; -use { - crate::{ - connection_cache::ConnectionCache, - nonblocking::tpu_client::TpuClient as NonblockingTpuClient, - }, - rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_rpc_client::rpc_client::RpcClient, - solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, - std::{ - collections::VecDeque, - net::UdpSocket, - sync::{Arc, RwLock}, - }, -}; -#[cfg(feature = "spinner")] -use { - solana_sdk::{message::Message, signers::Signers, transaction::TransactionError}, - tokio::time::Duration, -}; - -type Result = std::result::Result; - -/// Default number of slots used to build TPU socket fanout set -pub const DEFAULT_FANOUT_SLOTS: u64 = 12; - -/// Maximum number of slots used to build TPU socket fanout set -pub const MAX_FANOUT_SLOTS: u64 = 100; - -/// Send at ~100 TPS -#[cfg(feature = "spinner")] -pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10); -/// Retry batch send after 4 seconds -#[cfg(feature = "spinner")] -pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4); - -/// Config params for `TpuClient` -#[derive(Clone, Debug)] -pub struct TpuClientConfig { - /// The range of upcoming slots to include when determining which - /// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`) - pub fanout_slots: u64, -} - -impl Default for TpuClientConfig { - fn default() -> Self { - Self { - fanout_slots: DEFAULT_FANOUT_SLOTS, - } - } -} - -/// Client which sends transactions directly to the current leader's TPU port over UDP. -/// The client uses RPC to determine the current leader and fetch node contact info -pub struct TpuClient { - _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket - //todo: get rid of this field - rpc_client: Arc, - tpu_client: Arc, -} - -impl TpuClient { - /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout - /// size - pub fn send_transaction(&self, transaction: &Transaction) -> bool { - self.invoke(self.tpu_client.send_transaction(transaction)) - } - - /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size - pub fn send_wire_transaction(&self, wire_transaction: Vec) -> bool { - self.invoke(self.tpu_client.send_wire_transaction(wire_transaction)) - } - - /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout - /// size - /// Returns the last error if all sends fail - pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> { - self.invoke(self.tpu_client.try_send_transaction(transaction)) - } - - /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according - /// to fanout size - /// Returns the last error if all sends fail - pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { - let wire_transactions = transactions - .into_par_iter() - .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) - .collect::>(); - self.invoke( - self.tpu_client - .try_send_wire_transaction_batch(wire_transactions), - ) - } - - /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size - /// Returns the last error if all sends fail - pub fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { - self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) - } - - /// Create a new client that disconnects when dropped - pub fn new( - rpc_client: Arc, - websocket_url: &str, - config: TpuClientConfig, - ) -> Result { - let create_tpu_client = - NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config); - let tpu_client = - tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; - - Ok(Self { - _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), - rpc_client, - tpu_client: Arc::new(tpu_client), - }) - } - - /// Create a new client that disconnects when dropped - pub fn new_with_connection_cache( - rpc_client: Arc, - websocket_url: &str, - config: TpuClientConfig, - connection_cache: Arc, - ) -> Result { - let create_tpu_client = NonblockingTpuClient::new_with_connection_cache( - rpc_client.get_inner_client().clone(), - websocket_url, - config, - connection_cache, - ); - let tpu_client = - tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; - - Ok(Self { - _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), - rpc_client, - tpu_client: Arc::new(tpu_client), - }) - } - - #[cfg(feature = "spinner")] - pub fn send_and_confirm_messages_with_spinner( - &self, - messages: &[Message], - signers: &T, - ) -> Result>> { - self.invoke( - self.tpu_client - .send_and_confirm_messages_with_spinner(messages, signers), - ) - } - - pub fn rpc_client(&self) -> &RpcClient { - &self.rpc_client - } - - fn invoke>(&self, f: F) -> T { - // `block_on()` panics if called within an asynchronous execution context. Whereas - // `block_in_place()` only panics if called from a current_thread runtime, which is the - // lesser evil. - tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f)) - } -} - -// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots -const MAX_SLOT_SKIP_DISTANCE: u64 = 48; - -#[derive(Clone, Debug)] -pub(crate) struct RecentLeaderSlots(Arc>>); -impl RecentLeaderSlots { - pub(crate) fn new(current_slot: Slot) -> Self { - let mut recent_slots = VecDeque::new(); - recent_slots.push_back(current_slot); - Self(Arc::new(RwLock::new(recent_slots))) - } - - pub(crate) fn record_slot(&self, current_slot: Slot) { - let mut recent_slots = self.0.write().unwrap(); - recent_slots.push_back(current_slot); - // 12 recent slots should be large enough to avoid a misbehaving - // validator from affecting the median recent slot - while recent_slots.len() > 12 { - recent_slots.pop_front(); - } - } - - // Estimate the current slot from recent slot notifications. - pub(crate) fn estimated_current_slot(&self) -> Slot { - let mut recent_slots: Vec = self.0.read().unwrap().iter().cloned().collect(); - assert!(!recent_slots.is_empty()); - recent_slots.sort_unstable(); - - // Validators can broadcast invalid blocks that are far in the future - // so check if the current slot is in line with the recent progression. - let max_index = recent_slots.len() - 1; - let median_index = max_index / 2; - let median_recent_slot = recent_slots[median_index]; - let expected_current_slot = median_recent_slot + (max_index - median_index) as u64; - let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE; - - // Return the highest slot that doesn't exceed what we believe is a - // reasonable slot. - recent_slots - .into_iter() - .rev() - .find(|slot| *slot <= max_reasonable_current_slot) - .unwrap() - } -} - -#[cfg(test)] -impl From> for RecentLeaderSlots { - fn from(recent_slots: Vec) -> Self { - assert!(!recent_slots.is_empty()); - Self(Arc::new(RwLock::new(recent_slots.into_iter().collect()))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) { - assert_eq!(recent_slots.estimated_current_slot(), expected_slot); - } - - #[test] - fn test_recent_leader_slots() { - assert_slot(RecentLeaderSlots::new(0), 0); - - let mut recent_slots: Vec = (1..=12).collect(); - assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12); - - recent_slots.reverse(); - assert_slot(RecentLeaderSlots::from(recent_slots), 12); - - assert_slot( - RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]), - 1 + MAX_SLOT_SKIP_DISTANCE, - ); - assert_slot( - RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]), - 0, - ); - - assert_slot(RecentLeaderSlots::from(vec![1]), 1); - assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1); - assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2); - assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3); - assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3); - } -}