Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.14: Detect duplicates in the same insert batch (backport of #32528) #32591

Merged
merged 2 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 84 additions & 11 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ mod test {
solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo,
solana_ledger::{
blockstore::{make_many_slot_entries, Blockstore},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
shred::{ProcessShredsStats, Shredder},
},
solana_sdk::{
Expand Down Expand Up @@ -519,8 +519,8 @@ mod test {

#[test]
fn test_process_shred() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let num_entries = 10;
let original_entries = create_ticks(num_entries, 0, Hash::default());
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
Expand All @@ -530,28 +530,27 @@ mod test {
.expect("Expect successful processing of shred");

assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);

drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}

#[test]
fn test_run_check_duplicate() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (sender, receiver) = unbounded();
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
let (shreds, _) = make_many_slot_entries(5, 5, 10);
blockstore
.insert_shreds(shreds.clone(), None, false)
.unwrap();
let duplicate_index = 0;
let original_shred = shreds[duplicate_index].clone();
let duplicate_shred = {
let (mut shreds, _) = make_many_slot_entries(5, 1, 10);
shreds.swap_remove(0)
shreds.swap_remove(duplicate_index)
};
assert_eq!(duplicate_shred.slot(), shreds[0].slot());
let duplicate_shred_slot = duplicate_shred.slot();
sender.send(duplicate_shred).unwrap();
sender.send(duplicate_shred.clone()).unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
Expand All @@ -567,13 +566,87 @@ mod test {
&duplicate_slot_sender,
)
.unwrap();
assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));

// Make sure the correct duplicate proof was stored
let duplicate_proof = blockstore.get_duplicate_slot(duplicate_shred_slot).unwrap();
assert_eq!(duplicate_proof.shred1, *original_shred.payload());
assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());

// Make sure a duplicate signal was sent
assert_eq!(
duplicate_slot_receiver.try_recv().unwrap(),
duplicate_shred_slot
);
}

#[test]
fn test_store_duplicate_shreds_same_batch() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (duplicate_shred_sender, duplicate_shred_receiver) = unbounded();
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
let exit = Arc::new(AtomicBool::new(false));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
let cluster_info = Arc::new(ClusterInfo::new(
contact_info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
));

// Start duplicate thread receiving and inserting duplicates
let t_check_duplicate = WindowService::start_check_duplicate_thread(
cluster_info,
exit.clone(),
blockstore.clone(),
duplicate_shred_receiver,
duplicate_slot_sender,
);

let handle_duplicate = |shred| {
let _ = duplicate_shred_sender.send(shred);
};
let num_trials = 100;
for slot in 0..num_trials {
let (shreds, _) = make_many_slot_entries(slot, 1, 10);
let duplicate_index = 0;
let original_shred = shreds[duplicate_index].clone();
let duplicate_shred = {
let (mut shreds, _) = make_many_slot_entries(slot, 1, 10);
shreds.swap_remove(duplicate_index)
};
assert_eq!(duplicate_shred.slot(), slot);
// Simulate storing both duplicate shreds in the same batch
blockstore
.insert_shreds_handle_duplicate(
vec![original_shred.clone(), duplicate_shred.clone()],
vec![false, false],
None,
false, // is_trusted
None,
&handle_duplicate,
&ReedSolomonCache::default(),
&mut BlockstoreInsertionMetrics::default(),
)
.unwrap();

// Make sure a duplicate signal was sent
assert_eq!(
duplicate_slot_receiver
.recv_timeout(Duration::from_millis(5_000))
.unwrap(),
slot
);

// Make sure the correct duplicate proof was stored
let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
assert_eq!(duplicate_proof.shred1, *original_shred.payload());
assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
}
exit.store(true, Ordering::Relaxed);
t_check_duplicate.join().unwrap();
}

