diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 05408d031ac996..9c8515a23816a3 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -139,6 +139,8 @@ pub enum Error { InvalidErasureShardIndex(/*headers:*/ Box), #[error("Invalid merkle proof")] InvalidMerkleProof, + #[error("Invalid Merkle root")] + InvalidMerkleRoot, #[error("Invalid num coding shreds: {0}")] InvalidNumCodingShreds(u16), #[error("Invalid parent_offset: {parent_offset}, slot: {slot}")] @@ -602,6 +604,11 @@ pub mod layout { packet.data(..size) } + pub fn get_shred_mut(packet: &mut Packet) -> Option<&mut [u8]> { + let size = get_shred_size(packet)?; + packet.buffer_mut().get_mut(..size) + } + pub(crate) fn get_signature(shred: &[u8]) -> Option { shred .get(..SIZE_OF_SIGNATURE) @@ -780,6 +787,45 @@ pub mod layout { Ok(()) } + /// Resigns the shred's Merkle root as the retransmitter node in the + /// Turbine broadcast tree. This signature is in addition to leader's + /// signature which is left intact. + pub fn resign_shred(shred: &mut [u8], keypair: &Keypair) -> Result<(), Error> { + let (offset, merkle_root) = match get_shred_variant(shred)? { + ShredVariant::LegacyCode | ShredVariant::LegacyData => { + return Err(Error::InvalidShredVariant) + } + ShredVariant::MerkleCode { + proof_size, + chained, + resigned, + } => ( + merkle::ShredCode::get_retransmitter_signature_offset( + proof_size, chained, resigned, + )?, + merkle::ShredCode::get_merkle_root(shred, proof_size, chained, resigned) + .ok_or(Error::InvalidMerkleRoot)?, + ), + ShredVariant::MerkleData { + proof_size, + chained, + resigned, + } => ( + merkle::ShredData::get_retransmitter_signature_offset( + proof_size, chained, resigned, + )?, + merkle::ShredData::get_merkle_root(shred, proof_size, chained, resigned) + .ok_or(Error::InvalidMerkleRoot)?, + ), + }; + let Some(buffer) = shred.get_mut(offset..offset + SIZE_OF_SIGNATURE) else { + return Err(Error::InvalidPayloadSize(shred.len())); + }; + let signature = keypair.sign_message(merkle_root.as_ref()); + buffer.copy_from_slice(signature.as_ref()); + Ok(()) + } + // Minimally corrupts the packet so that the signature no longer verifies. #[cfg(test)] pub(crate) fn corrupt_packet( diff --git a/turbine/src/sigverify_shreds.rs b/turbine/src/sigverify_shreds.rs index 25a8b082a05042..ca7de2b5facef8 100644 --- a/turbine/src/sigverify_shreds.rs +++ b/turbine/src/sigverify_shreds.rs @@ -10,7 +10,11 @@ use { solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache}, solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank::Bank, bank_forks::BankForks}, - solana_sdk::{clock::Slot, pubkey::Pubkey}, + solana_sdk::{ + clock::Slot, + pubkey::Pubkey, + signature::{Keypair, Signer}, + }, std::{ collections::HashMap, sync::{Arc, RwLock}, @@ -56,11 +60,12 @@ pub fn spawn_shred_sigverify( if deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE) { stats.num_deduper_saturations += 1; } + // We can't store the keypair outside the loop + // because the identity might be hot swapped. + let keypair: Arc = cluster_info.keypair().clone(); match run_shred_sigverify( &thread_pool, - // We can't store the pubkey outside the loop - // because the identity might be hot swapped. - &cluster_info.id(), + &keypair, &bank_forks, &leader_schedule_cache, &recycler_cache, @@ -88,7 +93,7 @@ pub fn spawn_shred_sigverify( #[allow(clippy::too_many_arguments)] fn run_shred_sigverify( thread_pool: &ThreadPool, - self_pubkey: &Pubkey, + keypair: &Keypair, bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, recycler_cache: &RecyclerCache, @@ -125,7 +130,7 @@ fn run_shred_sigverify( }); verify_packets( thread_pool, - self_pubkey, + &keypair.pubkey(), bank_forks, leader_schedule_cache, recycler_cache, @@ -133,6 +138,28 @@ fn run_shred_sigverify( cache, ); stats.num_discards_post += count_discards(&packets); + // Resign shreds Merkle root as the retransmitter node. + let resign_start = Instant::now(); + thread_pool.install(|| { + packets + .par_iter_mut() + .flatten() + .filter(|packet| !packet.meta().discard()) + .for_each(|packet| { + if let Some(shred) = shred::layout::get_shred_mut(packet) { + // We can ignore Error::InvalidShredVariant because that + // basically means that the shred is of a variant which + // cannot be signed by the retransmitter node. + if !matches!( + shred::layout::resign_shred(shred, keypair), + Ok(()) | Err(shred::Error::InvalidShredVariant) + ) { + packet.meta_mut().set_discard(true); + } + } + }) + }); + stats.resign_micros += resign_start.elapsed().as_micros() as u64; // Exclude repair packets from retransmit. let shreds: Vec<_> = packets .iter() @@ -237,6 +264,7 @@ struct ShredSigVerifyStats { num_duplicates: usize, num_retransmit_shreds: usize, elapsed_micros: u64, + resign_micros: u64, } impl ShredSigVerifyStats { @@ -254,6 +282,7 @@ impl ShredSigVerifyStats { num_duplicates: 0usize, num_retransmit_shreds: 0usize, elapsed_micros: 0u64, + resign_micros: 0u64, } } @@ -272,6 +301,7 @@ impl ShredSigVerifyStats { ("num_duplicates", self.num_duplicates, i64), ("num_retransmit_shreds", self.num_retransmit_shreds, i64), ("elapsed_micros", self.elapsed_micros, i64), + ("resign_micros", self.resign_micros, i64), ); *self = Self::new(Instant::now()); }