Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pad shreds on retrieval from blockstore only when needed #17204

Closed
wants to merge 11 commits into from
37 changes: 32 additions & 5 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,45 @@ fn bench_deshredder(bencher: &mut Bencher) {
})
}

#[bench]
fn bench_deserialize_hdr(bencher: &mut Bencher) {
let data = vec![0; SIZE_OF_DATA_SHRED_PAYLOAD];
fn make_trimmed_serialized_shred() -> Vec<u8> {
let data = vec![0; 512];
let mut shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1);
// Trim off the zero padding that Shred::new_from_data would have added,
// we want to emulate having a "non-full" shred
shred.data_header.size = 512;
shred.payload.truncate(512);
// Even though shred will no longer be used, explicitly clone it to trimmed_payload
// so that a new buffer that is exactly 512 bytes will be allocated. Otherwise,
// truncate would drop the number of elements but keep the underlying buffer.
#[allow(clippy::redundant_clone)]
let trimmed_payload = shred.payload.clone();
// Ensure our new buffer is as expected
assert!(trimmed_payload.len() == 512);
assert!(trimmed_payload.capacity() == 512);

trimmed_payload
}

let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1);
#[bench]
fn bench_deserialize_shred_no_padding(bencher: &mut Bencher) {
let trimmed_payload = make_trimmed_serialized_shred();

bencher.iter(|| {
let payload = shred.payload.clone();
let payload = trimmed_payload.clone();
let _ = Shred::new_from_serialized_shred(payload).unwrap();
})
}

#[bench]
fn bench_deserialize_shred_padding(bencher: &mut Bencher) {
let trimmed_payload = make_trimmed_serialized_shred();

bencher.iter(|| {
let payload = trimmed_payload.clone();
let _ = Shred::new_from_serialized_shred_pad_out(payload).unwrap();
})
}

