diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index cda801bb296e45..7c421c7a4929ad 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -44,7 +44,6 @@ use { datapoint_debug, datapoint_error, poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, }, - solana_rayon_threadlimit::get_max_thread_count, solana_runtime::bank::Bank, solana_sdk::{ account::ReadableAccount, @@ -97,11 +96,6 @@ pub use { // get_max_thread_count to match number of threads in the old code. // see: https://github.com/solana-labs/solana/pull/24853 lazy_static! { - static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(get_max_thread_count()) - .thread_name(|i| format!("solBstore{i:02}")) - .build() - .unwrap(); static ref PAR_THREAD_POOL_ALL_CPUS: ThreadPool = rayon::ThreadPoolBuilder::new() .num_threads(num_cpus::get()) .thread_name(|i| format!("solBstoreAll{i:02}")) @@ -3097,29 +3091,7 @@ impl Blockstore { .map(|(_, end_index)| u64::from(*end_index) - start_index + 1) .unwrap_or(0); - let entries: Result>> = if completed_ranges.len() <= 1 { - completed_ranges - .into_iter() - .map(|(start_index, end_index)| { - self.get_entries_in_data_block(slot, start_index, end_index, Some(&slot_meta)) - }) - .collect() - } else { - PAR_THREAD_POOL.install(|| { - completed_ranges - .into_par_iter() - .map(|(start_index, end_index)| { - self.get_entries_in_data_block( - slot, - start_index, - end_index, - Some(&slot_meta), - ) - }) - .collect() - }) - }; - let entries: Vec = entries?.into_iter().flatten().collect(); + let entries = self.get_slot_entries_in_block(slot, completed_ranges, Some(&slot_meta))?; Ok((entries, num_shreds, slot_meta.is_full())) } @@ -3229,14 +3201,24 @@ impl Blockstore { .collect() } - pub fn get_entries_in_data_block( + /// Fetch the entries corresponding to all of the shred indices in `completed_ranges` + /// This function takes advantage of the fact that `completed_ranges` are both + /// contiguous and in sorted order. To clarify, suppose completed_ranges is as follows: + /// completed_ranges = [..., (s_i, e_i), (s_i+1, e_i+1), ...] + /// Then, the following statements are true: + /// s_i < e_i < s_i+1 < e_i+1 + /// e_i == s_i+1 + fn get_slot_entries_in_block( &self, slot: Slot, - start_index: u32, - end_index: u32, + completed_ranges: CompletedRanges, slot_meta: Option<&SlotMeta>, ) -> Result> { - let keys: Vec<(Slot, u64)> = (start_index..=end_index) + assert!(!completed_ranges.is_empty()); + + let (total_start_index, _) = *completed_ranges.first().unwrap(); + let (_, total_end_index) = *completed_ranges.last().unwrap(); + let keys: Vec<(Slot, u64)> = (total_start_index..=total_end_index) .map(|index| (slot, u64::from(index))) .collect(); @@ -3246,7 +3228,6 @@ impl Blockstore { .into_iter() .collect(); let data_shreds = data_shreds?; - let data_shreds: Result> = data_shreds .into_iter() @@ -3262,8 +3243,8 @@ impl Blockstore { idx, slot_meta.consumed, slot_meta.completed_data_indexes, - start_index, - end_index + total_start_index, + total_end_index ); } } @@ -3281,21 +3262,46 @@ impl Blockstore { }) .collect(); let data_shreds = data_shreds?; - let last_shred = data_shreds.last().unwrap(); - assert!(last_shred.data_complete() || last_shred.last_in_slot()); - let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| { - BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( - "Could not reconstruct data block from constituent shreds, error: {e:?}" - )))) - })?; + let mut entries = vec![]; + for (start_index, end_index) in completed_ranges.iter() { + // The indices from completed_ranges refer to shred indices in the + // block; map those indices to indices within data_shreds + let start_index = (*start_index - total_start_index) as usize; + let end_index = (*end_index - total_start_index) as usize; + let range_shreds = &data_shreds[start_index..=end_index]; + + let last_shred = range_shreds.last().unwrap(); + assert!(last_shred.data_complete() || last_shred.last_in_slot()); + + let deshred_payload = Shredder::deshred(range_shreds).map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( + "could not reconstruct entries buffer from shreds: {e:?}" + )))) + })?; + + debug!("{:?} shreds in last FEC set", data_shreds.len()); + let range_entries = + bincode::deserialize::>(&deshred_payload).map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + format!("could not reconstruct entries: {e:?}"), + ))) + })?; - debug!("{:?} shreds in last FEC set", data_shreds.len(),); - bincode::deserialize::>(&deshred_payload).map_err(|e| { - BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( - "could not reconstruct entries: {e:?}" - )))) - }) + entries.extend(range_entries); + } + Ok(entries) + } + + // TODO: probably delete this entry point, used by completed_data_sets_service.rs + pub fn get_entries_in_data_block( + &self, + slot: Slot, + start_index: u32, + end_index: u32, + slot_meta: Option<&SlotMeta>, + ) -> Result> { + self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta) } fn get_any_valid_slot_entries(&self, slot: Slot, start_index: u64) -> Vec {