From 58db4f5f99da6af0f184e47a590b7de44ff7d5e9 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Wed, 18 Dec 2024 11:55:24 +0800 Subject: [PATCH] refactor: use `Read` trait to convert Bytes to Snapshot (#1612) ## Rationale ## Detailed Changes ## Test Plan CI --- .github/workflows/ci.yml | 6 - src/metric_engine/src/manifest/encoding.rs | 148 ++++++++------------- 2 files changed, 58 insertions(+), 96 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 73775bc6bb..ea01d87e09 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,9 +57,6 @@ jobs: - uses: actions/checkout@v4 with: submodules: true - - name: Release Disk Quota - run: | - sudo make ensure-disk-quota - name: Setup Build Environment run: | sudo apt update @@ -81,9 +78,6 @@ jobs: timeout-minutes: 60 steps: - uses: actions/checkout@v4 - - name: Release Disk Quota - run: | - sudo make ensure-disk-quota - name: Setup Build Environment run: | sudo apt update diff --git a/src/metric_engine/src/manifest/encoding.rs b/src/metric_engine/src/manifest/encoding.rs index 72d43d6e96..9ec0ecd6d1 100644 --- a/src/metric_engine/src/manifest/encoding.rs +++ b/src/metric_engine/src/manifest/encoding.rs @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::io::{Cursor, Write}; +use std::io::{Cursor, Read, Write}; use anyhow::Context; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; -use bytes::Bytes; -use parquet::data_type::AsBytes; +use bytes::{Buf, Bytes}; use crate::{ ensure, @@ -88,7 +87,7 @@ impl From for pb_types::ManifestUpdate { /// - The length field (u64) represents the total length of the subsequent /// records and serves as a straightforward method for verifying their /// integrity. (length = record_length * record_count) -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct SnapshotHeader { pub magic: u32, pub version: u8, @@ -96,27 +95,33 @@ pub struct SnapshotHeader { pub length: u64, } -impl TryFrom<&[u8]> for SnapshotHeader { - type Error = Error; +impl SnapshotHeader { + pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 /*length*/; + pub const MAGIC: u32 = 0xCAFE_1234; - fn try_from(bytes: &[u8]) -> Result { - ensure!( - bytes.len() >= Self::LENGTH, - "invalid bytes, length: {}", - bytes.len() - ); + pub fn new(length: u64) -> Self { + Self { + magic: SnapshotHeader::MAGIC, + version: SnapshotRecord::VERSION, + flag: 0, + length, + } + } - let mut cursor = Cursor::new(bytes); - let magic = cursor + pub fn try_new(mut reader: R) -> Result + where + R: Read, + { + let magic = reader .read_u32::() .context("read snapshot header magic")?; ensure!( magic == SnapshotHeader::MAGIC, "invalid bytes to convert to header." ); - let version = cursor.read_u8().context("read snapshot header version")?; - let flag = cursor.read_u8().context("read snapshot header flag")?; - let length = cursor + let version = reader.read_u8().context("read snapshot header version")?; + let flag = reader.read_u8().context("read snapshot header flag")?; + let length = reader .read_u64::() .context("read snapshot header length")?; Ok(Self { @@ -126,20 +131,6 @@ impl TryFrom<&[u8]> for SnapshotHeader { length, }) } -} - -impl SnapshotHeader { - pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 /*length*/; - pub const MAGIC: u32 = 0xCAFE_1234; - - pub fn new(length: u64) -> Self { - Self { - magic: SnapshotHeader::MAGIC, - version: SnapshotRecord::VERSION, - flag: 0, - length, - } - } pub fn write_to(&self, mut writer: W) -> Result<()> where @@ -167,6 +158,7 @@ impl SnapshotHeader { /// | id(u64) | time_range(i64*2) | size(u32) | num_rows(u32) | /// +---------+-------------------+------------+-----------------+ /// ``` +#[derive(Debug, PartialEq, Eq)] pub struct SnapshotRecord { id: u64, time_range: TimeRange, @@ -216,30 +208,24 @@ impl From for SnapshotRecord { } } -impl TryFrom<&[u8]> for SnapshotRecord { - type Error = Error; - - fn try_from(value: &[u8]) -> Result { - ensure!( - value.len() >= SnapshotRecord::LENGTH, - "invalid value len: {}", - value.len() - ); - - let mut cursor = Cursor::new(value); - let id = cursor +impl SnapshotRecord { + fn try_new(mut reader: R) -> Result + where + R: Read, + { + let id = reader .read_u64::() .context("read record id")?; - let start = cursor + let start = reader .read_i64::() .context("read record start")?; - let end = cursor + let end = reader .read_i64::() .context("read record end")?; - let size = cursor + let size = reader .read_u32::() .context("read record size")?; - let num_rows = cursor + let num_rows = reader .read_u32::() .context("read record num_rows")?; Ok(SnapshotRecord { @@ -286,21 +272,20 @@ impl TryFrom for Snapshot { if bytes.is_empty() { return Ok(Snapshot::default()); } - let header = SnapshotHeader::try_from(bytes.as_bytes())?; + let bytes_len = bytes.len(); + let mut cursor = Cursor::new(bytes); + let header = SnapshotHeader::try_new(&mut cursor)?; let record_total_length = header.length as usize; ensure!( record_total_length > 0 && record_total_length % SnapshotRecord::LENGTH == 0 - && record_total_length + SnapshotHeader::LENGTH == bytes.len(), - "create snapshot from bytes failed, header:{header:?}, bytes_length: {}", - bytes.len() + && record_total_length + SnapshotHeader::LENGTH == bytes_len, + "create snapshot from bytes failed, header:{header:?}, bytes_length: {bytes_len}", ); - let mut index = SnapshotHeader::LENGTH; let mut records = Vec::with_capacity(record_total_length / SnapshotRecord::LENGTH); - while index < bytes.len() { - let record = SnapshotRecord::try_from(&bytes[index..index + SnapshotRecord::LENGTH])?; + while cursor.has_remaining() { + let record = SnapshotRecord::try_new(&mut cursor)?; records.push(record); - index += SnapshotRecord::LENGTH; } Ok(Self { header, records }) @@ -351,23 +336,17 @@ mod tests { let mut writer = vec.as_mut_slice(); header.write_to(&mut writer).unwrap(); assert!(writer.is_empty()); - let mut cursor = Cursor::new(vec); + let cursor = Cursor::new(vec); + let header = SnapshotHeader::try_new(cursor).unwrap(); assert_eq!( - SnapshotHeader::MAGIC, - cursor.read_u32::().unwrap() - ); - assert_eq!( - 1, // version - cursor.read_u8().unwrap() - ); - assert_eq!( - 0, // flag - cursor.read_u8().unwrap() - ); - assert_eq!( - 257, // length - cursor.read_u64::().unwrap() + SnapshotHeader { + magic: SnapshotHeader::MAGIC, + version: 1, + flag: 0, + length: 257 + }, + header ); } @@ -388,27 +367,16 @@ mod tests { record.write_to(&mut writer).unwrap(); assert!(writer.is_empty()); - let mut cursor = Cursor::new(vec); - - assert_eq!( - 99, // id - cursor.read_u64::().unwrap() - ); - assert_eq!( - 100, // start range - cursor.read_i64::().unwrap() - ); + let cursor = Cursor::new(vec); + let record = SnapshotRecord::try_new(cursor).unwrap(); assert_eq!( - 200, // end range - cursor.read_i64::().unwrap() - ); - assert_eq!( - 938, // size - cursor.read_u32::().unwrap() - ); - assert_eq!( - 100, // num rows - cursor.read_u32::().unwrap() + SnapshotRecord { + id: 99, + time_range: (100..200).into(), + size: 938, + num_rows: 100 + }, + record ); } }