Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
adds quic connection cache to turbine
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed May 23, 2023
1 parent c4c619e commit 2a5b177
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 55 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
solana-poh = { workspace = true }
solana-program-runtime = { workspace = true }
solana-quic-client = { workspace = true }
solana-rayon-threadlimit = { workspace = true }
solana-rpc = { workspace = true }
solana-rpc-client-api = { workspace = true }
Expand Down
14 changes: 11 additions & 3 deletions core/benches/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ use {
genesis_utils::{create_genesis_config, GenesisConfigInfo},
shred::{Shred, ShredFlags},
},
solana_quic_client::new_quic_connection_cache,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
pubkey,
signature::{Keypair, Signer},
timing::{timestamp, AtomicInterval},
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
std::{
collections::HashMap,
net::UdpSocket,
net::{IpAddr, Ipv4Addr, UdpSocket},
sync::{Arc, RwLock},
time::Duration,
},
Expand All @@ -38,14 +39,20 @@ use {
fn broadcast_shreds_bench(bencher: &mut Bencher) {
solana_logger::setup();
let leader_keypair = Arc::new(Keypair::new());
let quic_connection_cache = new_quic_connection_cache(
&leader_keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
4, // connection_pool_size
)
.unwrap();
let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
let cluster_info = ClusterInfo::new(
leader_info.info,
leader_keypair,
SocketAddrSpace::Unspecified,
);
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();

let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_benches(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
Expand Down Expand Up @@ -74,6 +81,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
&socket,
&shreds,
&cluster_nodes_cache,
&quic_connection_cache,
&last_datapoint,
&mut TransmitShredsStats::default(),
&cluster_info,
Expand Down
14 changes: 12 additions & 2 deletions core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use {
system_transaction,
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
std::{
iter::repeat_with,
net::{Ipv4Addr, UdpSocket},
net::{IpAddr, Ipv4Addr, UdpSocket},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
Expand Down Expand Up @@ -100,6 +100,15 @@ fn bench_retransmitter(bencher: &mut Bencher) {
.collect();

let keypair = Keypair::new();
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
&keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
4, // connection_pool_size
)
.unwrap(),
);
let slot = 0;
let parent = 0;
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
Expand All @@ -118,6 +127,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {

let retransmitter_handles = retransmitter(
Arc::new(sockets),
quic_connection_cache,
bank_forks,
leader_schedule_cache,
cluster_info,
Expand Down
50 changes: 39 additions & 11 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use {
standard_broadcast_run::StandardBroadcastRun,
},
crate::{
cluster_nodes::{ClusterNodes, ClusterNodesCache},
cluster_nodes::{self, ClusterNodes, ClusterNodesCache},
result::{Error, Result},
},
crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender},
itertools::Itertools,
itertools::{Either, Itertools},
solana_client::tpu_connection::TpuConnection,
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::Protocol,
Expand All @@ -22,6 +23,7 @@ use {
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
solana_poh::poh_recorder::WorkingBankEntry,
solana_quic_client::QuicConnectionCache,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::Slot,
Expand Down Expand Up @@ -87,6 +89,7 @@ impl BroadcastStageType {
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
shred_version: u16,
quic_connection_cache: Arc<QuicConnectionCache>,
) -> BroadcastStage {
match self {
BroadcastStageType::Standard => BroadcastStage::new(
Expand All @@ -97,7 +100,7 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
StandardBroadcastRun::new(shred_version),
StandardBroadcastRun::new(shred_version, quic_connection_cache),
),

BroadcastStageType::FailEntryVerification => BroadcastStage::new(
Expand All @@ -108,7 +111,7 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
FailEntryVerificationBroadcastRun::new(shred_version),
FailEntryVerificationBroadcastRun::new(shred_version, quic_connection_cache),
),

BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new(
Expand Down Expand Up @@ -392,6 +395,7 @@ pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
cluster_nodes_cache: &ClusterNodesCache<BroadcastStage>,
quic_connection_cache: &QuicConnectionCache,
last_datapoint_submit: &AtomicInterval,
transmit_stats: &mut TransmitShredsStats,
cluster_info: &ClusterInfo,
Expand All @@ -404,7 +408,7 @@ pub fn broadcast_shreds(
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let packets: Vec<_> = shreds
let (packets, quic_packets): (Vec<_>, Vec<_>) = shreds
.iter()
.group_by(|shred| shred.slot())
.into_iter()
Expand All @@ -413,15 +417,22 @@ pub fn broadcast_shreds(
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
update_peer_stats(&cluster_nodes, last_datapoint_submit);
shreds.filter_map(move |shred| {
let key = shred.id();
let protocol = cluster_nodes::get_broadcast_protocol(&key);
cluster_nodes
.get_broadcast_peer(&shred.id())?
.tvu(Protocol::UDP)
.get_broadcast_peer(&key)?
.tvu(protocol)
.ok()
.filter(|addr| socket_addr_space.check(addr))
.map(|addr| (shred.payload(), addr))
.map(|addr| {
(match protocol {
Protocol::QUIC => Either::Right,
Protocol::UDP => Either::Left,
})((shred.payload(), addr))
})
})
})
.collect();
.partition_map(std::convert::identity);
shred_select.stop();
transmit_stats.shred_select += shred_select.as_us();

Expand All @@ -432,7 +443,13 @@ pub fn broadcast_shreds(
}
send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
transmit_stats.total_packets += packets.len();
for (shred, addr) in &quic_packets {
let conn = quic_connection_cache.get_connection(addr);
if let Err(err) = conn.send_data(shred) {
result = Err(Error::from(err));
}
}
transmit_stats.total_packets += packets.len() + quic_packets.len();
result
}

Expand All @@ -454,7 +471,9 @@ pub mod test {
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::streamer::StakedNodes,
std::{
net::{IpAddr, Ipv4Addr},
path::Path,
sync::{atomic::AtomicBool, Arc},
thread::sleep,
Expand Down Expand Up @@ -586,6 +605,15 @@ pub mod test {
) -> MockBroadcastStage {
// Make the database ledger
let blockstore = Arc::new(Blockstore::open(ledger_path).unwrap());
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
&leader_keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
4, // connection_pool_size
)
.unwrap(),
);

// Make the leader node and scheduler
let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
Expand Down Expand Up @@ -619,7 +647,7 @@ pub mod test {
exit_sender,
blockstore.clone(),
bank_forks,
StandardBroadcastRun::new(0),
StandardBroadcastRun::new(0, quic_connection_cache),
);

MockBroadcastStage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ pub(super) struct FailEntryVerificationBroadcastRun {
next_shred_index: u32,
next_code_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
quic_connection_cache: Arc<QuicConnectionCache>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}

impl FailEntryVerificationBroadcastRun {
pub(super) fn new(shred_version: u16) -> Self {
pub(super) fn new(shred_version: u16, quic_connection_cache: Arc<QuicConnectionCache>) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
Expand All @@ -33,6 +34,7 @@ impl FailEntryVerificationBroadcastRun {
next_shred_index: 0,
next_code_index: 0,
cluster_nodes_cache,
quic_connection_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
Expand Down Expand Up @@ -168,6 +170,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
sock,
&shreds,
&self.cluster_nodes_cache,
&self.quic_connection_cache,
&AtomicInterval::default(),
&mut TransmitShredsStats::default(),
cluster_info,
Expand Down
Loading

0 comments on commit 2a5b177

Please sign in to comment.