diff --git a/Cargo.lock b/Cargo.lock index 2d608c5f984260..218b32aff3b7b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7495,6 +7495,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-streamer", + "static_assertions", "test-case", "thiserror", "tokio", diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 79d0c9ff79bab6..f2b7a84ed1e2ca 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -757,11 +757,8 @@ pub mod layout { .map(Hash::new) } - pub(crate) fn set_retransmitter_signature( - shred: &mut [u8], - signature: &Signature, - ) -> Result<(), Error> { - let offset = match get_shred_variant(shred)? { + fn get_retransmitter_signature_offset(shred: &[u8]) -> Result { + match get_shred_variant(shred)? { ShredVariant::LegacyCode | ShredVariant::LegacyData => Err(Error::InvalidShredVariant), ShredVariant::MerkleCode { proof_size, @@ -777,7 +774,23 @@ pub mod layout { } => { merkle::ShredData::get_retransmitter_signature_offset(proof_size, chained, resigned) } - }?; + } + } + + pub fn get_retransmitter_signature(shred: &[u8]) -> Result { + let offset = get_retransmitter_signature_offset(shred)?; + shred + .get(offset..offset + SIZE_OF_SIGNATURE) + .map(|bytes| <[u8; SIZE_OF_SIGNATURE]>::try_from(bytes).unwrap()) + .map(Signature::from) + .ok_or(Error::InvalidPayloadSize(shred.len())) + } + + pub(crate) fn set_retransmitter_signature( + shred: &mut [u8], + signature: &Signature, + ) -> Result<(), Error> { + let offset = get_retransmitter_signature_offset(shred)?; let Some(buffer) = shred.get_mut(offset..offset + SIZE_OF_SIGNATURE) else { return Err(Error::InvalidPayloadSize(shred.len())); }; diff --git a/turbine/Cargo.toml b/turbine/Cargo.toml index 2d2b0a79574d27..7b29f085fa0db9 100644 --- a/turbine/Cargo.toml +++ b/turbine/Cargo.toml @@ -36,6 +36,7 @@ solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-streamer = { workspace = true } +static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index beadf7b331f59d..8cc5f29033fd86 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -242,8 +242,7 @@ impl ClusterNodes { // Returns the parent node in the turbine broadcast tree. // Returns None if the node is the root of the tree or if it is not staked. - #[allow(unused)] - fn get_retransmit_parent( + pub(crate) fn get_retransmit_parent( &self, leader: &Pubkey, shred: &ShredId, diff --git a/turbine/src/sigverify_shreds.rs b/turbine/src/sigverify_shreds.rs index ca7de2b5facef8..88eaecd006764d 100644 --- a/turbine/src/sigverify_shreds.rs +++ b/turbine/src/sigverify_shreds.rs @@ -1,4 +1,8 @@ use { + crate::{ + cluster_nodes::{self, ClusterNodesCache}, + retransmit_stage::RetransmitStage, + }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_gossip::cluster_info::ClusterInfo, @@ -9,15 +13,22 @@ use { }, solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache}, solana_rayon_threadlimit::get_thread_count, - solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_runtime::{ + bank::{Bank, MAX_LEADER_SCHEDULE_STAKES}, + bank_forks::BankForks, + }, solana_sdk::{ clock::Slot, pubkey::Pubkey, signature::{Keypair, Signer}, }, + static_assertions::const_assert_eq, std::{ collections::HashMap, - sync::{Arc, RwLock}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, RwLock, + }, thread::{Builder, JoinHandle}, time::{Duration, Instant}, }, @@ -30,6 +41,16 @@ const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001; const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60); +// Num epochs capacity should be at least 2 because near the epoch boundary we +// may receive shreds from the other side of the epoch boundary. Because of the +// TTL based eviction it does not make sense to cache more than +// MAX_LEADER_SCHEDULE_STAKES epochs. +const_assert_eq!(CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, 5); +const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = MAX_LEADER_SCHEDULE_STAKES as usize; +// Because for ClusterNodes::get_retransmit_parent only pubkeys of staked nodes +// are needed, we can use longer durations for cache TTL. +const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(30); + #[allow(clippy::enum_variant_names)] enum Error { RecvDisconnected, @@ -48,6 +69,10 @@ pub fn spawn_shred_sigverify( let recycler_cache = RecyclerCache::warmed(); let mut stats = ShredSigVerifyStats::new(Instant::now()); let cache = RwLock::new(LruCache::new(SIGVERIFY_LRU_CACHE_CAPACITY)); + let cluster_nodes_cache = ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + ); let thread_pool = ThreadPoolBuilder::new() .num_threads(get_thread_count()) .thread_name(|i| format!("solSvrfyShred{i:02}")) @@ -66,6 +91,7 @@ pub fn spawn_shred_sigverify( match run_shred_sigverify( &thread_pool, &keypair, + &cluster_info, &bank_forks, &leader_schedule_cache, &recycler_cache, @@ -73,6 +99,7 @@ pub fn spawn_shred_sigverify( &shred_fetch_receiver, &retransmit_sender, &verified_sender, + &cluster_nodes_cache, &cache, &mut stats, ) { @@ -94,6 +121,7 @@ pub fn spawn_shred_sigverify( fn run_shred_sigverify( thread_pool: &ThreadPool, keypair: &Keypair, + cluster_info: &ClusterInfo, bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, recycler_cache: &RecyclerCache, @@ -101,6 +129,7 @@ fn run_shred_sigverify( shred_fetch_receiver: &Receiver, retransmit_sender: &Sender>>, verified_sender: &Sender>, + cluster_nodes_cache: &ClusterNodesCache, cache: &RwLock, stats: &mut ShredSigVerifyStats, ) -> Result<(), Error> { @@ -128,17 +157,22 @@ fn run_shred_sigverify( .map(|packet| packet.meta_mut().set_discard(true)) .count() }); + let (working_bank, root_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.working_bank(), bank_forks.root_bank()) + }; verify_packets( thread_pool, &keypair.pubkey(), - bank_forks, + &working_bank, leader_schedule_cache, recycler_cache, &mut packets, cache, ); stats.num_discards_post += count_discards(&packets); - // Resign shreds Merkle root as the retransmitter node. + // Verify retransmitter's signature, and resign shreds + // Merkle root as the retransmitter node. let resign_start = Instant::now(); thread_pool.install(|| { packets @@ -146,16 +180,36 @@ fn run_shred_sigverify( .flatten() .filter(|packet| !packet.meta().discard()) .for_each(|packet| { - if let Some(shred) = shred::layout::get_shred_mut(packet) { - // We can ignore Error::InvalidShredVariant because that - // basically means that the shred is of a variant which - // cannot be signed by the retransmitter node. - if !matches!( - shred::layout::resign_shred(shred, keypair), - Ok(()) | Err(shred::Error::InvalidShredVariant) - ) { - packet.meta_mut().set_discard(true); - } + let repair = packet.meta().repair(); + let Some(shred) = shred::layout::get_shred_mut(packet) else { + packet.meta_mut().set_discard(true); + return; + }; + // Repair packets do not follow turbine tree and + // are verified using the trailing nonce. + if !repair + && !verify_retransmitter_signature( + shred, + &root_bank, + &working_bank, + cluster_info, + leader_schedule_cache, + cluster_nodes_cache, + stats, + ) + { + stats + .num_invalid_retransmitter + .fetch_add(1, Ordering::Relaxed); + } + // We can ignore Error::InvalidShredVariant because that + // basically means that the shred is of a variant which + // cannot be signed by the retransmitter node. + if !matches!( + shred::layout::resign_shred(shred, keypair), + Ok(()) | Err(shred::Error::InvalidShredVariant) + ) { + packet.meta_mut().set_discard(true); } }) }); @@ -175,18 +229,64 @@ fn run_shred_sigverify( Ok(()) } +#[must_use] +fn verify_retransmitter_signature( + shred: &[u8], + root_bank: &Bank, + working_bank: &Bank, + cluster_info: &ClusterInfo, + leader_schedule_cache: &LeaderScheduleCache, + cluster_nodes_cache: &ClusterNodesCache, + stats: &ShredSigVerifyStats, +) -> bool { + let signature = match shred::layout::get_retransmitter_signature(shred) { + Ok(signature) => signature, + // If the shred is not of resigned variant, + // then there is nothing to verify. + Err(shred::Error::InvalidShredVariant) => return true, + Err(_) => return false, + }; + let Some(merkle_root) = shred::layout::get_merkle_root(shred) else { + return false; + }; + let Some(shred) = shred::layout::get_shred_id(shred) else { + return false; + }; + let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), Some(working_bank)) + else { + stats + .num_unknown_slot_leader + .fetch_add(1, Ordering::Relaxed); + return false; + }; + let cluster_nodes = + cluster_nodes_cache.get(shred.slot(), root_bank, working_bank, cluster_info); + let data_plane_fanout = cluster_nodes::get_data_plane_fanout(shred.slot(), root_bank); + let parent = match cluster_nodes.get_retransmit_parent(&leader, &shred, data_plane_fanout) { + Ok(Some(parent)) => parent, + Ok(None) => return true, + Err(err) => { + error!("get_retransmit_parent: {err:?}"); + stats + .num_unknown_turbine_parent + .fetch_add(1, Ordering::Relaxed); + return false; + } + }; + signature.verify(parent.as_ref(), merkle_root.as_ref()) +} + fn verify_packets( thread_pool: &ThreadPool, self_pubkey: &Pubkey, - bank_forks: &RwLock, + working_bank: &Bank, leader_schedule_cache: &LeaderScheduleCache, recycler_cache: &RecyclerCache, packets: &mut [PacketBatch], cache: &RwLock, ) { - let working_bank = bank_forks.read().unwrap().working_bank(); let leader_slots: HashMap = - get_slot_leaders(self_pubkey, packets, leader_schedule_cache, &working_bank) + get_slot_leaders(self_pubkey, packets, leader_schedule_cache, working_bank) .into_iter() .filter_map(|(slot, pubkey)| Some((slot, pubkey?))) .chain(std::iter::once((Slot::MAX, Pubkey::default()))) @@ -262,7 +362,10 @@ struct ShredSigVerifyStats { num_discards_post: usize, num_discards_pre: usize, num_duplicates: usize, + num_invalid_retransmitter: AtomicUsize, num_retransmit_shreds: usize, + num_unknown_slot_leader: AtomicUsize, + num_unknown_turbine_parent: AtomicUsize, elapsed_micros: u64, resign_micros: u64, } @@ -280,7 +383,10 @@ impl ShredSigVerifyStats { num_deduper_saturations: 0usize, num_discards_post: 0usize, num_duplicates: 0usize, + num_invalid_retransmitter: AtomicUsize::default(), num_retransmit_shreds: 0usize, + num_unknown_slot_leader: AtomicUsize::default(), + num_unknown_turbine_parent: AtomicUsize::default(), elapsed_micros: 0u64, resign_micros: 0u64, } @@ -299,7 +405,22 @@ impl ShredSigVerifyStats { ("num_deduper_saturations", self.num_deduper_saturations, i64), ("num_discards_post", self.num_discards_post, i64), ("num_duplicates", self.num_duplicates, i64), + ( + "num_invalid_retransmitter", + self.num_invalid_retransmitter.load(Ordering::Relaxed), + i64 + ), ("num_retransmit_shreds", self.num_retransmit_shreds, i64), + ( + "num_unknown_slot_leader", + self.num_unknown_slot_leader.load(Ordering::Relaxed), + i64 + ), + ( + "num_unknown_turbine_parent", + self.num_unknown_turbine_parent.load(Ordering::Relaxed), + i64 + ), ("elapsed_micros", self.elapsed_micros, i64), ("resign_micros", self.resign_micros, i64), ); @@ -365,10 +486,11 @@ mod tests { let cache = RwLock::new(LruCache::new(/*capacity:*/ 128)); let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); + let working_bank = bank_forks.read().unwrap().working_bank(); verify_packets( &thread_pool, &Pubkey::new_unique(), // self_pubkey - &bank_forks, + &working_bank, &leader_schedule_cache, &RecyclerCache::warmed(), &mut batches,