Skip to content

Commit

Permalink
adds rollout path for chained Merkle shreds (#35076)
Browse files Browse the repository at this point in the history
The commit adds should_chain_merkle_shreds to incrementally roll out
chained Merkle shreds to clusters.

(cherry picked from commit 0cfb06f)
  • Loading branch information
behzadnouri authored and mergify[bot] committed Feb 23, 2024
1 parent 6f13e1c commit df26261
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
9 changes: 9 additions & 0 deletions ledger/src/shred/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct ProcessShredsStats {
num_data_shreds_hist: [usize; 5],
// If the blockstore already has shreds for the broadcast slot.
pub num_extant_slots: u64,
// When looking up chained merkle root from parent slot fails.
pub err_unknown_chained_merkle_root: u64,
pub(crate) data_buffer_residual: usize,
pub num_merkle_data_shreds: usize,
pub num_merkle_coding_shreds: usize,
Expand Down Expand Up @@ -89,6 +91,11 @@ impl ProcessShredsStats {
("sign_coding_time", self.sign_coding_elapsed, i64),
("coding_send_time", self.coding_send_elapsed, i64),
("num_extant_slots", self.num_extant_slots, i64),
(
"err_unknown_chained_merkle_root",
self.err_unknown_chained_merkle_root,
i64
),
("data_buffer_residual", self.data_buffer_residual, i64),
("num_data_shreds_07", self.num_data_shreds_hist[0], i64),
("num_data_shreds_15", self.num_data_shreds_hist[1], i64),
Expand Down Expand Up @@ -161,6 +168,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
coalesce_elapsed,
num_data_shreds_hist,
num_extant_slots,
err_unknown_chained_merkle_root,
data_buffer_residual,
num_merkle_data_shreds,
num_merkle_coding_shreds,
Expand All @@ -175,6 +183,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
self.get_leader_schedule_elapsed += get_leader_schedule_elapsed;
self.coalesce_elapsed += coalesce_elapsed;
self.num_extant_slots += num_extant_slots;
self.err_unknown_chained_merkle_root += err_unknown_chained_merkle_root;
self.data_buffer_residual += data_buffer_residual;
self.num_merkle_data_shreds += num_merkle_data_shreds;
self.num_merkle_coding_shreds += num_merkle_coding_shreds;
Expand Down
1 change: 1 addition & 0 deletions turbine/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(super) struct ReceiveResults {

#[derive(Clone)]
pub struct UnfinishedSlotInfo {
pub(super) chained_merkle_root: Hash,
pub next_shred_index: u32,
pub(crate) next_code_index: u32,
pub slot: Slot,
Expand Down
60 changes: 52 additions & 8 deletions turbine/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use {
shred::{shred_code, ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder},
},
solana_sdk::{
genesis_config::ClusterType,
hash::Hash,
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
},
Expand Down Expand Up @@ -69,6 +71,7 @@ impl StandardBroadcastRun {
&mut self,
keypair: &Keypair,
max_ticks_in_slot: u8,
cluster_type: ClusterType,
stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
const SHRED_TICK_REFERENCE_MASK: u8 = ShredFlags::SHRED_TICK_REFERENCE_MASK.bits();
Expand All @@ -85,7 +88,8 @@ impl StandardBroadcastRun {
keypair,
&[], // entries
true, // is_last_in_slot,
None, // chained_merkle_root
should_chain_merkle_shreds(state.slot, cluster_type)
.then_some(state.chained_merkle_root),
state.next_shred_index,
state.next_code_index,
true, // merkle_variant
Expand All @@ -110,6 +114,7 @@ impl StandardBroadcastRun {
blockstore: &Blockstore,
reference_tick: u8,
is_slot_end: bool,
cluster_type: ClusterType,
process_stats: &mut ProcessShredsStats,
max_data_shreds_per_slot: u32,
max_code_shreds_per_slot: u32,
Expand All @@ -121,8 +126,12 @@ impl StandardBroadcastRun {
BroadcastError,
> {
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
let (next_shred_index, next_code_index) = match &self.unfinished_slot {
Some(state) => (state.next_shred_index, state.next_code_index),
let (next_shred_index, next_code_index, chained_merkle_root) = match &self.unfinished_slot {
Some(state) => (
state.next_shred_index,
state.next_code_index,
state.chained_merkle_root,
),
None => {
// If the blockstore has shreds for the slot, it should not
// recreate the slot:
Expand All @@ -135,7 +144,17 @@ impl StandardBroadcastRun {
return Ok((Vec::default(), Vec::default()));
}
}
(0u32, 0u32)
let chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent(
slot,
parent_slot,
blockstore,
)
.unwrap_or_else(|err| {
error!("Unknown chained Merkle root: {err}");
process_stats.err_unknown_chained_merkle_root += 1;
Hash::default()
});
(0u32, 0u32, chained_merkle_root)
}
};
let shredder =
Expand All @@ -144,7 +163,7 @@ impl StandardBroadcastRun {
keypair,
entries,
is_slot_end,
None, // chained_merkle_root
should_chain_merkle_shreds(slot, cluster_type).then_some(chained_merkle_root),
next_shred_index,
next_code_index,
true, // merkle_variant
Expand All @@ -153,6 +172,10 @@ impl StandardBroadcastRun {
);
process_stats.num_merkle_data_shreds += data_shreds.len();
process_stats.num_merkle_coding_shreds += coding_shreds.len();
let chained_merkle_root = match data_shreds.iter().max_by_key(|shred| shred.index()) {
None => chained_merkle_root,
Some(shred) => shred.merkle_root().unwrap(),
};
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
Some(index) => index + 1,
None => next_shred_index,
Expand All @@ -169,6 +192,7 @@ impl StandardBroadcastRun {
return Err(BroadcastError::TooManyShreds);
}
self.unfinished_slot = Some(UnfinishedSlotInfo {
chained_merkle_root,
next_shred_index,
next_code_index,
slot,
Expand Down Expand Up @@ -232,10 +256,15 @@ impl StandardBroadcastRun {
let mut process_stats = ProcessShredsStats::default();

let mut to_shreds_time = Measure::start("broadcast_to_shreds");
let cluster_type = bank.cluster_type();

// 1) Check if slot was interrupted
let prev_slot_shreds =
self.finish_prev_slot(keypair, bank.ticks_per_slot() as u8, &mut process_stats);
let prev_slot_shreds = self.finish_prev_slot(
keypair,
bank.ticks_per_slot() as u8,
cluster_type,
&mut process_stats,
);

// 2) Convert entries to shreds and coding shreds
let is_last_in_slot = last_tick_height == bank.max_tick_height();
Expand All @@ -247,6 +276,7 @@ impl StandardBroadcastRun {
blockstore,
reference_tick as u8,
is_last_in_slot,
cluster_type,
&mut process_stats,
blockstore::MAX_DATA_SHREDS_PER_SLOT as u32,
shred_code::MAX_CODE_SHREDS_PER_SLOT as u32,
Expand Down Expand Up @@ -497,10 +527,15 @@ impl BroadcastRun for StandardBroadcastRun {
}
}

fn should_chain_merkle_shreds(_slot: Slot, _cluster_type: ClusterType) -> bool {
false
}

#[cfg(test)]
mod test {
use {
super::*,
rand::Rng,
solana_entry::entry::create_ticks,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
Expand All @@ -510,6 +545,7 @@ mod test {
solana_runtime::bank::Bank,
solana_sdk::{
genesis_config::GenesisConfig,
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
Expand Down Expand Up @@ -569,6 +605,7 @@ mod test {
let slot = 1;
let parent = 0;
run.unfinished_slot = Some(UnfinishedSlotInfo {
chained_merkle_root: Hash::new_from_array(rand::thread_rng().gen()),
next_shred_index,
next_code_index: 17,
slot,
Expand All @@ -580,7 +617,12 @@ mod test {
run.current_slot_and_parent = Some((4, 2));

// Slot 2 interrupted slot 1
let shreds = run.finish_prev_slot(&keypair, 0, &mut ProcessShredsStats::default());
let shreds = run.finish_prev_slot(
&keypair,
0, // max_ticks_in_slot
ClusterType::Development,
&mut ProcessShredsStats::default(),
);
let shred = shreds
.first()
.expect("Expected a shred that signals an interrupt");
Expand Down Expand Up @@ -831,6 +873,7 @@ mod test {
&blockstore,
0,
false,
ClusterType::Development,
&mut stats,
1000,
1000,
Expand All @@ -846,6 +889,7 @@ mod test {
&blockstore,
0,
false,
ClusterType::Development,
&mut stats,
10,
10,
Expand Down

0 comments on commit df26261

Please sign in to comment.