Skip to content

Commit

Permalink
Detect duplicates in the same insert batch (#32528)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin authored Jul 20, 2023
1 parent 6d30429 commit b6927db
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 49 deletions.
95 changes: 84 additions & 11 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ mod test {
solana_gossip::contact_info::ContactInfo,
solana_ledger::{
blockstore::{make_many_slot_entries, Blockstore},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
shred::{ProcessShredsStats, Shredder},
},
solana_sdk::{
Expand Down Expand Up @@ -520,8 +520,8 @@ mod test {

#[test]
fn test_process_shred() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let num_entries = 10;
let original_entries = create_ticks(num_entries, 0, Hash::default());
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
Expand All @@ -531,28 +531,27 @@ mod test {
.expect("Expect successful processing of shred");

assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);

drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}

#[test]
fn test_run_check_duplicate() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (sender, receiver) = unbounded();
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
let (shreds, _) = make_many_slot_entries(5, 5, 10);
blockstore
.insert_shreds(shreds.clone(), None, false)
.unwrap();
let duplicate_index = 0;
let original_shred = shreds[duplicate_index].clone();
let duplicate_shred = {
let (mut shreds, _) = make_many_slot_entries(5, 1, 10);
shreds.swap_remove(0)
shreds.swap_remove(duplicate_index)
};
assert_eq!(duplicate_shred.slot(), shreds[0].slot());
let duplicate_shred_slot = duplicate_shred.slot();
sender.send(duplicate_shred).unwrap();
sender.send(duplicate_shred.clone()).unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
Expand All @@ -568,13 +567,87 @@ mod test {
&duplicate_slot_sender,
)
.unwrap();
assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));

// Make sure the correct duplicate proof was stored
let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap();
assert_eq!(duplicate_proof.shred1, *original_shred.payload());
assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());

// Make sure a duplicate signal was sent
assert_eq!(
duplicate_slot_receiver.try_recv().unwrap(),
duplicate_shred_slot
);
}

#[test]
fn test_store_duplicate_shreds_same_batch() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded();
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
let exit = Arc::new(AtomicBool::new(false));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
let cluster_info = Arc::new(ClusterInfo::new(
contact_info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
));

// Start duplicate thread receiving and inserting duplicates
let t_check_duplicate = WindowService::start_check_duplicate_thread(
cluster_info,
exit.clone(),
blockstore.clone(),
duplicate_shred_receiver,
duplicate_slot_sender,
);

let handle_duplicate = |shred| {
let _ = duplicate_shred_sender.send(shred);
};
let num_trials = 100;
for slot in 0..num_trials {
let (shreds, _) = make_many_slot_entries(slot, 1, 10);
let duplicate_index = 0;
let original_shred = shreds[duplicate_index].clone();
let duplicate_shred = {
let (mut shreds, _) = make_many_slot_entries(slot, 1, 10);
shreds.swap_remove(duplicate_index)
};
assert_eq!(duplicate_shred.slot(), slot);
// Simulate storing both duplicate shreds in the same batch
blockstore
.insert_shreds_handle_duplicate(
vec![original_shred.clone(), duplicate_shred.clone()],
vec![false, false],
None,
false, // is_trusted
None,
&handle_duplicate,
&ReedSolomonCache::default(),
&mut BlockstoreInsertionMetrics::default(),
)
.unwrap();

// Make sure a duplicate signal was sent
assert_eq!(
duplicate_slot_receiver
.recv_timeout(Duration::from_millis(5_000))
.unwrap(),
slot
);

// Make sure the correct duplicate proof was stored
let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
assert_eq!(duplicate_proof.shred1, *original_shred.payload());
assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
}
exit.store(true, Ordering::Relaxed);
t_check_duplicate.join().unwrap();
}

#[test]
fn test_prune_shreds() {
solana_logger::setup();
Expand Down
104 changes: 66 additions & 38 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ impl std::fmt::Display for InsertDataShredError {
}
}

pub struct InsertResults {
completed_data_set_infos: Vec<CompletedDataSetInfo>,
duplicate_shreds: Vec<Shred>,
}

