Skip to content

Commit

Permalink
adds feature to (temporarily) drop merkle shreds from testnet
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Jan 14, 2023
1 parent c56a47b commit f65c519
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 6 deletions.
7 changes: 6 additions & 1 deletion core/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,12 @@ fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool
}

// Returns true if the feature is effective for the shred slot.
fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
#[must_use]
pub(crate) fn check_feature_activation(
feature: &Pubkey,
shred_slot: Slot,
root_bank: &Bank,
) -> bool {
match root_bank.feature_set.activated_slot(feature) {
None => false,
Some(feature_slot) => {
Expand Down
53 changes: 48 additions & 5 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
use {
crate::{packet_hasher::PacketHasher, serve_repair::ServeRepair},
crate::{
cluster_nodes::check_feature_activation, packet_hasher::PacketHasher,
serve_repair::ServeRepair,
},
crossbeam_channel::{unbounded, Sender},
lru::LruCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::bank_forks::BankForks,
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
feature_set,
genesis_config::ClusterType,
},
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
net::UdpSocket,
Expand Down Expand Up @@ -43,6 +50,7 @@ impl ShredFetchStage {
.map(|(_, cluster_info)| cluster_info.keypair().clone());

// In the case of bank_forks=None, setup to accept any slot range
let mut root_bank = bank_forks.read().unwrap().root_bank();
let mut last_root = 0;
let mut last_slot = std::u64::MAX;
let mut slots_per_epoch = 0;
Expand All @@ -60,7 +68,7 @@ impl ShredFetchStage {
last_root = bank_forks_r.root();
let working_bank = bank_forks_r.working_bank();
last_slot = working_bank.slot();
let root_bank = bank_forks_r.root_bank();
root_bank = bank_forks_r.root_bank();
slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch());
}
keypair = repair_context
Expand All @@ -84,6 +92,8 @@ impl ShredFetchStage {

// Limit shreds to 2 epochs away.
let max_slot = last_slot + 2 * slots_per_epoch;
let should_drop_merkle_shreds =
|shred_slot| should_drop_merkle_shreds(shred_slot, &root_bank);
for packet in packet_batch.iter_mut() {
if should_discard_packet(
packet,
Expand All @@ -92,6 +102,7 @@ impl ShredFetchStage {
shred_version,
&packet_hasher,
&mut shreds_received,
should_drop_merkle_shreds,
&mut stats,
) {
packet.meta_mut().set_discard(true);
Expand Down Expand Up @@ -229,9 +240,17 @@ fn should_discard_packet(
shred_version: u16,
packet_hasher: &PacketHasher,
shreds_received: &mut LruCache<u64, ()>,
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
stats: &mut ShredFetchStats,
) -> bool {
if should_discard_shred(packet, root, max_slot, shred_version, stats) {
if should_discard_shred(
packet,
root,
max_slot,
shred_version,
should_drop_merkle_shreds,
stats,
) {
return true;
}
let hash = packet_hasher.hash_packet(packet);
Expand All @@ -244,6 +263,21 @@ fn should_discard_packet(
}
}

#[must_use]
fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
root_bank.cluster_type() == ClusterType::Testnet
&& check_feature_activation(
&feature_set::drop_merkle_shreds::id(),
shred_slot,
root_bank,
)
&& !check_feature_activation(
&feature_set::keep_merkle_shreds::id(),
shred_slot,
root_bank,
)
}

#[cfg(test)]
mod tests {
use {
Expand Down Expand Up @@ -288,6 +322,7 @@ mod tests {
shred_version,
&hasher,
&mut shreds_received,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
Expand All @@ -303,6 +338,7 @@ mod tests {
shred_version,
&hasher,
&mut shreds_received,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
}
Expand All @@ -329,6 +365,7 @@ mod tests {
shred_version,
&hasher,
&mut shreds_received,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.index_overrun, 1);
Expand All @@ -352,6 +389,7 @@ mod tests {
shred_version,
&hasher,
&mut shreds_received,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.slot_out_of_range, 1);
Expand All @@ -363,6 +401,7 @@ mod tests {
345, // shred_version
&hasher,
&mut shreds_received,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.shred_version_mismatch, 1);
Expand All @@ -375,6 +414,7 @@ mod tests {
shred_version,
&hasher,
&mut shreds_received,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

Expand All @@ -386,6 +426,7 @@ mod tests {
shred_version,
&hasher,
&mut shreds_received,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.duplicate_shred, 1);
Expand All @@ -410,6 +451,7 @@ mod tests {
shred_version,
&hasher,
&mut shreds_received,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

Expand All @@ -423,6 +465,7 @@ mod tests {
shred_version,
&hasher,
&mut shreds_received,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
}
Expand Down
18 changes: 18 additions & 0 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ pub fn should_discard_shred(
root: Slot,
max_slot: Slot,
shred_version: u16,
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
stats: &mut ShredFetchStats,
) -> bool {
debug_assert!(root < max_slot);
Expand Down Expand Up @@ -978,9 +979,15 @@ pub fn should_discard_shred(
match shred_variant {
ShredVariant::LegacyCode | ShredVariant::LegacyData => (),
ShredVariant::MerkleCode(_) => {
if should_drop_merkle_shreds(slot) {
return true;
}
stats.num_shreds_merkle_code = stats.num_shreds_merkle_code.saturating_add(1);
}
ShredVariant::MerkleData(_) => {
if should_drop_merkle_shreds(slot) {
return true;
}
stats.num_shreds_merkle_data = stats.num_shreds_merkle_data.saturating_add(1);
}
}
Expand Down Expand Up @@ -1178,6 +1185,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats, ShredFetchStats::default());
Expand All @@ -1188,6 +1196,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.index_overrun, 1);
Expand All @@ -1198,6 +1207,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.index_overrun, 2);
Expand All @@ -1208,6 +1218,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.index_overrun, 3);
Expand All @@ -1218,6 +1229,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.index_overrun, 4);
Expand All @@ -1228,6 +1240,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.bad_parent_offset, 1);
Expand All @@ -1248,6 +1261,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));

Expand All @@ -1267,6 +1281,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(1, stats.index_out_of_bounds);
Expand All @@ -1287,6 +1302,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
packet.buffer_mut()[OFFSET_OF_SHRED_VARIANT] = u8::MAX;
Expand All @@ -1296,6 +1312,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(1, stats.bad_shred_type);
Expand All @@ -1307,6 +1324,7 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(1, stats.bad_shred_type);
Expand Down
10 changes: 10 additions & 0 deletions sdk/src/feature_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,14 @@ pub mod disable_turbine_fanout_experiments {
solana_sdk::declare_id!("Gz1aLrbeQ4Q6PTSafCZcGWZXz91yVRi7ASFzFEr1U4sa");
}

pub mod drop_merkle_shreds {
solana_sdk::declare_id!("84zy5N23Q9vTZuLc9h1HWUtyM9yCFV2SCmyP9W9C3yHZ");
}

pub mod keep_merkle_shreds {
solana_sdk::declare_id!("HyNQzc7TMNmRhpVHXqDGjpsHzeQie82mDQXSF9hj7nAH");
}

pub mod move_serialized_len_ptr_in_cpi {
solana_sdk::declare_id!("74CoWuBmt3rUVUrCb2JiSTvh6nXyBWUsK4SaMj3CtE3T");
}
Expand Down Expand Up @@ -720,6 +728,8 @@ lazy_static! {
(commission_updates_only_allowed_in_first_half_of_epoch::id(), "validator commission updates are only allowed in the first half of an epoch #29362"),
(enable_turbine_fanout_experiments::id(), "enable turbine fanout experiments #29393"),
(disable_turbine_fanout_experiments::id(), "disable turbine fanout experiments #29393"),
(drop_merkle_shreds::id(), "drop merkle shreds #29711"),
(keep_merkle_shreds::id(), "keep merkle shreds #29711"),
(move_serialized_len_ptr_in_cpi::id(), "cpi ignore serialized_len_ptr #29592"),
(update_hashes_per_tick::id(), "Update desired hashes per tick on epoch boundary"),
/*************** ADD NEW FEATURES HERE ***************/
Expand Down

0 comments on commit f65c519

Please sign in to comment.