Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.18: adds rollout path for chained Merkle shreds (backport of #35076) #35305

Merged
merged 1 commit into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ledger/src/shred/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct ProcessShredsStats {
num_data_shreds_hist: [usize; 5],
// If the blockstore already has shreds for the broadcast slot.
pub num_extant_slots: u64,
// When looking up chained merkle root from parent slot fails.
pub err_unknown_chained_merkle_root: u64,
pub(crate) data_buffer_residual: usize,
pub num_merkle_data_shreds: usize,
pub num_merkle_coding_shreds: usize,
Expand Down Expand Up @@ -89,6 +91,11 @@ impl ProcessShredsStats {
("sign_coding_time", self.sign_coding_elapsed, i64),
("coding_send_time", self.coding_send_elapsed, i64),
("num_extant_slots", self.num_extant_slots, i64),
(
"err_unknown_chained_merkle_root",
self.err_unknown_chained_merkle_root,
i64
),
("data_buffer_residual", self.data_buffer_residual, i64),
("num_data_shreds_07", self.num_data_shreds_hist[0], i64),
("num_data_shreds_15", self.num_data_shreds_hist[1], i64),
Expand Down Expand Up @@ -161,6 +168,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
coalesce_elapsed,
num_data_shreds_hist,
num_extant_slots,
err_unknown_chained_merkle_root,
data_buffer_residual,
num_merkle_data_shreds,
num_merkle_coding_shreds,
Expand All @@ -175,6 +183,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
self.get_leader_schedule_elapsed += get_leader_schedule_elapsed;
self.coalesce_elapsed += coalesce_elapsed;
self.num_extant_slots += num_extant_slots;
self.err_unknown_chained_merkle_root += err_unknown_chained_merkle_root;
self.data_buffer_residual += data_buffer_residual;
self.num_merkle_data_shreds += num_merkle_data_shreds;
self.num_merkle_coding_shreds += num_merkle_coding_shreds;
Expand Down
1 change: 1 addition & 0 deletions turbine/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(super) struct ReceiveResults {

#[derive(Clone)]
pub struct UnfinishedSlotInfo {
pub(super) chained_merkle_root: Hash,
pub next_shred_index: u32,
pub(crate) next_code_index: u32,
pub slot: Slot,
Expand Down
60 changes: 52 additions & 8 deletions turbine/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use {
shred::{shred_code, ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder},
},
solana_sdk::{
genesis_config::ClusterType,
hash::Hash,
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
},
Expand Down Expand Up @@ -69,6 +71,7 @@ impl StandardBroadcastRun {
&mut self,
keypair: &Keypair,
max_ticks_in_slot: u8,
cluster_type: ClusterType,
stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
const SHRED_TICK_REFERENCE_MASK: u8 = ShredFlags::SHRED_TICK_REFERENCE_MASK.bits();
Expand All @@ -85,7 +88,8 @@ impl StandardBroadcastRun {
keypair,
&[], // entries
true, // is_last_in_slot,
None, // chained_merkle_root
should_chain_merkle_shreds(state.slot, cluster_type)
.then_some(state.chained_merkle_root),
state.next_shred_index,
state.next_code_index,
true, // merkle_variant
Expand All @@ -110,6 +114,7 @@ impl StandardBroadcastRun {
blockstore: &Blockstore,
reference_tick: u8,
is_slot_end: bool,
cluster_type: ClusterType,
process_stats: &mut ProcessShredsStats,
max_data_shreds_per_slot: u32,
max_code_shreds_per_slot: u32,
Expand All @@ -121,8 +126,12 @@ impl StandardBroadcastRun {
BroadcastError,
> {
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
let (next_shred_index, next_code_index) = match &self.unfinished_slot {
Some(state) => (state.next_shred_index, state.next_code_index),
let (next_shred_index, next_code_index, chained_merkle_root) = match &self.unfinished_slot {
Some(state) => (
state.next_shred_index,
state.next_code_index,
state.chained_merkle_root,
),
None => {
// If the blockstore has shreds for the slot, it should not
// recreate the slot:
Expand All @@ -135,7 +144,17 @@ impl StandardBroadcastRun {
return Ok((Vec::default(), Vec::default()));
}
}
(0u32, 0u32)
let chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent(
slot,
parent_slot,
blockstore,
)
.unwrap_or_else(|err| {
error!("Unknown chained Merkle root: {err}");
process_stats.err_unknown_chained_merkle_root += 1;
Hash::default()
});
(0u32, 0u32, chained_merkle_root)
}
};
let shredder =
Expand All @@ -144,7 +163,7 @@ impl StandardBroadcastRun {
keypair,
entries,
is_slot_end,
None, // chained_merkle_root
should_chain_merkle_shreds(slot, cluster_type).then_some(chained_merkle_root),
next_shred_index,
next_code_index,
true, // merkle_variant
Expand All @@ -153,6 +172,10 @@ impl StandardBroadcastRun {
);
process_stats.num_merkle_data_shreds += data_shreds.len();
process_stats.num_merkle_coding_shreds += coding_shreds.len();
let chained_merkle_root = match data_shreds.iter().max_by_key(|shred| shred.index()) {
None => chained_merkle_root,
Some(shred) => shred.merkle_root().unwrap(),
};
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
Some(index) => index + 1,
None => next_shred_index,
Expand All @@ -169,6 +192,7 @@ impl StandardBroadcastRun {
return Err(BroadcastError::TooManyShreds);
}
self.unfinished_slot = Some(UnfinishedSlotInfo {
chained_merkle_root,
next_shred_index,
next_code_index,
slot,
Expand Down Expand Up @@ -232,10 +256,15 @@ impl StandardBroadcastRun {
let mut process_stats = ProcessShredsStats::default();

let mut to_shreds_time = Measure::start("broadcast_to_shreds");
let cluster_type = bank.cluster_type();

// 1) Check if slot was interrupted
let prev_slot_shreds =
self.finish_prev_slot(keypair, bank.ticks_per_slot() as u8, &mut process_stats);
let prev_slot_shreds = self.finish_prev_slot(
keypair,
bank.ticks_per_slot() as u8,
cluster_type,
&mut process_stats,
);

// 2) Convert entries to shreds and coding shreds
let is_last_in_slot = last_tick_height == bank.max_tick_height();
Expand All @@ -247,6 +276,7 @@ impl StandardBroadcastRun {
blockstore,
reference_tick as u8,
is_last_in_slot,
cluster_type,
&mut process_stats,
blockstore::MAX_DATA_SHREDS_PER_SLOT as u32,
shred_code::MAX_CODE_SHREDS_PER_SLOT as u32,
Expand Down Expand Up @@ -497,10 +527,15 @@ impl BroadcastRun for StandardBroadcastRun {
}
}

fn should_chain_merkle_shreds(_slot: Slot, _cluster_type: ClusterType) -> bool {
false
}

#[cfg(test)]
mod test {
use {
super::*,
rand::Rng,
solana_entry::entry::create_ticks,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
Expand All @@ -510,6 +545,7 @@ mod test {
solana_runtime::bank::Bank,
solana_sdk::{
genesis_config::GenesisConfig,
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
Expand Down Expand Up @@ -569,6 +605,7 @@ mod test {
let slot = 1;
let parent = 0;
run.unfinished_slot = Some(UnfinishedSlotInfo {
chained_merkle_root: Hash::new_from_array(rand::thread_rng().gen()),
next_shred_index,
next_code_index: 17,
slot,
Expand All @@ -580,7 +617,12 @@ mod test {
run.current_slot_and_parent = Some((4, 2));

// Slot 2 interrupted slot 1
let shreds = run.finish_prev_slot(&keypair, 0, &mut ProcessShredsStats::default());
let shreds = run.finish_prev_slot(
&keypair,
0, // max_ticks_in_slot
ClusterType::Development,
&mut ProcessShredsStats::default(),
);
let shred = shreds
.first()
.expect("Expected a shred that signals an interrupt");
Expand Down Expand Up @@ -831,6 +873,7 @@ mod test {
&blockstore,
0,
false,
ClusterType::Development,
&mut stats,
1000,
1000,
Expand All @@ -846,6 +889,7 @@ mod test {
&blockstore,
0,
false,
ClusterType::Development,
&mut stats,
10,
10,
Expand Down
Loading