#[bench]
fn bench_shredder_coding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub fn repair_response_packet(
nonce: Nonce,
) -> Option<Packet> {
let shred = blockstore
.get_data_shred(slot, shred_index)
.get_padded_data_shred(slot, shred_index)
.expect("Blockstore could not get data shred");
shred
.map(|shred| repair_response_packet_from_shred(shred, dest, nonce))
Expand Down
7 changes: 3 additions & 4 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ mod tests {
.into_iter()
.filter_map(|b| {
assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce);
Shred::new_from_serialized_shred(b.data.to_vec()).ok()
Shred::copy_from_packet(&b).ok()
})
.collect();
assert!(!rv.is_empty());
Expand Down Expand Up @@ -749,7 +749,7 @@ mod tests {
.into_iter()
.filter_map(|b| {
assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce);
Shred::new_from_serialized_shred(b.data.to_vec()).ok()
Shred::copy_from_packet(&b).ok()
})
.collect();
assert_eq!(rv[0].index(), 1);
Expand Down Expand Up @@ -1112,8 +1112,7 @@ mod tests {

fn verify_responses<'a>(request: &RepairType, packets: impl Iterator<Item = &'a Packet>) {
for packet in packets {
let shred_payload = packet.data.to_vec();
let shred = Shred::new_from_serialized_shred(shred_payload).unwrap();
let shred = Shred::copy_from_packet(&packet).unwrap();
request.verify_response(&shred);
}
}
Expand Down
9 changes: 4 additions & 5 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,11 @@ where
);
None
} else {
// shred fetch stage should be sending packets
// with sufficiently large buffers. Needed to ensure
// call to `new_from_serialized_shred` is safe.
// shred fetch stage should be sending packets with
// sufficiently large buffers. Needed to ensure call
// to `copy_from_packet` (which deserializes shred) is safe.
assert_eq!(packet.data.len(), PACKET_DATA_SIZE);
let serialized_shred = packet.data.to_vec();
if let Ok(shred) = Shred::new_from_serialized_shred(serialized_shred) {
if let Ok(shred) = Shred::copy_from_packet(&packet) {
let repair_info = {
if packet.meta.repair {
if let Some(nonce) = repair_response::nonce(&packet.data) {
Expand Down
63 changes: 35 additions & 28 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ impl Blockstore {
if index.data().is_present(i) {
if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| {
let some_data = data_cf
.get_bytes((slot, i))
.get_padded_bytes((slot, i), crate::shred::SHRED_PAYLOAD_SIZE)
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
Shred::new_from_serialized_shred(data).ok()
Expand Down Expand Up @@ -1473,17 +1473,15 @@ impl Blockstore {
Ok(newly_completed_data_sets)
}

/// Gets a shred from the blockstore without any zero-padding it may have had at insert
pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
use crate::shred::SHRED_PAYLOAD_SIZE;
self.data_shred_cf.get_bytes((slot, index)).map(|data| {
data.map(|mut d| {
// Only data_header.size bytes stored in the blockstore so
// pad the payload out to SHRED_PAYLOAD_SIZE so that the
// erasure recovery works properly.
d.resize(cmp::max(d.len(), SHRED_PAYLOAD_SIZE), 0);
d
})
})
self.data_shred_cf.get_bytes((slot, index))
}

/// Gets a shred from the blockstore with zero padding out to SHRED_PAYLOAD_SIZE bytes
pub fn get_padded_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
self.data_shred_cf
.get_padded_bytes((slot, index), crate::shred::SHRED_PAYLOAD_SIZE)
}

pub fn get_data_shreds_for_slot(
Expand Down Expand Up @@ -2699,6 +2697,7 @@ impl Blockstore {
let last_shred = data_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());

// Shreds are combined back into entry, no need for the zero padding in shreds here
let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"Could not reconstruct data block from constituent shreds, error: {:?}",
Expand Down Expand Up @@ -2855,18 +2854,14 @@ impl Blockstore {
is_data: bool,
) -> Option<Vec<u8>> {
let res = if is_data {
self.get_data_shred(slot, index as u64)
self.get_padded_data_shred(slot, index as u64)
.expect("fetch from DuplicateSlots column family failed")
} else {
self.get_coding_shred(slot, index as u64)
.expect("fetch from DuplicateSlots column family failed")
};

let mut payload = new_shred_raw.to_vec();
payload.resize(
std::cmp::max(new_shred_raw.len(), crate::shred::SHRED_PAYLOAD_SIZE),
0,
);
let payload = new_shred_raw.to_vec();
let new_shred = Shred::new_from_serialized_shred(payload).unwrap();
res.map(|existing_shred| {
if existing_shred != new_shred.payload {
Expand Down Expand Up @@ -3692,6 +3687,13 @@ pub fn make_many_slot_entries(
(shreds, entries)
}

// Check shreds for equality, ignoring any zero padding that either may have
// used for tests only
pub fn shred_payloads_equal_ignore_padding(shred1: &Shred, shred2: &Shred) -> bool {
shred1.payload[..shred1.data_header.size as usize]
== shred2.payload[..shred2.data_header.size as usize]
}

// Create shreds for slots that have a parent-child relationship defined by the input `chain`
// used for tests only
pub fn make_chaining_slot_entries(
Expand Down Expand Up @@ -3850,7 +3852,10 @@ pub mod tests {
.unwrap();
let deserialized_shred = Shred::new_from_serialized_shred(serialized_shred).unwrap();

assert_eq!(last_shred, deserialized_shred);
assert!(shred_payloads_equal_ignore_padding(
&last_shred,
&deserialized_shred
));
// Destroying database without closing it first is undefined behavior
drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
Expand Down Expand Up @@ -5753,7 +5758,9 @@ pub mod tests {
.filter_map(|(_, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok())
.collect();
assert_eq!(result.len(), slot_8_shreds.len());
assert_eq!(result, slot_8_shreds);
for (left, right) in result.iter().zip(slot_8_shreds.iter()) {
assert!(shred_payloads_equal_ignore_padding(&left, &right));
}

drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
Expand Down Expand Up @@ -7625,20 +7632,20 @@ pub mod tests {
blockstore
.insert_shreds(coding_shreds, Some(&leader_schedule_cache), false)
.unwrap();
let shred_bufs: Vec<_> = data_shreds
.iter()
.map(|shred| shred.payload.clone())
.collect();

// Check all the data shreds were recovered
for (s, buf) in data_shreds.iter().zip(shred_bufs) {
assert_eq!(
for shred in data_shreds.iter() {
let recovered_shred = Shred::new_from_serialized_shred(
blockstore
.get_data_shred(s.slot(), s.index() as u64)
.get_data_shred(shred.slot(), shred.index() as u64)
.unwrap()
.unwrap(),
buf
);
)
.unwrap();
assert!(shred_payloads_equal_ignore_padding(
&shred,
&recovered_shred
));
}

verify_index_integrity(&blockstore, slot);
Expand Down
18 changes: 18 additions & 0 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,18 @@ impl Rocks {
Ok(opt)
}

fn get_padded_cf(&self, cf: &ColumnFamily, key: &[u8], size: usize) -> Result<Option<Vec<u8>>> {
let opt = self.0.get_cf(cf, key)?.map(|db_vec| {
let mut bytes = vec![0; size];
{
let (data, _padding) = bytes.split_at_mut(db_vec.len());
data.copy_from_slice(&db_vec[..]);
}
bytes
});
Ok(opt)
}
Comment on lines +360 to +370
Copy link
Contributor Author

@steviez steviez May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to break this out into a separate function was because of get_cf()'s implementation:

fn get_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> {
   let opt = self.0.get_cf(cf, key)?.map(|db_vec| db_vec.to_vec());
   Ok(opt)
}

Note the db_vec.to_vec(); this clones db_vec into a new vector. If we do the resize later, then a second allocation & memcpy will occur with Vector::resize(); doing this here shaves that off.


fn put_cf(&self, cf: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
self.0.put_cf(cf, key, value)?;
Ok(())
Expand Down Expand Up @@ -847,6 +859,12 @@ where
self.backend.get_cf(self.handle(), &C::key(key))
}

// Similar to get_bytes, but will zero pad vector (if necessary) out to size bytes
pub fn get_padded_bytes(&self, key: C::Index, size: usize) -> Result<Option<Vec<u8>>> {
self.backend
.get_padded_cf(self.handle(), &C::key(key), size)
}

pub fn iter(
&self,
iterator_mode: IteratorMode<C::Index>,
Expand Down
78 changes: 71 additions & 7 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ impl Shred {
packet.meta.size = len;
}

pub fn copy_from_packet(packet: &Packet) -> Result<Self> {
let mut serialized_shred = vec![0; SHRED_PAYLOAD_SIZE];
// TODO: assert packet.data.len() >= SHRED_PAYLOAD_SIZE / == PACKET_DATA_SIZE ?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this assertion? We had something like this in window service; we could move it here to avoid repeated lines for all the callers of this.

serialized_shred.copy_from_slice(&packet.data[..SHRED_PAYLOAD_SIZE]);
Shred::new_from_serialized_shred(serialized_shred)
}

pub fn new_from_data(
slot: Slot,
index: u32,
Expand Down Expand Up @@ -314,16 +321,65 @@ impl Shred {
}
}

pub fn new_from_serialized_shred(mut payload: Vec<u8>) -> Result<Self> {
/// De-serializes a shred into its' headers and payload
pub fn new_from_serialized_shred(payload: Vec<u8>) -> Result<Self> {
// A shred can be deserialized in several cases; payload length will vary for these:
// payload.len() <= SHRED_PAYLOAD_SIZE when payload is retrieved from the blockstore
// payload.len() == SHRED_PAYLOAD_SIZE when a new shred is created
// payload.len() == PACKET_DATA_SIZE when payload comes from a packet (window serivce)
// The below assertion requires that packets be shortened before calling
assert!(payload.len() <= SHRED_PAYLOAD_SIZE);

let mut start = 0;
let common_header: ShredCommonHeader =
Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?;

let slot = common_header.slot;
// Shreds should be padded out to SHRED_PAYLOAD_SIZE
// so that erasure generation/recovery works correctly
// But only the data_header.size is stored in blockstore.

let shred = if common_header.shred_type == ShredType(CODING_SHRED) {
let coding_header: CodingShredHeader =
Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?;
Self {
common_header,
data_header: DataShredHeader::default(),
coding_header,
payload,
}
} else if common_header.shred_type == ShredType(DATA_SHRED) {
let data_header: DataShredHeader =
Self::deserialize_obj(&mut start, SIZE_OF_DATA_SHRED_HEADER, &payload)?;
if u64::from(data_header.parent_offset) > common_header.slot {
return Err(ShredError::InvalidParentOffset {
slot,
parent_offset: data_header.parent_offset,
});
}
Self {
common_header,
data_header,
coding_header: CodingShredHeader::default(),
payload,
}
} else {
return Err(ShredError::InvalidShredType);
};

Ok(shred)
}

pub fn new_from_serialized_shred_pad_out(mut payload: Vec<u8>) -> Result<Self> {
// A shred can be deserialized in several cases; payload length will vary for these:
// payload.len() <= SHRED_PAYLOAD_SIZE when payload is retrieved from the blockstore
// payload.len() == SHRED_PAYLOAD_SIZE when a new shred is created
// payload.len() == PACKET_DATA_SIZE when payload comes from a packet (window serivce)

// Pad out the shred
payload.resize(SHRED_PAYLOAD_SIZE, 0);

let mut start = 0;
let common_header: ShredCommonHeader =
Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?;
let slot = common_header.slot;

let shred = if common_header.shred_type == ShredType(CODING_SHRED) {
let coding_header: CodingShredHeader =
Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?;
Expand Down Expand Up @@ -933,7 +989,6 @@ impl Shredder {
pub fn deshred(shreds: &[Shred]) -> std::result::Result<Vec<u8>, reed_solomon_erasure::Error> {
use reed_solomon_erasure::Error::TooFewDataShards;
const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER;
Self::verify_consistent_shred_payload_sizes(&"deshred()", shreds)?;
let index = shreds.first().ok_or(TooFewDataShards)?.index();
let aligned = shreds.iter().zip(index..).all(|(s, i)| s.index() == i);
let data_complete = {
Expand Down Expand Up @@ -1876,7 +1931,7 @@ pub mod tests {
let shred = Shred::new_from_data(10, 0, 1000, Some(&[1, 2, 3]), false, false, 0, 1, 0);
let mut packet = Packet::default();
shred.copy_to_packet(&mut packet);
let shred_res = Shred::new_from_serialized_shred(packet.data.to_vec());
let shred_res = Shred::copy_from_packet(&packet);
assert_matches!(
shred_res,
Err(ShredError::InvalidParentOffset {
Expand Down Expand Up @@ -1940,4 +1995,13 @@ pub mod tests {
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
assert_eq!(1, stats.bad_shred_type);
}

#[test]
fn test_shred_copy_to_from_packet() {
let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0);
let mut packet = Packet::default();
shred.copy_to_packet(&mut packet);
let copied_shred = Shred::copy_from_packet(&packet).unwrap();
assert_eq!(shred, copied_shred);
}
}