Skip to content

Commit

Permalink
Quic Connection Cache (#23598)
Browse files Browse the repository at this point in the history
Add a connection cache to allow add modules that send data to get or create connections (e.g. for quic) associated with a certain SocketAddr
  • Loading branch information
ryleung-solana authored Mar 15, 2022
1 parent 2d3501d commit 9b46f9b
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 1 deletion.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ futures-util = "0.3.21"
indicatif = "0.16.2"
itertools = "0.10.2"
jsonrpc-core = "18.0.0"
lazy_static = "1.4.0"
log = "0.4.14"
quinn = "0.8.0"
rand = "0.7.0"
rand_chacha = "0.2.2"
rayon = "1.5.1"
reqwest = { version = "0.11.10", default-features = false, features = ["blocking", "rustls-tls", "json"] }
rustls = { version = "0.20.2", features = ["dangerous_configuration"] }
Expand Down
155 changes: 155 additions & 0 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use {
crate::{tpu_connection::TpuConnection, udp_client::UdpTpuConnection},
lazy_static::lazy_static,
std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
net::{SocketAddr, UdpSocket},
sync::{Arc, Mutex},
},
};

// Should be non-zero
static MAX_CONNECTIONS: usize = 64;

struct ConnMap {
// Keeps track of the connection associated with an addr and the last time it was used
map: HashMap<SocketAddr, (Arc<dyn TpuConnection + 'static + Sync + Send>, u64)>,
// Helps to find the least recently used connection. The search and inserts are O(log(n))
// but since we're bounding the size of the collections, this should be constant
// (and hopefully negligible) time. In theory, we can do this in constant time
// with a queue implemented as a doubly-linked list (and all the
// HashMap entries holding a "pointer" to the corresponding linked-list node),
// so we can push, pop and bump a used connection back to the end of the queue in O(1) time, but
// that seems non-"Rust-y" and low bang/buck. This is still pretty terrible though...
last_used_times: BTreeMap<u64, SocketAddr>,
ticks: u64,
}

impl ConnMap {
pub fn new() -> Self {
Self {
map: HashMap::new(),
last_used_times: BTreeMap::new(),
ticks: 0,
}
}
}

lazy_static! {
static ref CONNECTION_MAP: Mutex<ConnMap> = Mutex::new(ConnMap::new());
}

#[allow(dead_code)]
// TODO: see https://github.com/solana-labs/solana/issues/23661
// remove lazy_static and optimize and refactor this
pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sync + Send> {
let mut map = (*CONNECTION_MAP).lock().unwrap();
let ticks = map.ticks;

let (conn, target_ticks) = match map.map.entry(*addr) {
Entry::Occupied(mut entry) => {
let mut pair = entry.get_mut();
let old_ticks = pair.1;
pair.1 = ticks;
(pair.0.clone(), old_ticks)
}
Entry::Vacant(entry) => {
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
// TODO: see https://github.com/solana-labs/solana/issues/23659
// make it configurable (e.g. via the command line) whether to use UDP or Quic
let conn = Arc::new(UdpTpuConnection::new(send_socket, *addr));
entry.insert((conn.clone(), ticks));
(
conn as Arc<dyn TpuConnection + 'static + Sync + Send>,
ticks,
)
}
};

let num_connections = map.map.len();
if num_connections > MAX_CONNECTIONS {
let (old_ticks, target_addr) = {
let (old_ticks, target_addr) = map.last_used_times.iter().next().unwrap();
(*old_ticks, *target_addr)
};
map.map.remove(&target_addr);
map.last_used_times.remove(&old_ticks);
}

if target_ticks != ticks {
map.last_used_times.remove(&target_ticks);
}
map.last_used_times.insert(ticks, *addr);

map.ticks += 1;
conn
}

#[cfg(test)]
mod tests {
use {
crate::connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS},
rand::{Rng, SeedableRng},
rand_chacha::ChaChaRng,
std::net::SocketAddr,
};

fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
let a = rng.gen_range(1, 255);
let b = rng.gen_range(1, 255);
let c = rng.gen_range(1, 255);
let d = rng.gen_range(1, 255);

let addr_str = format!("{}.{}.{}.{}:80", a, b, c, d);

addr_str.parse().expect("Invalid address")
}

#[test]
fn test_connection_cache() {
// Allow the test to run deterministically
// with the same pseudorandom sequence between runs
// and on different platforms - the cryptographic security
// property isn't important here but ChaChaRng provides a way
// to get the same pseudorandom sequence on different platforms
let mut rng = ChaChaRng::seed_from_u64(42);

// Generate a bunch of random addresses and create TPUConnections to them
// Since TPUConnection::new is infallible, it should't matter whether or not
// we can actually connect to those addresses - TPUConnection implementations should either
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let first_addr = get_addr(&mut rng);
assert!(get_connection(&first_addr).tpu_addr().ip() == first_addr.ip());
let addrs = (0..MAX_CONNECTIONS)
.into_iter()
.map(|_| {
let addr = get_addr(&mut rng);
get_connection(&addr);
addr
})
.collect::<Vec<_>>();
{
let map = (*CONNECTION_MAP).lock().unwrap();
addrs.iter().for_each(|a| {
let conn = map.map.get(a).expect("Address not found");
assert!(a.ip() == conn.0.tpu_addr().ip());
});

assert!(map.map.get(&first_addr).is_none());
}

// Test that get_connection updates which connection is next up for eviction
// when an existing connection is used. Initially, addrs[0] should be next up for eviction, since
// it was the earliest added. But we do get_connection(&addrs[0]), thereby using
// that connection, and bumping it back to the end of the queue. So addrs[1] should be
// the next up for eviction. So we add a new connection, and test that addrs[0] is not
// evicted but addrs[1] is.
get_connection(&addrs[0]);
get_connection(&get_addr(&mut rng));

let map = (*CONNECTION_MAP).lock().unwrap();
assert!(map.map.get(&addrs[0]).is_some());
assert!(map.map.get(&addrs[1]).is_none());
}
}
1 change: 1 addition & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate serde_derive;

pub mod blockhash_query;
pub mod client_error;
pub mod connection_cache;
pub(crate) mod http_sender;
pub(crate) mod mock_sender;
pub mod nonblocking;
Expand Down
4 changes: 3 additions & 1 deletion client/src/tpu_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use {
};

pub trait TpuConnection {
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self;
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self
where
Self: Sized;

fn tpu_addr(&self) -> &SocketAddr;

Expand Down
3 changes: 3 additions & 0 deletions programs/bpf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9b46f9b

Please sign in to comment.