Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the Blockstore thread pool used for fetching Entries #34768

Merged
merged 6 commits into from
Feb 27, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Remove the thread pool used for fetching Entries
  • Loading branch information
Steven Czabaniuk authored and steviez committed Feb 26, 2024
commit 3c314171882addd33ad77701b24771c93315f1b1
104 changes: 55 additions & 49 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ use {
datapoint_debug, datapoint_error,
poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo},
},
solana_rayon_threadlimit::get_max_thread_count,
solana_runtime::bank::Bank,
solana_sdk::{
account::ReadableAccount,
@@ -97,11 +96,6 @@ pub use {
// get_max_thread_count to match number of threads in the old code.
// see: https://github.com/solana-labs/solana/pull/24853
lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solBstore{i:02}"))
.build()
.unwrap();
static ref PAR_THREAD_POOL_ALL_CPUS: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.thread_name(|i| format!("solBstoreAll{i:02}"))
@@ -3097,29 +3091,7 @@ impl Blockstore {
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1)
.unwrap_or(0);

let entries: Result<Vec<Vec<Entry>>> = if completed_ranges.len() <= 1 {
completed_ranges
.into_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(slot, start_index, end_index, Some(&slot_meta))
})
.collect()
} else {
PAR_THREAD_POOL.install(|| {
completed_ranges
.into_par_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(
slot,
start_index,
end_index,
Some(&slot_meta),
)
})
.collect()
})
};
let entries: Vec<Entry> = entries?.into_iter().flatten().collect();
let entries = self.get_slot_entries_in_block(slot, completed_ranges, Some(&slot_meta))?;
Ok((entries, num_shreds, slot_meta.is_full()))
}

@@ -3229,14 +3201,24 @@ impl Blockstore {
.collect()
}

pub fn get_entries_in_data_block(
/// Fetch the entries corresponding to all of the shred indices in `completed_ranges`
/// This function takes advantage of the fact that `completed_ranges` are both
/// contiguous and in sorted order. To clarify, suppose completed_ranges is as follows:
/// completed_ranges = [..., (s_i, e_i), (s_i+1, e_i+1), ...]
/// Then, the following statements are true:
/// s_i < e_i < s_i+1 < e_i+1
/// e_i == s_i+1
fn get_slot_entries_in_block(
&self,
slot: Slot,
start_index: u32,
end_index: u32,
completed_ranges: CompletedRanges,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
let keys: Vec<(Slot, u64)> = (start_index..=end_index)
assert!(!completed_ranges.is_empty());
steviez marked this conversation as resolved.
Show resolved Hide resolved

let (total_start_index, _) = *completed_ranges.first().unwrap();
steviez marked this conversation as resolved.
Show resolved Hide resolved
let (_, total_end_index) = *completed_ranges.last().unwrap();
let keys: Vec<(Slot, u64)> = (total_start_index..=total_end_index)
.map(|index| (slot, u64::from(index)))
.collect();

@@ -3246,7 +3228,6 @@ impl Blockstore {
.into_iter()
.collect();
let data_shreds = data_shreds?;

let data_shreds: Result<Vec<Shred>> =
data_shreds
.into_iter()
@@ -3262,8 +3243,8 @@ impl Blockstore {
idx,
slot_meta.consumed,
slot_meta.completed_data_indexes,
start_index,
end_index
total_start_index,
total_end_index
);
}
}
@@ -3281,21 +3262,46 @@ impl Blockstore {
})
.collect();
let data_shreds = data_shreds?;
let last_shred = data_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());

let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"Could not reconstruct data block from constituent shreds, error: {e:?}"
))))
})?;
let mut entries = vec![];
for (start_index, end_index) in completed_ranges.iter() {
steviez marked this conversation as resolved.
Show resolved Hide resolved
// The indices from completed_ranges refer to shred indices in the
// block; map those indices to indices within data_shreds
let start_index = (*start_index - total_start_index) as usize;
let end_index = (*end_index - total_start_index) as usize;
let range_shreds = &data_shreds[start_index..=end_index];
steviez marked this conversation as resolved.
Show resolved Hide resolved

let last_shred = range_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());

let deshred_payload = Shredder::deshred(range_shreds).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"could not reconstruct entries buffer from shreds: {e:?}"
))))
})?;

debug!("{:?} shreds in last FEC set", data_shreds.len());
steviez marked this conversation as resolved.
Show resolved Hide resolved
let range_entries =
bincode::deserialize::<Vec<Entry>>(&deshred_payload).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("could not reconstruct entries: {e:?}"),
)))
})?;

debug!("{:?} shreds in last FEC set", data_shreds.len(),);
bincode::deserialize::<Vec<Entry>>(&deshred_payload).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"could not reconstruct entries: {e:?}"
))))
})
entries.extend(range_entries);
}
Ok(entries)
}

// TODO: probably delete this entry point, used by completed_data_sets_service.rs
steviez marked this conversation as resolved.
Show resolved Hide resolved
pub fn get_entries_in_data_block(
&self,
slot: Slot,
start_index: u32,
end_index: u32,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta)
}

fn get_any_valid_slot_entries(&self, slot: Slot, start_index: u64) -> Vec<Entry> {