#[test]
fn test_prune_shreds() {
use {
Expand Down
119 changes: 81 additions & 38 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ impl std::fmt::Display for InsertDataShredError {
}
}

pub struct InsertResults {
completed_data_set_infos: Vec<CompletedDataSetInfo>,
inserted_indices: Vec<usize>,
duplicate_shreds: Vec<Shred>,
}

/// A "complete data set" is a range of [`Shred`]s that combined in sequence carry a single
/// serialized [`Vec<Entry>`].
///
/// Services such as the `WindowService` for a TVU, and `ReplayStage` for a TPU, piece together
/// these sets by inserting shreds via direct or indirect calls to
/// [`Blockstore::insert_shreds_handle_duplicate()`].
///
/// `solana_core::completed_data_sets_service::CompletedDataSetsService` is the main receiver of
/// `CompletedDataSetInfo`.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct CompletedDataSetInfo {
pub slot: Slot,
Expand Down Expand Up @@ -803,20 +818,16 @@ impl Blockstore {
/// On success, the function returns an Ok result with a vector of
/// `CompletedDataSetInfo` and a vector of its corresponding index in the
/// input `shreds` vector.
pub fn insert_shreds_handle_duplicate<F>(
fn do_insert_shreds(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
handle_duplicate: &F,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)>
where
F: Fn(Shred),
{
) -> Result<InsertResults> {
assert_eq!(shreds.len(), is_repaired.len());
let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blockstore lock");
Expand All @@ -831,6 +842,7 @@ impl Blockstore {
let mut erasure_metas = HashMap::new();
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
let mut duplicate_shreds = vec![];

metrics.num_shreds += shreds.len();
let mut start = Measure::start("Shred insertion");
Expand All @@ -854,7 +866,7 @@ impl Blockstore {
&mut just_inserted_shreds,
&mut index_meta_time_us,
is_trusted,
handle_duplicate,
&mut duplicate_shreds,
leader_schedule,
shred_source,
) {
Expand All @@ -881,7 +893,7 @@ impl Blockstore {
&mut write_batch,
&mut just_inserted_shreds,
&mut index_meta_time_us,
handle_duplicate,
&mut duplicate_shreds,
is_trusted,
shred_source,
metrics,
Expand Down Expand Up @@ -930,7 +942,7 @@ impl Blockstore {
&mut just_inserted_shreds,
&mut index_meta_time_us,
is_trusted,
&handle_duplicate,
&mut duplicate_shreds,
leader_schedule,
ShredSource::Recovered,
) {
Expand Down Expand Up @@ -1014,7 +1026,46 @@ impl Blockstore {
metrics.total_elapsed_us += total_start.as_us();
metrics.index_meta_time_us += index_meta_time_us;

Ok((newly_completed_data_sets, inserted_indices))
Ok(InsertResults {
completed_data_set_infos: newly_completed_data_sets,
inserted_indices,
duplicate_shreds,
})
}

pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
handle_duplicate: &F,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)>
where
F: Fn(Shred),
{
let InsertResults {
completed_data_set_infos,
inserted_indices,
duplicate_shreds,
} = self.do_insert_shreds(
shreds,
is_repaired,
leader_schedule,
is_trusted,
retransmit_sender,
reed_solomon_cache,
metrics,
)?;

for shred in duplicate_shreds {
handle_duplicate(shred);
}

Ok((completed_data_set_infos, inserted_indices))
}

pub fn add_new_shred_signal(&self, s: Sender<bool>) {
Expand Down Expand Up @@ -1092,35 +1143,35 @@ impl Blockstore {
is_trusted: bool,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)> {
let shreds_len = shreds.len();
self.insert_shreds_handle_duplicate(
let insert_results = self.do_insert_shreds(
shreds,
vec![false; shreds_len],
leader_schedule,
is_trusted,
None, // retransmit-sender
&|_| {}, // handle-duplicates
None, // retransmit-sender
&ReedSolomonCache::default(),
&mut BlockstoreInsertionMetrics::default(),
)
)?;
Ok((
insert_results.completed_data_set_infos,
insert_results.inserted_indices,
))
}

#[allow(clippy::too_many_arguments)]
fn check_insert_coding_shred<F>(
fn check_insert_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time_us: &mut u64,
handle_duplicate: &F,
duplicate_shreds: &mut Vec<Shred>,
is_trusted: bool,
shred_source: ShredSource,
metrics: &mut BlockstoreInsertionMetrics,
) -> bool
where
F: Fn(Shred),
{
) -> bool {
let slot = shred.slot();
let shred_index = u64::from(shred.index());

Expand All @@ -1135,7 +1186,7 @@ impl Blockstore {
if !is_trusted {
if index_meta.coding().contains(shred_index) {
metrics.num_coding_shreds_exists += 1;
handle_duplicate(shred);
duplicate_shreds.push(shred);
return false;
}

Expand Down Expand Up @@ -1279,7 +1330,7 @@ impl Blockstore {
/// whether it is okay to insert the input shred.
/// - `shred_source`: the source of the shred.
#[allow(clippy::too_many_arguments)]
fn check_insert_data_shred<F>(
fn check_insert_data_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<ErasureSetId, ErasureMeta>,
Expand All @@ -1289,13 +1340,10 @@ impl Blockstore {
just_inserted_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time_us: &mut u64,
is_trusted: bool,
handle_duplicate: &F,
duplicate_shreds: &mut Vec<Shred>,
leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource,
) -> std::result::Result<Vec<CompletedDataSetInfo>, InsertDataShredError>
where
F: Fn(Shred),
{
) -> std::result::Result<Vec<CompletedDataSetInfo>, InsertDataShredError> {
let slot = shred.slot();
let shred_index = u64::from(shred.index());

Expand All @@ -1316,7 +1364,7 @@ impl Blockstore {

if !is_trusted {
if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) {
handle_duplicate(shred);
duplicate_shreds.push(shred);
return Err(InsertDataShredError::Exists);
}

Expand Down Expand Up @@ -6211,32 +6259,27 @@ pub mod tests {
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
&|_shred| {
panic!("no dupes");
},
&mut vec![],
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));

// insert again fails on dupe
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
let mut duplicate_shreds = vec![];
assert!(!blockstore.check_insert_coding_shred(
coding_shred,
coding_shred.clone(),
&mut erasure_metas,
&mut index_working_set,
&mut write_batch,
&mut just_received_shreds,
&mut index_meta_time_us,
&|_shred| {
counter.fetch_add(1, Ordering::Relaxed);
},
&mut duplicate_shreds,
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));
assert_eq!(counter.load(Ordering::Relaxed), 1);
assert_eq!(duplicate_shreds, vec![coding_shred]);
}

#[test]
Expand Down