From 344e466e12ddef9c15564eab54101c92b968e008 Mon Sep 17 00:00:00 2001
From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com>
Date: Mon, 25 Sep 2023 18:17:47 -0700
Subject: [PATCH] Async connection creation in connection cache (#33302)
If there is a connection in the cache available, use it and create the additional connection asynchronously.
---
Cargo.lock | 1 +
bench-tps/src/bench_tps_client/tpu_client.rs | 5 +-
client/src/connection_cache.rs | 1 +
client/src/nonblocking/tpu_client.rs | 3 +
client/src/tpu_client.rs | 3 +
connection-cache/Cargo.toml | 1 +
connection-cache/src/connection_cache.rs | 252 ++++++++++++++-----
programs/sbf/Cargo.lock | 1 +
quic-client/src/lib.rs | 10 +-
quic-client/src/nonblocking/quic_client.rs | 8 +-
thin-client/src/thin_client.rs | 8 +-
tpu-client/src/nonblocking/tpu_client.rs | 7 +-
tpu-client/src/tpu_client.rs | 3 +-
udp-client/src/lib.rs | 8 +-
14 files changed, 238 insertions(+), 73 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 11b687ef77c962..9c8371372316d4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5700,6 +5700,7 @@ version = "1.17.0"
dependencies = [
"async-trait",
"bincode",
+ "crossbeam-channel",
"futures-util",
"indexmap 2.0.0",
"indicatif",
diff --git a/bench-tps/src/bench_tps_client/tpu_client.rs b/bench-tps/src/bench_tps_client/tpu_client.rs
index ae762e52922ec5..c56da2ae6e880b 100644
--- a/bench-tps/src/bench_tps_client/tpu_client.rs
+++ b/bench-tps/src/bench_tps_client/tpu_client.rs
@@ -1,7 +1,9 @@
use {
crate::bench_tps_client::{BenchTpsClient, BenchTpsError, Result},
solana_client::tpu_client::TpuClient,
- solana_connection_cache::connection_cache::{ConnectionManager, ConnectionPool},
+ solana_connection_cache::connection_cache::{
+ ConnectionManager, ConnectionPool, NewConnectionConfig,
+ },
solana_sdk::{
account::Account, commitment_config::CommitmentConfig, epoch_info::EpochInfo, hash::Hash,
message::Message, pubkey::Pubkey, signature::Signature, transaction::Transaction,
@@ -12,6 +14,7 @@ impl
BenchTpsClient for TpuClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
fn send_transaction(&self, transaction: Transaction) -> Result {
let signature = transaction.signatures[0];
diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs
index 0bce6bbb3c90d2..44673c06f4d087 100644
--- a/client/src/connection_cache.rs
+++ b/client/src/connection_cache.rs
@@ -5,6 +5,7 @@ use {
client_connection::ClientConnection,
connection_cache::{
BaseClientConnection, ConnectionCache as BackendConnectionCache, ConnectionPool,
+ NewConnectionConfig,
},
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
diff --git a/client/src/nonblocking/tpu_client.rs b/client/src/nonblocking/tpu_client.rs
index d04df3e451ae67..5e71eae36bd6e4 100644
--- a/client/src/nonblocking/tpu_client.rs
+++ b/client/src/nonblocking/tpu_client.rs
@@ -3,6 +3,7 @@ use {
crate::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig},
solana_connection_cache::connection_cache::{
ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
+ NewConnectionConfig,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::nonblocking::rpc_client::RpcClient,
@@ -30,6 +31,7 @@ impl TpuClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
@@ -99,6 +101,7 @@ impl TpuClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
/// Create a new client that disconnects when dropped
pub async fn new_with_connection_cache(
diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs
index 2abba3f7772894..45394151340070 100644
--- a/client/src/tpu_client.rs
+++ b/client/src/tpu_client.rs
@@ -2,6 +2,7 @@ use {
crate::connection_cache::ConnectionCache,
solana_connection_cache::connection_cache::{
ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
+ NewConnectionConfig,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::rpc_client::RpcClient,
@@ -34,6 +35,7 @@ impl TpuClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
@@ -90,6 +92,7 @@ impl TpuClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
/// Create a new client that disconnects when dropped
pub fn new_with_connection_cache(
diff --git a/connection-cache/Cargo.toml b/connection-cache/Cargo.toml
index c0d37f40017b88..acf52f05f9ba5d 100644
--- a/connection-cache/Cargo.toml
+++ b/connection-cache/Cargo.toml
@@ -12,6 +12,7 @@ edition = { workspace = true }
[dependencies]
async-trait = { workspace = true }
bincode = { workspace = true }
+crossbeam-channel = { workspace = true }
futures-util = { workspace = true }
indexmap = { workspace = true }
indicatif = { workspace = true, optional = true }
diff --git a/connection-cache/src/connection_cache.rs b/connection-cache/src/connection_cache.rs
index 66ec89d44f056f..306a8df2722091 100644
--- a/connection-cache/src/connection_cache.rs
+++ b/connection-cache/src/connection_cache.rs
@@ -4,13 +4,16 @@ use {
connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL},
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
+ crossbeam_channel::{Receiver, RecvError, Sender},
indexmap::map::IndexMap,
+ log::*,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_sdk::timing::AtomicInterval,
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc, RwLock},
+ thread::{Builder, JoinHandle},
},
thiserror::Error,
};
@@ -27,9 +30,9 @@ pub enum Protocol {
QUIC,
}
-pub trait ConnectionManager {
+pub trait ConnectionManager: Send + Sync + 'static {
type ConnectionPool: ConnectionPool;
- type NewConnectionConfig;
+ type NewConnectionConfig: NewConnectionConfig;
const PROTOCOL: Protocol;
@@ -43,18 +46,20 @@ pub struct ConnectionCache<
T, // NewConnectionConfig
> {
name: &'static str,
- map: RwLock>,
- connection_manager: S,
+ map: Arc>>,
+ connection_manager: Arc,
stats: Arc,
last_stats: AtomicInterval,
connection_pool_size: usize,
- connection_config: T,
+ connection_config: Arc,
+ sender: Sender<(usize, SocketAddr)>,
}
impl ConnectionCache
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
pub fn new(
name: &'static str,
@@ -76,17 +81,61 @@ where
connection_config: C,
connection_manager: M,
) -> Self {
+ let (sender, receiver) = crossbeam_channel::unbounded();
+
+ let map = Arc::new(RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)));
+ let config = Arc::new(connection_config);
+ let connection_manager = Arc::new(connection_manager);
+ let connection_pool_size = 1.max(connection_pool_size); // The minimum pool size is 1.
+
+ let stats = Arc::new(ConnectionCacheStats::default());
+
+ let _async_connection_thread =
+ Self::create_connection_async_thread(map.clone(), receiver, stats.clone());
Self {
name,
- map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
- stats: Arc::new(ConnectionCacheStats::default()),
+ map,
+ stats,
connection_manager,
last_stats: AtomicInterval::default(),
- connection_pool_size: 1.max(connection_pool_size), // The minimum pool size is 1.
- connection_config,
+ connection_pool_size,
+ connection_config: config,
+ sender,
}
}
+ /// This actually triggers the connection creation by sending empty data
+ fn create_connection_async_thread(
+ map: Arc>>,
+ receiver: Receiver<(usize, SocketAddr)>,
+ stats: Arc,
+ ) -> JoinHandle<()> {
+ Builder::new()
+ .name("solQAsynCon".to_string())
+ .spawn(move || loop {
+ let recv_result = receiver.recv();
+ match recv_result {
+ Err(RecvError) => {
+ break;
+ }
+ Ok((idx, addr)) => {
+ let map = map.read().unwrap();
+ let pool = map.get(&addr);
+ if let Some(pool) = pool {
+ let conn = pool.get(idx);
+ if let Ok(conn) = conn {
+ drop(map);
+ let conn = conn.new_blocking_connection(addr, stats.clone());
+ let result = conn.send_data(&[]);
+ debug!("Create async connection result {result:?} for {addr}");
+ }
+ }
+ }
+ }
+ })
+ .unwrap()
+ }
+
/// Create a lazy connection object under the exclusive lock of the cache map if there is not
/// enough used connections in the connection pool for the specified address.
/// Returns CreateConnectionResult.
@@ -102,47 +151,37 @@ where
// Read again, as it is possible that between read lock dropped and the write lock acquired
// another thread could have setup the connection.
- let should_create_connection = map
+ let pool_status = map
.get(addr)
- .map(|pool| pool.need_new_connection(self.connection_pool_size))
- .unwrap_or(true);
-
- let (cache_hit, num_evictions, eviction_timing_ms) = if should_create_connection {
- // evict a connection if the cache is reaching upper bounds
- let mut num_evictions = 0;
- let mut get_connection_cache_eviction_measure =
- Measure::start("get_connection_cache_eviction_measure");
- let existing_index = map.get_index_of(addr);
- while map.len() >= MAX_CONNECTIONS {
- let mut rng = thread_rng();
- let n = rng.gen_range(0..MAX_CONNECTIONS);
- if let Some(index) = existing_index {
- if n == index {
- continue;
- }
- }
- map.swap_remove_index(n);
- num_evictions += 1;
- }
- get_connection_cache_eviction_measure.stop();
-
- map.entry(*addr)
- .and_modify(|pool| {
- pool.add_connection(&self.connection_config, addr);
- })
- .or_insert_with(|| {
- let mut pool = self.connection_manager.new_connection_pool();
- pool.add_connection(&self.connection_config, addr);
- pool
- });
- (
- false,
- num_evictions,
- get_connection_cache_eviction_measure.as_ms(),
- )
- } else {
- (true, 0, 0)
- };
+ .map(|pool| pool.check_pool_status(self.connection_pool_size))
+ .unwrap_or(PoolStatus::Empty);
+
+ let (cache_hit, num_evictions, eviction_timing_ms) =
+ if matches!(pool_status, PoolStatus::Empty) {
+ Self::create_connection_internal(
+ &self.connection_config,
+ &self.connection_manager,
+ &mut map,
+ addr,
+ self.connection_pool_size,
+ None,
+ )
+ } else {
+ (true, 0, 0)
+ };
+
+ if matches!(pool_status, PoolStatus::PartiallyFull) {
+ // trigger an async connection create
+ debug!("Triggering async connection for {addr:?}");
+ Self::create_connection_internal(
+ &self.connection_config,
+ &self.connection_manager,
+ &mut map,
+ addr,
+ self.connection_pool_size,
+ Some(&self.sender),
+ );
+ }
let pool = map.get(addr).unwrap();
let connection = pool.borrow_connection();
@@ -156,6 +195,63 @@ where
}
}
+ fn create_connection_internal(
+ config: &Arc,
+ connection_manager: &Arc,
+ map: &mut std::sync::RwLockWriteGuard<'_, IndexMap>,
+ addr: &SocketAddr,
+ connection_pool_size: usize,
+ async_connection_sender: Option<&Sender<(usize, SocketAddr)>>,
+ ) -> (bool, u64, u64) {
+ // evict a connection if the cache is reaching upper bounds
+ let mut num_evictions = 0;
+ let mut get_connection_cache_eviction_measure =
+ Measure::start("get_connection_cache_eviction_measure");
+ let existing_index = map.get_index_of(addr);
+ while map.len() >= MAX_CONNECTIONS {
+ let mut rng = thread_rng();
+ let n = rng.gen_range(0..MAX_CONNECTIONS);
+ if let Some(index) = existing_index {
+ if n == index {
+ continue;
+ }
+ }
+ map.swap_remove_index(n);
+ num_evictions += 1;
+ }
+ get_connection_cache_eviction_measure.stop();
+
+ let mut hit_cache = false;
+ map.entry(*addr)
+ .and_modify(|pool| {
+ if matches!(
+ pool.check_pool_status(connection_pool_size),
+ PoolStatus::PartiallyFull
+ ) {
+ let idx = pool.add_connection(config, addr);
+ if let Some(sender) = async_connection_sender {
+ debug!(
+ "Sending async connection creation {} for {addr}",
+ pool.num_connections() - 1
+ );
+ sender.send((idx, *addr)).unwrap();
+ };
+ } else {
+ hit_cache = true;
+ }
+ })
+ .or_insert_with(|| {
+ let mut pool = connection_manager.new_connection_pool();
+ pool.add_connection(config, addr);
+ pool
+ });
+ (
+ hit_cache,
+ num_evictions,
+ get_connection_cache_eviction_measure.as_ms(),
+ )
+ }
+
fn get_or_add_connection(
&self,
addr: &SocketAddr,
@@ -179,12 +275,26 @@ where
eviction_timing_ms,
} = match map.get(addr) {
Some(pool) => {
- if pool.need_new_connection(self.connection_pool_size) {
+ let pool_status = pool.check_pool_status(self.connection_pool_size);
+ if matches!(pool_status, PoolStatus::Empty) {
// create more connection and put it in the pool
drop(map);
self.create_connection(&mut lock_timing_ms, addr)
} else {
let connection = pool.borrow_connection();
+ if matches!(pool_status, PoolStatus::PartiallyFull) {
+ debug!("Creating connection async for {addr}");
+ drop(map);
+ let mut map = self.map.write().unwrap();
+ Self::create_connection_internal(
+ &self.connection_config,
+ &self.connection_manager,
+ &mut map,
+ addr,
+ self.connection_pool_size,
+ Some(&self.sender),
+ );
+ }
CreateConnectionResult {
connection,
cache_hit: true,
@@ -299,12 +409,22 @@ pub enum ClientError {
IoError(#[from] std::io::Error),
}
-pub trait ConnectionPool {
- type NewConnectionConfig;
+pub trait NewConnectionConfig: Sized + Send + Sync + 'static {
+ fn new() -> Result;
+}
+
+pub enum PoolStatus {
+ Empty,
+ PartiallyFull,
+ Full,
+}
+
+pub trait ConnectionPool: Send + Sync + 'static {
+ type NewConnectionConfig: NewConnectionConfig;
type BaseClientConnection: BaseClientConnection;
- /// Add a connection to the pool
- fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr);
+ /// Add a connection to the pool and return its index
+ fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize;
/// Get the number of current connections in the pool
fn num_connections(&self) -> usize;
@@ -319,10 +439,17 @@ pub trait ConnectionPool {
let n = rng.gen_range(0..self.num_connections());
self.get(n).expect("index is within num_connections")
}
+
/// Check if we need to create a new connection. If the count of the connections
- /// is smaller than the pool size.
- fn need_new_connection(&self, required_pool_size: usize) -> bool {
- self.num_connections() < required_pool_size
+ /// is smaller than the pool size and if there is no connection at all.
+ fn check_pool_status(&self, required_pool_size: usize) -> PoolStatus {
+ if self.num_connections() == 0 {
+ PoolStatus::Empty
+ } else if self.num_connections() < required_pool_size {
+ PoolStatus::PartiallyFull
+ } else {
+ PoolStatus::Full
+ }
}
fn create_pool_entry(
@@ -393,9 +520,16 @@ mod tests {
type NewConnectionConfig = MockUdpConfig;
type BaseClientConnection = MockUdp;
- fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) {
+ /// Add a connection into the pool and return its index in the pool.
+ fn add_connection(
+ &mut self,
+ config: &Self::NewConnectionConfig,
+ addr: &SocketAddr,
+ ) -> usize {
let connection = self.create_pool_entry(config, addr);
+ let idx = self.connections.len();
self.connections.push(connection);
+ idx
}
fn num_connections(&self) -> usize {
@@ -436,7 +570,7 @@ mod tests {
}
}
- impl MockUdpConfig {
+ impl NewConnectionConfig for MockUdpConfig {
fn new() -> Result {
Ok(Self {
udp_socket: Arc::new(
diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock
index d6fbe9f6f79707..67f6c9862b071c 100644
--- a/programs/sbf/Cargo.lock
+++ b/programs/sbf/Cargo.lock
@@ -4765,6 +4765,7 @@ version = "1.17.0"
dependencies = [
"async-trait",
"bincode",
+ "crossbeam-channel",
"futures-util",
"indexmap 2.0.0",
"log",
diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs
index 0357969d8296d3..90a55deaa691ed 100644
--- a/quic-client/src/lib.rs
+++ b/quic-client/src/lib.rs
@@ -19,7 +19,7 @@ use {
solana_connection_cache::{
connection_cache::{
BaseClientConnection, ClientError, ConnectionCache, ConnectionManager, ConnectionPool,
- ConnectionPoolError, Protocol,
+ ConnectionPoolError, NewConnectionConfig, Protocol,
},
connection_cache_stats::ConnectionCacheStats,
},
@@ -53,9 +53,11 @@ impl ConnectionPool for QuicPool {
type BaseClientConnection = Quic;
type NewConnectionConfig = QuicConfig;
- fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) {
+ fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize {
let connection = self.create_pool_entry(config, addr);
+ let idx = self.connections.len();
self.connections.push(connection);
+ idx
}
fn num_connections(&self) -> usize {
@@ -93,8 +95,8 @@ pub struct QuicConfig {
client_endpoint: Option,
}
-impl QuicConfig {
- pub fn new() -> Result {
+impl NewConnectionConfig for QuicConfig {
+ fn new() -> Result {
let (cert, priv_key) =
new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED))?;
Ok(Self {
diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs
index 78f21ac2565f95..66a55f8f3fd78f 100644
--- a/quic-client/src/nonblocking/quic_client.rs
+++ b/quic-client/src/nonblocking/quic_client.rs
@@ -340,16 +340,18 @@ impl QuicClient {
Ok(conn) => {
*conn_guard = Some(conn.clone());
info!(
- "Made connection to {} id {} try_count {}",
+ "Made connection to {} id {} try_count {}, from connection cache warming?: {}",
self.addr,
conn.connection.stable_id(),
- connection_try_count
+ connection_try_count,
+ data.is_empty(),
);
connection_try_count += 1;
conn.connection.clone()
}
Err(err) => {
- info!("Cannot make connection to {}, error {:}", self.addr, err);
+ info!("Cannot make connection to {}, error {:}, from connection cache warming?: {}",
+ self.addr, err, data.is_empty());
return Err(err);
}
}
diff --git a/thin-client/src/thin_client.rs b/thin-client/src/thin_client.rs
index c61addfb500c06..b1ae08fd7c01a3 100644
--- a/thin-client/src/thin_client.rs
+++ b/thin-client/src/thin_client.rs
@@ -8,7 +8,9 @@ use {
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::{
client_connection::ClientConnection,
- connection_cache::{ConnectionCache, ConnectionManager, ConnectionPool},
+ connection_cache::{
+ ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
+ },
},
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response},
@@ -124,6 +126,7 @@ impl ThinClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
/// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
/// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
@@ -324,6 +327,7 @@ impl Client for ThinClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
fn tpu_addr(&self) -> String {
self.tpu_addr().to_string()
@@ -334,6 +338,7 @@ impl SyncClient for ThinClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
fn send_and_confirm_message(
&self,
@@ -618,6 +623,7 @@ impl AsyncClient for ThinClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
fn async_send_versioned_transaction(
&self,
diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs
index ea1bb98a569f7c..57a9b0b4033c61 100644
--- a/tpu-client/src/nonblocking/tpu_client.rs
+++ b/tpu-client/src/nonblocking/tpu_client.rs
@@ -9,7 +9,7 @@ use {
log::*,
solana_connection_cache::{
connection_cache::{
- ConnectionCache, ConnectionManager, ConnectionPool, Protocol,
+ ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, Protocol,
DEFAULT_CONNECTION_POOL_SIZE,
},
nonblocking::client_connection::ClientConnection,
@@ -268,6 +268,7 @@ fn send_wire_transaction_futures<'a, P, M, C>(
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
let sleep_duration = SEND_TRANSACTION_INTERVAL.saturating_mul(index as u32);
@@ -339,6 +340,7 @@ async fn sleep_and_send_wire_transaction_to_addr(
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
sleep(sleep_duration).await;
send_wire_transaction_to_addr(connection_cache, &addr, wire_transaction).await
@@ -352,6 +354,7 @@ async fn send_wire_transaction_to_addr(
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_data(&wire_transaction).await
@@ -365,6 +368,7 @@ async fn send_wire_transaction_batch_to_addr(
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_data_batch(wire_transactions).await
@@ -374,6 +378,7 @@ impl TpuClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs
index f2e9155f39116e..9d5a159686d426 100644
--- a/tpu-client/src/tpu_client.rs
+++ b/tpu-client/src/tpu_client.rs
@@ -3,7 +3,7 @@ use {
crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::connection_cache::{
- ConnectionCache, ConnectionManager, ConnectionPool,
+ ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
},
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult},
@@ -71,6 +71,7 @@ impl TpuClient
where
P: ConnectionPool,
M: ConnectionManager,
+ C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
diff --git a/udp-client/src/lib.rs b/udp-client/src/lib.rs
index c4ed99b3b7f82b..06eeca00185898 100644
--- a/udp-client/src/lib.rs
+++ b/udp-client/src/lib.rs
@@ -11,7 +11,7 @@ use {
solana_connection_cache::{
connection_cache::{
BaseClientConnection, ClientError, ConnectionManager, ConnectionPool,
- ConnectionPoolError, Protocol,
+ ConnectionPoolError, NewConnectionConfig, Protocol,
},
connection_cache_stats::ConnectionCacheStats,
},
@@ -28,9 +28,11 @@ impl ConnectionPool for UdpPool {
type BaseClientConnection = Udp;
type NewConnectionConfig = UdpConfig;
- fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) {
+ fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize {
let connection = self.create_pool_entry(config, addr);
+ let idx = self.connections.len();
self.connections.push(connection);
+ idx
}
fn num_connections(&self) -> usize {
@@ -57,7 +59,7 @@ pub struct UdpConfig {
udp_socket: Arc,
}
-impl UdpConfig {
+impl NewConnectionConfig for UdpConfig {
fn new() -> Result {
let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.map_err(Into::::into)?;