/// A "complete data set" is a range of [`Shred`]s that combined in sequence carry a single
/// serialized [`Vec<Entry>`].
///
Expand Down Expand Up @@ -817,20 +822,16 @@ impl Blockstore {
/// On success, the function returns an Ok result with a vector of
/// `CompletedDataSetInfo` and a vector of its corresponding index in the
/// input `shreds` vector.
pub fn insert_shreds_handle_duplicate<F>(
fn do_insert_shreds(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
handle_duplicate: &F,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<Vec<CompletedDataSetInfo>>
where
F: Fn(Shred),
{
) -> Result<InsertResults> {
assert_eq!(shreds.len(), is_repaired.len());
let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blockstore lock");
Expand All @@ -845,6 +846,7 @@ impl Blockstore {
let mut erasure_metas = HashMap::new();
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
let mut duplicate_shreds = vec![];

metrics.num_shreds += shreds.len();
let mut start = Measure::start("Shred insertion");
Expand All @@ -867,7 +869,7 @@ impl Blockstore {
&mut just_inserted_shreds,
&mut index_meta_time_us,
is_trusted,
handle_duplicate,
&mut duplicate_shreds,
leader_schedule,
shred_source,
) {
Expand Down Expand Up @@ -902,7 +904,7 @@ impl Blockstore {
&mut write_batch,
&mut just_inserted_shreds,
&mut index_meta_time_us,
handle_duplicate,
&mut duplicate_shreds,
is_trusted,
shred_source,
metrics,
Expand Down Expand Up @@ -951,7 +953,7 @@ impl Blockstore {
&mut just_inserted_shreds,
&mut index_meta_time_us,
is_trusted,
&handle_duplicate,
&mut duplicate_shreds,
leader_schedule,
ShredSource::Recovered,
) {
Expand Down Expand Up @@ -1035,7 +1037,44 @@ impl Blockstore {
metrics.total_elapsed_us += total_start.as_us();
metrics.index_meta_time_us += index_meta_time_us;

Ok(newly_completed_data_sets)
Ok(InsertResults {
completed_data_set_infos: newly_completed_data_sets,
duplicate_shreds,
})
}

pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
handle_duplicate: &F,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<Vec<CompletedDataSetInfo>>
where
F: Fn(Shred),
{
let InsertResults {
completed_data_set_infos,
duplicate_shreds,
} = self.do_insert_shreds(
shreds,
is_repaired,
leader_schedule,
is_trusted,
retransmit_sender,
reed_solomon_cache,
metrics,
)?;

for shred in duplicate_shreds {
handle_duplicate(shred);
}

Ok(completed_data_set_infos)
}

pub fn add_new_shred_signal(&self, s: Sender<bool>) {
Expand Down Expand Up @@ -1113,35 +1152,32 @@ impl Blockstore {
is_trusted: bool,
) -> Result<Vec<CompletedDataSetInfo>> {
let shreds_len = shreds.len();
self.insert_shreds_handle_duplicate(
let insert_results = self.do_insert_shreds(
shreds,
vec![false; shreds_len],
leader_schedule,
is_trusted,
None, // retransmit-sender
&|_| {}, // handle-duplicates
None, // retransmit-sender
&ReedSolomonCache::default(),
&mut BlockstoreInsertionMetrics::default(),
)
)?;
Ok(insert_results.completed_data_set_infos)
}

#[allow(clippy::too_many_arguments)]
fn check_insert_coding_shred<F>(
fn check_insert_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time_us: &mut u64,
handle_duplicate: &F,
duplicate_shreds: &mut Vec<Shred>,
is_trusted: bool,
shred_source: ShredSource,
metrics: &mut BlockstoreInsertionMetrics,
) -> bool
where
F: Fn(Shred),
{
) -> bool {
let slot = shred.slot();
let shred_index = u64::from(shred.index());

Expand All @@ -1156,7 +1192,7 @@ impl Blockstore {
if !is_trusted {
if index_meta.coding().contains(shred_index) {
metrics.num_coding_shreds_exists += 1;
handle_duplicate(shred);
duplicate_shreds.push(shred);
return false;
}

Expand Down Expand Up @@ -1300,7 +1336,7 @@ impl Blockstore {
/// whether it is okay to insert the input shred.
/// - `shred_source`: the source of the shred.
#[allow(clippy::too_many_arguments)]
fn check_insert_data_shred<F>(
fn check_insert_data_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
Expand All @@ -1310,13 +1346,10 @@ impl Blockstore {
just_inserted_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time_us: &mut u64,
is_trusted: bool,
handle_duplicate: &F,
duplicate_shreds: &mut Vec<Shred>,
leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource,
) -> std::result::Result<Vec<CompletedDataSetInfo>, InsertDataShredError>
where
F: Fn(Shred),
{
) -> std::result::Result<Vec<CompletedDataSetInfo>, InsertDataShredError> {
let slot = shred.slot();
let shred_index = u64::from(shred.index());

Expand All @@ -1337,7 +1370,7 @@ impl Blockstore {

if !is_trusted {
if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) {
handle_duplicate(shred);
duplicate_shreds.push(shred);
return Err(InsertDataShredError::Exists);
}

Expand Down Expand Up @@ -6641,32 +6674,27 @@ pub mod tests {
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
&|_shred| {
panic!("no dupes");
},
&mut vec![],
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));

// insert again fails on dupe
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
let mut duplicate_shreds = vec![];
assert!(!blockstore.check_insert_coding_shred(
coding_shred,
coding_shred.clone(),
&mut erasure_metas,
&mut index_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
&|_shred| {
counter.fetch_add(1, Ordering::Relaxed);
},
&mut duplicate_shreds,
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));
assert_eq!(counter.load(Ordering::Relaxed), 1);
assert_eq!(duplicate_shreds, vec![coding_shred]);
}

#[test]
Expand Down

0 comments on commit b6927db

Please sign in to comment.