Skip to content

Commit

Permalink
makes last erasure batch size >= 64 shreds (#34330)
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri authored Dec 13, 2023
1 parent 70cab76 commit 7500235
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 40 deletions.
95 changes: 67 additions & 28 deletions ledger/src/shred/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,8 @@ pub(super) fn make_shreds_from_data(
}
}
let now = Instant::now();
let erasure_batch_size = shredder::get_erasure_batch_size(DATA_SHREDS_PER_FEC_BLOCK);
let erasure_batch_size =
shredder::get_erasure_batch_size(DATA_SHREDS_PER_FEC_BLOCK, is_last_in_slot);
let proof_size = get_proof_size(erasure_batch_size);
let data_buffer_size = ShredData::capacity(proof_size)?;
let chunk_size = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_size;
Expand Down Expand Up @@ -872,7 +873,8 @@ pub(super) fn make_shreds_from_data(
let data_buffer_size = ShredData::capacity(proof_size).ok()?;
let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size;
let num_data_shreds = num_data_shreds.max(1);
let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds);
let erasure_batch_size =
shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
(proof_size == get_proof_size(erasure_batch_size))
.then_some((proof_size, data_buffer_size))
})
Expand Down Expand Up @@ -932,7 +934,8 @@ pub(super) fn make_shreds_from_data(
.scan(next_code_index, |next_code_index, chunk| {
let out = Some(*next_code_index);
let num_data_shreds = chunk.len();
let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds);
let erasure_batch_size =
shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
let num_coding_shreds = erasure_batch_size - num_data_shreds;
*next_code_index += num_coding_shreds as u32;
out
Expand All @@ -945,7 +948,13 @@ pub(super) fn make_shreds_from_data(
.into_iter()
.zip(next_code_index)
.map(|(shreds, next_code_index)| {
make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache)
make_erasure_batch(
keypair,
shreds,
next_code_index,
is_last_in_slot,
reed_solomon_cache,
)
})
.collect()
} else {
Expand All @@ -954,7 +963,13 @@ pub(super) fn make_shreds_from_data(
.into_par_iter()
.zip(next_code_index)
.map(|(shreds, next_code_index)| {
make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache)
make_erasure_batch(
keypair,
shreds,
next_code_index,
is_last_in_slot,
reed_solomon_cache,
)
})
.collect()
})
Expand All @@ -969,10 +984,11 @@ fn make_erasure_batch(
keypair: &Keypair,
shreds: Vec<ShredData>,
next_code_index: u32,
is_last_in_slot: bool,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<Vec<Shred>, Error> {
let num_data_shreds = shreds.len();
let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds);
let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
let num_coding_shreds = erasure_batch_size - num_data_shreds;
let proof_size = get_proof_size(erasure_batch_size);
debug_assert!(shreds
Expand Down Expand Up @@ -1056,7 +1072,10 @@ mod test {
itertools::Itertools,
rand::{seq::SliceRandom, CryptoRng, Rng},
rayon::ThreadPoolBuilder,
solana_sdk::signature::{Keypair, Signer},
solana_sdk::{
packet::PACKET_DATA_SIZE,
signature::{Keypair, Signer},
},
std::{cmp::Ordering, iter::repeat_with},
test_case::test_case,
};
Expand Down Expand Up @@ -1124,8 +1143,7 @@ mod test {
assert_eq!(entry, &bytes[..SIZE_OF_MERKLE_PROOF_ENTRY]);
}

fn run_merkle_tree_round_trip(size: usize) {
let mut rng = rand::thread_rng();
fn run_merkle_tree_round_trip<R: Rng>(rng: &mut R, size: usize) {
let nodes = repeat_with(|| rng.gen::<[u8; 32]>()).map(Hash::from);
let nodes: Vec<_> = nodes.take(size).collect();
let tree = make_merkle_tree(nodes.clone());
Expand All @@ -1145,8 +1163,9 @@ mod test {

#[test]
fn test_merkle_tree_round_trip() {
for size in [1, 2, 3, 4, 5, 6, 7, 8, 9, 19, 37, 64, 79] {
run_merkle_tree_round_trip(size);
let mut rng = rand::thread_rng();
for size in 1..=143 {
run_merkle_tree_round_trip(&mut rng, size);
}
}

Expand Down Expand Up @@ -1327,32 +1346,49 @@ mod test {
}
}

#[test_case(0)]
#[test_case(15600)]
#[test_case(31200)]
#[test_case(46800)]
fn test_make_shreds_from_data(data_size: usize) {
#[test_case(0, false)]
#[test_case(0, true)]
#[test_case(15600, false)]
#[test_case(15600, true)]
#[test_case(31200, false)]
#[test_case(31200, true)]
#[test_case(46800, false)]
#[test_case(46800, true)]
fn test_make_shreds_from_data(data_size: usize, is_last_in_slot: bool) {
let mut rng = rand::thread_rng();
let data_size = data_size.saturating_sub(16);
let reed_solomon_cache = ReedSolomonCache::default();
for data_size in data_size..data_size + 32 {
run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache);
run_make_shreds_from_data(&mut rng, data_size, is_last_in_slot, &reed_solomon_cache);
}
}

#[test]
fn test_make_shreds_from_data_rand() {
#[test_case(false)]
#[test_case(true)]
fn test_make_shreds_from_data_rand(is_last_in_slot: bool) {
let mut rng = rand::thread_rng();
let reed_solomon_cache = ReedSolomonCache::default();
for _ in 0..32 {
let data_size = rng.gen_range(0..31200 * 7);
run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache);
run_make_shreds_from_data(&mut rng, data_size, is_last_in_slot, &reed_solomon_cache);
}
}

#[ignore]
#[test_case(false)]
#[test_case(true)]
fn test_make_shreds_from_data_paranoid(is_last_in_slot: bool) {
let mut rng = rand::thread_rng();
let reed_solomon_cache = ReedSolomonCache::default();
for data_size in 0..=PACKET_DATA_SIZE * 4 * 64 {
run_make_shreds_from_data(&mut rng, data_size, is_last_in_slot, &reed_solomon_cache);
}
}

fn run_make_shreds_from_data<R: Rng>(
rng: &mut R,
data_size: usize,
is_last_in_slot: bool,
reed_solomon_cache: &ReedSolomonCache,
) {
let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
Expand All @@ -1373,7 +1409,7 @@ mod test {
parent_slot,
shred_version,
reference_tick,
true, // is_last_in_slot
is_last_in_slot,
next_shred_index,
next_code_index,
reed_solomon_cache,
Expand Down Expand Up @@ -1480,14 +1516,17 @@ mod test {
.flags
.contains(ShredFlags::LAST_SHRED_IN_SLOT))
.count(),
1
if is_last_in_slot { 1 } else { 0 }
);
assert_eq!(
data_shreds
.last()
.unwrap()
.data_header
.flags
.contains(ShredFlags::LAST_SHRED_IN_SLOT),
is_last_in_slot
);
assert!(data_shreds
.last()
.unwrap()
.data_header
.flags
.contains(ShredFlags::LAST_SHRED_IN_SLOT));
// Assert that data shreds can be recovered from coding shreds.
let recovered_data_shreds: Vec<_> = shreds
.iter()
Expand Down
46 changes: 34 additions & 12 deletions ledger/src/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,13 @@ impl Shredder {
.iter()
.scan(next_code_index, |next_code_index, chunk| {
let num_data_shreds = chunk.len();
let erasure_batch_size = get_erasure_batch_size(num_data_shreds);
let is_last_in_slot = chunk
.last()
.copied()
.map(Shred::last_in_slot)
.unwrap_or(true);
let erasure_batch_size =
get_erasure_batch_size(num_data_shreds, is_last_in_slot);
*next_code_index += (erasure_batch_size - num_data_shreds) as u32;
Some(*next_code_index)
}),
Expand Down Expand Up @@ -276,7 +282,12 @@ impl Shredder {
&& shred.version() == version
&& shred.fec_set_index() == fec_set_index));
let num_data = data.len();
let num_coding = get_erasure_batch_size(num_data)
let is_last_in_slot = data
.last()
.map(Borrow::borrow)
.map(Shred::last_in_slot)
.unwrap_or(true);
let num_coding = get_erasure_batch_size(num_data, is_last_in_slot)
.checked_sub(num_data)
.unwrap();
assert!(num_coding > 0);
Expand Down Expand Up @@ -434,11 +445,16 @@ impl Default for ReedSolomonCache {
}

/// Maps number of data shreds in each batch to the erasure batch size.
pub(crate) fn get_erasure_batch_size(num_data_shreds: usize) -> usize {
ERASURE_BATCH_SIZE
pub(crate) fn get_erasure_batch_size(num_data_shreds: usize, is_last_in_slot: bool) -> usize {
let erasure_batch_size = ERASURE_BATCH_SIZE
.get(num_data_shreds)
.copied()
.unwrap_or(2 * num_data_shreds)
.unwrap_or(2 * num_data_shreds);
if is_last_in_slot {
erasure_batch_size.max(2 * DATA_SHREDS_PER_FEC_BLOCK)
} else {
erasure_batch_size
}
}

// Returns offsets to fec_set_index when spliting shreds into erasure batches.
Expand Down Expand Up @@ -518,17 +534,19 @@ mod tests {
})
.collect();

let is_last_in_slot = true;
let size = serialized_size(&entries).unwrap() as usize;
// Integer division to ensure we have enough shreds to fit all the data
let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap();
let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size;
let num_expected_coding_shreds =
get_erasure_batch_size(num_expected_data_shreds) - num_expected_data_shreds;
get_erasure_batch_size(num_expected_data_shreds, is_last_in_slot)
- num_expected_data_shreds;
let start_index = 0;
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&keypair,
&entries,
true, // is_last_in_slot
is_last_in_slot,
start_index, // next_shred_index
start_index, // next_code_index
true, // merkle_variant
Expand Down Expand Up @@ -792,7 +810,7 @@ mod tests {
assert_eq!(data_shreds.len(), num_data_shreds);
assert_eq!(
num_coding_shreds,
get_erasure_batch_size(num_data_shreds) - num_data_shreds
get_erasure_batch_size(num_data_shreds, is_last_in_slot) - num_data_shreds
);

let all_shreds = data_shreds
Expand Down Expand Up @@ -1189,7 +1207,10 @@ mod tests {
.iter()
.group_by(|shred| shred.fec_set_index())
.into_iter()
.map(|(_, chunk)| get_erasure_batch_size(chunk.count()))
.map(|(_, chunk)| {
let chunk: Vec<_> = chunk.collect();
get_erasure_batch_size(chunk.len(), chunk.last().unwrap().last_in_slot())
})
.sum();
assert_eq!(coding_shreds.len(), num_shreds - data_shreds.len());
}
Expand Down Expand Up @@ -1232,9 +1253,10 @@ mod tests {
#[test]
fn test_max_shreds_per_slot() {
for num_data_shreds in 32..128 {
let num_coding_shreds = get_erasure_batch_size(num_data_shreds)
.checked_sub(num_data_shreds)
.unwrap();
let num_coding_shreds =
get_erasure_batch_size(num_data_shreds, /*is_last_in_slot:*/ false)
.checked_sub(num_data_shreds)
.unwrap();
assert!(
MAX_DATA_SHREDS_PER_SLOT * num_coding_shreds
<= MAX_CODE_SHREDS_PER_SLOT * num_data_shreds
Expand Down
1 change: 1 addition & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2562,6 +2562,7 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {

#[test]
#[serial]
#[ignore]
fn test_rpc_block_subscribe() {
let total_stake = 100 * DEFAULT_NODE_STAKE;
let leader_stake = total_stake;
Expand Down

0 comments on commit 7500235

Please sign in to comment.