Skip to content

Commit

Permalink
Remove the thread pool used for fetching Entries
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Czabaniuk committed Jan 12, 2024
1 parent 98a2873 commit 86a3b03
Showing 1 changed file with 55 additions and 49 deletions.
104 changes: 55 additions & 49 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use {
datapoint_debug, datapoint_error,
poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo},
},
solana_rayon_threadlimit::get_max_thread_count,
solana_runtime::bank::Bank,
solana_sdk::{
account::ReadableAccount,
Expand Down Expand Up @@ -96,11 +95,6 @@ pub use {
// get_max_thread_count to match number of threads in the old code.
// see: https://github.com/solana-labs/solana/pull/24853
lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solBstore{i:02}"))
.build()
.unwrap();
static ref PAR_THREAD_POOL_ALL_CPUS: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.thread_name(|i| format!("solBstoreAll{i:02}"))
Expand Down Expand Up @@ -3084,29 +3078,7 @@ impl Blockstore {
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1)
.unwrap_or(0);

let entries: Result<Vec<Vec<Entry>>> = if completed_ranges.len() <= 1 {
completed_ranges
.into_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(slot, start_index, end_index, Some(&slot_meta))
})
.collect()
} else {
PAR_THREAD_POOL.install(|| {
completed_ranges
.into_par_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(
slot,
start_index,
end_index,
Some(&slot_meta),
)
})
.collect()
})
};
let entries: Vec<Entry> = entries?.into_iter().flatten().collect();
let entries = self.get_slot_entries_in_block(slot, completed_ranges, Some(&slot_meta))?;
Ok((entries, num_shreds, slot_meta.is_full()))
}

Expand Down Expand Up @@ -3216,14 +3188,24 @@ impl Blockstore {
.collect()
}

pub fn get_entries_in_data_block(
/// Fetch the entries corresponding to all of the shred indices in `completed_ranges`
/// This function takes advantage of the fact that `completed_ranges` are both
/// contiguous and in sorted order. To clarify, suppose completed_ranges is as follows:
/// completed_ranges = [..., (s_i, e_i), (s_i+1, e_i+1), ...]
/// Then, the following statements are true:
/// s_i < e_i < s_i+1 < e_i+1
/// e_i == s_i+1
fn get_slot_entries_in_block(
&self,
slot: Slot,
start_index: u32,
end_index: u32,
completed_ranges: CompletedRanges,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
let keys: Vec<(Slot, u64)> = (start_index..=end_index)
assert!(!completed_ranges.is_empty());

let (total_start_index, _) = *completed_ranges.first().unwrap();
let (_, total_end_index) = *completed_ranges.last().unwrap();
let keys: Vec<(Slot, u64)> = (total_start_index..=total_end_index)
.map(|index| (slot, u64::from(index)))
.collect();

Expand All @@ -3233,7 +3215,6 @@ impl Blockstore {
.into_iter()
.collect();
let data_shreds = data_shreds?;

let data_shreds: Result<Vec<Shred>> =
data_shreds
.into_iter()
Expand All @@ -3249,8 +3230,8 @@ impl Blockstore {
idx,
slot_meta.consumed,
slot_meta.completed_data_indexes,
start_index,
end_index
total_start_index,
total_end_index
);
}
}
Expand All @@ -3268,21 +3249,46 @@ impl Blockstore {
})
.collect();
let data_shreds = data_shreds?;
let last_shred = data_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());

let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"Could not reconstruct data block from constituent shreds, error: {e:?}"
))))
})?;
let mut entries = vec![];
for (start_index, end_index) in completed_ranges.iter() {
// The indices from completed_ranges refer to shred indices in the
// block; map those indices to indices within data_shreds
let start_index = (*start_index - total_start_index) as usize;
let end_index = (*end_index - total_start_index) as usize;
let range_shreds = &data_shreds[start_index..=end_index];

let last_shred = range_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());

let deshred_payload = Shredder::deshred(&range_shreds).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"could not reconstruct entries buffer from shreds: {e:?}"
))))
})?;

debug!("{:?} shreds in last FEC set", data_shreds.len());
let range_entries =
bincode::deserialize::<Vec<Entry>>(&deshred_payload).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("could not reconstruct entries: {e:?}"),
)))
})?;

debug!("{:?} shreds in last FEC set", data_shreds.len(),);
bincode::deserialize::<Vec<Entry>>(&deshred_payload).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"could not reconstruct entries: {e:?}"
))))
})
entries.extend(range_entries);
}
Ok(entries)
}

// TODO: probably delete this entry point, used by completed_data_sets_service.rs
pub fn get_entries_in_data_block(
&self,
slot: Slot,
start_index: u32,
end_index: u32,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta)
}

fn get_any_valid_slot_entries(&self, slot: Slot, start_index: u64) -> Vec<Entry> {
Expand Down

0 comments on commit 86a3b03

Please sign in to comment.