Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
hides implementation details of shred from its public interface
Browse files Browse the repository at this point in the history
Working towards embedding versioning into shreds binary, so that a new
variant of shred struct can include merkle tree hashes of the erasure
set.
  • Loading branch information
behzadnouri committed Apr 22, 2022
1 parent 2d4defa commit 85b56ad
Show file tree
Hide file tree
Showing 19 changed files with 339 additions and 375 deletions.
4 changes: 2 additions & 2 deletions core/benches/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn get_retransmit_peers_deterministic(
num_simulated_shreds: usize,
) {
for i in 0..num_simulated_shreds {
shred.common_header.index = i as u32;
shred.set_index(i as u32);
let (_neighbors, _children) = cluster_nodes.get_retransmit_peers(
*slot_leader,
shred,
Expand All @@ -55,7 +55,7 @@ fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: O
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
let slot = rand::random::<u64>();
let mut shred = Shred::new_empty_data_shred();
shred.common_header.slot = slot;
shred.set_slot(slot);
b.iter(|| {
get_retransmit_peers_deterministic(
&cluster_nodes,
Expand Down
4 changes: 2 additions & 2 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ fn make_concatenated_shreds(num_shreds: usize) -> Vec<u8> {
let mut data: Vec<u8> = vec![0; num_shreds * valid_shred_data_len];
for (i, shred) in (data_shreds[0..num_shreds]).iter().enumerate() {
data[i * valid_shred_data_len..(i + 1) * valid_shred_data_len]
.copy_from_slice(&shred.payload[..valid_shred_data_len]);
.copy_from_slice(&shred.payload()[..valid_shred_data_len]);
}

data
Expand Down Expand Up @@ -120,7 +120,7 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1);

bencher.iter(|| {
let payload = shred.payload.clone();
let payload = shred.payload().clone();
let _ = Shred::new_from_serialized_shred(payload).unwrap();
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ pub fn broadcast_shreds(
update_peer_stats(&cluster_nodes, last_datapoint_submit);
let root_bank = root_bank.clone();
shreds.flat_map(move |shred| {
repeat(&shred.payload).zip(cluster_nodes.get_broadcast_addrs(
repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs(
shred,
&root_bank,
DATA_PLANE_FANOUT,
Expand Down
4 changes: 2 additions & 2 deletions core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,13 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.filter_map(|pubkey| {
let tvu = cluster_info
.lookup_contact_info(pubkey, |contact_info| contact_info.tvu)?;
Some((&shred.payload, tvu))
Some((shred.payload(), tvu))
})
.collect(),
);
}

Some(vec![(&shred.payload, node.tvu)])
Some(vec![(shred.payload(), node.tvu)])
})
.flatten()
.collect();
Expand Down
2 changes: 1 addition & 1 deletion core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
if fake == (i <= self.partition) {
// Send fake shreds to the first N peers
data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
sock.send_to(b.payload(), &peer.tvu_forwards).unwrap();
});
}
});
Expand Down
2 changes: 1 addition & 1 deletion core/src/packet_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl PacketHasher {
}

pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 {
self.hash_data(&shred.payload)
self.hash_data(shred.payload())
}

fn hash_data(&self, data: &[u8]) -> u64 {
Expand Down
4 changes: 2 additions & 2 deletions core/src/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ mod test {
assert_eq!(shred.slot(), slot);
let keypair = Keypair::new();
shred.sign(&keypair);
trace!("signature {}", shred.common_header.signature);
trace!("signature {}", shred.signature());
let nonce = 9;
let mut packet = repair_response_packet_from_bytes(
shred.payload,
shred.into_payload(),
&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
nonce,
)
Expand Down
37 changes: 13 additions & 24 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3133,10 +3133,7 @@ pub mod tests {
create_new_tmp_ledger,
genesis_utils::{create_genesis_config, create_genesis_config_with_leader},
get_tmp_ledger_path,
shred::{
CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED,
SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD,
},
shred::{Shred, SIZE_OF_DATA_SHRED_PAYLOAD},
},
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
Expand All @@ -3152,7 +3149,6 @@ pub mod tests {
genesis_config,
hash::{hash, Hash},
instruction::InstructionError,
packet::PACKET_DATA_SIZE,
poh_config::PohConfig,
signature::{Keypair, Signer},
system_transaction,
Expand Down Expand Up @@ -3736,26 +3732,19 @@ pub mod tests {
fn test_dead_fork_entry_deserialize_failure() {
// Insert entry that causes deserialization failure
let res = check_dead_fork(|_, bank| {
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
let mut data_header = DataShredHeader::default();
data_header.flags |= DATA_COMPLETE_SHRED;
// Need to provide the right size for Shredder::deshred.
data_header.size = SIZE_OF_DATA_SHRED_PAYLOAD as u16;
data_header.parent_offset = (bank.slot() - bank.parent_slot()) as u16;
let shred_common_header = ShredCommonHeader {
slot: bank.slot(),
..ShredCommonHeader::default()
};
let mut shred = Shred::new_empty_from_header(
shred_common_header,
data_header,
CodingShredHeader::default(),
let gibberish = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let parent_offset = bank.slot() - bank.parent_slot();
let shred = Shred::new_from_data(
bank.slot(),
0, // index,
parent_offset as u16,
Some(&gibberish),
true, // is_last_data
false, // is_last_in_slot
0, // reference_tick
0, // version
0, // fec_set_index
);
bincode::serialize_into(
&mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..],
&gibberish[..SIZE_OF_DATA_SHRED_PAYLOAD],
)
.unwrap();
vec![shred]
});

Expand Down
2 changes: 1 addition & 1 deletion core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ fn retransmit(
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);

let mut retransmit_time = Measure::start("retransmit_to");
let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) {
let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
stats
Expand Down
7 changes: 4 additions & 3 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,11 +1095,12 @@ mod tests {
// Create slots [1, 2] with 1 shred apiece
let (mut shreds, _) = make_many_slot_entries(1, 2, 1);

// Make shred for slot 1 too large
assert_eq!(shreds[0].slot(), 1);
assert_eq!(shreds[0].index(), 0);
shreds[0].payload.push(10);
shreds[0].data_header.size = shreds[0].payload.len() as u16;
// TODO: The test previously relied on corrupting shred payload
// size which we no longer want to expose. Current test no longer
// covers packet size check in repair_response_packet_from_bytes.
shreds.remove(0);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
Expand Down
16 changes: 8 additions & 8 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ pub mod tests {
let keypair = Keypair::new();
shred.sign(&keypair);
batches[0].packets.resize(1, Packet::default());
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();

let mut shred = Shred::new_from_data(
0xc0de_dead,
Expand All @@ -110,8 +110,8 @@ pub mod tests {
);
shred.sign(&keypair);
batches[1].packets.resize(1, Packet::default());
batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[1].packets[0].meta.size = shred.payload.len();
batches[1].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[1].packets[0].meta.size = shred.payload().len();

let expected: HashSet<u64> = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect();
assert_eq!(ShredSigVerifier::read_slots(&batches), expected);
Expand Down Expand Up @@ -143,8 +143,8 @@ pub mod tests {
0xc0de,
);
shred.sign(&leader_keypair);
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();

let mut shred = Shred::new_from_data(
0,
Expand All @@ -159,8 +159,8 @@ pub mod tests {
);
let wrong_keypair = Keypair::new();
shred.sign(&wrong_keypair);
batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[1].meta.size = shred.payload.len();
batches[0].packets[1].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[1].meta.size = shred.payload().len();

let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches);
let rv = verifier.verify_batches(batches, num_packets);
Expand Down
31 changes: 8 additions & 23 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ pub(crate) fn should_retransmit_and_persist(
} else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 {
inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1);
false
} else if shred.data_header.size as usize > shred.payload.len() {
inc_new_counter_warn!("streamer-recv_window-shred_bad_meta_size", 1);
} else if !shred.sanitize() {
inc_new_counter_warn!("streamer-recv_window-invalid-shred", 1);
false
} else {
true
Expand All @@ -215,13 +215,13 @@ fn run_check_duplicate(
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) =
blockstore.is_shred_duplicate(shred.id(), shred.payload.clone())
blockstore.is_shred_duplicate(shred.id(), shred.payload().clone())
{
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload,
shred.payload,
shred.into_payload(),
)?;

duplicate_slot_sender.send(shred_slot)?;
Expand Down Expand Up @@ -717,7 +717,7 @@ mod test {
blockstore::{make_many_slot_entries, Blockstore},
genesis_utils::create_genesis_config_with_leader,
get_tmp_ledger_path,
shred::{DataShredHeader, Shredder},
shred::Shredder,
},
solana_sdk::{
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
Expand Down Expand Up @@ -825,21 +825,9 @@ mod test {
0
));

// with a bad header size
let mut bad_header_shred = shreds[0].clone();
bad_header_shred.data_header.size = (bad_header_shred.payload.len() + 1) as u16;
assert!(!should_retransmit_and_persist(
&bad_header_shred,
Some(bank.clone()),
&cache,
&me_id,
0,
0
));

// with an invalid index, shred gets thrown out
let mut bad_index_shred = shreds[0].clone();
bad_index_shred.common_header.index = (MAX_DATA_SHREDS_PER_SLOT + 1) as u32;
bad_index_shred.set_index((MAX_DATA_SHREDS_PER_SLOT + 1) as u32);
assert!(!should_retransmit_and_persist(
&bad_index_shred,
Some(bank.clone()),
Expand Down Expand Up @@ -875,7 +863,7 @@ mod test {
));

// coding shreds don't contain parent slot information, test that slot >= root
let (common, coding) = Shred::new_coding_shred_header(
let mut coding_shred = Shred::new_empty_coding(
5, // slot
5, // index
5, // fec_set_index
Expand All @@ -884,8 +872,6 @@ mod test {
3, // position
0, // version
);
let mut coding_shred =
Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
coding_shred.sign(&leader_keypair);
// shred.slot() > root, shred continues
assert!(should_retransmit_and_persist(
Expand Down Expand Up @@ -959,7 +945,7 @@ mod test {
std::net::{IpAddr, Ipv4Addr},
};
solana_logger::setup();
let (common, coding) = Shred::new_coding_shred_header(
let shred = Shred::new_empty_coding(
5, // slot
5, // index
5, // fec_set_index
Expand All @@ -968,7 +954,6 @@ mod test {
4, // position
0, // version
);
let shred = Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
let mut shreds = vec![shred.clone(), shred.clone(), shred];
let _from_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let repair_meta = RepairMeta {
Expand Down
7 changes: 5 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3343,9 +3343,12 @@ mod tests {
let keypair = Keypair::new();
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen();
let next_shred_index = rng.gen_range(0, 32_000);
let shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader).payload;
let other_payload = {
let other_shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
other_shred.into_payload()
};
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
Expand Down
12 changes: 6 additions & 6 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ fn check_shreds(
Err(Error::ShredIndexMismatch)
} else if shred1.shred_type() != shred2.shred_type() {
Err(Error::ShredTypeMismatch)
} else if shred1.payload == shred2.payload {
} else if shred1.payload() == shred2.payload() {
Err(Error::InvalidDuplicateShreds)
} else {
if let Some(leader_schedule) = leader_schedule {
Expand Down Expand Up @@ -152,14 +152,14 @@ pub(crate) fn from_shred(
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
if shred.payload == other_payload {
if shred.payload() == &other_payload {
return Err(Error::InvalidDuplicateShreds);
}
let other_shred = Shred::new_from_serialized_shred(other_payload.clone())?;
check_shreds(leader_schedule, &shred, &other_shred)?;
let (slot, shred_index, shred_type) = (shred.slot(), shred.index(), shred.shred_type());
let proof = DuplicateSlotProof {
shred1: shred.payload,
shred1: shred.into_payload(),
shred2: other_payload,
};
let data = bincode::serialize(&proof)?;
Expand Down Expand Up @@ -259,7 +259,7 @@ pub fn into_shreds(
Err(Error::ShredIndexMismatch)
} else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type {
Err(Error::ShredTypeMismatch)
} else if shred1.payload == shred2.payload {
} else if shred1.payload() == shred2.payload() {
Err(Error::InvalidDuplicateShreds)
} else if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
Err(Error::InvalidSignature)
Expand Down Expand Up @@ -352,7 +352,7 @@ pub(crate) mod tests {
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen();
let next_shred_index = rng.gen_range(0, 32_000);
let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let leader_schedule = |s| {
Expand All @@ -365,7 +365,7 @@ pub(crate) mod tests {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload.clone(),
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
Expand Down
Loading

0 comments on commit 85b56ad

Please sign in to comment.