From 3c314171882addd33ad77701b24771c93315f1b1 Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Fri, 12 Jan 2024 00:33:23 -0600 Subject: [PATCH 1/6] Remove the thread pool used for fetching Entries --- ledger/src/blockstore.rs | 104 +++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 49 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index cda801bb296e45..7c421c7a4929ad 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -44,7 +44,6 @@ use { datapoint_debug, datapoint_error, poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, }, - solana_rayon_threadlimit::get_max_thread_count, solana_runtime::bank::Bank, solana_sdk::{ account::ReadableAccount, @@ -97,11 +96,6 @@ pub use { // get_max_thread_count to match number of threads in the old code. // see: https://github.com/solana-labs/solana/pull/24853 lazy_static! { - static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(get_max_thread_count()) - .thread_name(|i| format!("solBstore{i:02}")) - .build() - .unwrap(); static ref PAR_THREAD_POOL_ALL_CPUS: ThreadPool = rayon::ThreadPoolBuilder::new() .num_threads(num_cpus::get()) .thread_name(|i| format!("solBstoreAll{i:02}")) @@ -3097,29 +3091,7 @@ impl Blockstore { .map(|(_, end_index)| u64::from(*end_index) - start_index + 1) .unwrap_or(0); - let entries: Result>> = if completed_ranges.len() <= 1 { - completed_ranges - .into_iter() - .map(|(start_index, end_index)| { - self.get_entries_in_data_block(slot, start_index, end_index, Some(&slot_meta)) - }) - .collect() - } else { - PAR_THREAD_POOL.install(|| { - completed_ranges - .into_par_iter() - .map(|(start_index, end_index)| { - self.get_entries_in_data_block( - slot, - start_index, - end_index, - Some(&slot_meta), - ) - }) - .collect() - }) - }; - let entries: Vec = entries?.into_iter().flatten().collect(); + let entries = self.get_slot_entries_in_block(slot, completed_ranges, Some(&slot_meta))?; Ok((entries, num_shreds, slot_meta.is_full())) } @@ -3229,14 +3201,24 @@ impl Blockstore { .collect() } - pub fn get_entries_in_data_block( + /// Fetch the entries corresponding to all of the shred indices in `completed_ranges` + /// This function takes advantage of the fact that `completed_ranges` are both + /// contiguous and in sorted order. To clarify, suppose completed_ranges is as follows: + /// completed_ranges = [..., (s_i, e_i), (s_i+1, e_i+1), ...] + /// Then, the following statements are true: + /// s_i < e_i < s_i+1 < e_i+1 + /// e_i == s_i+1 + fn get_slot_entries_in_block( &self, slot: Slot, - start_index: u32, - end_index: u32, + completed_ranges: CompletedRanges, slot_meta: Option<&SlotMeta>, ) -> Result> { - let keys: Vec<(Slot, u64)> = (start_index..=end_index) + assert!(!completed_ranges.is_empty()); + + let (total_start_index, _) = *completed_ranges.first().unwrap(); + let (_, total_end_index) = *completed_ranges.last().unwrap(); + let keys: Vec<(Slot, u64)> = (total_start_index..=total_end_index) .map(|index| (slot, u64::from(index))) .collect(); @@ -3246,7 +3228,6 @@ impl Blockstore { .into_iter() .collect(); let data_shreds = data_shreds?; - let data_shreds: Result> = data_shreds .into_iter() @@ -3262,8 +3243,8 @@ impl Blockstore { idx, slot_meta.consumed, slot_meta.completed_data_indexes, - start_index, - end_index + total_start_index, + total_end_index ); } } @@ -3281,21 +3262,46 @@ impl Blockstore { }) .collect(); let data_shreds = data_shreds?; - let last_shred = data_shreds.last().unwrap(); - assert!(last_shred.data_complete() || last_shred.last_in_slot()); - let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| { - BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( - "Could not reconstruct data block from constituent shreds, error: {e:?}" - )))) - })?; + let mut entries = vec![]; + for (start_index, end_index) in completed_ranges.iter() { + // The indices from completed_ranges refer to shred indices in the + // block; map those indices to indices within data_shreds + let start_index = (*start_index - total_start_index) as usize; + let end_index = (*end_index - total_start_index) as usize; + let range_shreds = &data_shreds[start_index..=end_index]; + + let last_shred = range_shreds.last().unwrap(); + assert!(last_shred.data_complete() || last_shred.last_in_slot()); + + let deshred_payload = Shredder::deshred(range_shreds).map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( + "could not reconstruct entries buffer from shreds: {e:?}" + )))) + })?; + + debug!("{:?} shreds in last FEC set", data_shreds.len()); + let range_entries = + bincode::deserialize::>(&deshred_payload).map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + format!("could not reconstruct entries: {e:?}"), + ))) + })?; - debug!("{:?} shreds in last FEC set", data_shreds.len(),); - bincode::deserialize::>(&deshred_payload).map_err(|e| { - BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( - "could not reconstruct entries: {e:?}" - )))) - }) + entries.extend(range_entries); + } + Ok(entries) + } + + // TODO: probably delete this entry point, used by completed_data_sets_service.rs + pub fn get_entries_in_data_block( + &self, + slot: Slot, + start_index: u32, + end_index: u32, + slot_meta: Option<&SlotMeta>, + ) -> Result> { + self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta) } fn get_any_valid_slot_entries(&self, slot: Slot, start_index: u64) -> Vec { From 4797558d01e12b7633d28e1ed9c97f2ca51feded Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Mon, 26 Feb 2024 14:40:00 -0600 Subject: [PATCH 2/6] Fix off-by-one error in comment --- ledger/src/blockstore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 7c421c7a4929ad..831a348fff81bc 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3207,7 +3207,7 @@ impl Blockstore { /// completed_ranges = [..., (s_i, e_i), (s_i+1, e_i+1), ...] /// Then, the following statements are true: /// s_i < e_i < s_i+1 < e_i+1 - /// e_i == s_i+1 + /// e_i == s_i+1 + 1 fn get_slot_entries_in_block( &self, slot: Slot, From 6ad51f0e4b0c088deec6a5985258dfd9f0bd1733 Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Mon, 26 Feb 2024 14:40:40 -0600 Subject: [PATCH 3/6] Remove outdated TODO --- ledger/src/blockstore.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 831a348fff81bc..8c407f19ace04f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3293,7 +3293,6 @@ impl Blockstore { Ok(entries) } - // TODO: probably delete this entry point, used by completed_data_sets_service.rs pub fn get_entries_in_data_block( &self, slot: Slot, From 1db5c93bba86d35f13288cbe287ffe5b19bc18fa Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Mon, 26 Feb 2024 18:05:35 -0600 Subject: [PATCH 4/6] Tweak FEC set log and demote to trace --- ledger/src/blockstore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 8c407f19ace04f..6ecd9cbe79a408 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3273,6 +3273,7 @@ impl Blockstore { let last_shred = range_shreds.last().unwrap(); assert!(last_shred.data_complete() || last_shred.last_in_slot()); + trace!("{:?} data shreds in last FEC set", data_shreds.len()); let deshred_payload = Shredder::deshred(range_shreds).map_err(|e| { BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( @@ -3280,7 +3281,6 @@ impl Blockstore { )))) })?; - debug!("{:?} shreds in last FEC set", data_shreds.len()); let range_entries = bincode::deserialize::>(&deshred_payload).map_err(|e| { BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( From 9a80b64bfbec00e65f7750b2276072d90a1aae1c Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Mon, 26 Feb 2024 18:02:55 -0600 Subject: [PATCH 5/6] Use combinators over for-loop --- ledger/src/blockstore.rs | 59 ++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 6ecd9cbe79a408..e146d6170bc3d1 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -28,6 +28,7 @@ use { bincode::{deserialize, serialize}, crossbeam_channel::{bounded, Receiver, Sender, TrySendError}, dashmap::DashSet, + itertools::Itertools, log::*, rand::Rng, rayon::{ @@ -3263,34 +3264,35 @@ impl Blockstore { .collect(); let data_shreds = data_shreds?; - let mut entries = vec![]; - for (start_index, end_index) in completed_ranges.iter() { - // The indices from completed_ranges refer to shred indices in the - // block; map those indices to indices within data_shreds - let start_index = (*start_index - total_start_index) as usize; - let end_index = (*end_index - total_start_index) as usize; - let range_shreds = &data_shreds[start_index..=end_index]; - - let last_shred = range_shreds.last().unwrap(); - assert!(last_shred.data_complete() || last_shred.last_in_slot()); - trace!("{:?} data shreds in last FEC set", data_shreds.len()); - - let deshred_payload = Shredder::deshred(range_shreds).map_err(|e| { - BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( - "could not reconstruct entries buffer from shreds: {e:?}" - )))) - })?; - - let range_entries = - bincode::deserialize::>(&deshred_payload).map_err(|e| { - BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( - format!("could not reconstruct entries: {e:?}"), - ))) - })?; - - entries.extend(range_entries); - } - Ok(entries) + completed_ranges + .into_iter() + .map(|(start_index, end_index)| { + // The indices from completed_ranges refer to shred indices in the + // block; map those indices to indices within data_shreds + let start_index = (start_index - total_start_index) as usize; + let end_index = (end_index - total_start_index) as usize; + let range_shreds = &data_shreds[start_index..=end_index]; + + let last_shred = range_shreds.last().unwrap(); + assert!(last_shred.data_complete() || last_shred.last_in_slot()); + trace!("{:?} data shreds in last FEC set", data_shreds.len()); + + Shredder::deshred(range_shreds) + .map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + format!("could not reconstruct entries buffer from shreds: {e:?}"), + ))) + }) + .and_then(|payload| { + bincode::deserialize::>(&payload).map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + format!("could not reconstruct entries: {e:?}"), + ))) + }) + }) + }) + .flatten_ok() + .collect() } pub fn get_entries_in_data_block( @@ -4800,7 +4802,6 @@ pub mod tests { assert_matches::assert_matches, bincode::serialize, crossbeam_channel::unbounded, - itertools::Itertools, rand::{seq::SliceRandom, thread_rng}, solana_account_decoder::parse_token::UiTokenAmount, solana_entry::entry::{next_entry, next_entry_mut}, From c89bdaae1feed737cd835c363b8c84b54a9ee28c Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Mon, 26 Feb 2024 18:19:50 -0600 Subject: [PATCH 6/6] Rename variables for more clarity between all and individual ranges --- ledger/src/blockstore.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e146d6170bc3d1..c01b1806a8fa27 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3217,9 +3217,9 @@ impl Blockstore { ) -> Result> { assert!(!completed_ranges.is_empty()); - let (total_start_index, _) = *completed_ranges.first().unwrap(); - let (_, total_end_index) = *completed_ranges.last().unwrap(); - let keys: Vec<(Slot, u64)> = (total_start_index..=total_end_index) + let (all_ranges_start_index, _) = *completed_ranges.first().unwrap(); + let (_, all_ranges_end_index) = *completed_ranges.last().unwrap(); + let keys: Vec<(Slot, u64)> = (all_ranges_start_index..=all_ranges_end_index) .map(|index| (slot, u64::from(index))) .collect(); @@ -3244,8 +3244,8 @@ impl Blockstore { idx, slot_meta.consumed, slot_meta.completed_data_indexes, - total_start_index, - total_end_index + all_ranges_start_index, + all_ranges_end_index ); } } @@ -3268,10 +3268,10 @@ impl Blockstore { .into_iter() .map(|(start_index, end_index)| { // The indices from completed_ranges refer to shred indices in the - // block; map those indices to indices within data_shreds - let start_index = (start_index - total_start_index) as usize; - let end_index = (end_index - total_start_index) as usize; - let range_shreds = &data_shreds[start_index..=end_index]; + // entire block; map those indices to indices within data_shreds + let range_start_index = (start_index - all_ranges_start_index) as usize; + let range_end_index = (end_index - all_ranges_start_index) as usize; + let range_shreds = &data_shreds[range_start_index..=range_end_index]; let last_shred = range_shreds.last().unwrap(); assert!(last_shred.data_complete() || last_shred.last_in_slot());