Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
hides implementation details of shred from its public interface
Browse files Browse the repository at this point in the history
Working towards embedding versioning into shreds binary, so that a new
variant of shred struct can include merkle tree hashes of the erasure
set.
  • Loading branch information
behzadnouri committed Apr 22, 2022
1 parent 948abf0 commit cc4d9af
Show file tree
Hide file tree
Showing 21 changed files with 379 additions and 390 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/benches/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn get_retransmit_peers_deterministic(
num_simulated_shreds: usize,
) {
for i in 0..num_simulated_shreds {
shred.common_header.index = i as u32;
shred.set_index(i as u32);
let (_neighbors, _children) = cluster_nodes.get_retransmit_peers(
*slot_leader,
shred,
Expand All @@ -55,7 +55,7 @@ fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: O
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
let slot = rand::random::<u64>();
let mut shred = Shred::new_empty_data_shred();
shred.common_header.slot = slot;
shred.set_slot(slot);
b.iter(|| {
get_retransmit_peers_deterministic(
&cluster_nodes,
Expand Down
28 changes: 16 additions & 12 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@ use {
solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, Shredder,
MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_PAYLOAD_SIZE, SIZE_OF_CODING_SHRED_HEADERS,
SIZE_OF_DATA_SHRED_PAYLOAD,
MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, signature::Keypair},
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
test::Bencher,
};

// Copied these values here to avoid exposing shreds
// internals only for the sake of benchmarks.

// size of nonce: 4
// size of common shred header: 83
// size of coding shred header: 6
const VALID_SHRED_DATA_LEN: usize = PACKET_DATA_SIZE - 4 - 83 - 6;

fn make_test_entry(txs_per_entry: u64) -> Entry {
Entry {
num_hashes: 100_000,
Expand Down Expand Up @@ -54,11 +61,10 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {

fn make_concatenated_shreds(num_shreds: usize) -> Vec<u8> {
let data_shreds = make_shreds(num_shreds);
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
let mut data: Vec<u8> = vec![0; num_shreds * valid_shred_data_len];
let mut data: Vec<u8> = vec![0; num_shreds * VALID_SHRED_DATA_LEN];
for (i, shred) in (data_shreds[0..num_shreds]).iter().enumerate() {
data[i * valid_shred_data_len..(i + 1) * valid_shred_data_len]
.copy_from_slice(&shred.payload[..valid_shred_data_len]);
data[i * VALID_SHRED_DATA_LEN..(i + 1) * VALID_SHRED_DATA_LEN]
.copy_from_slice(&shred.payload()[..VALID_SHRED_DATA_LEN]);
}

data
Expand Down Expand Up @@ -120,7 +126,7 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1);

bencher.iter(|| {
let payload = shred.payload.clone();
let payload = shred.payload().clone();
let _ = Shred::new_from_serialized_shred(payload).unwrap();
})
}
Expand Down Expand Up @@ -157,9 +163,8 @@ fn bench_shredder_decoding(bencher: &mut Bencher) {
fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize);
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
bencher.iter(|| {
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
encoder.get_encoded_packets(symbol_count);
})
}
Expand All @@ -168,8 +173,7 @@ fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize);
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
let mut packets = encoder.get_encoded_packets(symbol_count as u32);
packets.shuffle(&mut rand::thread_rng());

Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ pub fn broadcast_shreds(
update_peer_stats(&cluster_nodes, last_datapoint_submit);
let root_bank = root_bank.clone();
shreds.flat_map(move |shred| {
repeat(&shred.payload).zip(cluster_nodes.get_broadcast_addrs(
repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs(
shred,
&root_bank,
DATA_PLANE_FANOUT,
Expand Down
4 changes: 2 additions & 2 deletions core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,13 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.filter_map(|pubkey| {
let tvu = cluster_info
.lookup_contact_info(pubkey, |contact_info| contact_info.tvu)?;
Some((&shred.payload, tvu))
Some((shred.payload(), tvu))
})
.collect(),
);
}

Some(vec![(&shred.payload, node.tvu)])
Some(vec![(shred.payload(), node.tvu)])
})
.flatten()
.collect();
Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
if fake == (i <= self.partition) {
// Send fake shreds to the first N peers
data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
sock.send_to(b.payload(), &peer.tvu_forwards).unwrap();
});
}
});
Expand Down
2 changes: 1 addition & 1 deletion core/src/packet_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl PacketHasher {
}

pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 {
self.hash_data(&shred.payload)
self.hash_data(shred.payload())
}

fn hash_data(&self, data: &[u8]) -> u64 {
Expand Down
4 changes: 2 additions & 2 deletions core/src/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ mod test {
assert_eq!(shred.slot(), slot);
let keypair = Keypair::new();
shred.sign(&keypair);
trace!("signature {}", shred.common_header.signature);
trace!("signature {}", shred.signature());
let nonce = 9;
let mut packet = repair_response_packet_from_bytes(
shred.payload,
shred.into_payload(),
&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
nonce,
)
Expand Down
37 changes: 13 additions & 24 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3133,10 +3133,7 @@ pub mod tests {
create_new_tmp_ledger,
genesis_utils::{create_genesis_config, create_genesis_config_with_leader},
get_tmp_ledger_path,
shred::{
CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED,
SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD,
},
shred::{Shred, SIZE_OF_DATA_SHRED_PAYLOAD},
},
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
Expand All @@ -3152,7 +3149,6 @@ pub mod tests {
genesis_config,
hash::{hash, Hash},
instruction::InstructionError,
packet::PACKET_DATA_SIZE,
poh_config::PohConfig,
signature::{Keypair, Signer},
system_transaction,
Expand Down Expand Up @@ -3736,26 +3732,19 @@ pub mod tests {
fn test_dead_fork_entry_deserialize_failure() {
// Insert entry that causes deserialization failure
let res = check_dead_fork(|_, bank| {
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
let mut data_header = DataShredHeader::default();
data_header.flags |= DATA_COMPLETE_SHRED;
// Need to provide the right size for Shredder::deshred.
data_header.size = SIZE_OF_DATA_SHRED_PAYLOAD as u16;
data_header.parent_offset = (bank.slot() - bank.parent_slot()) as u16;
let shred_common_header = ShredCommonHeader {
slot: bank.slot(),
..ShredCommonHeader::default()
};
let mut shred = Shred::new_empty_from_header(
shred_common_header,
data_header,
CodingShredHeader::default(),
let gibberish = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let parent_offset = bank.slot() - bank.parent_slot();
let shred = Shred::new_from_data(
bank.slot(),
0, // index,
parent_offset as u16,
Some(&gibberish),
true, // is_last_data
false, // is_last_in_slot
0, // reference_tick
0, // version
0, // fec_set_index
);
bincode::serialize_into(
&mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..],
&gibberish[..SIZE_OF_DATA_SHRED_PAYLOAD],
)
.unwrap();
vec![shred]
});

Expand Down
2 changes: 1 addition & 1 deletion core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ fn retransmit(
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);

let mut retransmit_time = Measure::start("retransmit_to");
let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) {
let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
stats
Expand Down
7 changes: 4 additions & 3 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,11 +1095,12 @@ mod tests {
// Create slots [1, 2] with 1 shred apiece
let (mut shreds, _) = make_many_slot_entries(1, 2, 1);

// Make shred for slot 1 too large
assert_eq!(shreds[0].slot(), 1);
assert_eq!(shreds[0].index(), 0);
shreds[0].payload.push(10);
shreds[0].data_header.size = shreds[0].payload.len() as u16;
// TODO: The test previously relied on corrupting shred payload
// size which we no longer want to expose. Current test no longer
// covers packet size check in repair_response_packet_from_bytes.
shreds.remove(0);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
Expand Down
16 changes: 8 additions & 8 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ pub mod tests {
let keypair = Keypair::new();
shred.sign(&keypair);
batches[0].packets.resize(1, Packet::default());
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();

let mut shred = Shred::new_from_data(
0xc0de_dead,
Expand All @@ -110,8 +110,8 @@ pub mod tests {
);
shred.sign(&keypair);
batches[1].packets.resize(1, Packet::default());
batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[1].packets[0].meta.size = shred.payload.len();
batches[1].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[1].packets[0].meta.size = shred.payload().len();

let expected: HashSet<u64> = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect();
assert_eq!(ShredSigVerifier::read_slots(&batches), expected);
Expand Down Expand Up @@ -143,8 +143,8 @@ pub mod tests {
0xc0de,
);
shred.sign(&leader_keypair);
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();

let mut shred = Shred::new_from_data(
0,
Expand All @@ -159,8 +159,8 @@ pub mod tests {
);
let wrong_keypair = Keypair::new();
shred.sign(&wrong_keypair);
batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[1].meta.size = shred.payload.len();
batches[0].packets[1].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[1].meta.size = shred.payload().len();

let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches);
let rv = verifier.verify_batches(batches, num_packets);
Expand Down
31 changes: 8 additions & 23 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ pub(crate) fn should_retransmit_and_persist(
} else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 {
inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1);
false
} else if shred.data_header.size as usize > shred.payload.len() {
inc_new_counter_warn!("streamer-recv_window-shred_bad_meta_size", 1);
} else if !shred.sanitize() {
inc_new_counter_warn!("streamer-recv_window-invalid-shred", 1);
false
} else {
true
Expand All @@ -215,13 +215,13 @@ fn run_check_duplicate(
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) =
blockstore.is_shred_duplicate(shred.id(), shred.payload.clone())
blockstore.is_shred_duplicate(shred.id(), shred.payload().clone())
{
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload,
shred.payload,
shred.into_payload(),
)?;

duplicate_slot_sender.send(shred_slot)?;
Expand Down Expand Up @@ -717,7 +717,7 @@ mod test {
blockstore::{make_many_slot_entries, Blockstore},
genesis_utils::create_genesis_config_with_leader,
get_tmp_ledger_path,
shred::{DataShredHeader, Shredder},
shred::Shredder,
},
solana_sdk::{
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
Expand Down Expand Up @@ -825,21 +825,9 @@ mod test {
0
));

// with a bad header size
let mut bad_header_shred = shreds[0].clone();
bad_header_shred.data_header.size = (bad_header_shred.payload.len() + 1) as u16;
assert!(!should_retransmit_and_persist(
&bad_header_shred,
Some(bank.clone()),
&cache,
&me_id,
0,
0
));

// with an invalid index, shred gets thrown out
let mut bad_index_shred = shreds[0].clone();
bad_index_shred.common_header.index = (MAX_DATA_SHREDS_PER_SLOT + 1) as u32;
bad_index_shred.set_index((MAX_DATA_SHREDS_PER_SLOT + 1) as u32);
assert!(!should_retransmit_and_persist(
&bad_index_shred,
Some(bank.clone()),
Expand Down Expand Up @@ -875,7 +863,7 @@ mod test {
));

// coding shreds don't contain parent slot information, test that slot >= root
let (common, coding) = Shred::new_coding_shred_header(
let mut coding_shred = Shred::new_empty_coding(
5, // slot
5, // index
5, // fec_set_index
Expand All @@ -884,8 +872,6 @@ mod test {
3, // position
0, // version
);
let mut coding_shred =
Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
coding_shred.sign(&leader_keypair);
// shred.slot() > root, shred continues
assert!(should_retransmit_and_persist(
Expand Down Expand Up @@ -959,7 +945,7 @@ mod test {
std::net::{IpAddr, Ipv4Addr},
};
solana_logger::setup();
let (common, coding) = Shred::new_coding_shred_header(
let shred = Shred::new_empty_coding(
5, // slot
5, // index
5, // fec_set_index
Expand All @@ -968,7 +954,6 @@ mod test {
4, // position
0, // version
);
let shred = Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
let mut shreds = vec![shred.clone(), shred.clone(), shred];
let _from_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let repair_meta = RepairMeta {
Expand Down
7 changes: 5 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3343,9 +3343,12 @@ mod tests {
let keypair = Keypair::new();
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen();
let next_shred_index = rng.gen_range(0, 32_000);
let shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader).payload;
let other_payload = {
let other_shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
other_shred.into_payload()
};
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
Expand Down
Loading

0 comments on commit cc4d9af

Please sign in to comment.