diff --git a/core/src/window_service.rs b/core/src/window_service.rs index e4d162602edf64..44c0a2956d55a7 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -486,7 +486,7 @@ mod test { solana_gossip::contact_info::ContactInfo, solana_ledger::{ blockstore::{make_many_slot_entries, Blockstore}, - get_tmp_ledger_path, + get_tmp_ledger_path_auto_delete, shred::{ProcessShredsStats, Shredder}, }, solana_sdk::{ @@ -519,8 +519,8 @@ mod test { #[test] fn test_process_shred() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); let num_entries = 10; let original_entries = create_ticks(num_entries, 0, Hash::default()); let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new()); @@ -530,28 +530,27 @@ mod test { .expect("Expect successful processing of shred"); assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries); - - drop(blockstore); - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } #[test] fn test_run_check_duplicate() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); let (sender, receiver) = unbounded(); let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded(); let (shreds, _) = make_many_slot_entries(5, 5, 10); blockstore .insert_shreds(shreds.clone(), None, false) .unwrap(); + let duplicate_index = 0; + let original_shred = shreds[duplicate_index].clone(); let duplicate_shred = { let (mut shreds, _) = make_many_slot_entries(5, 1, 10); - shreds.swap_remove(0) + shreds.swap_remove(duplicate_index) }; assert_eq!(duplicate_shred.slot(), shreds[0].slot()); let duplicate_shred_slot = duplicate_shred.slot(); - sender.send(duplicate_shred).unwrap(); + sender.send(duplicate_shred.clone()).unwrap(); assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot)); let keypair = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp()); @@ -567,13 +566,87 @@ mod test { &duplicate_slot_sender, ) .unwrap(); - assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot)); + + // Make sure the correct duplicate proof was stored + let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap(); + assert_eq!(duplicate_proof.shred1, *original_shred.payload()); + assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload()); + + // Make sure a duplicate signal was sent assert_eq!( duplicate_slot_receiver.try_recv().unwrap(), duplicate_shred_slot ); } + #[test] + fn test_store_duplicate_shreds_same_batch() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded(); + let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded(); + let exit = Arc::new(AtomicBool::new(false)); + let keypair = Keypair::new(); + let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp()); + let cluster_info = Arc::new(ClusterInfo::new( + contact_info, + Arc::new(keypair), + SocketAddrSpace::Unspecified, + )); + + // Start duplicate thread receiving and inserting duplicates + let t_check_duplicate = WindowService::start_check_duplicate_thread( + cluster_info, + exit.clone(), + blockstore.clone(), + duplicate_shred_receiver, + duplicate_slot_sender, + ); + + let handle_duplicate = |shred| { + let _ = duplicate_shred_sender.send(shred); + }; + let num_trials = 100; + for slot in 0..num_trials { + let (shreds, _) = make_many_slot_entries(slot, 1, 10); + let duplicate_index = 0; + let original_shred = shreds[duplicate_index].clone(); + let duplicate_shred = { + let (mut shreds, _) = make_many_slot_entries(slot, 1, 10); + shreds.swap_remove(duplicate_index) + }; + assert_eq!(duplicate_shred.slot(), slot); + // Simulate storing both duplicate shreds in the same batch + blockstore + .insert_shreds_handle_duplicate( + vec![original_shred.clone(), duplicate_shred.clone()], + vec![false, false], + None, + false, // is_trusted + None, + &handle_duplicate, + &ReedSolomonCache::default(), + &mut BlockstoreInsertionMetrics::default(), + ) + .unwrap(); + + // Make sure a duplicate signal was sent + assert_eq!( + duplicate_slot_receiver + .recv_timeout(Duration::from_millis(5_000)) + .unwrap(), + slot + ); + + // Make sure the correct duplicate proof was stored + let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap(); + assert_eq!(duplicate_proof.shred1, *original_shred.payload()); + assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload()); + } + exit.store(true, Ordering::Relaxed); + t_check_duplicate.join().unwrap(); + } + #[test] fn test_prune_shreds() { use crate::serve_repair::ShredRepairType; diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index a33c98726f476c..2a5ea113fd4928 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -135,6 +135,11 @@ impl std::fmt::Display for InsertDataShredError { } } +pub struct InsertResults { + completed_data_set_infos: Vec, + duplicate_shreds: Vec, +} + /// A "complete data set" is a range of [`Shred`]s that combined in sequence carry a single /// serialized [`Vec`]. /// @@ -817,20 +822,16 @@ impl Blockstore { /// On success, the function returns an Ok result with a vector of /// `CompletedDataSetInfo` and a vector of its corresponding index in the /// input `shreds` vector. - pub fn insert_shreds_handle_duplicate( + fn do_insert_shreds( &self, shreds: Vec, is_repaired: Vec, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, retransmit_sender: Option<&Sender>>>, - handle_duplicate: &F, reed_solomon_cache: &ReedSolomonCache, metrics: &mut BlockstoreInsertionMetrics, - ) -> Result> - where - F: Fn(Shred), - { + ) -> Result { assert_eq!(shreds.len(), is_repaired.len()); let mut total_start = Measure::start("Total elapsed"); let mut start = Measure::start("Blockstore lock"); @@ -845,6 +846,7 @@ impl Blockstore { let mut erasure_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); + let mut duplicate_shreds = vec![]; metrics.num_shreds += shreds.len(); let mut start = Measure::start("Shred insertion"); @@ -867,7 +869,7 @@ impl Blockstore { &mut just_inserted_shreds, &mut index_meta_time_us, is_trusted, - handle_duplicate, + &mut duplicate_shreds, leader_schedule, shred_source, ) { @@ -902,7 +904,7 @@ impl Blockstore { &mut write_batch, &mut just_inserted_shreds, &mut index_meta_time_us, - handle_duplicate, + &mut duplicate_shreds, is_trusted, shred_source, metrics, @@ -951,7 +953,7 @@ impl Blockstore { &mut just_inserted_shreds, &mut index_meta_time_us, is_trusted, - &handle_duplicate, + &mut duplicate_shreds, leader_schedule, ShredSource::Recovered, ) { @@ -1035,7 +1037,44 @@ impl Blockstore { metrics.total_elapsed_us += total_start.as_us(); metrics.index_meta_time_us += index_meta_time_us; - Ok(newly_completed_data_sets) + Ok(InsertResults { + completed_data_set_infos: newly_completed_data_sets, + duplicate_shreds, + }) + } + + pub fn insert_shreds_handle_duplicate( + &self, + shreds: Vec, + is_repaired: Vec, + leader_schedule: Option<&LeaderScheduleCache>, + is_trusted: bool, + retransmit_sender: Option<&Sender>>>, + handle_duplicate: &F, + reed_solomon_cache: &ReedSolomonCache, + metrics: &mut BlockstoreInsertionMetrics, + ) -> Result> + where + F: Fn(Shred), + { + let InsertResults { + completed_data_set_infos, + duplicate_shreds, + } = self.do_insert_shreds( + shreds, + is_repaired, + leader_schedule, + is_trusted, + retransmit_sender, + reed_solomon_cache, + metrics, + )?; + + for shred in duplicate_shreds { + handle_duplicate(shred); + } + + Ok(completed_data_set_infos) } pub fn add_new_shred_signal(&self, s: Sender) { @@ -1113,20 +1152,20 @@ impl Blockstore { is_trusted: bool, ) -> Result> { let shreds_len = shreds.len(); - self.insert_shreds_handle_duplicate( + let insert_results = self.do_insert_shreds( shreds, vec![false; shreds_len], leader_schedule, is_trusted, - None, // retransmit-sender - &|_| {}, // handle-duplicates + None, // retransmit-sender &ReedSolomonCache::default(), &mut BlockstoreInsertionMetrics::default(), - ) + )?; + Ok(insert_results.completed_data_set_infos) } #[allow(clippy::too_many_arguments)] - fn check_insert_coding_shred( + fn check_insert_coding_shred( &self, shred: Shred, erasure_metas: &mut HashMap, @@ -1134,14 +1173,11 @@ impl Blockstore { write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, index_meta_time_us: &mut u64, - handle_duplicate: &F, + duplicate_shreds: &mut Vec, is_trusted: bool, shred_source: ShredSource, metrics: &mut BlockstoreInsertionMetrics, - ) -> bool - where - F: Fn(Shred), - { + ) -> bool { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -1156,7 +1192,7 @@ impl Blockstore { if !is_trusted { if index_meta.coding().contains(shred_index) { metrics.num_coding_shreds_exists += 1; - handle_duplicate(shred); + duplicate_shreds.push(shred); return false; } @@ -1300,7 +1336,7 @@ impl Blockstore { /// whether it is okay to insert the input shred. /// - `shred_source`: the source of the shred. #[allow(clippy::too_many_arguments)] - fn check_insert_data_shred( + fn check_insert_data_shred( &self, shred: Shred, erasure_metas: &mut HashMap, @@ -1310,13 +1346,10 @@ impl Blockstore { just_inserted_shreds: &mut HashMap, index_meta_time_us: &mut u64, is_trusted: bool, - handle_duplicate: &F, + duplicate_shreds: &mut Vec, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, - ) -> std::result::Result, InsertDataShredError> - where - F: Fn(Shred), - { + ) -> std::result::Result, InsertDataShredError> { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -1337,7 +1370,7 @@ impl Blockstore { if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) { - handle_duplicate(shred); + duplicate_shreds.push(shred); return Err(InsertDataShredError::Exists); } @@ -6509,32 +6542,27 @@ pub mod tests { &mut write_batch, &mut just_received_shreds, &mut index_meta_time_us, - &|_shred| { - panic!("no dupes"); - }, + &mut vec![], false, ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); // insert again fails on dupe - use std::sync::atomic::{AtomicUsize, Ordering}; - let counter = AtomicUsize::new(0); + let mut duplicate_shreds = vec![]; assert!(!blockstore.check_insert_coding_shred( - coding_shred, + coding_shred.clone(), &mut erasure_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, &mut index_meta_time_us, - &|_shred| { - counter.fetch_add(1, Ordering::Relaxed); - }, + &mut duplicate_shreds, false, ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); - assert_eq!(counter.load(Ordering::Relaxed), 1); + assert_eq!(duplicate_shreds, vec![coding_shred]); } #[test]