Skip to content

Commit

Permalink
fix cr
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Oct 16, 2024
1 parent 8bbf115 commit 825f9b6
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 33 deletions.
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ members = [
"src/wal"
]

default-members = [
"src/horaedb"
]
default-members = ["src/horaedb"]

[workspace.dependencies]
alloc_tracker = { path = "src/components/alloc_tracker" }
Expand Down
50 changes: 23 additions & 27 deletions src/wal/src/local_storage_impl/record_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,15 @@ pub struct Record {
}

impl Record {
pub fn new(table_id: u64, sequence_num: u64, value: &[u8]) -> Result<Self> {
let mut record = Record {
pub fn new(table_id: u64, sequence_num: u64, value: &[u8]) -> Self {
Record {
version: NEWEST_RECORD_ENCODING_VERSION,
crc: 0,
crc: compute_crc32(table_id, sequence_num, value),
table_id,
sequence_num,
value_length: value.len() as u32,
value: value.to_vec(),
};

// Calculate CRC
let mut h = Hasher::new();
h.update(&table_id.to_le_bytes());
h.update(&sequence_num.to_le_bytes());
h.update(&record.value_length.to_le_bytes());
h.update(value);
record.crc = h.finalize();

Ok(record)
}
}

// Return the length of the record
Expand Down Expand Up @@ -160,9 +150,9 @@ impl RecordEncoding {
pub fn decode<'a>(&'a self, mut buf: &'a [u8]) -> Result<Record> {
// Ensure that buf is not shorter than the shortest record.
ensure!(
buf.remaining() >= VERSION_SIZE + CRC_SIZE + VALUE_LENGTH_SIZE,
buf.remaining() >= RECORD_HEADER_SIZE,
LengthMismatch {
expected: VERSION_SIZE + CRC_SIZE + VALUE_LENGTH_SIZE,
expected: RECORD_HEADER_SIZE,
actual: buf.remaining()
}
);
Expand All @@ -183,15 +173,11 @@ impl RecordEncoding {
let table_id = buf.try_get_u64().context(Decoding)?;
let sequence_num = buf.try_get_u64().context(Decoding)?;
let value_length = buf.try_get_u32().context(Decoding)?;
let value = buf[0..value_length as usize].to_vec();
let mut value = vec![0; value_length as usize];
buf.try_copy_to_slice(&mut value).context(Decoding)?;

// Verify CRC
let mut h = Hasher::new();
h.update(&table_id.to_le_bytes());
h.update(&sequence_num.to_le_bytes());
h.update(&value_length.to_le_bytes());
h.update(&value);
let computed_crc = h.finalize();
let computed_crc = compute_crc32(table_id, sequence_num, &value);
ensure!(
computed_crc == crc,
ChecksumMismatch {
Expand All @@ -200,8 +186,6 @@ impl RecordEncoding {
}
);

buf.advance(value_length as usize);

Ok(Record {
version,
crc,
Expand All @@ -213,6 +197,18 @@ impl RecordEncoding {
}
}

/// The crc32 checksum is calculated over the table_id, sequence_num,
/// value_length and value.
// This function does the same with `crc32fast::hash`.
fn compute_crc32(table_id: u64, seq_num: u64, value: &[u8]) -> u32 {
let mut h = Hasher::new();
h.update(&table_id.to_le_bytes());
h.update(&seq_num.to_le_bytes());
h.update(&value.len().to_le_bytes());
h.update(value);
h.finalize()
}

#[cfg(test)]
mod tests {
use bytes_ext::BytesMut;
Expand All @@ -225,7 +221,7 @@ mod tests {
let table_id = 1;
let sequence_num = 2;
let value = b"test_value";
let record = Record::new(table_id, sequence_num, value).unwrap();
let record = Record::new(table_id, sequence_num, value);

let encoder = RecordEncoding::newest();
let mut buf = BytesMut::new();
Expand All @@ -240,7 +236,7 @@ mod tests {
let table_id = 1;
let sequence_num = 2;
let value = b"test_value";
let record = Record::new(table_id, sequence_num, value).unwrap();
let record = Record::new(table_id, sequence_num, value);

let encoder = RecordEncoding::newest();
let mut buf = BytesMut::new();
Expand Down
4 changes: 1 addition & 3 deletions src/wal/src/local_storage_impl/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,9 +760,7 @@ impl Region {

for entry in &batch.entries {
// Encode the record
let record = Record::new(table_id, next_sequence_num, &entry.payload)
.box_err()
.context(Encoding)?;
let record = Record::new(table_id, next_sequence_num, &entry.payload);
self.record_encoding
.encode(&mut data, &record)
.box_err()
Expand Down

0 comments on commit 825f9b6

Please sign in to comment.