Skip to content

Commit

Permalink
v2.1: add fanout to tpu-client-next (backport of #3478) (#3523)
Browse files Browse the repository at this point in the history
* add fanout to tpu-client-next (#3478)

* Add tpu-client-next to the root Cargo.toml

* Change LeaderUpdater trait to accept mut self

* add fanout to the tpu-client-next

* Shutdown in separate task

* Use try_send instead, minor impromenets

* fix LeaderUpdaterError traits

* improve lifetimes in split_leaders

Co-authored-by: Illia Bobyr <[email protected]>

* address PR comments

* create connections in advance

* removed lookahead_slots

---------

Co-authored-by: Illia Bobyr <[email protected]>
(cherry picked from commit 2a618b5)

# Conflicts:
#	Cargo.toml

* resolve the conflict

---------

Co-authored-by: kirill lykov <[email protected]>
  • Loading branch information
mergify[bot] and KirillLykov authored Nov 25, 2024
1 parent 970606e commit 92f639b
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 224 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ solana-test-validator = { path = "test-validator", version = "=2.1.3" }
solana-thin-client = { path = "thin-client", version = "=2.1.3" }
solana-transaction-error = { path = "sdk/transaction-error", version = "=2.1.3" }
solana-tpu-client = { path = "tpu-client", version = "=2.1.3", default-features = false }
solana-tpu-client-next = { path = "tpu-client-next", version = "=2.1.3" }
solana-transaction-status = { path = "transaction-status", version = "=2.1.3" }
solana-transaction-status-client-types = { path = "transaction-status-client-types", version = "=2.1.3" }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.1.3" }
Expand Down
26 changes: 13 additions & 13 deletions tpu-client-next/src/connection_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS},
timing::timestamp,
},
std::net::SocketAddr,
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
},
tokio::{
sync::mpsc,
time::{sleep, Duration},
Expand Down Expand Up @@ -72,7 +75,7 @@ pub(crate) struct ConnectionWorker {
connection: ConnectionState,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: SendTransactionStats,
send_txs_stats: Arc<SendTransactionStats>,
cancel: CancellationToken,
}

Expand All @@ -93,6 +96,7 @@ impl ConnectionWorker {
transactions_receiver: mpsc::Receiver<TransactionBatch>,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: Arc<SendTransactionStats>,
) -> (Self, CancellationToken) {
let cancel = CancellationToken::new();

Expand All @@ -103,7 +107,7 @@ impl ConnectionWorker {
connection: ConnectionState::NotSetup,
skip_check_transaction_age,
max_reconnect_attempts,
send_txs_stats: SendTransactionStats::default(),
send_txs_stats,
cancel: cancel.clone(),
};

Expand Down Expand Up @@ -155,11 +159,6 @@ impl ConnectionWorker {
}
}

/// Retrieves the statistics for transactions sent by this worker.
pub fn transaction_stats(&self) -> &SendTransactionStats {
&self.send_txs_stats
}

/// Sends a batch of transactions using the provided `connection`.
///
/// Each transaction in the batch is sent over the QUIC streams one at the
Expand All @@ -183,11 +182,12 @@ impl ConnectionWorker {

if let Err(error) = result {
trace!("Failed to send transaction over stream with error: {error}.");
record_error(error, &mut self.send_txs_stats);
record_error(error, &self.send_txs_stats);
self.connection = ConnectionState::Retry(0);
} else {
self.send_txs_stats.successfully_sent =
self.send_txs_stats.successfully_sent.saturating_add(1);
self.send_txs_stats
.successfully_sent
.fetch_add(1, Ordering::Relaxed);
}
}
measure_send.stop();
Expand Down Expand Up @@ -221,14 +221,14 @@ impl ConnectionWorker {
}
Err(err) => {
warn!("Connection error {}: {}", self.peer, err);
record_error(err.into(), &mut self.send_txs_stats);
record_error(err.into(), &self.send_txs_stats);
self.connection =
ConnectionState::Retry(max_retries_attempt.saturating_add(1));
}
}
}
Err(connecting_error) => {
record_error(connecting_error.clone().into(), &mut self.send_txs_stats);
record_error(connecting_error.clone().into(), &self.send_txs_stats);
match connecting_error {
ConnectError::EndpointStopping => {
debug!("Endpoint stopping, exit connection worker.");
Expand Down
122 changes: 80 additions & 42 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use {
create_client_config, create_client_endpoint, QuicClientCertificate, QuicError,
},
transaction_batch::TransactionBatch,
workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError},
workers_cache::{maybe_shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError},
SendTransactionStats,
},
log::*,
quinn::Endpoint,
Expand Down Expand Up @@ -39,6 +40,25 @@ pub enum ConnectionWorkersSchedulerError {
LeaderReceiverDropped,
}

/// [`Fanout`] is a configuration struct that specifies how many leaders should
/// be targeted when sending transactions and connecting.
///
/// Note, that the unit is number of leaders per
/// [`NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is
/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per
/// consecutive leader slots are [L1, L1, L2], so there are 3 of them.
///
/// The idea of having a separate `connect` parameter is to create a set of
/// nodes to connect to in advance in order to hide the latency of opening new
/// connection. Hence, `connect` must be greater or equal to `send`
pub struct Fanout {
/// The number of leaders to target for sending transactions.
pub send: usize,

/// The number of leaders to target for establishing connections.
pub connect: usize,
}

/// Configuration for the [`ConnectionWorkersScheduler`].
///
/// This struct holds the necessary settings to initialize and manage connection
Expand Down Expand Up @@ -66,10 +86,8 @@ pub struct ConnectionWorkersSchedulerConfig {
/// connection failure.
pub max_reconnect_attempts: usize,

/// The number of slots to look ahead during the leader estimation
/// procedure. Determines how far into the future leaders are estimated,
/// allowing connections to be established with those leaders in advance.
pub lookahead_slots: u64,
/// Configures the number of leaders to connect to and send transactions to.
pub leaders_fanout: Fanout,
}

impl ConnectionWorkersScheduler {
Expand All @@ -90,7 +108,7 @@ impl ConnectionWorkersScheduler {
skip_check_transaction_age,
worker_channel_size,
max_reconnect_attempts,
lookahead_slots,
leaders_fanout,
}: ConnectionWorkersSchedulerConfig,
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
Expand All @@ -99,6 +117,7 @@ impl ConnectionWorkersScheduler {
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();

loop {
let transaction_batch = tokio::select! {
Expand All @@ -114,50 +133,49 @@ impl ConnectionWorkersScheduler {
break;
}
};
let updated_leaders = leader_updater.next_leaders(lookahead_slots);
let new_leader = &updated_leaders[0];
let future_leaders = &updated_leaders[1..];
if !workers.contains(new_leader) {
debug!("No existing workers for {new_leader:?}, starting a new one.");
let worker = Self::spawn_worker(
&endpoint,
new_leader,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
);
workers.push(*new_leader, worker).await;
}

tokio::select! {
send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
},
() = cancel.cancelled() => {
debug!("Cancelled: Shutting down");
break;
}
};
let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect);

// Regardless of who is leader, add future leaders to the cache to
// hide the latency of opening the connection.
for peer in future_leaders {
let (fanout_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);
// add future leaders to the cache to hide the latency of opening
// the connection.
for peer in connect_leaders {
if !workers.contains(peer) {
let stats = send_stats_per_addr.entry(peer.ip()).or_default();
let worker = Self::spawn_worker(
&endpoint,
peer,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
stats.clone(),
);
workers.push(*peer, worker).await;
maybe_shutdown_worker(workers.push(*peer, worker));
}
}

for new_leader in fanout_leaders {
if !workers.contains(new_leader) {
warn!("No existing worker for {new_leader:?}, skip sending to this leader.");
continue;
}

let send_res =
workers.try_send_transactions_to_address(new_leader, transaction_batch.clone());
match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(WorkersCacheError::ReceiverDropped) => {
// Remove the worker from the cache, if the peer has disconnected.
maybe_shutdown_worker(workers.pop(*new_leader));
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
}
}
}
Expand All @@ -166,7 +184,7 @@ impl ConnectionWorkersScheduler {

endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
Ok(workers.transaction_stats().clone())
Ok(send_stats_per_addr)
}

/// Sets up the QUIC endpoint for the scheduler to handle connections.
Expand All @@ -191,6 +209,7 @@ impl ConnectionWorkersScheduler {
worker_channel_size: usize,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
stats: Arc<SendTransactionStats>,
) -> WorkerInfo {
let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size);
let endpoint = endpoint.clone();
Expand All @@ -202,12 +221,31 @@ impl ConnectionWorkersScheduler {
txs_receiver,
skip_check_transaction_age,
max_reconnect_attempts,
stats,
);
let handle = tokio::spawn(async move {
worker.run().await;
worker.transaction_stats().clone()
});

WorkerInfo::new(txs_sender, handle, cancel)
}
}

