From f49beb0cbc51ea63a893d345cfac68b88bdad2ce Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 25 Sep 2022 18:09:47 +0000 Subject: [PATCH] caches reed-solomon encoder/decoder instance (#27510) ReedSolomon::new(...) initializes a matrix and a data-decode-matrix cache: https://github.com/rust-rse/reed-solomon-erasure/blob/273ebbced/src/core.rs#L460-L466 In order to cache this computation, this commit caches the reed-solomon encoder/decoder instance for each (data_shards, parity_shards) pair. --- core/benches/retransmit_stage.rs | 3 +- core/benches/shredder.rs | 16 ++- core/src/broadcast_stage.rs | 3 +- .../broadcast_duplicates_run.rs | 7 +- .../broadcast_fake_shreds_run.rs | 6 +- .../fail_entry_verification_broadcast_run.rs | 7 +- .../broadcast_stage/standard_broadcast_run.rs | 6 +- core/src/shred_fetch_stage.rs | 3 +- core/src/window_service.rs | 7 +- gossip/src/duplicate_shred.rs | 3 +- ledger/src/blockstore.rs | 34 +++++- ledger/src/shred.rs | 15 ++- ledger/src/shred/merkle.rs | 52 ++++++-- ledger/src/shredder.rs | 111 +++++++++++++++--- ledger/tests/shred.rs | 14 ++- 15 files changed, 233 insertions(+), 54 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index bac12781f4b8b8..bad02d043a51f5 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -15,7 +15,7 @@ use { solana_ledger::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, leader_schedule_cache::LeaderScheduleCache, - shred::{ProcessShredsStats, Shredder}, + shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, }, solana_measure::measure::Measure, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -107,6 +107,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 9685b4b2e01304..6216fdbd871046 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -8,8 +8,8 @@ use { raptorq::{Decoder, Encoder}, solana_entry::entry::{create_ticks, Entry}, solana_ledger::shred::{ - max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags, - Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, + max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, + Shred, ShredFlags, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, }, solana_perf::test_tx, solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair}, @@ -53,6 +53,7 @@ fn make_shreds(num_shreds: usize) -> Vec { 0, // next_shred_index 0, // next_code_index false, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); assert!(data_shreds.len() >= num_shreds); @@ -78,6 +79,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { // ~1Mb let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); + let reed_solomon_cache = ReedSolomonCache::default(); bencher.iter(|| { let shredder = Shredder::new(1, 0, 0, 0).unwrap(); shredder.entries_to_shreds( @@ -87,6 +89,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { 0, 0, true, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); }) @@ -104,6 +107,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { Some(shred_size), ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); + let reed_solomon_cache = ReedSolomonCache::default(); // 1Mb bencher.iter(|| { let shredder = Shredder::new(1, 0, 0, 0).unwrap(); @@ -114,6 +118,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { 0, 0, true, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); }) @@ -135,6 +140,7 @@ fn bench_deshredder(bencher: &mut Bencher) { 0, 0, true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); bencher.iter(|| { @@ -159,10 +165,12 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) { fn bench_shredder_coding(bencher: &mut Bencher) { let symbol_count = DATA_SHREDS_PER_FEC_BLOCK; let data_shreds = make_shreds(symbol_count); + let reed_solomon_cache = ReedSolomonCache::default(); bencher.iter(|| { Shredder::generate_coding_shreds( &data_shreds[..symbol_count], 0, // next_code_index + &reed_solomon_cache, ) .len(); }) @@ -172,12 +180,14 @@ fn bench_shredder_coding(bencher: &mut Bencher) { fn bench_shredder_decoding(bencher: &mut Bencher) { let symbol_count = DATA_SHREDS_PER_FEC_BLOCK; let data_shreds = make_shreds(symbol_count); + let reed_solomon_cache = ReedSolomonCache::default(); let coding_shreds = Shredder::generate_coding_shreds( &data_shreds[..symbol_count], 0, // next_code_index + &reed_solomon_cache, ); bencher.iter(|| { - Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap(); + Shredder::try_recovery(coding_shreds[..].to_vec(), &reed_solomon_cache).unwrap(); }) } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 0163c5c10cf41a..a42c8a68851037 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -443,7 +443,7 @@ pub mod test { blockstore::Blockstore, genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, - shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder}, + shred::{max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, Shredder}, }, solana_runtime::bank::Bank, solana_sdk::{ @@ -482,6 +482,7 @@ pub mod test { 0, // next_shred_index, 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); ( diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index f7d48644030ed1..0315a343b98739 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -4,7 +4,7 @@ use { itertools::Itertools, solana_entry::entry::Entry, solana_gossip::contact_info::ContactInfo, - solana_ledger::shred::{ProcessShredsStats, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{ hash::Hash, signature::{Keypair, Signature, Signer}, @@ -36,6 +36,7 @@ pub(super) struct BroadcastDuplicatesRun { cluster_nodes_cache: Arc>, original_last_data_shreds: Arc>>, partition_last_data_shreds: Arc>>, + reed_solomon_cache: Arc, } impl BroadcastDuplicatesRun { @@ -56,6 +57,7 @@ impl BroadcastDuplicatesRun { cluster_nodes_cache, original_last_data_shreds: Arc::>>::default(), partition_last_data_shreds: Arc::>>::default(), + reed_solomon_cache: Arc::::default(), } } } @@ -164,6 +166,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { self.next_shred_index, self.next_code_index, false, // merkle_variant + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); @@ -180,6 +183,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { self.next_shred_index, self.next_code_index, false, // merkle_variant + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); // Don't mark the last shred as last so that validators won't @@ -192,6 +196,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { self.next_shred_index, self.next_code_index, false, // merkle_variant + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); let sigs: Vec<_> = partition_last_data_shred diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 10b2f6c6b91c6a..69aaf410af9123 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -1,7 +1,7 @@ use { super::*, solana_entry::entry::Entry, - solana_ledger::shred::{ProcessShredsStats, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{hash::Hash, signature::Keypair}, }; @@ -11,6 +11,7 @@ pub(super) struct BroadcastFakeShredsRun { partition: usize, shred_version: u16, next_code_index: u32, + reed_solomon_cache: Arc, } impl BroadcastFakeShredsRun { @@ -20,6 +21,7 @@ impl BroadcastFakeShredsRun { partition, shred_version, next_code_index: 0, + reed_solomon_cache: Arc::::default(), } } } @@ -61,6 +63,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { next_shred_index, self.next_code_index, true, // merkle_variant + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); @@ -81,6 +84,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { next_shred_index, self.next_code_index, true, // merkle_variant + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index d5b77e59bdb022..e7b899ab0fdbbb 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,7 +1,7 @@ use { super::*, crate::cluster_nodes::ClusterNodesCache, - solana_ledger::shred::{ProcessShredsStats, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{hash::Hash, signature::Keypair}, std::{thread::sleep, time::Duration}, }; @@ -17,6 +17,7 @@ pub(super) struct FailEntryVerificationBroadcastRun { next_shred_index: u32, next_code_index: u32, cluster_nodes_cache: Arc>, + reed_solomon_cache: Arc, } impl FailEntryVerificationBroadcastRun { @@ -32,6 +33,7 @@ impl FailEntryVerificationBroadcastRun { next_shred_index: 0, next_code_index: 0, cluster_nodes_cache, + reed_solomon_cache: Arc::::default(), } } } @@ -92,6 +94,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { self.next_shred_index, self.next_code_index, true, // merkle_variant + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); @@ -107,6 +110,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { self.next_shred_index, self.next_code_index, true, // merkle_variant + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); // Don't mark the last shred as last so that validators won't know @@ -119,6 +123,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { self.next_shred_index, self.next_code_index, true, // merkle_variant + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); self.next_shred_index += 1; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index ba3e6b0962fa69..37c6584069efc4 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -9,7 +9,7 @@ use { broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache, }, solana_entry::entry::Entry, - solana_ledger::shred::{ProcessShredsStats, Shred, ShredFlags, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder}, solana_sdk::{ signature::Keypair, timing::{duration_as_us, AtomicInterval}, @@ -29,6 +29,7 @@ pub struct StandardBroadcastRun { last_datapoint_submit: Arc, num_batches: usize, cluster_nodes_cache: Arc>, + reed_solomon_cache: Arc, } impl StandardBroadcastRun { @@ -48,6 +49,7 @@ impl StandardBroadcastRun { last_datapoint_submit: Arc::default(), num_batches: 0, cluster_nodes_cache, + reed_solomon_cache: Arc::::default(), } } @@ -77,6 +79,7 @@ impl StandardBroadcastRun { state.next_shred_index, state.next_code_index, false, // merkle_variant + &self.reed_solomon_cache, stats, ); self.report_and_reset_stats(true); @@ -126,6 +129,7 @@ impl StandardBroadcastRun { next_shred_index, next_code_index, false, // merkle_variant + &self.reed_solomon_cache, process_stats, ); let next_shred_index = match data_shreds.iter().map(Shred::index).max() { diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 25c9b00cdf7c76..cde36c8389b2cc 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -251,7 +251,7 @@ mod tests { super::*, solana_ledger::{ blockstore::MAX_DATA_SHREDS_PER_SLOT, - shred::{Shred, ShredFlags}, + shred::{ReedSolomonCache, Shred, ShredFlags}, }, }; @@ -294,6 +294,7 @@ mod tests { let coding = solana_ledger::shred::Shredder::generate_coding_shreds( &[shred], 3, // next_code_index + &ReedSolomonCache::default(), ); coding[0].copy_to_packet(&mut packet); assert!(!should_discard_packet( diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 35330f50ff5db9..5e7d0e4bd5b0dc 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -16,7 +16,7 @@ use { solana_ledger::{ blockstore::{Blockstore, BlockstoreInsertionMetrics}, leader_schedule_cache::LeaderScheduleCache, - shred::{self, Nonce, Shred}, + shred::{self, Nonce, ReedSolomonCache, Shred}, }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, @@ -220,6 +220,7 @@ fn run_insert( completed_data_sets_sender: &CompletedDataSetsSender, retransmit_sender: &Sender>, outstanding_requests: &RwLock, + reed_solomon_cache: &ReedSolomonCache, ) -> Result<()> where F: Fn(Shred), @@ -282,6 +283,7 @@ where false, // is_trusted Some(retransmit_sender), &handle_duplicate, + reed_solomon_cache, metrics, )?; for index in inserted_indices { @@ -411,6 +413,7 @@ impl WindowService { .thread_name(|i| format!("solWinInsert{:02}", i)) .build() .unwrap(); + let reed_solomon_cache = ReedSolomonCache::default(); Builder::new() .name("solWinInsert".to_string()) .spawn(move || { @@ -432,6 +435,7 @@ impl WindowService { &completed_data_sets_sender, &retransmit_sender, &outstanding_requests, + &reed_solomon_cache, ) { ws_metrics.record_error(&e); if Self::should_exit_on_error(e, &handle_error) { @@ -507,6 +511,7 @@ mod test { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); data_shreds diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 283d2c409388a8..d6861c1fc54daf 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -284,7 +284,7 @@ pub(crate) mod tests { super::*, rand::Rng, solana_entry::entry::Entry, - solana_ledger::shred::{ProcessShredsStats, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{ hash, signature::{Keypair, Signer}, @@ -343,6 +343,7 @@ pub(crate) mod tests { next_shred_index, next_shred_index, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); data_shreds.swap_remove(0) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index bd059c6960508f..65d2fc285b790b 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -17,8 +17,8 @@ use { leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, shred::{ - self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, Shred, ShredData, - ShredId, ShredType, Shredder, + self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, ReedSolomonCache, + Shred, ShredData, ShredId, ShredType, Shredder, }, slot_stats::{ShredSource, SlotsStats}, }, @@ -634,6 +634,7 @@ impl Blockstore { recovered_shreds: &mut Vec, data_cf: &LedgerColumn, code_cf: &LedgerColumn, + reed_solomon_cache: &ReedSolomonCache, ) { // Find shreds for this erasure set and try recovery let slot = index.slot; @@ -652,7 +653,7 @@ impl Blockstore { code_cf, )) .collect(); - if let Ok(mut result) = shred::recover(available_shreds) { + if let Ok(mut result) = shred::recover(available_shreds, reed_solomon_cache) { Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len()); recovered_shreds.append(&mut result); } else { @@ -712,6 +713,7 @@ impl Blockstore { erasure_metas: &HashMap, index_working_set: &mut HashMap, prev_inserted_shreds: &HashMap, + reed_solomon_cache: &ReedSolomonCache, ) -> Vec { let data_cf = db.column::(); let code_cf = db.column::(); @@ -734,6 +736,7 @@ impl Blockstore { &mut recovered_shreds, &data_cf, &code_cf, + reed_solomon_cache, ); } ErasureMetaStatus::DataFull => { @@ -812,6 +815,7 @@ impl Blockstore { is_trusted: bool, retransmit_sender: Option<&Sender>>>, handle_duplicate: &F, + reed_solomon_cache: &ReedSolomonCache, metrics: &mut BlockstoreInsertionMetrics, ) -> Result<(Vec, Vec)> where @@ -899,6 +903,7 @@ impl Blockstore { &erasure_metas, &mut index_working_set, &just_inserted_shreds, + reed_solomon_cache, ); metrics.num_recovered += recovered_shreds @@ -1098,6 +1103,7 @@ impl Blockstore { is_trusted, None, // retransmit-sender &|_| {}, // handle-duplicates + &ReedSolomonCache::default(), &mut BlockstoreInsertionMetrics::default(), ) } @@ -1704,6 +1710,7 @@ impl Blockstore { let mut shredder = Shredder::new(current_slot, parent_slot, 0, version).unwrap(); let mut all_shreds = vec![]; let mut slot_entries = vec![]; + let reed_solomon_cache = ReedSolomonCache::default(); // Find all the entries for start_slot for entry in entries.into_iter() { if remaining_ticks_in_slot == 0 { @@ -1725,6 +1732,7 @@ impl Blockstore { start_index, // next_shred_index start_index, // next_code_index true, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); all_shreds.append(&mut data_shreds); @@ -1752,6 +1760,7 @@ impl Blockstore { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); all_shreds.append(&mut data_shreds); @@ -3867,6 +3876,7 @@ pub fn create_new_ledger( 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); assert!(shreds.last().unwrap().last_in_slot()); @@ -4133,6 +4143,7 @@ pub fn entries_to_test_shreds( 0, // next_shred_index, 0, // next_code_index merkle_variant, + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ) .0 @@ -8770,6 +8781,7 @@ pub mod tests { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); @@ -8824,6 +8836,7 @@ pub mod tests { let entries1 = make_slot_entries_with_transactions(1); let entries2 = make_slot_entries_with_transactions(1); let leader_keypair = Arc::new(Keypair::new()); + let reed_solomon_cache = ReedSolomonCache::default(); let shredder = Shredder::new(slot, 0, 0, 0).unwrap(); let (shreds, _) = shredder.entries_to_shreds( &leader_keypair, @@ -8832,6 +8845,7 @@ pub mod tests { 0, // next_shred_index 0, // next_code_index, true, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let (duplicate_shreds, _) = shredder.entries_to_shreds( @@ -8841,6 +8855,7 @@ pub mod tests { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let shred = shreds[0].clone(); @@ -9188,8 +9203,17 @@ pub mod tests { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - let coding1 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 0); - let coding2 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 1); + let reed_solomon_cache = ReedSolomonCache::default(); + let coding1 = Shredder::generate_coding_shreds( + &shreds, + 0, // next_code_index + &reed_solomon_cache, + ); + let coding2 = Shredder::generate_coding_shreds( + &shreds, + 1, // next_code_index + &reed_solomon_cache, + ); for shred in &shreds { info!("shred {:?}", shred); } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 94198ef9274256..06952362146009 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -76,7 +76,7 @@ pub use { shred_data::ShredData, stats::{ProcessShredsStats, ShredFetchStats}, }, - crate::shredder::Shredder, + crate::shredder::{ReedSolomonCache, Shredder}, }; mod common; @@ -717,20 +717,25 @@ impl TryFrom for ShredVariant { } } -pub(crate) fn recover(shreds: Vec) -> Result, Error> { +pub(crate) fn recover( + shreds: Vec, + reed_solomon_cache: &ReedSolomonCache, +) -> Result, Error> { match shreds .first() .ok_or(TooFewShardsPresent)? .common_header() .shred_variant { - ShredVariant::LegacyData | ShredVariant::LegacyCode => Shredder::try_recovery(shreds), + ShredVariant::LegacyData | ShredVariant::LegacyCode => { + Shredder::try_recovery(shreds, reed_solomon_cache) + } ShredVariant::MerkleCode(_) | ShredVariant::MerkleData(_) => { let shreds = shreds .into_iter() .map(merkle::Shred::try_from) .collect::>()?; - Ok(merkle::recover(shreds)? + Ok(merkle::recover(shreds, reed_solomon_cache)? .into_iter() .map(Shred::from) .collect()) @@ -750,6 +755,7 @@ pub(crate) fn make_merkle_shreds_from_entries( is_last_in_slot: bool, next_shred_index: u32, next_code_index: u32, + reed_solomon_cache: &ReedSolomonCache, stats: &mut ProcessShredsStats, ) -> Result, Error> { let now = Instant::now(); @@ -766,6 +772,7 @@ pub(crate) fn make_merkle_shreds_from_entries( is_last_in_slot, next_shred_index, next_code_index, + reed_solomon_cache, stats, )?; Ok(shreds.into_iter().flatten().map(Shred::from).collect()) diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index 3a0ee68204fe50..725854766b39f0 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -13,7 +13,7 @@ use { ShredFlags, ShredVariant, DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_DATA_SHRED_HEADERS, SIZE_OF_SIGNATURE, }, - shredder::{self, ReedSolomon}, + shredder::{self, ReedSolomonCache}, }, assert_matches::debug_assert_matches, itertools::Itertools, @@ -597,7 +597,10 @@ fn make_merkle_branch( Some(MerkleBranch { root, proof }) } -pub(super) fn recover(mut shreds: Vec) -> Result, Error> { +pub(super) fn recover( + mut shreds: Vec, + reed_solomon_cache: &ReedSolomonCache, +) -> Result, Error> { // Grab {common, coding} headers from first coding shred. let headers = shreds.iter().find_map(|shred| { let shred = match shred { @@ -674,7 +677,9 @@ pub(super) fn recover(mut shreds: Vec) -> Result, Error> { .iter() .map(|shred| Some(shred.as_ref()?.erasure_shard_as_slice().ok()?.to_vec())) .collect(); - ReedSolomon::new(num_data_shreds, num_coding_shreds)?.reconstruct(&mut shards)?; + reed_solomon_cache + .get(num_data_shreds, num_coding_shreds)? + .reconstruct(&mut shards)?; let mask: Vec<_> = shreds.iter().map(Option::is_some).collect(); // Reconstruct code and data shreds from erasure encoded shards. let mut shreds: Vec<_> = shreds @@ -778,6 +783,7 @@ pub(super) fn make_shreds_from_data( is_last_in_slot: bool, next_shred_index: u32, next_code_index: u32, + reed_solomon_cache: &ReedSolomonCache, stats: &mut ProcessShredsStats, ) -> Result>, Error> { fn new_shred_data( @@ -916,7 +922,9 @@ pub(super) fn make_shreds_from_data( shreds .into_par_iter() .zip(next_code_index) - .map(|(shreds, next_code_index)| make_erasure_batch(keypair, shreds, next_code_index)) + .map(|(shreds, next_code_index)| { + make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache) + }) .collect::, Error>>() }); stats.gen_coding_elapsed += now.elapsed().as_micros() as u64; @@ -929,6 +937,7 @@ fn make_erasure_batch( keypair: &Keypair, shreds: Vec, next_code_index: u32, + reed_solomon_cache: &ReedSolomonCache, ) -> Result, Error> { let num_data_shreds = shreds.len(); let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds); @@ -949,7 +958,9 @@ fn make_erasure_batch( // Shreds should have erasure encoded shard of the same length. debug_assert_eq!(data.iter().map(|shard| shard.len()).dedup().count(), 1); let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds]; - ReedSolomon::new(num_data_shreds, num_coding_shreds)?.encode_sep(&data, &mut parity[..])?; + reed_solomon_cache + .get(num_data_shreds, num_coding_shreds)? + .encode_sep(&data, &mut parity[..])?; let mut shreds: Vec<_> = shreds.into_iter().map(Shred::ShredData).collect(); // Initialize coding shreds from erasure coding shards. common_header.index = next_code_index; @@ -1114,9 +1125,15 @@ mod test { #[test_case(73)] fn test_recover_merkle_shreds(num_shreds: usize) { let mut rng = rand::thread_rng(); + let reed_solomon_cache = ReedSolomonCache::default(); for num_data_shreds in 1..num_shreds { let num_coding_shreds = num_shreds - num_data_shreds; - run_recover_merkle_shreds(&mut rng, num_data_shreds, num_coding_shreds); + run_recover_merkle_shreds( + &mut rng, + num_data_shreds, + num_coding_shreds, + &reed_solomon_cache, + ); } } @@ -1124,6 +1141,7 @@ mod test { rng: &mut R, num_data_shreds: usize, num_coding_shreds: usize, + reed_solomon_cache: &ReedSolomonCache, ) { let keypair = Keypair::generate(rng); let num_shreds = num_data_shreds + num_coding_shreds; @@ -1177,7 +1195,8 @@ mod test { .collect::>() .unwrap(); let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds]; - ReedSolomon::new(num_data_shreds, num_coding_shreds) + reed_solomon_cache + .get(num_data_shreds, num_coding_shreds) .unwrap() .encode_sep(&data, &mut parity[..]) .unwrap(); @@ -1237,12 +1256,12 @@ mod test { ) }) { assert_matches!( - recover(shreds), + recover(shreds, reed_solomon_cache), Err(Error::ErasureError(TooFewParityShards)) ); continue; } - let recovered_shreds = recover(shreds).unwrap(); + let recovered_shreds = recover(shreds, reed_solomon_cache).unwrap(); assert_eq!(size + recovered_shreds.len(), num_shreds); assert_eq!(recovered_shreds.len(), removed_shreds.len()); removed_shreds.sort_by(|a, b| { @@ -1287,21 +1306,27 @@ mod test { fn test_make_shreds_from_data(data_size: usize) { let mut rng = rand::thread_rng(); let data_size = data_size.saturating_sub(16).max(1); + let reed_solomon_cache = ReedSolomonCache::default(); for data_size in (data_size..data_size + 32).step_by(3) { - run_make_shreds_from_data(&mut rng, data_size); + run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache); } } #[test] fn test_make_shreds_from_data_rand() { let mut rng = rand::thread_rng(); + let reed_solomon_cache = ReedSolomonCache::default(); for _ in 0..32 { let data_size = rng.gen_range(0, 31200 * 7); - run_make_shreds_from_data(&mut rng, data_size); + run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache); } } - fn run_make_shreds_from_data(rng: &mut R, data_size: usize) { + fn run_make_shreds_from_data( + rng: &mut R, + data_size: usize, + reed_solomon_cache: &ReedSolomonCache, + ) { let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); let keypair = Keypair::new(); let slot = 149_745_689; @@ -1323,6 +1348,7 @@ mod test { true, // is_last_in_slot next_shred_index, next_code_index, + reed_solomon_cache, &mut ProcessShredsStats::default(), ) .unwrap(); @@ -1433,7 +1459,7 @@ mod test { }) .group_by(|shred| shred.common_header().fec_set_index) .into_iter() - .flat_map(|(_, shreds)| recover(shreds.collect()).unwrap()) + .flat_map(|(_, shreds)| recover(shreds.collect(), reed_solomon_cache).unwrap()) .collect(); assert_eq!(recovered_data_shreds.len(), data_shreds.len()); for (shred, other) in recovered_data_shreds.into_iter().zip(data_shreds) { diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index feebcdcb468b65..132cf670b8c637 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -4,6 +4,7 @@ use { }, itertools::Itertools, lazy_static::lazy_static, + lru::LruCache, rayon::{prelude::*, ThreadPool}, reed_solomon_erasure::{ galois_8::Field, @@ -13,7 +14,11 @@ use { solana_measure::measure::Measure, solana_rayon_threadlimit::get_thread_count, solana_sdk::{clock::Slot, signature::Keypair}, - std::{borrow::Borrow, fmt::Debug}, + std::{ + borrow::Borrow, + fmt::Debug, + sync::{Arc, Mutex}, + }, }; lazy_static! { @@ -33,7 +38,11 @@ pub(crate) const ERASURE_BATCH_SIZE: [usize; 33] = [ 55, 56, 58, 59, 60, 62, 63, 64, // 32 ]; -pub(crate) type ReedSolomon = reed_solomon_erasure::ReedSolomon; +type ReedSolomon = reed_solomon_erasure::ReedSolomon; + +pub struct ReedSolomonCache( + Mutex>>, +); #[derive(Debug)] pub struct Shredder { @@ -70,6 +79,7 @@ impl Shredder { next_shred_index: u32, next_code_index: u32, merkle_variant: bool, + reed_solomon_cache: &ReedSolomonCache, stats: &mut ProcessShredsStats, ) -> ( Vec, // data shreds @@ -87,6 +97,7 @@ impl Shredder { is_last_in_slot, next_shred_index, next_code_index, + reed_solomon_cache, stats, ) .unwrap() @@ -95,9 +106,14 @@ impl Shredder { } let data_shreds = self.entries_to_data_shreds(keypair, entries, is_last_in_slot, next_shred_index, stats); - let coding_shreds = - Self::data_shreds_to_coding_shreds(keypair, &data_shreds, next_code_index, stats) - .unwrap(); + let coding_shreds = Self::data_shreds_to_coding_shreds( + keypair, + &data_shreds, + next_code_index, + reed_solomon_cache, + stats, + ) + .unwrap(); (data_shreds, coding_shreds) } @@ -174,6 +190,7 @@ impl Shredder { keypair: &Keypair, data_shreds: &[Shred], next_code_index: u32, + reed_solomon_cache: &ReedSolomonCache, process_stats: &mut ProcessShredsStats, ) -> Result, Error> { if data_shreds.is_empty() { @@ -204,7 +221,7 @@ impl Shredder { .into_par_iter() .zip(next_code_index) .flat_map(|(shreds, next_code_index)| { - Shredder::generate_coding_shreds(&shreds, next_code_index) + Shredder::generate_coding_shreds(&shreds, next_code_index, reed_solomon_cache) }) .collect() }); @@ -228,6 +245,7 @@ impl Shredder { pub fn generate_coding_shreds>( data: &[T], next_code_index: u32, + reed_solomon_cache: &ReedSolomonCache, ) -> Vec { let (slot, index, version, fec_set_index) = { let shred = data.first().unwrap().borrow(); @@ -257,7 +275,8 @@ impl Shredder { .collect::>() .unwrap(); let mut parity = vec![vec![0u8; data[0].len()]; num_coding]; - ReedSolomon::new(num_data, num_coding) + reed_solomon_cache + .get(num_data, num_coding) .unwrap() .encode_sep(&data, &mut parity[..]) .unwrap(); @@ -282,7 +301,10 @@ impl Shredder { .collect() } - pub fn try_recovery(shreds: Vec) -> Result, Error> { + pub fn try_recovery( + shreds: Vec, + reed_solomon_cache: &ReedSolomonCache, + ) -> Result, Error> { let (slot, fec_set_index) = match shreds.first() { None => return Err(Error::from(TooFewShardsPresent)), Some(shred) => (shred.slot(), shred.fec_set_index()), @@ -322,7 +344,9 @@ impl Shredder { mask[index] = true; } } - ReedSolomon::new(num_data_shreds, num_coding_shreds)?.reconstruct_data(&mut shards)?; + reed_solomon_cache + .get(num_data_shreds, num_coding_shreds)? + .reconstruct_data(&mut shards)?; let recovered_data = mask .into_iter() .zip(shards) @@ -365,6 +389,38 @@ impl Shredder { } } +impl ReedSolomonCache { + const CAPACITY: usize = 4 * DATA_SHREDS_PER_FEC_BLOCK; + + pub(crate) fn get( + &self, + data_shards: usize, + parity_shards: usize, + ) -> Result, reed_solomon_erasure::Error> { + let key = (data_shards, parity_shards); + { + let mut cache = self.0.lock().unwrap(); + if let Some(entry) = cache.get(&key) { + return Ok(entry.clone()); + } + } + let entry = ReedSolomon::new(data_shards, parity_shards)?; + let entry = Arc::new(entry); + { + let entry = entry.clone(); + let mut cache = self.0.lock().unwrap(); + cache.put(key, entry); + } + Ok(entry) + } +} + +impl Default for ReedSolomonCache { + fn default() -> Self { + Self(Mutex::new(LruCache::new(Self::CAPACITY))) + } +} + /// Maps number of data shreds in each batch to the erasure batch size. pub(crate) fn get_erasure_batch_size(num_data_shreds: usize) -> usize { ERASURE_BATCH_SIZE @@ -464,6 +520,7 @@ mod tests { start_index, // next_shred_index start_index, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); let next_index = data_shreds.last().unwrap().index() + 1; @@ -542,6 +599,7 @@ mod tests { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); let deserialized_shred = @@ -573,6 +631,7 @@ mod tests { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); data_shreds.iter().for_each(|s| { @@ -609,6 +668,7 @@ mod tests { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); data_shreds.iter().for_each(|s| { @@ -654,6 +714,7 @@ mod tests { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); for (i, s) in data_shreds.iter().enumerate() { @@ -701,6 +762,7 @@ mod tests { }) .collect(); + let reed_solomon_cache = ReedSolomonCache::default(); let serialized_entries = bincode::serialize(&entries).unwrap(); let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, @@ -709,6 +771,7 @@ mod tests { 0, // next_shred_index 0, // next_code_index false, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let num_coding_shreds = coding_shreds.len(); @@ -728,12 +791,17 @@ mod tests { // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail assert_eq!( - Shredder::try_recovery(data_shreds[..data_shreds.len() - 1].to_vec()).unwrap(), + Shredder::try_recovery( + data_shreds[..data_shreds.len() - 1].to_vec(), + &reed_solomon_cache + ) + .unwrap(), Vec::default() ); // Test1: Try recovery/reassembly with only data shreds. Hint: should work - let recovered_data = Shredder::try_recovery(data_shreds[..].to_vec()).unwrap(); + let recovered_data = + Shredder::try_recovery(data_shreds[..].to_vec(), &reed_solomon_cache).unwrap(); assert!(recovered_data.is_empty()); // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work @@ -743,7 +811,8 @@ mod tests { .filter_map(|(i, b)| if i % 2 == 0 { Some(b.clone()) } else { None }) .collect(); - let mut recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let mut recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); assert_eq!(recovered_data.len(), 2); // Data shreds 1 and 3 were missing let recovered_shred = recovered_data.remove(0); @@ -783,7 +852,8 @@ mod tests { .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); assert_eq!(recovered_data.len(), 3); // Data shreds 0, 2, 4 were missing for (i, recovered_shred) in recovered_data.into_iter().enumerate() { @@ -838,6 +908,7 @@ mod tests { 25, // next_shred_index, 25, // next_code_index false, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); // We should have 10 shreds now @@ -855,7 +926,8 @@ mod tests { .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); assert_eq!(recovered_data.len(), 3); // Data shreds 25, 27, 29 were missing for (i, recovered_shred) in recovered_data.into_iter().enumerate() { @@ -879,7 +951,8 @@ mod tests { assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); // Test6: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds - let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); assert!(recovered_data.is_empty()); } @@ -923,6 +996,7 @@ mod tests { ) .unwrap(); let next_shred_index = rng.gen_range(1, 1024); + let reed_solomon_cache = ReedSolomonCache::default(); let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, &[entry], @@ -930,6 +1004,7 @@ mod tests { next_shred_index, next_shred_index, // next_code_index false, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let num_data_shreds = data_shreds.len(); @@ -949,7 +1024,7 @@ mod tests { .filter(|shred| shred.is_data()) .map(|shred| shred.index()) .collect(); - let recovered_shreds = Shredder::try_recovery(shreds).unwrap(); + let recovered_shreds = Shredder::try_recovery(shreds, &reed_solomon_cache).unwrap(); assert_eq!( recovered_shreds, data_shreds @@ -991,6 +1066,7 @@ mod tests { 0, // next_shred_index 0, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); assert!(!data_shreds @@ -1024,6 +1100,7 @@ mod tests { start_index, // next_shred_index start_index, // next_code_index true, // merkle_variant + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); const MIN_CHUNK_SIZE: usize = DATA_SHREDS_PER_FEC_BLOCK; @@ -1084,6 +1161,7 @@ mod tests { ); let next_code_index = data_shreds[0].index(); + let reed_solomon_cache = ReedSolomonCache::default(); for size in (1..data_shreds.len()).step_by(5) { let data_shreds = &data_shreds[..size]; @@ -1091,6 +1169,7 @@ mod tests { &keypair, data_shreds, next_code_index, + &reed_solomon_cache, &mut stats, ) .unwrap(); diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index c3caaf1b4b98c0..ef873bd8f585d8 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -2,8 +2,8 @@ use { solana_entry::entry::Entry, solana_ledger::shred::{ - max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, Shred, Shredder, - DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, + max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, ReedSolomonCache, + Shred, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, }, solana_sdk::{ clock::Slot, @@ -47,6 +47,7 @@ fn test_multi_fec_block_coding() { }) .collect(); + let reed_solomon_cache = ReedSolomonCache::default(); let serialized_entries = bincode::serialize(&entries).unwrap(); let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, @@ -55,6 +56,7 @@ fn test_multi_fec_block_coding() { 0, // next_shred_index 0, // next_code_index false, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let next_index = data_shreds.last().unwrap().index() + 1; @@ -82,7 +84,8 @@ fn test_multi_fec_block_coding() { .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); for (i, recovered_shred) in recovered_data.into_iter().enumerate() { let index = shred_start_index + (i * 2); @@ -116,6 +119,7 @@ fn test_multi_fec_block_different_size_coding() { setup_different_sized_fec_blocks(slot, parent_slot, keypair.clone()); let total_num_data_shreds: usize = fec_data.values().map(|x| x.len()).sum(); + let reed_solomon_cache = ReedSolomonCache::default(); // Test recovery for (fec_data_shreds, fec_coding_shreds) in fec_data.values().zip(fec_coding.values()) { let first_data_index = fec_data_shreds.first().unwrap().index() as usize; @@ -125,7 +129,7 @@ fn test_multi_fec_block_different_size_coding() { .chain(fec_coding_shreds.iter().step_by(2)) .cloned() .collect(); - let recovered_data = Shredder::try_recovery(all_shreds).unwrap(); + let recovered_data = Shredder::try_recovery(all_shreds, &reed_solomon_cache).unwrap(); // Necessary in order to ensure the last shred in the slot // is part of the recovered set, and that the below `index` // calcuation in the loop is correct @@ -219,6 +223,7 @@ fn setup_different_sized_fec_blocks( let mut coding_slot_and_index = HashSet::new(); let total_num_data_shreds: usize = 2 * num_shreds_per_iter; + let reed_solomon_cache = ReedSolomonCache::default(); for i in 0..2 { let is_last = i == 1; let (data_shreds, coding_shreds) = shredder.entries_to_shreds( @@ -228,6 +233,7 @@ fn setup_different_sized_fec_blocks( next_shred_index, next_code_index, false, // merkle_variant + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); for shred in &data_shreds {