Skip to content

Commit

Permalink
Add feature flag for LastIndex and Erasure duplicate proofs (#34360)
Browse files Browse the repository at this point in the history
* Add feature flag for LastIndex and Erasure duplicate proofs

* pr feedback: use root bank instead of 2 params

* pr feedback: & instead of &Arc

* pr feedback: reuse fn, remove redundant clones

* rebase: fix feature set conflict
  • Loading branch information
AshwinSekar authored Dec 20, 2023
1 parent 210d320 commit def3bc4
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 5 deletions.
43 changes: 40 additions & 3 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! `window_service` handles the data plane incoming shreds, storing them in
//! blockstore and retransmitting where required
//!
use {
crate::{
cluster_info_vote_listener::VerifiedVoteReceiver,
Expand Down Expand Up @@ -28,7 +29,12 @@ use {
solana_metrics::inc_new_counter_error,
solana_perf::packet::{Packet, PacketBatch},
solana_rayon_threadlimit::get_thread_count,
solana_sdk::clock::Slot,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
feature_set,
},
solana_turbine::cluster_nodes,
std::{
cmp::Reverse,
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -142,12 +148,31 @@ fn run_check_duplicate(
blockstore: &Blockstore,
shred_receiver: &Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: &DuplicateSlotSender,
bank_forks: &RwLock<BankForks>,
) -> Result<()> {
let mut root_bank = bank_forks.read().unwrap().root_bank();
let mut last_updated = Instant::now();
let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
// Grabs bank forks lock once a slot
last_updated = Instant::now();
root_bank = bank_forks.read().unwrap().root_bank();
}
let shred_slot = shred.slot();
let send_index_and_erasure_conflicts = cluster_nodes::check_feature_activation(
&feature_set::index_erasure_conflict_duplicate_proofs::id(),
shred_slot,
&root_bank,
);
let (shred1, shred2) = match shred {
PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::LastIndexConflict(shred, conflict)
| PossibleDuplicateShred::ErasureConflict(shred, conflict) => {
if send_index_and_erasure_conflicts {
(shred, conflict)
} else {
return Ok(());
}
}
PossibleDuplicateShred::Exists(shred) => {
// Unlike the other cases we have to wait until here to decide to handle the duplicate and store
// in blockstore. This is because the duplicate could have been part of the same insert batch,
Expand Down Expand Up @@ -342,6 +367,7 @@ impl WindowService {
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();

let cluster_info = repair_info.cluster_info.clone();
let bank_forks = repair_info.bank_forks.clone();

let repair_service = RepairService::new(
blockstore.clone(),
Expand All @@ -366,6 +392,7 @@ impl WindowService {
blockstore.clone(),
duplicate_receiver,
duplicate_slots_sender,
bank_forks,
);

let t_insert = Self::start_window_insert_thread(
Expand All @@ -392,6 +419,7 @@ impl WindowService {
blockstore: Arc<Blockstore>,
duplicate_receiver: Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: DuplicateSlotSender,
bank_forks: Arc<RwLock<BankForks>>,
) -> JoinHandle<()> {
let handle_error = || {
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
Expand All @@ -405,6 +433,7 @@ impl WindowService {
&blockstore,
&duplicate_receiver,
&duplicate_slots_sender,
&bank_forks,
) {
if Self::should_exit_on_error(e, &handle_error) {
break;
Expand Down Expand Up @@ -507,9 +536,11 @@ mod test {
solana_gossip::contact_info::ContactInfo,
solana_ledger::{
blockstore::{make_many_slot_entries, Blockstore},
genesis_utils::create_genesis_config,
get_tmp_ledger_path_auto_delete,
shred::{ProcessShredsStats, Shredder},
},
solana_runtime::bank::Bank,
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -556,6 +587,8 @@ mod test {
#[test]
fn test_run_check_duplicate() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (sender, receiver) = unbounded();
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
Expand Down Expand Up @@ -587,6 +620,7 @@ mod test {
&blockstore,
&receiver,
&duplicate_slot_sender,
&bank_forks,
)
.unwrap();

Expand Down Expand Up @@ -616,6 +650,8 @@ mod test {
Arc::new(keypair),
SocketAddrSpace::Unspecified,
));
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));

// Start duplicate thread receiving and inserting duplicates
let t_check_duplicate = WindowService::start_check_duplicate_thread(
Expand All @@ -624,6 +660,7 @@ mod test {
blockstore.clone(),
duplicate_shred_receiver,
duplicate_slot_sender,
bank_forks,
);

let handle_duplicate = |shred| {
Expand Down
7 changes: 6 additions & 1 deletion sdk/src/feature_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,10 @@ pub mod consume_blockstore_duplicate_proofs {
solana_sdk::declare_id!("6YsBCejwK96GZCkJ6mkZ4b68oP63z2PLoQmWjC7ggTqZ");
}

pub mod index_erasure_conflict_duplicate_proofs {
solana_sdk::declare_id!("dupPajaLy2SSn8ko42aZz4mHANDNrLe8Nw8VQgFecLa");
}

lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
Expand Down Expand Up @@ -924,7 +928,8 @@ lazy_static! {
(enable_zk_transfer_with_fee::id(), "enable Zk Token proof program transfer with fee"),
(drop_legacy_shreds::id(), "drops legacy shreds #34328"),
(allow_commission_decrease_at_any_time::id(), "Allow commission decrease at any time in epoch #33843"),
(consume_blockstore_duplicate_proofs::id(), "consume duplicate proofs from blockstore in consensus #34372")
(consume_blockstore_duplicate_proofs::id(), "consume duplicate proofs from blockstore in consensus #34372"),
(index_erasure_conflict_duplicate_proofs::id(), "generate duplicate proofs for index and erasure conflicts #34360"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()
Expand Down
2 changes: 1 addition & 1 deletion turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool

// Returns true if the feature is effective for the shred slot.
#[must_use]
fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
match root_bank.feature_set.activated_slot(feature) {
None => false,
Some(feature_slot) => {
Expand Down

0 comments on commit def3bc4

Please sign in to comment.