Skip to content

Commit

Permalink
adds code-path broadcasting shreds using QUIC (solana-labs#31610)
Browse files Browse the repository at this point in the history
adds quic connection cache to turbine

Working towards migrating turbine to QUIC.
  • Loading branch information
behzadnouri authored and wen-coding committed Aug 15, 2023
1 parent 96582f8 commit 51019be
Show file tree
Hide file tree
Showing 17 changed files with 254 additions and 68 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
solana-poh = { workspace = true }
solana-program-runtime = { workspace = true }
solana-quic-client = { workspace = true }
solana-rayon-threadlimit = { workspace = true }
solana-rpc = { workspace = true }
solana-rpc-client-api = { workspace = true }
Expand Down
16 changes: 13 additions & 3 deletions core/benches/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage,
},
cluster_nodes::ClusterNodesCache,
validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
Expand All @@ -18,16 +19,17 @@ use {
genesis_utils::{create_genesis_config, GenesisConfigInfo},
shred::{Shred, ShredFlags},
},
solana_quic_client::new_quic_connection_cache,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
pubkey,
signature::{Keypair, Signer},
timing::{timestamp, AtomicInterval},
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
std::{
collections::HashMap,
net::UdpSocket,
net::{IpAddr, Ipv4Addr, UdpSocket},
sync::{Arc, RwLock},
time::Duration,
},
Expand All @@ -38,14 +40,21 @@ use {
fn broadcast_shreds_bench(bencher: &mut Bencher) {
solana_logger::setup();
let leader_keypair = Arc::new(Keypair::new());
let quic_connection_cache = new_quic_connection_cache(
"connection_cache_test",
&leader_keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap();
let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
let cluster_info = ClusterInfo::new(
leader_info.info,
leader_keypair,
SocketAddrSpace::Unspecified,
);
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();

let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_benches(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
Expand Down Expand Up @@ -74,6 +83,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
&socket,
&shreds,
&cluster_nodes_cache,
&quic_connection_cache,
&last_datapoint,
&mut TransmitShredsStats::default(),
&cluster_info,
Expand Down
17 changes: 14 additions & 3 deletions core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extern crate test;
use {
crossbeam_channel::unbounded,
log::*,
solana_core::retransmit_stage::retransmitter,
solana_core::{retransmit_stage::retransmitter, validator::TURBINE_QUIC_CONNECTION_POOL_SIZE},
solana_entry::entry::Entry,
solana_gossip::{
cluster_info::{ClusterInfo, Node},
Expand All @@ -26,10 +26,10 @@ use {
system_transaction,
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
std::{
iter::repeat_with,
net::{Ipv4Addr, UdpSocket},
net::{IpAddr, Ipv4Addr, UdpSocket},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
Expand Down Expand Up @@ -100,6 +100,16 @@ fn bench_retransmitter(bencher: &mut Bencher) {
.collect();

let keypair = Keypair::new();
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
&keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap(),
);
let slot = 0;
let parent = 0;
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
Expand All @@ -118,6 +128,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {

let retransmitter_handles = retransmitter(
Arc::new(sockets),
quic_connection_cache,
bank_forks,
leader_schedule_cache,
cluster_info,
Expand Down
55 changes: 43 additions & 12 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use {
standard_broadcast_run::StandardBroadcastRun,
},
crate::{
cluster_nodes::{ClusterNodes, ClusterNodesCache},
cluster_nodes::{self, ClusterNodes, ClusterNodesCache},
result::{Error, Result},
},
crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender},
itertools::Itertools,
itertools::{Either, Itertools},
solana_client::tpu_connection::TpuConnection,
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::Protocol,
Expand All @@ -22,6 +23,7 @@ use {
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
solana_poh::poh_recorder::WorkingBankEntry,
solana_quic_client::QuicConnectionCache,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::Slot,
Expand Down Expand Up @@ -87,6 +89,7 @@ impl BroadcastStageType {
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
shred_version: u16,
quic_connection_cache: Arc<QuicConnectionCache>,
) -> BroadcastStage {
match self {
BroadcastStageType::Standard => BroadcastStage::new(
Expand All @@ -97,7 +100,7 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
StandardBroadcastRun::new(shred_version),
StandardBroadcastRun::new(shred_version, quic_connection_cache),
),

BroadcastStageType::FailEntryVerification => BroadcastStage::new(
Expand All @@ -108,7 +111,7 @@ impl BroadcastStageType {
exit_sender,
blockstore,
bank_forks,
FailEntryVerificationBroadcastRun::new(shred_version),
FailEntryVerificationBroadcastRun::new(shred_version, quic_connection_cache),
),

BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new(
Expand Down Expand Up @@ -392,6 +395,7 @@ pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
cluster_nodes_cache: &ClusterNodesCache<BroadcastStage>,
quic_connection_cache: &QuicConnectionCache,
last_datapoint_submit: &AtomicInterval,
transmit_stats: &mut TransmitShredsStats,
cluster_info: &ClusterInfo,
Expand All @@ -404,7 +408,7 @@ pub fn broadcast_shreds(
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let packets: Vec<_> = shreds
let (packets, quic_packets): (Vec<_>, Vec<_>) = shreds
.iter()
.group_by(|shred| shred.slot())
.into_iter()
Expand All @@ -413,33 +417,48 @@ pub fn broadcast_shreds(
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
update_peer_stats(&cluster_nodes, last_datapoint_submit);
shreds.filter_map(move |shred| {
let key = shred.id();
let protocol = cluster_nodes::get_broadcast_protocol(&key);
cluster_nodes
.get_broadcast_peer(&shred.id())?
.tvu(Protocol::UDP)
.get_broadcast_peer(&key)?
.tvu(protocol)
.ok()
.filter(|addr| socket_addr_space.check(addr))
.map(|addr| (shred.payload(), addr))
.map(|addr| {
(match protocol {
Protocol::QUIC => Either::Right,
Protocol::UDP => Either::Left,
})((shred.payload(), addr))
})
})
})
.collect();
.partition_map(std::convert::identity);
shred_select.stop();
transmit_stats.shred_select += shred_select.as_us();

let mut send_mmsg_time = Measure::start("send_mmsg");
if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(s, &packets[..]) {
transmit_stats.dropped_packets += num_failed;
transmit_stats.dropped_packets_udp += num_failed;
result = Err(Error::Io(ioerr));
}
send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
transmit_stats.total_packets += packets.len();
for (shred, addr) in &quic_packets {
let conn = quic_connection_cache.get_connection(addr);
if let Err(err) = conn.send_data(shred) {
transmit_stats.dropped_packets_quic += 1;
result = Err(Error::from(err));
}
}
transmit_stats.total_packets += packets.len() + quic_packets.len();
result
}

#[cfg(test)]
pub mod test {
use {
super::*,
crate::validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
crossbeam_channel::unbounded,
solana_entry::entry::create_ticks,
solana_gossip::cluster_info::{ClusterInfo, Node},
Expand All @@ -454,7 +473,9 @@ pub mod test {
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::streamer::StakedNodes,
std::{
net::{IpAddr, Ipv4Addr},
path::Path,
sync::{atomic::AtomicBool, Arc},
thread::sleep,
Expand Down Expand Up @@ -586,6 +607,16 @@ pub mod test {
) -> MockBroadcastStage {
// Make the database ledger
let blockstore = Arc::new(Blockstore::open(ledger_path).unwrap());
let quic_connection_cache = Arc::new(
solana_quic_client::new_quic_connection_cache(
"connection_cache_test",
&leader_keypair,
IpAddr::V4(Ipv4Addr::LOCALHOST),
&Arc::<RwLock<StakedNodes>>::default(),
TURBINE_QUIC_CONNECTION_POOL_SIZE,
)
.unwrap(),
);

// Make the leader node and scheduler
let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
Expand Down Expand Up @@ -619,7 +650,7 @@ pub mod test {
exit_sender,
blockstore.clone(),
bank_forks,
StandardBroadcastRun::new(0),
StandardBroadcastRun::new(0, quic_connection_cache),
);

MockBroadcastStage {
Expand Down
35 changes: 26 additions & 9 deletions core/src/broadcast_stage/broadcast_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ pub struct TransmitShredsStats {
pub shred_select: u64,
pub num_shreds: usize,
pub total_packets: usize,
pub dropped_packets: usize,
pub(crate) dropped_packets_udp: usize,
pub(crate) dropped_packets_quic: usize,
}

impl BroadcastStats for TransmitShredsStats {
Expand All @@ -32,7 +33,8 @@ impl BroadcastStats for TransmitShredsStats {
self.num_shreds += new_stats.num_shreds;
self.shred_select += new_stats.shred_select;
self.total_packets += new_stats.total_packets;
self.dropped_packets += new_stats.dropped_packets;
self.dropped_packets_udp += new_stats.dropped_packets_udp;
self.dropped_packets_quic += new_stats.dropped_packets_quic;
}
fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool) {
if was_interrupted {
Expand All @@ -45,7 +47,12 @@ impl BroadcastStats for TransmitShredsStats {
("num_shreds", self.num_shreds as i64, i64),
("shred_select", self.shred_select as i64, i64),
("total_packets", self.total_packets as i64, i64),
("dropped_packets", self.dropped_packets as i64, i64),
("dropped_packets_udp", self.dropped_packets_udp as i64, i64),
(
"dropped_packets_quic",
self.dropped_packets_quic as i64,
i64
),
);
} else {
datapoint_info!(
Expand All @@ -64,7 +71,12 @@ impl BroadcastStats for TransmitShredsStats {
("num_shreds", self.num_shreds as i64, i64),
("shred_select", self.shred_select as i64, i64),
("total_packets", self.total_packets as i64, i64),
("dropped_packets", self.dropped_packets as i64, i64),
("dropped_packets_udp", self.dropped_packets_udp as i64, i64),
(
"dropped_packets_quic",
self.dropped_packets_quic as i64,
i64
),
);
}
}
Expand Down Expand Up @@ -210,7 +222,8 @@ mod test {
shred_select: 4,
num_shreds: 5,
total_packets: 6,
dropped_packets: 7,
dropped_packets_udp: 7,
dropped_packets_quic: 8,
},
&Some(BroadcastShredBatchInfo {
slot: 0,
Expand All @@ -230,7 +243,8 @@ mod test {
assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_udp, 7);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_quic, 8);

slot_broadcast_stats.update(
&TransmitShredsStats {
Expand All @@ -240,7 +254,8 @@ mod test {
shred_select: 14,
num_shreds: 15,
total_packets: 16,
dropped_packets: 17,
dropped_packets_udp: 17,
dropped_packets_quic: 18,
},
&None,
);
Expand All @@ -255,7 +270,8 @@ mod test {
assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_udp, 7);
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets_quic, 8);

// If another batch is given, then total number of batches == num_expected_batches == 2,
// so the batch should be purged from the HashMap
Expand All @@ -267,7 +283,8 @@ mod test {
shred_select: 1,
num_shreds: 1,
total_packets: 1,
dropped_packets: 1,
dropped_packets_udp: 1,
dropped_packets_quic: 1,
},
&Some(BroadcastShredBatchInfo {
slot: 0,
Expand Down
Loading

0 comments on commit 51019be

Please sign in to comment.