Skip to content

Commit

Permalink
hides implementation details of shred from its public interface (#24563)
Browse files Browse the repository at this point in the history
Working towards embedding versioning into shreds binary, so that a new
variant of shred struct can include merkle tree hashes of the erasure
set.
  • Loading branch information
behzadnouri authored Apr 25, 2022
1 parent 758fcd3 commit 895f76a
Show file tree
Hide file tree
Showing 19 changed files with 366 additions and 390 deletions.
4 changes: 2 additions & 2 deletions core/benches/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn get_retransmit_peers_deterministic(
num_simulated_shreds: usize,
) {
for i in 0..num_simulated_shreds {
shred.common_header.index = i as u32;
shred.set_index(i as u32);
let (_neighbors, _children) = cluster_nodes.get_retransmit_peers(
*slot_leader,
shred,
Expand All @@ -55,7 +55,7 @@ fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: O
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
let slot = rand::random::<u64>();
let mut shred = Shred::new_empty_data_shred();
shred.common_header.slot = slot;
shred.set_slot(slot);
b.iter(|| {
get_retransmit_peers_deterministic(
&cluster_nodes,
Expand Down
28 changes: 16 additions & 12 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@ use {
solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, Shredder,
MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_PAYLOAD_SIZE, SIZE_OF_CODING_SHRED_HEADERS,
SIZE_OF_DATA_SHRED_PAYLOAD,
MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, signature::Keypair},
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
test::Bencher,
};

// Copied these values here to avoid exposing shreds
// internals only for the sake of benchmarks.

// size of nonce: 4
// size of common shred header: 83
// size of coding shred header: 6
const VALID_SHRED_DATA_LEN: usize = PACKET_DATA_SIZE - 4 - 83 - 6;

fn make_test_entry(txs_per_entry: u64) -> Entry {
Entry {
num_hashes: 100_000,
Expand Down Expand Up @@ -54,11 +61,10 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {

fn make_concatenated_shreds(num_shreds: usize) -> Vec<u8> {
let data_shreds = make_shreds(num_shreds);
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
let mut data: Vec<u8> = vec![0; num_shreds * valid_shred_data_len];
let mut data: Vec<u8> = vec![0; num_shreds * VALID_SHRED_DATA_LEN];
for (i, shred) in (data_shreds[0..num_shreds]).iter().enumerate() {
data[i * valid_shred_data_len..(i + 1) * valid_shred_data_len]
.copy_from_slice(&shred.payload[..valid_shred_data_len]);
data[i * VALID_SHRED_DATA_LEN..(i + 1) * VALID_SHRED_DATA_LEN]
.copy_from_slice(&shred.payload()[..VALID_SHRED_DATA_LEN]);
}

data
Expand Down Expand Up @@ -120,7 +126,7 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1);

bencher.iter(|| {
let payload = shred.payload.clone();
let payload = shred.payload().clone();
let _ = Shred::new_from_serialized_shred(payload).unwrap();
})
}
Expand Down Expand Up @@ -157,9 +163,8 @@ fn bench_shredder_decoding(bencher: &mut Bencher) {
fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize);
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
bencher.iter(|| {
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
encoder.get_encoded_packets(symbol_count);
})
}
Expand All @@ -168,8 +173,7 @@ fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize);
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
let mut packets = encoder.get_encoded_packets(symbol_count as u32);
packets.shuffle(&mut rand::thread_rng());

Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ pub fn broadcast_shreds(
update_peer_stats(&cluster_nodes, last_datapoint_submit);
let root_bank = root_bank.clone();
shreds.flat_map(move |shred| {
repeat(&shred.payload).zip(cluster_nodes.get_broadcast_addrs(
repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs(
shred,
&root_bank,
DATA_PLANE_FANOUT,
Expand Down
4 changes: 2 additions & 2 deletions core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,13 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.filter_map(|pubkey| {
let tvu = cluster_info
.lookup_contact_info(pubkey, |contact_info| contact_info.tvu)?;
Some((&shred.payload, tvu))
Some((shred.payload(), tvu))
})
.collect(),
);
}

Some(vec![(&shred.payload, node.tvu)])
Some(vec![(shred.payload(), node.tvu)])
})
.flatten()
.collect();
Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
if fake == (i <= self.partition) {
// Send fake shreds to the first N peers
data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
sock.send_to(b.payload(), &peer.tvu_forwards).unwrap();
});
}
});
Expand Down
2 changes: 1 addition & 1 deletion core/src/packet_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl PacketHasher {
}

pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 {
self.hash_data(&shred.payload)
self.hash_data(shred.payload())
}

fn hash_data(&self, data: &[u8]) -> u64 {
Expand Down
4 changes: 2 additions & 2 deletions core/src/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ mod test {
assert_eq!(shred.slot(), slot);
let keypair = Keypair::new();
shred.sign(&keypair);
trace!("signature {}", shred.common_header.signature);
trace!("signature {}", shred.signature());
let nonce = 9;
let mut packet = repair_response_packet_from_bytes(
shred.payload,
shred.into_payload(),
&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
nonce,
)
Expand Down
37 changes: 13 additions & 24 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3144,10 +3144,7 @@ pub mod tests {
create_new_tmp_ledger,
genesis_utils::{create_genesis_config, create_genesis_config_with_leader},
get_tmp_ledger_path,
shred::{
CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED,
SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD,
},
shred::{Shred, SIZE_OF_DATA_SHRED_PAYLOAD},
},
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
Expand All @@ -3163,7 +3160,6 @@ pub mod tests {
genesis_config,
hash::{hash, Hash},
instruction::InstructionError,
packet::PACKET_DATA_SIZE,
poh_config::PohConfig,
signature::{Keypair, Signer},
system_transaction,
Expand Down Expand Up @@ -3747,26 +3743,19 @@ pub mod tests {
fn test_dead_fork_entry_deserialize_failure() {
// Insert entry that causes deserialization failure
let res = check_dead_fork(|_, bank| {
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
let mut data_header = DataShredHeader::default();
data_header.flags |= DATA_COMPLETE_SHRED;
// Need to provide the right size for Shredder::deshred.
data_header.size = SIZE_OF_DATA_SHRED_PAYLOAD as u16;
data_header.parent_offset = (bank.slot() - bank.parent_slot()) as u16;
let shred_common_header = ShredCommonHeader {
slot: bank.slot(),
..ShredCommonHeader::default()
};
let mut shred = Shred::new_empty_from_header(
shred_common_header,
data_header,
CodingShredHeader::default(),
let gibberish = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let parent_offset = bank.slot() - bank.parent_slot();
let shred = Shred::new_from_data(
bank.slot(),
0, // index,
parent_offset as u16,
Some(&gibberish),
true, // is_last_data
false, // is_last_in_slot
0, // reference_tick
0, // version
0, // fec_set_index
);
bincode::serialize_into(
&mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..],
&gibberish[..SIZE_OF_DATA_SHRED_PAYLOAD],
)
.unwrap();
vec![shred]
});

Expand Down
2 changes: 1 addition & 1 deletion core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ fn retransmit(
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);

let mut retransmit_time = Measure::start("retransmit_to");
let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) {
let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
stats
Expand Down
7 changes: 4 additions & 3 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,11 +1095,12 @@ mod tests {
// Create slots [1, 2] with 1 shred apiece
let (mut shreds, _) = make_many_slot_entries(1, 2, 1);

// Make shred for slot 1 too large
assert_eq!(shreds[0].slot(), 1);
assert_eq!(shreds[0].index(), 0);
shreds[0].payload.push(10);
shreds[0].data_header.size = shreds[0].payload.len() as u16;
// TODO: The test previously relied on corrupting shred payload
// size which we no longer want to expose. Current test no longer
// covers packet size check in repair_response_packet_from_bytes.
shreds.remove(0);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
Expand Down
16 changes: 8 additions & 8 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ pub mod tests {
let keypair = Keypair::new();
shred.sign(&keypair);
batches[0].packets.resize(1, Packet::default());
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();

let mut shred = Shred::new_from_data(
0xc0de_dead,
Expand All @@ -110,8 +110,8 @@ pub mod tests {
);
shred.sign(&keypair);
batches[1].packets.resize(1, Packet::default());
batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[1].packets[0].meta.size = shred.payload.len();
batches[1].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[1].packets[0].meta.size = shred.payload().len();

let expected: HashSet<u64> = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect();
assert_eq!(ShredSigVerifier::read_slots(&batches), expected);
Expand Down Expand Up @@ -143,8 +143,8 @@ pub mod tests {
0xc0de,
);
shred.sign(&leader_keypair);
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();

let mut shred = Shred::new_from_data(
0,
Expand All @@ -159,8 +159,8 @@ pub mod tests {
);
let wrong_keypair = Keypair::new();
shred.sign(&wrong_keypair);
batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[1].meta.size = shred.payload.len();
batches[0].packets[1].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[1].meta.size = shred.payload().len();

let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches);
let rv = verifier.verify_batches(batches, num_packets);
Expand Down
31 changes: 8 additions & 23 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ pub(crate) fn should_retransmit_and_persist(
} else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 {
inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1);
false
} else if shred.data_header.size as usize > shred.payload.len() {
inc_new_counter_warn!("streamer-recv_window-shred_bad_meta_size", 1);
} else if !shred.sanitize() {
inc_new_counter_warn!("streamer-recv_window-invalid-shred", 1);
false
} else {
true
Expand All @@ -215,13 +215,13 @@ fn run_check_duplicate(
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) =
blockstore.is_shred_duplicate(shred.id(), shred.payload.clone())
blockstore.is_shred_duplicate(shred.id(), shred.payload().clone())
{
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload,
shred.payload,
shred.into_payload(),
)?;

duplicate_slot_sender.send(shred_slot)?;
Expand Down Expand Up @@ -717,7 +717,7 @@ mod test {
blockstore::{make_many_slot_entries, Blockstore},
genesis_utils::create_genesis_config_with_leader,
get_tmp_ledger_path,
shred::{DataShredHeader, Shredder},
shred::Shredder,
},
solana_sdk::{
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
Expand Down Expand Up @@ -825,21 +825,9 @@ mod test {
0
));

// with a bad header size
let mut bad_header_shred = shreds[0].clone();
bad_header_shred.data_header.size = (bad_header_shred.payload.len() + 1) as u16;
assert!(!should_retransmit_and_persist(
&bad_header_shred,
Some(bank.clone()),
&cache,
&me_id,
0,
0
));

// with an invalid index, shred gets thrown out
let mut bad_index_shred = shreds[0].clone();
bad_index_shred.common_header.index = (MAX_DATA_SHREDS_PER_SLOT + 1) as u32;
bad_index_shred.set_index((MAX_DATA_SHREDS_PER_SLOT + 1) as u32);
assert!(!should_retransmit_and_persist(
&bad_index_shred,
Some(bank.clone()),
Expand Down Expand Up @@ -875,7 +863,7 @@ mod test {
));

// coding shreds don't contain parent slot information, test that slot >= root
let (common, coding) = Shred::new_coding_shred_header(
let mut coding_shred = Shred::new_empty_coding(
5, // slot
5, // index
5, // fec_set_index
Expand All @@ -884,8 +872,6 @@ mod test {
3, // position
0, // version
);
let mut coding_shred =
Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
coding_shred.sign(&leader_keypair);
// shred.slot() > root, shred continues
assert!(should_retransmit_and_persist(
Expand Down Expand Up @@ -959,7 +945,7 @@ mod test {
std::net::{IpAddr, Ipv4Addr},
};
solana_logger::setup();
let (common, coding) = Shred::new_coding_shred_header(
let shred = Shred::new_empty_coding(
5, // slot
5, // index
5, // fec_set_index
Expand All @@ -968,7 +954,6 @@ mod test {
4, // position
0, // version
);
let shred = Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
let mut shreds = vec![shred.clone(), shred.clone(), shred];
let _from_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let repair_meta = RepairMeta {
Expand Down
7 changes: 5 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3343,9 +3343,12 @@ mod tests {
let keypair = Keypair::new();
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen();
let next_shred_index = rng.gen_range(0, 32_000);
let shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader).payload;
let other_payload = {
let other_shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
other_shred.into_payload()
};
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
Expand Down
Loading

0 comments on commit 895f76a

Please sign in to comment.