diff --git a/ledger/src/shred/stats.rs b/ledger/src/shred/stats.rs index 5b4a75a2489bbb..60dfa9a79859c2 100644 --- a/ledger/src/shred/stats.rs +++ b/ledger/src/shred/stats.rs @@ -23,6 +23,8 @@ pub struct ProcessShredsStats { num_data_shreds_hist: [usize; 5], // If the blockstore already has shreds for the broadcast slot. pub num_extant_slots: u64, + // When looking up chained merkle root from parent slot fails. + pub err_unknown_chained_merkle_root: u64, pub(crate) data_buffer_residual: usize, pub num_merkle_data_shreds: usize, pub num_merkle_coding_shreds: usize, @@ -89,6 +91,11 @@ impl ProcessShredsStats { ("sign_coding_time", self.sign_coding_elapsed, i64), ("coding_send_time", self.coding_send_elapsed, i64), ("num_extant_slots", self.num_extant_slots, i64), + ( + "err_unknown_chained_merkle_root", + self.err_unknown_chained_merkle_root, + i64 + ), ("data_buffer_residual", self.data_buffer_residual, i64), ("num_data_shreds_07", self.num_data_shreds_hist[0], i64), ("num_data_shreds_15", self.num_data_shreds_hist[1], i64), @@ -161,6 +168,7 @@ impl AddAssign for ProcessShredsStats { coalesce_elapsed, num_data_shreds_hist, num_extant_slots, + err_unknown_chained_merkle_root, data_buffer_residual, num_merkle_data_shreds, num_merkle_coding_shreds, @@ -175,6 +183,7 @@ impl AddAssign for ProcessShredsStats { self.get_leader_schedule_elapsed += get_leader_schedule_elapsed; self.coalesce_elapsed += coalesce_elapsed; self.num_extant_slots += num_extant_slots; + self.err_unknown_chained_merkle_root += err_unknown_chained_merkle_root; self.data_buffer_residual += data_buffer_residual; self.num_merkle_data_shreds += num_merkle_data_shreds; self.num_merkle_coding_shreds += num_merkle_coding_shreds; diff --git a/turbine/src/broadcast_stage/broadcast_utils.rs b/turbine/src/broadcast_stage/broadcast_utils.rs index 3468a86dfd64ff..be231581e7fbfe 100644 --- a/turbine/src/broadcast_stage/broadcast_utils.rs +++ b/turbine/src/broadcast_stage/broadcast_utils.rs @@ -28,6 +28,7 @@ pub(super) struct ReceiveResults { #[derive(Clone)] pub struct UnfinishedSlotInfo { + pub(super) chained_merkle_root: Hash, pub next_shred_index: u32, pub(crate) next_code_index: u32, pub slot: Slot, diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index e2b8871b4bc3c2..6378c0df40a8d3 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -14,6 +14,8 @@ use { shred::{shred_code, ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder}, }, solana_sdk::{ + genesis_config::ClusterType, + hash::Hash, signature::Keypair, timing::{duration_as_us, AtomicInterval}, }, @@ -69,6 +71,7 @@ impl StandardBroadcastRun { &mut self, keypair: &Keypair, max_ticks_in_slot: u8, + cluster_type: ClusterType, stats: &mut ProcessShredsStats, ) -> Vec { const SHRED_TICK_REFERENCE_MASK: u8 = ShredFlags::SHRED_TICK_REFERENCE_MASK.bits(); @@ -85,7 +88,8 @@ impl StandardBroadcastRun { keypair, &[], // entries true, // is_last_in_slot, - None, // chained_merkle_root + should_chain_merkle_shreds(state.slot, cluster_type) + .then_some(state.chained_merkle_root), state.next_shred_index, state.next_code_index, true, // merkle_variant @@ -110,6 +114,7 @@ impl StandardBroadcastRun { blockstore: &Blockstore, reference_tick: u8, is_slot_end: bool, + cluster_type: ClusterType, process_stats: &mut ProcessShredsStats, max_data_shreds_per_slot: u32, max_code_shreds_per_slot: u32, @@ -121,8 +126,12 @@ impl StandardBroadcastRun { BroadcastError, > { let (slot, parent_slot) = self.current_slot_and_parent.unwrap(); - let (next_shred_index, next_code_index) = match &self.unfinished_slot { - Some(state) => (state.next_shred_index, state.next_code_index), + let (next_shred_index, next_code_index, chained_merkle_root) = match &self.unfinished_slot { + Some(state) => ( + state.next_shred_index, + state.next_code_index, + state.chained_merkle_root, + ), None => { // If the blockstore has shreds for the slot, it should not // recreate the slot: @@ -135,7 +144,17 @@ impl StandardBroadcastRun { return Ok((Vec::default(), Vec::default())); } } - (0u32, 0u32) + let chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent( + slot, + parent_slot, + blockstore, + ) + .unwrap_or_else(|err| { + error!("Unknown chained Merkle root: {err}"); + process_stats.err_unknown_chained_merkle_root += 1; + Hash::default() + }); + (0u32, 0u32, chained_merkle_root) } }; let shredder = @@ -144,7 +163,7 @@ impl StandardBroadcastRun { keypair, entries, is_slot_end, - None, // chained_merkle_root + should_chain_merkle_shreds(slot, cluster_type).then_some(chained_merkle_root), next_shred_index, next_code_index, true, // merkle_variant @@ -153,6 +172,10 @@ impl StandardBroadcastRun { ); process_stats.num_merkle_data_shreds += data_shreds.len(); process_stats.num_merkle_coding_shreds += coding_shreds.len(); + let chained_merkle_root = match data_shreds.iter().max_by_key(|shred| shred.index()) { + None => chained_merkle_root, + Some(shred) => shred.merkle_root().unwrap(), + }; let next_shred_index = match data_shreds.iter().map(Shred::index).max() { Some(index) => index + 1, None => next_shred_index, @@ -169,6 +192,7 @@ impl StandardBroadcastRun { return Err(BroadcastError::TooManyShreds); } self.unfinished_slot = Some(UnfinishedSlotInfo { + chained_merkle_root, next_shred_index, next_code_index, slot, @@ -232,10 +256,15 @@ impl StandardBroadcastRun { let mut process_stats = ProcessShredsStats::default(); let mut to_shreds_time = Measure::start("broadcast_to_shreds"); + let cluster_type = bank.cluster_type(); // 1) Check if slot was interrupted - let prev_slot_shreds = - self.finish_prev_slot(keypair, bank.ticks_per_slot() as u8, &mut process_stats); + let prev_slot_shreds = self.finish_prev_slot( + keypair, + bank.ticks_per_slot() as u8, + cluster_type, + &mut process_stats, + ); // 2) Convert entries to shreds and coding shreds let is_last_in_slot = last_tick_height == bank.max_tick_height(); @@ -247,6 +276,7 @@ impl StandardBroadcastRun { blockstore, reference_tick as u8, is_last_in_slot, + cluster_type, &mut process_stats, blockstore::MAX_DATA_SHREDS_PER_SLOT as u32, shred_code::MAX_CODE_SHREDS_PER_SLOT as u32, @@ -497,10 +527,15 @@ impl BroadcastRun for StandardBroadcastRun { } } +fn should_chain_merkle_shreds(_slot: Slot, _cluster_type: ClusterType) -> bool { + false +} + #[cfg(test)] mod test { use { super::*, + rand::Rng, solana_entry::entry::create_ticks, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -510,6 +545,7 @@ mod test { solana_runtime::bank::Bank, solana_sdk::{ genesis_config::GenesisConfig, + hash::Hash, signature::{Keypair, Signer}, }, solana_streamer::socket::SocketAddrSpace, @@ -569,6 +605,7 @@ mod test { let slot = 1; let parent = 0; run.unfinished_slot = Some(UnfinishedSlotInfo { + chained_merkle_root: Hash::new_from_array(rand::thread_rng().gen()), next_shred_index, next_code_index: 17, slot, @@ -580,7 +617,12 @@ mod test { run.current_slot_and_parent = Some((4, 2)); // Slot 2 interrupted slot 1 - let shreds = run.finish_prev_slot(&keypair, 0, &mut ProcessShredsStats::default()); + let shreds = run.finish_prev_slot( + &keypair, + 0, // max_ticks_in_slot + ClusterType::Development, + &mut ProcessShredsStats::default(), + ); let shred = shreds .first() .expect("Expected a shred that signals an interrupt"); @@ -831,6 +873,7 @@ mod test { &blockstore, 0, false, + ClusterType::Development, &mut stats, 1000, 1000, @@ -846,6 +889,7 @@ mod test { &blockstore, 0, false, + ClusterType::Development, &mut stats, 10, 10,