/// Splits `leaders` into two slices based on the `fanout` configuration:
/// * the first slice contains the leaders to which transactions will be sent,
/// * the second vector contains the leaders, used to warm up connections. This
/// slice includes the the first set.
fn split_leaders<'leaders>(
leaders: &'leaders [SocketAddr],
fanout: &Fanout,
) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) {
let Fanout { send, connect } = fanout;
assert!(send <= connect);
let send_count = (*send).min(leaders.len());
let connect_count = (*connect).min(leaders.len());

let send_slice = &leaders[..send_count];
let connect_slice = &leaders[..connect_count];

(send_slice, connect_slice)
}
15 changes: 11 additions & 4 deletions tpu-client-next/src/leader_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
log::*,
solana_connection_cache::connection_cache::Protocol,
solana_rpc_client::nonblocking::rpc_client::RpcClient,
solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS,
solana_tpu_client::nonblocking::tpu_client::LeaderTpuService,
std::{
fmt,
Expand All @@ -22,26 +23,30 @@ use {
Arc,
},
},
thiserror::Error,
};

/// [`LeaderUpdater`] trait abstracts out functionality required for the
/// [`ConnectionWorkersScheduler`](crate::ConnectionWorkersScheduler) to
/// identify next leaders to send transactions to.
#[async_trait]
pub trait LeaderUpdater: Send {
/// Returns next unique leaders for the next `lookahead_slots` starting from
/// Returns next leaders for the next `lookahead_leaders` starting from
/// current estimated slot.
///
/// Leaders are returned per [`NUM_CONSECUTIVE_LEADER_SLOTS`] to avoid unnecessary repetition.
///
/// If the current leader estimation is incorrect and transactions are sent to
/// only one estimated leader, there is a risk of losing all the transactions,
/// depending on the forwarding policy.
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr>;
fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr>;

/// Stop [`LeaderUpdater`] and releases all associated resources.
async fn stop(&mut self);
}

/// Error type for [`LeaderUpdater`].
#[derive(Error, PartialEq)]
pub struct LeaderUpdaterError;

impl fmt::Display for LeaderUpdaterError {
Expand Down Expand Up @@ -98,7 +103,9 @@ struct LeaderUpdaterService {

#[async_trait]
impl LeaderUpdater for LeaderUpdaterService {
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr> {
fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr> {
let lookahead_slots =
(lookahead_leaders as u64).saturating_mul(NUM_CONSECUTIVE_LEADER_SLOTS);
self.leader_tpu_service.leader_tpu_sockets(lookahead_slots)
}

Expand All @@ -116,7 +123,7 @@ struct PinnedLeaderUpdater {

#[async_trait]
impl LeaderUpdater for PinnedLeaderUpdater {
fn next_leaders(&self, _lookahead_slots: u64) -> Vec<SocketAddr> {
fn next_leaders(&mut self, _lookahead_leaders: usize) -> Vec<SocketAddr> {
self.address.clone()
}

Expand Down
Loading

0 comments on commit 92f639b

Please sign in to comment.