Skip to content

Commit

Permalink
removes buffering when generating coding shreds in broadcast (solana-…
Browse files Browse the repository at this point in the history
…labs#25807)

Given the 32:32 erasure recovery schema, current implementation requires
exactly 32 data shreds to generate coding shreds for the batch (except
for the final erasure batch in each slot).
As a result, when serializing ledger entries to data shreds, if the
number of data shreds is not a multiple of 32, the coding shreds for the
last batch cannot be generated until there are more data shreds to
complete the batch to 32 data shreds. This adds latency in generating
and broadcasting coding shreds.

In addition, with Merkle variants for shreds, data shreds cannot be
signed and broadcasted until coding shreds are also generated. As a
result *both* code and data shreds will be delayed before broadcast if
we still require exactly 32 data shreds for each batch.

This commit instead always generates and broadcast coding shreds as soon
as there any number of data shreds available. When serializing entries
to shreds:
* if the number of resulting data shreds is less than 32, then more
  coding shreds will be generated so that the resulting erasure batch
  has the same recovery probabilities as a 32:32 batch.
* if the number of data shreds is more than 32, then the data shreds are
  split uniformly into erasure batches with _at least_ 32 data shreds in
  each batch. Each erasure batch will have the same number of code and
  data shreds.

For example:
* If there are 19 data shreds, 27 coding shreds are generated. The
  resulting 19(data):27(code) erasure batch has the same recovery
  probabilities as a 32:32 batch.
* If there are 107 data shreds, they are split into 3 batches of 36:36,
  36:36 and 35:35 data:code shreds each.

A consequence of this change is that code and data shreds indices will
no longer align as there will be more coding shreds than data shreds
(not only in the last batch in each slot but also in the intermediate
ones);
  • Loading branch information
behzadnouri authored Aug 11, 2022
1 parent ce23003 commit ac91cda
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 254 deletions.
22 changes: 10 additions & 12 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags,
Shredder, LEGACY_SHRED_DATA_CAPACITY, MAX_DATA_SHREDS_PER_FEC_BLOCK,
Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
Expand Down Expand Up @@ -153,26 +153,24 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {

#[bench]
fn bench_shredder_coding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
bencher.iter(|| {
Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
true, // is_last_in_slot
0, // next_code_index
0, // next_code_index
)
.len();
})
}

#[bench]
fn bench_shredder_decoding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
let coding_shreds = Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
true, // is_last_in_slot
0, // next_code_index
0, // next_code_index
);
bencher.iter(|| {
Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap();
Expand All @@ -181,18 +179,18 @@ fn bench_shredder_decoding(bencher: &mut Bencher) {

#[bench]
fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize);
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count);
bencher.iter(|| {
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
encoder.get_encoded_packets(symbol_count);
encoder.get_encoded_packets(symbol_count as u32);
})
}

