diff --git a/core/benches/cluster_nodes.rs b/core/benches/cluster_nodes.rs index c2302fcabcd115..9b63a40c672c75 100644 --- a/core/benches/cluster_nodes.rs +++ b/core/benches/cluster_nodes.rs @@ -37,7 +37,7 @@ fn get_retransmit_peers_deterministic( num_simulated_shreds: usize, ) { for i in 0..num_simulated_shreds { - shred.common_header.index = i as u32; + shred.set_index(i as u32); let (_neighbors, _children) = cluster_nodes.get_retransmit_peers( *slot_leader, shred, @@ -55,7 +55,7 @@ fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: O let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; let slot = rand::random::(); let mut shred = Shred::new_empty_data_shred(); - shred.common_header.slot = slot; + shred.set_slot(slot); b.iter(|| { get_retransmit_peers_deterministic( &cluster_nodes, diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 565f8ced2dda3b..039bc5b55dae46 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -58,7 +58,7 @@ fn make_concatenated_shreds(num_shreds: usize) -> Vec { let mut data: Vec = vec![0; num_shreds * valid_shred_data_len]; for (i, shred) in (data_shreds[0..num_shreds]).iter().enumerate() { data[i * valid_shred_data_len..(i + 1) * valid_shred_data_len] - .copy_from_slice(&shred.payload[..valid_shred_data_len]); + .copy_from_slice(&shred.payload()[..valid_shred_data_len]); } data @@ -120,7 +120,7 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) { let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1); bencher.iter(|| { - let payload = shred.payload.clone(); + let payload = shred.payload().clone(); let _ = Shred::new_from_serialized_shred(payload).unwrap(); }) } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 84cdb9fa22000d..11e7bf979afea2 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -424,7 +424,7 @@ pub fn broadcast_shreds( update_peer_stats(&cluster_nodes, last_datapoint_submit); let root_bank = root_bank.clone(); shreds.flat_map(move |shred| { - repeat(&shred.payload).zip(cluster_nodes.get_broadcast_addrs( + repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs( shred, &root_bank, DATA_PLANE_FANOUT, diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index f50943c24e5780..e4ba405193775d 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -325,13 +325,13 @@ impl BroadcastRun for BroadcastDuplicatesRun { .filter_map(|pubkey| { let tvu = cluster_info .lookup_contact_info(pubkey, |contact_info| contact_info.tvu)?; - Some((&shred.payload, tvu)) + Some((shred.payload(), tvu)) }) .collect(), ); } - Some(vec![(&shred.payload, node.tvu)]) + Some(vec![(shred.payload(), node.tvu)]) }) .flatten() .collect(); diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index a0bf77153a1cd0..90f404933c65a6 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -132,7 +132,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { if fake == (i <= self.partition) { // Send fake shreds to the first N peers data_shreds.iter().for_each(|b| { - sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); + sock.send_to(b.payload(), &peer.tvu_forwards).unwrap(); }); } }); diff --git a/core/src/packet_hasher.rs b/core/src/packet_hasher.rs index 575c9733fdb8f5..51746445bdfb7e 100644 --- a/core/src/packet_hasher.rs +++ b/core/src/packet_hasher.rs @@ -31,7 +31,7 @@ impl PacketHasher { } pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 { - self.hash_data(&shred.payload) + self.hash_data(shred.payload()) } fn hash_data(&self, data: &[u8]) -> u64 { diff --git a/core/src/repair_response.rs b/core/src/repair_response.rs index 201efc455404be..b2235c087016d8 100644 --- a/core/src/repair_response.rs +++ b/core/src/repair_response.rs @@ -79,10 +79,10 @@ mod test { assert_eq!(shred.slot(), slot); let keypair = Keypair::new(); shred.sign(&keypair); - trace!("signature {}", shred.common_header.signature); + trace!("signature {}", shred.signature()); let nonce = 9; let mut packet = repair_response_packet_from_bytes( - shred.payload, + shred.into_payload(), &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080), nonce, ) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index dcb85ab6a0bcc7..ad0fcdf28ae25c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3133,10 +3133,7 @@ pub mod tests { create_new_tmp_ledger, genesis_utils::{create_genesis_config, create_genesis_config_with_leader}, get_tmp_ledger_path, - shred::{ - CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED, - SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD, - }, + shred::{Shred, SIZE_OF_DATA_SHRED_PAYLOAD}, }, solana_rpc::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, @@ -3152,7 +3149,6 @@ pub mod tests { genesis_config, hash::{hash, Hash}, instruction::InstructionError, - packet::PACKET_DATA_SIZE, poh_config::PohConfig, signature::{Keypair, Signer}, system_transaction, @@ -3736,26 +3732,19 @@ pub mod tests { fn test_dead_fork_entry_deserialize_failure() { // Insert entry that causes deserialization failure let res = check_dead_fork(|_, bank| { - let gibberish = [0xa5u8; PACKET_DATA_SIZE]; - let mut data_header = DataShredHeader::default(); - data_header.flags |= DATA_COMPLETE_SHRED; - // Need to provide the right size for Shredder::deshred. - data_header.size = SIZE_OF_DATA_SHRED_PAYLOAD as u16; - data_header.parent_offset = (bank.slot() - bank.parent_slot()) as u16; - let shred_common_header = ShredCommonHeader { - slot: bank.slot(), - ..ShredCommonHeader::default() - }; - let mut shred = Shred::new_empty_from_header( - shred_common_header, - data_header, - CodingShredHeader::default(), + let gibberish = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD]; + let parent_offset = bank.slot() - bank.parent_slot(); + let shred = Shred::new_from_data( + bank.slot(), + 0, // index, + parent_offset as u16, + Some(&gibberish), + true, // is_last_data + false, // is_last_in_slot + 0, // reference_tick + 0, // version + 0, // fec_set_index ); - bincode::serialize_into( - &mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..], - &gibberish[..SIZE_OF_DATA_SHRED_PAYLOAD], - ) - .unwrap(); vec![shred] }); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index e89fe32bb03e9b..b3b51df34bff4c 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -295,7 +295,7 @@ fn retransmit( .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); let mut retransmit_time = Measure::start("retransmit_to"); - let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) { + let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) { Ok(()) => addrs.len(), Err(SendPktsError::IoError(ioerr, num_failed)) => { stats diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 137cdbaa0ca4cd..6227d142bd4bef 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -1095,11 +1095,12 @@ mod tests { // Create slots [1, 2] with 1 shred apiece let (mut shreds, _) = make_many_slot_entries(1, 2, 1); - // Make shred for slot 1 too large assert_eq!(shreds[0].slot(), 1); assert_eq!(shreds[0].index(), 0); - shreds[0].payload.push(10); - shreds[0].data_header.size = shreds[0].payload.len() as u16; + // TODO: The test previously relied on corrupting shred payload + // size which we no longer want to expose. Current test no longer + // covers packet size check in repair_response_packet_from_bytes. + shreds.remove(0); blockstore .insert_shreds(shreds, None, false) .expect("Expect successful ledger write"); diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 4ac60a29712ca5..2175fe5113c696 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -94,8 +94,8 @@ pub mod tests { let keypair = Keypair::new(); shred.sign(&keypair); batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let mut shred = Shred::new_from_data( 0xc0de_dead, @@ -110,8 +110,8 @@ pub mod tests { ); shred.sign(&keypair); batches[1].packets.resize(1, Packet::default()); - batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[1].packets[0].meta.size = shred.payload.len(); + batches[1].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[1].packets[0].meta.size = shred.payload().len(); let expected: HashSet = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect(); assert_eq!(ShredSigVerifier::read_slots(&batches), expected); @@ -143,8 +143,8 @@ pub mod tests { 0xc0de, ); shred.sign(&leader_keypair); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let mut shred = Shred::new_from_data( 0, @@ -159,8 +159,8 @@ pub mod tests { ); let wrong_keypair = Keypair::new(); shred.sign(&wrong_keypair); - batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[1].meta.size = shred.payload.len(); + batches[0].packets[1].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[1].meta.size = shred.payload().len(); let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches); let rv = verifier.verify_batches(batches, num_packets); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index f5eaa11864308c..5b12d6f44186d3 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -193,8 +193,8 @@ pub(crate) fn should_retransmit_and_persist( } else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 { inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1); false - } else if shred.data_header.size as usize > shred.payload.len() { - inc_new_counter_warn!("streamer-recv_window-shred_bad_meta_size", 1); + } else if !shred.sanitize() { + inc_new_counter_warn!("streamer-recv_window-invalid-shred", 1); false } else { true @@ -215,13 +215,13 @@ fn run_check_duplicate( let shred_slot = shred.slot(); if !blockstore.has_duplicate_shreds_in_slot(shred_slot) { if let Some(existing_shred_payload) = - blockstore.is_shred_duplicate(shred.id(), shred.payload.clone()) + blockstore.is_shred_duplicate(shred.id(), shred.payload().clone()) { cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?; blockstore.store_duplicate_slot( shred_slot, existing_shred_payload, - shred.payload, + shred.into_payload(), )?; duplicate_slot_sender.send(shred_slot)?; @@ -717,7 +717,7 @@ mod test { blockstore::{make_many_slot_entries, Blockstore}, genesis_utils::create_genesis_config_with_leader, get_tmp_ledger_path, - shred::{DataShredHeader, Shredder}, + shred::Shredder, }, solana_sdk::{ epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, @@ -825,21 +825,9 @@ mod test { 0 )); - // with a bad header size - let mut bad_header_shred = shreds[0].clone(); - bad_header_shred.data_header.size = (bad_header_shred.payload.len() + 1) as u16; - assert!(!should_retransmit_and_persist( - &bad_header_shred, - Some(bank.clone()), - &cache, - &me_id, - 0, - 0 - )); - // with an invalid index, shred gets thrown out let mut bad_index_shred = shreds[0].clone(); - bad_index_shred.common_header.index = (MAX_DATA_SHREDS_PER_SLOT + 1) as u32; + bad_index_shred.set_index((MAX_DATA_SHREDS_PER_SLOT + 1) as u32); assert!(!should_retransmit_and_persist( &bad_index_shred, Some(bank.clone()), @@ -875,7 +863,7 @@ mod test { )); // coding shreds don't contain parent slot information, test that slot >= root - let (common, coding) = Shred::new_coding_shred_header( + let mut coding_shred = Shred::new_empty_coding( 5, // slot 5, // index 5, // fec_set_index @@ -884,8 +872,6 @@ mod test { 3, // position 0, // version ); - let mut coding_shred = - Shred::new_empty_from_header(common, DataShredHeader::default(), coding); coding_shred.sign(&leader_keypair); // shred.slot() > root, shred continues assert!(should_retransmit_and_persist( @@ -959,7 +945,7 @@ mod test { std::net::{IpAddr, Ipv4Addr}, }; solana_logger::setup(); - let (common, coding) = Shred::new_coding_shred_header( + let shred = Shred::new_empty_coding( 5, // slot 5, // index 5, // fec_set_index @@ -968,7 +954,6 @@ mod test { 4, // position 0, // version ); - let shred = Shred::new_empty_from_header(common, DataShredHeader::default(), coding); let mut shreds = vec![shred.clone(), shred.clone(), shred]; let _from_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let repair_meta = RepairMeta { diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 426829e89d3e2b..e70d1b6d3446a9 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -3343,9 +3343,12 @@ mod tests { let keypair = Keypair::new(); let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); - let next_shred_index = rng.gen(); + let next_shred_index = rng.gen_range(0, 32_000); let shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); - let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader).payload; + let other_payload = { + let other_shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); + other_shred.into_payload() + }; let leader_schedule = |s| { if s == slot { Some(leader.pubkey()) diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 9a8d0437dcd56f..33c81f5fb677af 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -91,7 +91,7 @@ fn check_shreds( Err(Error::ShredIndexMismatch) } else if shred1.shred_type() != shred2.shred_type() { Err(Error::ShredTypeMismatch) - } else if shred1.payload == shred2.payload { + } else if shred1.payload() == shred2.payload() { Err(Error::InvalidDuplicateShreds) } else { if let Some(leader_schedule) = leader_schedule { @@ -152,14 +152,14 @@ pub(crate) fn from_shred( wallclock: u64, max_size: usize, // Maximum serialized size of each DuplicateShred. ) -> Result, Error> { - if shred.payload == other_payload { + if shred.payload() == &other_payload { return Err(Error::InvalidDuplicateShreds); } let other_shred = Shred::new_from_serialized_shred(other_payload.clone())?; check_shreds(leader_schedule, &shred, &other_shred)?; let (slot, shred_index, shred_type) = (shred.slot(), shred.index(), shred.shred_type()); let proof = DuplicateSlotProof { - shred1: shred.payload, + shred1: shred.into_payload(), shred2: other_payload, }; let data = bincode::serialize(&proof)?; @@ -259,7 +259,7 @@ pub fn into_shreds( Err(Error::ShredIndexMismatch) } else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type { Err(Error::ShredTypeMismatch) - } else if shred1.payload == shred2.payload { + } else if shred1.payload() == shred2.payload() { Err(Error::InvalidDuplicateShreds) } else if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) { Err(Error::InvalidSignature) @@ -352,7 +352,7 @@ pub(crate) mod tests { let leader = Arc::new(Keypair::new()); let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); - let next_shred_index = rng.gen(); + let next_shred_index = rng.gen_range(0, 32_000); let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); let leader_schedule = |s| { @@ -365,7 +365,7 @@ pub(crate) mod tests { let chunks: Vec<_> = from_shred( shred1.clone(), Pubkey::new_unique(), // self_pubkey - shred2.payload.clone(), + shred2.payload().clone(), Some(leader_schedule), rng.gen(), // wallclock 512, // max_size diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index ff4194e9078e6e..a654a46145de0d 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1120,12 +1120,6 @@ impl Blockstore { ) } - fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool { - shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds - || shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds - || shred1.first_coding_index() != shred2.first_coding_index() - } - #[allow(clippy::too_many_arguments)] fn check_insert_coding_shred( &self, @@ -1186,7 +1180,11 @@ impl Blockstore { ); if let Some(conflicting_shred) = conflicting_shred { if self - .store_duplicate_if_not_existing(slot, conflicting_shred, shred.payload.clone()) + .store_duplicate_if_not_existing( + slot, + conflicting_shred, + shred.payload().clone(), + ) .is_err() { warn!("bad duplicate store.."); @@ -1198,10 +1196,15 @@ impl Blockstore { // ToDo: This is a potential slashing condition warn!("Received multiple erasure configs for the same erasure set!!!"); warn!( - "Slot: {}, shred index: {}, erasure_set: {:?}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}", - slot, shred.index(), erasure_set, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header, + "Slot: {}, shred index: {}, erasure_set: {:?}, \ + is_duplicate: {}, stored config: {:#?}, new shred: {:#?}", + slot, + shred.index(), + erasure_set, + self.has_duplicate_shreds_in_slot(slot), + erasure_meta.config(), + shred, ); - return false; } @@ -1239,15 +1242,15 @@ impl Blockstore { let maybe_shred = self.get_coding_shred(slot, coding_index); if let Ok(Some(shred_data)) = maybe_shred { let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap(); - if Self::erasure_mismatch(&potential_shred, shred) { - return Some(potential_shred.payload); + if shred.erasure_mismatch(&potential_shred).unwrap_or_default() { + return Some(potential_shred.into_payload()); } } else if let Some(potential_shred) = { let key = ShredId::new(slot, u32::try_from(coding_index).unwrap(), ShredType::Code); just_received_shreds.get(&key) } { - if Self::erasure_mismatch(potential_shred, shred) { - return Some(potential_shred.payload.clone()); + if shred.erasure_mismatch(potential_shred).unwrap_or_default() { + return Some(potential_shred.payload().clone()); } } } @@ -1397,7 +1400,7 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, shred_index), &shred.payload)?; + write_batch.put_bytes::((slot, shred_index), shred.payload())?; index_meta.coding_mut().insert(shred_index); Ok(()) @@ -1417,7 +1420,7 @@ impl Blockstore { ) -> Cow<'a, Vec> { let key = ShredId::new(slot, u32::try_from(index).unwrap(), ShredType::Data); if let Some(shred) = just_inserted_shreds.get(&key) { - Cow::Borrowed(&shred.payload) + Cow::Borrowed(shred.payload()) } else { // If it doesn't exist in the just inserted set, it must exist in // the backing store @@ -1442,25 +1445,7 @@ impl Blockstore { } else { false }; - - if shred.data_header.size == 0 { - let leader_pubkey = leader_schedule - .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); - - datapoint_error!( - "blockstore_error", - ( - "error", - format!( - "Leader {:?}, slot {}: received index {} is empty", - leader_pubkey, slot, shred_index, - ), - String - ) - ); - return false; - } - if shred.payload.len() > SHRED_PAYLOAD_SIZE { + if !shred.sanitize() { let leader_pubkey = leader_schedule .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); @@ -1469,15 +1454,14 @@ impl Blockstore { ( "error", format!( - "Leader {:?}, slot {}: received index {} shred.payload.len() > SHRED_PAYLOAD_SIZE", - leader_pubkey, slot, shred_index, + "Leader {:?}, slot {}: received invalid shred", + leader_pubkey, slot, ), String ) ); return false; } - // Check that we do not receive shred_index >= than the last_index // for the slot let last_index = slot_meta.last_index; @@ -1495,7 +1479,7 @@ impl Blockstore { .store_duplicate_if_not_existing( slot, ending_shred.into_owned(), - shred.payload.clone(), + shred.payload().clone(), ) .is_err() { @@ -1531,7 +1515,7 @@ impl Blockstore { .store_duplicate_if_not_existing( slot, ending_shred.into_owned(), - shred.payload.clone(), + shred.payload().clone(), ) .is_err() { @@ -1616,12 +1600,7 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::( - (slot, index), - // Payload will be padded out to SHRED_PAYLOAD_SIZE - // But only need to store the bytes within data_header.size - &shred.payload[..shred.data_header.size as usize], - )?; + write_batch.put_bytes::((slot, index), shred.bytes_to_store())?; data_index.insert(index); let newly_completed_data_sets = update_slot_meta( last_in_slot, @@ -3132,7 +3111,7 @@ impl Blockstore { let size = payload.len().max(SHRED_PAYLOAD_SIZE); payload.resize(size, 0u8); let new_shred = Shred::new_from_serialized_shred(payload).unwrap(); - (existing_shred != new_shred.payload).then(|| existing_shred) + (existing_shred != *new_shred.payload()).then(|| existing_shred) } pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool { @@ -4294,7 +4273,7 @@ pub mod tests { blockstore_db::BlockstoreRocksFifoOptions, genesis_utils::{create_genesis_config, GenesisConfigInfo}, leader_schedule::{FixedSchedule, LeaderSchedule}, - shred::{max_ticks_per_n_shreds, DataShredHeader}, + shred::max_ticks_per_n_shreds, }, assert_matches::assert_matches, bincode::serialize, @@ -4577,7 +4556,7 @@ pub mod tests { let slot = 0; let (shreds, _) = make_slot_entries(slot, 0, 100); let num_shreds = shreds.len() as u64; - let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.payload.clone()).collect(); + let shred_bufs: Vec<_> = shreds.iter().map(Shred::payload).cloned().collect(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -5846,31 +5825,23 @@ pub mod tests { .unwrap(); let slot_meta = blockstore.meta(0).unwrap().unwrap(); - // Corrupt shred by making it too large - let mut shred5 = shreds[5].clone(); - shred5.payload.push(10); - shred5.data_header.size = shred5.payload.len() as u16; - assert!(!blockstore.should_insert_data_shred( - &shred5, - &slot_meta, - &HashMap::new(), - &last_root, - None, - ShredSource::Turbine - )); + let shred5 = shreds[5].clone(); // Ensure that an empty shred (one with no data) would get inserted. Such shreds // may be used as signals (broadcast does so to indicate a slot was interrupted) // Reuse shred5's header values to avoid a false negative result - let mut empty_shred = Shred::new_from_data( - shred5.common_header.slot, - shred5.common_header.index, - shred5.data_header.parent_offset, + let empty_shred = Shred::new_from_data( + shred5.slot(), + shred5.index(), + { + let parent_offset = shred5.slot() - shred5.parent().unwrap(); + parent_offset as u16 + }, None, // data true, // is_last_data true, // is_last_in_slot 0, // reference_tick - shred5.common_header.version, + shred5.version(), shred5.fec_set_index(), ); assert!(blockstore.should_insert_data_shred( @@ -5881,16 +5852,6 @@ pub mod tests { None, ShredSource::Repaired, )); - empty_shred.data_header.size = 0; - assert!(!blockstore.should_insert_data_shred( - &empty_shred, - &slot_meta, - &HashMap::new(), - &last_root, - None, - ShredSource::Recovered, - )); - // Trying to insert another "is_last" shred with index < the received index should fail // skip over shred 7 blockstore @@ -5977,7 +5938,7 @@ pub mod tests { let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let slot = 1; - let (shred, coding) = Shred::new_coding_shred_header( + let coding_shred = Shred::new_empty_coding( slot, 11, // index 11, // fec_set_index 11, // num_data_shreds @@ -5985,7 +5946,6 @@ pub mod tests { 8, // position 0, // version ); - let coding_shred = Shred::new_empty_from_header(shred, DataShredHeader::default(), coding); let mut erasure_metas = HashMap::new(); let mut index_working_set = HashMap::new(); @@ -6034,7 +5994,7 @@ pub mod tests { let last_root = RwLock::new(0); let slot = 1; - let (mut shred, coding) = Shred::new_coding_shred_header( + let mut coding_shred = Shred::new_empty_coding( slot, 11, // index 11, // fec_set_index 11, // num_data_shreds @@ -6042,8 +6002,6 @@ pub mod tests { 8, // position 0, // version ); - let coding_shred = - Shred::new_empty_from_header(shred.clone(), DataShredHeader::default(), coding.clone()); // Insert a good coding shred assert!(Blockstore::should_insert_coding_shred( @@ -6065,28 +6023,16 @@ pub mod tests { )); } - shred.index += 1; - // Establish a baseline that works - { - let coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); - assert!(Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - } + coding_shred.set_index(coding_shred.index() + 1); + assert!(Blockstore::should_insert_coding_shred( + &coding_shred, + &last_root + )); // Trying to insert a shred with index < position should fail { - let mut coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); + let mut coding_shred = coding_shred.clone(); let index = coding_shred.index() - coding_shred.fec_set_index() - 1; coding_shred.set_index(index as u32); @@ -6096,76 +6042,9 @@ pub mod tests { )); } - // Trying to insert shred with num_coding == 0 should fail - { - let mut coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); - coding_shred.coding_header.num_coding_shreds = 0; - assert!(!Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - } - - // Trying to insert shred with pos >= num_coding should fail - { - let mut coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); - let num_coding_shreds = coding_shred.index() - coding_shred.fec_set_index(); - coding_shred.coding_header.num_coding_shreds = num_coding_shreds as u16; - assert!(!Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - } - - // Trying to insert with set_index with num_coding that would imply the last shred - // has index > u32::MAX should fail - { - let mut coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); - coding_shred.common_header.fec_set_index = std::u32::MAX - 1; - coding_shred.coding_header.num_data_shreds = 2; - coding_shred.coding_header.num_coding_shreds = 4; - coding_shred.coding_header.position = 1; - coding_shred.common_header.index = std::u32::MAX - 1; - assert!(!Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - - coding_shred.coding_header.num_coding_shreds = 2000; - assert!(!Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - - // Decreasing the number of num_coding_shreds will put it within the allowed limit - coding_shred.coding_header.num_coding_shreds = 2; - assert!(Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - - // Insertion should succeed - blockstore - .insert_shreds(vec![coding_shred], None, false) - .unwrap(); - } - // Trying to insert value into slot <= than last root should fail { - let mut coding_shred = - Shred::new_empty_from_header(shred, DataShredHeader::default(), coding); + let mut coding_shred = coding_shred.clone(); coding_shred.set_slot(*last_root.read().unwrap()); assert!(!Blockstore::should_insert_coding_shred( &coding_shred, @@ -8350,10 +8229,7 @@ pub mod tests { blockstore .insert_shreds(coding_shreds, Some(&leader_schedule_cache), false) .unwrap(); - let shred_bufs: Vec<_> = data_shreds - .iter() - .map(|shred| shred.payload.clone()) - .collect(); + let shred_bufs: Vec<_> = data_shreds.iter().map(Shred::payload).cloned().collect(); // Check all the data shreds were recovered for (s, buf) in data_shreds.iter().zip(shred_bufs) { @@ -8606,20 +8482,24 @@ pub mod tests { assert_eq!( blockstore.is_shred_duplicate( ShredId::new(slot, /*index:*/ 0, duplicate_shred.shred_type()), - duplicate_shred.payload.clone(), + duplicate_shred.payload().clone(), ), - Some(shred.payload.to_vec()) + Some(shred.payload().clone()) ); assert!(blockstore .is_shred_duplicate( ShredId::new(slot, /*index:*/ 0, non_duplicate_shred.shred_type()), - non_duplicate_shred.payload, + non_duplicate_shred.into_payload(), ) .is_none()); // Store a duplicate shred blockstore - .store_duplicate_slot(slot, shred.payload.clone(), duplicate_shred.payload.clone()) + .store_duplicate_slot( + slot, + shred.payload().clone(), + duplicate_shred.payload().clone(), + ) .unwrap(); // Slot is now marked as duplicate @@ -8627,8 +8507,8 @@ pub mod tests { // Check ability to fetch the duplicates let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap(); - assert_eq!(duplicate_proof.shred1, shred.payload); - assert_eq!(duplicate_proof.shred2, duplicate_shred.payload); + assert_eq!(duplicate_proof.shred1, *shred.payload()); + assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload()); } #[test] @@ -8926,30 +8806,6 @@ pub mod tests { assert!(blockstore.has_duplicate_shreds_in_slot(slot)); } - #[test] - fn test_large_num_coding() { - solana_logger::setup(); - let slot = 1; - let (_data_shreds, mut coding_shreds, leader_schedule_cache) = - setup_erasure_shreds(slot, 0, 100); - - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - - coding_shreds[1].coding_header.num_coding_shreds = u16::MAX; - blockstore - .insert_shreds( - vec![coding_shreds[1].clone()], - Some(&leader_schedule_cache), - false, - ) - .unwrap(); - - // Check no coding shreds are inserted - let res = blockstore.get_coding_shreds_for_slot(slot, 0).unwrap(); - assert!(res.is_empty()); - } - #[test] pub fn test_insert_data_shreds_same_slot_last_index() { let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -9074,7 +8930,7 @@ pub mod tests { if i <= smaller_last_shred_index as u64 { assert_eq!( blockstore.get_data_shred(slot, i).unwrap().unwrap(), - shreds[i as usize].payload + *shreds[i as usize].payload() ); } else { assert!(blockstore.get_data_shred(slot, i).unwrap().is_none()); @@ -9087,11 +8943,14 @@ pub mod tests { // Case 2: Inserting a duplicate with an even smaller last shred index should not // mark the slot as dead since the Slotmeta is full. - let mut even_smaller_last_shred_duplicate = shreds[smaller_last_shred_index - 1].clone(); - even_smaller_last_shred_duplicate.set_last_in_slot(); - // Flip a byte to create a duplicate shred - even_smaller_last_shred_duplicate.payload[0] = - std::u8::MAX - even_smaller_last_shred_duplicate.payload[0]; + let even_smaller_last_shred_duplicate = { + let mut payload = shreds[smaller_last_shred_index - 1].payload().clone(); + // Flip a byte to create a duplicate shred + payload[0] = std::u8::MAX - payload[0]; + let mut shred = Shred::new_from_serialized_shred(payload).unwrap(); + shred.set_last_in_slot(); + shred + }; assert!(blockstore .is_shred_duplicate( ShredId::new( @@ -9099,7 +8958,7 @@ pub mod tests { even_smaller_last_shred_duplicate.index(), ShredType::Data ), - even_smaller_last_shred_duplicate.payload.clone(), + even_smaller_last_shred_duplicate.payload().clone(), ) .is_some()); blockstore @@ -9110,7 +8969,7 @@ pub mod tests { if i <= smaller_last_shred_index as u64 { assert_eq!( blockstore.get_data_shred(slot, i).unwrap().unwrap(), - shreds[i as usize].payload + *shreds[i as usize].payload() ); } else { assert!(blockstore.get_data_shred(slot, i).unwrap().is_none()); @@ -9146,7 +9005,7 @@ pub mod tests { .get_data_shred(slot, shred_index) .unwrap() .unwrap(), - shred_to_check.payload + *shred_to_check.payload() ); } else { assert!(blockstore @@ -9177,7 +9036,7 @@ pub mod tests { .get_data_shred(slot, shred_index) .unwrap() .unwrap(), - shred_to_check.payload + *shred_to_check.payload() ); } else { assert!(blockstore diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 7544c7b24ff7cd..41b6ffb179b0e6 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -234,8 +234,8 @@ impl ErasureMeta { ShredType::Data => None, ShredType::Code => { let config = ErasureConfig::new( - usize::from(shred.coding_header.num_data_shreds), - usize::from(shred.coding_header.num_coding_shreds), + usize::from(shred.num_data_shreds().ok()?), + usize::from(shred.num_coding_shreds().ok()?), ); let first_coding_index = u64::from(shred.first_coding_index()?); let erasure_meta = ErasureMeta { diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index c2083a14ca40f1..086b13fa4a47dc 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -68,7 +68,7 @@ use { pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }, - std::{cell::RefCell, mem::size_of}, + std::{cell::RefCell, mem::size_of, ops::RangeInclusive}, thiserror::Error, }; @@ -79,11 +79,11 @@ pub type Nonce = u32; /// Constants are used over lazy_static for performance reasons. pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; pub const SIZE_OF_DATA_SHRED_HEADER: usize = 5; -pub const SIZE_OF_CODING_SHRED_HEADER: usize = 6; -pub const SIZE_OF_SIGNATURE: usize = 64; -pub const SIZE_OF_SHRED_TYPE: usize = 1; -pub const SIZE_OF_SHRED_SLOT: usize = 8; -pub const SIZE_OF_SHRED_INDEX: usize = 4; +const SIZE_OF_CODING_SHRED_HEADER: usize = 6; +const SIZE_OF_SIGNATURE: usize = 64; +const SIZE_OF_SHRED_TYPE: usize = 1; +const SIZE_OF_SHRED_SLOT: usize = 8; +const SIZE_OF_SHRED_INDEX: usize = 4; pub const SIZE_OF_NONCE: usize = 4; pub const SIZE_OF_CODING_SHRED_HEADERS: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER; @@ -92,10 +92,11 @@ pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_HEADER - SIZE_OF_CODING_SHRED_HEADERS - SIZE_OF_NONCE; +const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER; -pub const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE; -pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE; -pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; +const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE; +const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE; +const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; pub const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() @@ -170,37 +171,37 @@ impl<'de> Deserialize<'de> for ShredType { /// A common header that is present in data and code shred headers #[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] -pub struct ShredCommonHeader { - pub signature: Signature, - pub shred_type: ShredType, - pub slot: Slot, - pub index: u32, - pub version: u16, - pub fec_set_index: u32, +struct ShredCommonHeader { + signature: Signature, + shred_type: ShredType, + slot: Slot, + index: u32, + version: u16, + fec_set_index: u32, } /// The data shred header has parent offset and flags #[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)] -pub struct DataShredHeader { - pub parent_offset: u16, - pub flags: u8, - pub size: u16, +struct DataShredHeader { + parent_offset: u16, + flags: u8, + size: u16, // common shred header + data shred header + data } /// The coding shred header has FEC information #[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)] -pub struct CodingShredHeader { - pub num_data_shreds: u16, - pub num_coding_shreds: u16, - pub position: u16, +struct CodingShredHeader { + num_data_shreds: u16, + num_coding_shreds: u16, + position: u16, } #[derive(Clone, Debug, PartialEq)] pub struct Shred { - pub common_header: ShredCommonHeader, - pub data_header: DataShredHeader, - pub coding_header: CodingShredHeader, - pub payload: Vec, + common_header: ShredCommonHeader, + data_header: DataShredHeader, + coding_header: CodingShredHeader, + payload: Vec, } /// Tuple which uniquely identifies a shred should it exists. @@ -267,6 +268,7 @@ impl Shred { packet.meta.size = len; } + // TODO: Should this sanitize output? pub fn new_from_data( slot: Slot, index: u32, @@ -321,7 +323,7 @@ impl Shred { &data_header, ) .expect("Failed to write data header into shred buffer"); - + // TODO: Need to check if data is too large! if let Some(data) = data { payload[start..start + data.len()].clone_from_slice(data); } @@ -361,13 +363,14 @@ impl Shred { coding_header, payload, }; + // TODO: Should return why sanitize failed. shred .sanitize() .then(|| shred) .ok_or(ShredError::InvalidPayload) } - pub fn new_coding_shred_header( + pub fn new_empty_coding( slot: Slot, index: u32, fec_set_index: u32, @@ -375,7 +378,7 @@ impl Shred { num_coding_shreds: u16, position: u16, version: u16, - ) -> (ShredCommonHeader, CodingShredHeader) { + ) -> Self { let header = ShredCommonHeader { shred_type: ShredType::Code, index, @@ -384,38 +387,15 @@ impl Shred { fec_set_index, ..ShredCommonHeader::default() }; - ( - header, - CodingShredHeader { - num_data_shreds, - num_coding_shreds, - position, - }, - ) - } - - pub fn new_empty_coding( - slot: Slot, - index: u32, - fec_set_index: u32, - num_data: u16, - num_code: u16, - position: u16, - version: u16, - ) -> Self { - let (header, coding_header) = Self::new_coding_shred_header( - slot, - index, - fec_set_index, - num_data, - num_code, + let coding_header = CodingShredHeader { + num_data_shreds, + num_coding_shreds, position, - version, - ); + }; Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header) } - pub fn new_empty_from_header( + fn new_empty_from_header( common_header: ShredCommonHeader, data_header: DataShredHeader, coding_header: CodingShredHeader, @@ -453,6 +433,7 @@ impl Shred { } } + // TODO Replace this with default? pub fn new_empty_data_shred() -> Self { Self::new_empty_from_header( ShredCommonHeader::default(), @@ -495,7 +476,28 @@ impl Shred { self.common_header.index } - pub(crate) fn fec_set_index(&self) -> u32 { + pub fn payload(&self) -> &Vec { + &self.payload + } + + // Possibly trimmed payload; + // Should only be used when storing shreds to blockstore. + pub(crate) fn bytes_to_store(&self) -> &[u8] { + match self.shred_type() { + ShredType::Code => &self.payload, + ShredType::Data => { + // Payload will be padded out to SHRED_PAYLOAD_SIZE. + // But only need to store the bytes within data_header.size. + &self.payload[..self.data_header.size as usize] + } + } + } + + pub fn into_payload(self) -> Vec { + self.payload + } + + pub fn fec_set_index(&self) -> u32 { self.common_header.fec_set_index } @@ -510,12 +512,19 @@ impl Shred { } // Returns true if the shred passes sanity checks. - pub(crate) fn sanitize(&self) -> bool { - self.erasure_block_index().is_some() + // TODO: Should return why sanitize failed! + pub fn sanitize(&self) -> bool { + self.payload.len() <= SHRED_PAYLOAD_SIZE + && self.erasure_block_index().is_some() && match self.shred_type() { ShredType::Data => { - self.parent().is_ok() - && usize::from(self.data_header.size) <= self.payload.len() + const DATA_SHRED_SIZE_RANGE: RangeInclusive = + SHRED_DATA_OFFSET..=SHRED_DATA_OFFSET + SIZE_OF_DATA_SHRED_PAYLOAD; + let size = usize::from(self.data_header.size); + self.index() < MAX_DATA_SHREDS_PER_SLOT as u32 + && self.parent().is_ok() + && size <= self.payload.len() + && DATA_SHRED_SIZE_RANGE.contains(&size) } ShredType::Code => { u32::from(self.coding_header.num_coding_shreds) @@ -652,7 +661,7 @@ impl Shred { } #[cfg(test)] - pub fn unset_data_complete(&mut self) { + pub(crate) fn unset_data_complete(&mut self) { if self.is_data() { self.data_header.flags &= !DATA_COMPLETE_SHRED; } @@ -677,7 +686,7 @@ impl Shred { } } - pub fn reference_tick(&self) -> u8 { + pub(crate) fn reference_tick(&self) -> u8 { if self.is_data() { self.data_header.flags & SHRED_TICK_REFERENCE_MASK } else { @@ -697,7 +706,7 @@ impl Shred { limited_deserialize::(&p.data[slot_start..slot_end]).ok() } - pub fn reference_tick_from_data(data: &[u8]) -> u8 { + pub(crate) fn reference_tick_from_data(data: &[u8]) -> u8 { let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER - size_of::() - size_of::()]; @@ -708,12 +717,38 @@ impl Shred { self.signature() .verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..]) } + + // Returns true if the erasure coding of the two shreds mismatch. + pub(crate) fn erasure_mismatch(self: &Shred, shred: &Shred) -> Result { + if self.shred_type() != ShredType::Code || shred.shred_type() != ShredType::Code { + return Err(ShredError::InvalidShredType); + } + Ok( + self.coding_header.num_coding_shreds != shred.coding_header.num_coding_shreds + || self.coding_header.num_data_shreds != shred.coding_header.num_data_shreds + || self.first_coding_index() != shred.first_coding_index(), + ) + } + + pub(crate) fn num_data_shreds(self: &Shred) -> Result { + match self.shred_type() { + ShredType::Data => Err(ShredError::InvalidShredType), + ShredType::Code => Ok(self.coding_header.num_data_shreds), + } + } + + pub(crate) fn num_coding_shreds(self: &Shred) -> Result { + match self.shred_type() { + ShredType::Data => Err(ShredError::InvalidShredType), + ShredType::Code => Ok(self.coding_header.num_coding_shreds), + } + } } #[derive(Debug)] pub struct Shredder { - pub slot: Slot, - pub parent_slot: Slot, + slot: Slot, + parent_slot: Slot, version: u16, reference_tick: u8, } @@ -1015,7 +1050,6 @@ impl Shredder { /// Combines all shreds to recreate the original buffer pub fn deshred(shreds: &[Shred]) -> std::result::Result, reed_solomon_erasure::Error> { use reed_solomon_erasure::Error::TooFewDataShards; - const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER; Self::verify_consistent_shred_payload_sizes("deshred()", shreds)?; let index = shreds.first().ok_or(TooFewDataShards)?.index(); let aligned = shreds.iter().zip(index..).all(|(s, i)| s.index() == i); @@ -1167,7 +1201,7 @@ pub fn verify_test_data_shred( } #[cfg(test)] -pub mod tests { +mod tests { use { super::*, bincode::serialized_size, @@ -1992,7 +2026,7 @@ pub mod tests { assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); assert_eq!(1, stats.index_out_of_bounds); - let (header, coding_header) = Shred::new_coding_shred_header( + let shred = Shred::new_empty_coding( 8, // slot 2, // index 10, // fec_set_index @@ -2001,7 +2035,6 @@ pub mod tests { 3, // position 200, // version ); - let shred = Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header); shred.copy_to_packet(&mut packet); packet.data[OFFSET_OF_SHRED_TYPE] = u8::MAX; @@ -2035,4 +2068,98 @@ pub mod tests { Ok(ShredType::Code) ); } + + #[test] + fn test_sanitize_data_shred() { + let data = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD]; + let mut shred = Shred::new_from_data( + 420, // slot + 19, // index + 5, // parent_offset + Some(&data), + true, // is_last_data + false, // is_last_in_slot + 3, // reference_tick + 1, // version + 16, // fec_set_index + ); + assert!(shred.sanitize()); + // Corrupt shred by making it too large + { + let mut shred = shred.clone(); + shred.payload.push(10u8); + assert!(!shred.sanitize()); + } + { + let mut shred = shred.clone(); + shred.data_header.size += 1; + assert!(!shred.sanitize()); + } + { + let mut shred = shred.clone(); + shred.data_header.size = 0; + assert!(!shred.sanitize()); + } + { + shred.data_header.size = shred.payload().len() as u16 + 1; + assert!(!shred.sanitize()); + } + } + + #[test] + fn test_sanitize_coding_shred() { + let mut shred = Shred::new_empty_coding( + 1, // slot + 12, // index + 11, // fec_set_index + 11, // num_data_shreds + 11, // num_coding_shreds + 8, // position + 0, // version + ); + assert!(shred.sanitize()); + // index < position should fail + { + let mut shred = shred.clone(); + let index = shred.index() - shred.fec_set_index() - 1; + shred.set_index(index as u32); + assert!(!shred.sanitize()); + } + // num_coding == 0 should fail + { + let mut shred = shred.clone(); + shred.coding_header.num_coding_shreds = 0; + assert!(!shred.sanitize()); + } + // pos >= num_coding should fail + { + let mut shred = shred.clone(); + let num_coding_shreds = shred.index() - shred.fec_set_index(); + shred.coding_header.num_coding_shreds = num_coding_shreds as u16; + assert!(!shred.sanitize()); + } + // set_index with num_coding that would imply the last + // shred has index > u32::MAX should fail. + { + let mut shred = shred.clone(); + shred.common_header.fec_set_index = std::u32::MAX - 1; + shred.coding_header.num_data_shreds = 2; + shred.coding_header.num_coding_shreds = 4; + shred.coding_header.position = 1; + shred.common_header.index = std::u32::MAX - 1; + assert!(!shred.sanitize()); + + shred.coding_header.num_coding_shreds = 2000; + assert!(!shred.sanitize()); + + // Decreasing the number of num_coding_shreds will put it within + // the allowed limit. + shred.coding_header.num_coding_shreds = 2; + assert!(shred.sanitize()); + } + { + shred.coding_header.num_coding_shreds = u16::MAX; + assert!(!shred.sanitize()); + } + } } diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 7bd7a5b72735aa..e5c9dabbe95e45 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -480,9 +480,9 @@ pub mod tests { assert_eq!(shred.slot(), slot); let keypair = Keypair::new(); shred.sign(&keypair); - trace!("signature {}", shred.common_header.signature); - packet.data[0..shred.payload.len()].copy_from_slice(&shred.payload); - packet.meta.size = shred.payload.len(); + trace!("signature {}", shred.signature()); + packet.data[0..shred.payload().len()].copy_from_slice(shred.payload()); + packet.meta.size = shred.payload().len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() @@ -526,8 +526,8 @@ pub mod tests { let keypair = Keypair::new(); shred.sign(&keypair); batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() @@ -581,8 +581,8 @@ pub mod tests { let keypair = Keypair::new(); shred.sign(&keypair); batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), @@ -693,8 +693,8 @@ pub mod tests { 0xc0de, ); batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let pubkeys = [ (slot, keypair.pubkey().to_bytes()), (std::u64::MAX, Pubkey::default().to_bytes()), diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 7a1cf976d8afdf..cdc25189e78d80 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -163,7 +163,7 @@ fn sort_data_coding_into_fec_sets( assert!(!data_slot_and_index.contains(&key)); data_slot_and_index.insert(key); let fec_entry = fec_data - .entry(shred.common_header.fec_set_index) + .entry(shred.fec_set_index()) .or_insert_with(Vec::new); fec_entry.push(shred); } @@ -174,7 +174,7 @@ fn sort_data_coding_into_fec_sets( assert!(!coding_slot_and_index.contains(&key)); coding_slot_and_index.insert(key); let fec_entry = fec_coding - .entry(shred.common_header.fec_set_index) + .entry(shred.fec_set_index()) .or_insert_with(Vec::new); fec_entry.push(shred); }