Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

v1.17: Add feature flag for LastIndex and Erasure duplicate proofs (backport of #34360) #34541

Merged
merged 2 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! `window_service` handles the data plane incoming shreds, storing them in
//! blockstore and retransmitting where required
//!

use {
crate::{
cluster_info_vote_listener::VerifiedVoteReceiver,
Expand Down Expand Up @@ -28,7 +29,12 @@ use {
solana_metrics::inc_new_counter_error,
solana_perf::packet::{Packet, PacketBatch},
solana_rayon_threadlimit::get_thread_count,
solana_sdk::clock::Slot,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
feature_set,
},
solana_turbine::cluster_nodes,
std::{
cmp::Reverse,
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -142,12 +148,31 @@ fn run_check_duplicate(
blockstore: &Blockstore,
shred_receiver: &Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: &DuplicateSlotSender,
bank_forks: &RwLock<BankForks>,
) -> Result<()> {
let mut root_bank = bank_forks.read().unwrap().root_bank();
let mut last_updated = Instant::now();
let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
// Grabs bank forks lock once a slot
last_updated = Instant::now();
root_bank = bank_forks.read().unwrap().root_bank();
}
let shred_slot = shred.slot();
let send_index_and_erasure_conflicts = cluster_nodes::check_feature_activation(
&feature_set::index_erasure_conflict_duplicate_proofs::id(),
shred_slot,
&root_bank,
);
let (shred1, shred2) = match shred {
PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::LastIndexConflict(shred, conflict)
| PossibleDuplicateShred::ErasureConflict(shred, conflict) => {
if send_index_and_erasure_conflicts {
(shred, conflict)
} else {
return Ok(());
}
}
PossibleDuplicateShred::Exists(shred) => {
// Unlike the other cases we have to wait until here to decide to handle the duplicate and store
// in blockstore. This is because the duplicate could have been part of the same insert batch,
Expand Down Expand Up @@ -342,6 +367,7 @@ impl WindowService {
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();

let cluster_info = repair_info.cluster_info.clone();
let bank_forks = repair_info.bank_forks.clone();

let repair_service = RepairService::new(
blockstore.clone(),
Expand All @@ -366,6 +392,7 @@ impl WindowService {
blockstore.clone(),
duplicate_receiver,
duplicate_slots_sender,
bank_forks,
);

let t_insert = Self::start_window_insert_thread(
Expand All @@ -392,6 +419,7 @@ impl WindowService {
blockstore: Arc<Blockstore>,
duplicate_receiver: Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: DuplicateSlotSender,
bank_forks: Arc<RwLock<BankForks>>,
) -> JoinHandle<()> {
let handle_error = || {
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
Expand All @@ -405,6 +433,7 @@ impl WindowService {
&blockstore,
&duplicate_receiver,
&duplicate_slots_sender,
&bank_forks,
) {
if Self::should_exit_on_error(e, &handle_error) {
break;
Expand Down Expand Up @@ -507,9 +536,11 @@ mod test {
solana_gossip::contact_info::ContactInfo,
solana_ledger::{
blockstore::{make_many_slot_entries, Blockstore},
genesis_utils::create_genesis_config,
get_tmp_ledger_path_auto_delete,
shred::{ProcessShredsStats, Shredder},
},
solana_runtime::bank::Bank,
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -556,6 +587,8 @@ mod test {
#[test]
fn test_run_check_duplicate() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (sender, receiver) = unbounded();
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
Expand Down Expand Up @@ -587,6 +620,7 @@ mod test {
&blockstore,
&receiver,
&duplicate_slot_sender,
&bank_forks,
)
.unwrap();

Expand Down Expand Up @@ -616,6 +650,8 @@ mod test {
Arc::new(keypair),
SocketAddrSpace::Unspecified,
));
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));

// Start duplicate thread receiving and inserting duplicates
let t_check_duplicate = WindowService::start_check_duplicate_thread(
Expand All @@ -624,6 +660,7 @@ mod test {
blockstore.clone(),
duplicate_shred_receiver,
duplicate_slot_sender,
bank_forks,
);

let handle_duplicate = |shred| {
Expand Down
7 changes: 6 additions & 1 deletion sdk/src/feature_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,10 @@ pub mod consume_blockstore_duplicate_proofs {
solana_sdk::declare_id!("6YsBCejwK96GZCkJ6mkZ4b68oP63z2PLoQmWjC7ggTqZ");
}

pub mod index_erasure_conflict_duplicate_proofs {
solana_sdk::declare_id!("dupPajaLy2SSn8ko42aZz4mHANDNrLe8Nw8VQgFecLa");
}

lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
Expand Down Expand Up @@ -914,7 +918,8 @@ lazy_static! {
(validate_fee_collector_account::id(), "validate fee collector account #33888"),
(enable_zk_transfer_with_fee::id(), "enable Zk Token proof program transfer with fee"),
(drop_legacy_shreds::id(), "drops legacy shreds #34328"),
(consume_blockstore_duplicate_proofs::id(), "consume duplicate proofs from blockstore in consensus #34372")
(consume_blockstore_duplicate_proofs::id(), "consume duplicate proofs from blockstore in consensus #34372"),
(index_erasure_conflict_duplicate_proofs::id(), "generate duplicate proofs for index and erasure conflicts #34360"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()
Expand Down
2 changes: 1 addition & 1 deletion turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool

// Returns true if the feature is effective for the shred slot.
#[must_use]
fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
match root_bank.feature_set.activated_slot(feature) {
None => false,
Some(feature_slot) => {
Expand Down