#[bench]
fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize);
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count);
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
let mut packets = encoder.get_encoded_packets(symbol_count as u32);
packets.shuffle(&mut rand::thread_rng());
Expand Down
4 changes: 0 additions & 4 deletions core/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use {
crate::result::Result,
crossbeam_channel::Receiver,
solana_entry::entry::Entry,
solana_ledger::shred::Shred,
solana_poh::poh_recorder::WorkingBankEntry,
solana_runtime::bank::Bank,
solana_sdk::clock::Slot,
Expand All @@ -25,9 +24,6 @@ pub struct UnfinishedSlotInfo {
pub(crate) next_code_index: u32,
pub slot: Slot,
pub parent: Slot,
// Data shreds buffered to make a batch of size
// MAX_DATA_SHREDS_PER_FEC_BLOCK.
pub(crate) data_shreds_buffer: Vec<Shred>,
}

/// This parameter tunes how many entries are received in one iteration of recv loop
Expand Down
156 changes: 40 additions & 116 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use {
broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache,
},
solana_entry::entry::Entry,
solana_ledger::shred::{
ProcessShredsStats, Shred, ShredFlags, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK,
},
solana_ledger::shred::{ProcessShredsStats, Shred, ShredFlags, Shredder},
solana_sdk::{
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
Expand Down Expand Up @@ -68,52 +66,41 @@ impl StandardBroadcastRun {
None => Vec::default(),
Some(ref state) if state.slot == current_slot => Vec::default(),
Some(ref mut state) => {
let parent_offset = state.slot - state.parent;
let reference_tick = max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK;
let fec_set_offset = state
.data_shreds_buffer
.first()
.map(Shred::index)
.unwrap_or(state.next_shred_index);
let fec_set_index = Shredder::fec_set_index(state.next_shred_index, fec_set_offset);
let mut shred = Shred::new_from_data(
state.slot,
state.next_shred_index,
parent_offset as u16,
&[], // data
ShredFlags::LAST_SHRED_IN_SLOT,
reference_tick,
self.shred_version,
fec_set_index.unwrap(),
);
shred.sign(keypair);
state.data_shreds_buffer.push(shred.clone());
let mut shreds = make_coding_shreds(
let shredder =
Shredder::new(state.slot, state.parent, reference_tick, self.shred_version)
.unwrap();
let (mut shreds, coding_shreds) = shredder.entries_to_shreds(
keypair,
&mut self.unfinished_slot,
true, // is_last_in_slot
&[], // entries
true, // is_last_in_slot,
state.next_shred_index,
state.next_code_index,
stats,
);
shreds.insert(0, shred);
self.report_and_reset_stats(true);
self.unfinished_slot = None;
shreds.extend(coding_shreds);
shreds
}
}
}

fn entries_to_data_shreds(
fn entries_to_shreds(
&mut self,
keypair: &Keypair,
entries: &[Entry],
blockstore: &Blockstore,
reference_tick: u8,
is_slot_end: bool,
process_stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
) -> (
Vec<Shred>, // data shreds
Vec<Shred>, // coding shreds
) {
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
let next_shred_index = match &self.unfinished_slot {
Some(state) => state.next_shred_index,
let (next_shred_index, next_code_index) = match &self.unfinished_slot {
Some(state) => (state.next_shred_index, state.next_code_index),
None => {
// If the blockstore has shreds for the slot, it should not
// recreate the slot:
Expand All @@ -123,46 +110,37 @@ impl StandardBroadcastRun {
process_stats.num_extant_slots += 1;
// This is a faulty situation that should not happen.
// Refrain from generating shreds for the slot.
return Vec::default();
return (Vec::default(), Vec::default());
}
}
0u32
}
};
let data_shreds = Shredder::new(slot, parent_slot, reference_tick, self.shred_version)
.unwrap()
.entries_to_data_shreds(
keypair,
entries,
is_slot_end,
next_shred_index,
0, // fec_set_offset
process_stats,
);
let mut data_shreds_buffer = match &mut self.unfinished_slot {
Some(state) => {
assert_eq!(state.slot, slot);
std::mem::take(&mut state.data_shreds_buffer)
(0u32, 0u32)
}
None => Vec::default(),
};
data_shreds_buffer.extend(data_shreds.clone());
let shredder =
Shredder::new(slot, parent_slot, reference_tick, self.shred_version).unwrap();
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
keypair,
entries,
is_slot_end,
next_shred_index,
next_code_index,
process_stats,
);
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
Some(index) => index + 1,
None => next_shred_index,
};
let next_code_index = match &self.unfinished_slot {
Some(state) => state.next_code_index,
None => 0,
let next_code_index = match coding_shreds.iter().map(Shred::index).max() {
Some(index) => index + 1,
None => next_code_index,
};
self.unfinished_slot = Some(UnfinishedSlotInfo {
next_shred_index,
next_code_index,
slot,
parent: parent_slot,
data_shreds_buffer,
});
data_shreds
(data_shreds, coding_shreds)
}

#[cfg(test)]
Expand Down Expand Up @@ -228,7 +206,7 @@ impl StandardBroadcastRun {
// 2) Convert entries to shreds and coding shreds
let is_last_in_slot = last_tick_height == bank.max_tick_height();
let reference_tick = bank.tick_height() % bank.ticks_per_slot();
let data_shreds = self.entries_to_data_shreds(
let (data_shreds, coding_shreds) = self.entries_to_shreds(
keypair,
&receive_results.entries,
blockstore,
Expand Down Expand Up @@ -300,13 +278,7 @@ impl StandardBroadcastRun {
socket_sender.send((data_shreds.clone(), batch_info.clone()))?;
blockstore_sender.send((data_shreds, batch_info.clone()))?;

// Create and send coding shreds
let coding_shreds = make_coding_shreds(
keypair,
&mut self.unfinished_slot,
is_last_in_slot,
&mut process_stats,
);
// Send coding shreds
let coding_shreds = Arc::new(coding_shreds);
debug_assert!(coding_shreds
.iter()
Expand Down Expand Up @@ -435,49 +407,6 @@ impl StandardBroadcastRun {
}
}

// Consumes data_shreds_buffer returning corresponding coding shreds.
fn make_coding_shreds(
keypair: &Keypair,
unfinished_slot: &mut Option<UnfinishedSlotInfo>,
is_slot_end: bool,
stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
let unfinished_slot = match unfinished_slot {
None => return Vec::default(),
Some(state) => state,
};
let data_shreds: Vec<_> = {
let size = unfinished_slot.data_shreds_buffer.len();
// Consume a multiple of 32, unless this is the slot end.
let offset = if is_slot_end {
0
} else {
size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize
};
unfinished_slot
.data_shreds_buffer
.drain(0..size - offset)
.collect()
};
let shreds = Shredder::data_shreds_to_coding_shreds(
keypair,
&data_shreds,
is_slot_end,
unfinished_slot.next_code_index,
stats,
)
.unwrap();
if let Some(index) = shreds
.iter()
.filter(|shred| shred.is_code())
.map(Shred::index)
.max()
{
unfinished_slot.next_code_index = unfinished_slot.next_code_index.max(index + 1);
}
shreds
}

impl BroadcastRun for StandardBroadcastRun {
fn run(
&mut self,
Expand Down Expand Up @@ -591,7 +520,6 @@ mod test {
next_code_index: 17,
slot,
parent,
data_shreds_buffer: Vec::default(),
});
run.slot_broadcast_start = Some(Instant::now());

Expand Down Expand Up @@ -776,19 +704,15 @@ mod test {
while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) {
shreds.extend(recv_shreds.deref().clone());
}
assert!(shreds.len() < 32, "shreds.len(): {}", shreds.len());
assert!(shreds.iter().all(|shred| shred.is_data()));
// At least as many coding shreds as data shreds.
assert!(shreds.len() >= 29 * 2);
assert_eq!(shreds.iter().filter(|shred| shred.is_data()).count(), 29);
process_ticks(75);
while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) {
shreds.extend(recv_shreds.deref().clone());
}
assert!(shreds.len() > 64, "shreds.len(): {}", shreds.len());
let num_coding_shreds = shreds.iter().filter(|shred| shred.is_code()).count();
assert_eq!(
num_coding_shreds, 32,
"num coding shreds: {}",
num_coding_shreds
);
assert!(shreds.len() >= 33 * 2);
assert_eq!(shreds.iter().filter(|shred| shred.is_data()).count(), 33);
}

#[test]
Expand Down
3 changes: 1 addition & 2 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ mod tests {
));
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred],
false, // is_last_in_slot
3, // next_code_index
3, // next_code_index
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
Expand Down
10 changes: 2 additions & 8 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8969,14 +8969,8 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let coding1 = Shredder::generate_coding_shreds(
&shreds, false, // is_last_in_slot
0, // next_code_index
);
let coding2 = Shredder::generate_coding_shreds(
&shreds, true, // is_last_in_slot
0, // next_code_index
);
let coding1 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 0);
let coding2 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 1);
for shred in &shreds {
info!("shred {:?}", shred);
}
Expand Down
6 changes: 5 additions & 1 deletion ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE;
const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT;
const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;

pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32;
// Shreds are uniformly split into erasure batches with a "target" number of
// data shreds per each batch as below. The actual number of data shreds in
// each erasure batch depends on the number of shreds obtained from serializing
// a &[Entry].
pub const DATA_SHREDS_PER_FEC_BLOCK: usize = 32;

// For legacy tests and benchmarks.
const_assert_eq!(LEGACY_SHRED_DATA_CAPACITY, 1051);
Expand Down
6 changes: 3 additions & 3 deletions ledger/src/shred/shred_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
common::dispatch,
legacy, merkle,
traits::{Shred, ShredCode as ShredCodeTrait},
CodingShredHeader, Error, ShredCommonHeader, ShredType, MAX_DATA_SHREDS_PER_FEC_BLOCK,
CodingShredHeader, Error, ShredCommonHeader, ShredType, DATA_SHREDS_PER_FEC_BLOCK,
MAX_DATA_SHREDS_PER_SLOT, SIZE_OF_NONCE,
},
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature},
Expand Down Expand Up @@ -132,8 +132,8 @@ pub(super) fn sanitize<T: ShredCodeTrait>(shred: &T) -> Result<(), Error> {
common_header.index,
));
}
let num_coding_shreds = u32::from(coding_header.num_coding_shreds);
if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK {
let num_coding_shreds = usize::from(coding_header.num_coding_shreds);
if num_coding_shreds > 8 * DATA_SHREDS_PER_FEC_BLOCK {
return Err(Error::InvalidNumCodingShreds(
coding_header.num_coding_shreds,
));
Expand Down
Loading

0 comments on commit ac91cda

Please sign in to comment.