Skip to content

Commit

Permalink
refactor: use Read trait to convert Bytes to Snapshot (apache#1612)
Browse files Browse the repository at this point in the history
## Rationale


## Detailed Changes


## Test Plan
CI
  • Loading branch information
jiacai2050 authored Dec 18, 2024
1 parent cc3352a commit 58db4f5
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 96 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Release Disk Quota
run: |
sudo make ensure-disk-quota
- name: Setup Build Environment
run: |
sudo apt update
Expand All @@ -81,9 +78,6 @@ jobs:
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- name: Release Disk Quota
run: |
sudo make ensure-disk-quota
- name: Setup Build Environment
run: |
sudo apt update
Expand Down
148 changes: 58 additions & 90 deletions src/metric_engine/src/manifest/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use std::io::{Cursor, Write};
use std::io::{Cursor, Read, Write};

use anyhow::Context;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::Bytes;
use parquet::data_type::AsBytes;
use bytes::{Buf, Bytes};

use crate::{
ensure,
Expand Down Expand Up @@ -88,35 +87,41 @@ impl From<ManifestUpdate> for pb_types::ManifestUpdate {
/// - The length field (u64) represents the total length of the subsequent
/// records and serves as a straightforward method for verifying their
/// integrity. (length = record_length * record_count)
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub struct SnapshotHeader {
pub magic: u32,
pub version: u8,
pub flag: u8,
pub length: u64,
}

impl TryFrom<&[u8]> for SnapshotHeader {
type Error = Error;
impl SnapshotHeader {
pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 /*length*/;
pub const MAGIC: u32 = 0xCAFE_1234;

fn try_from(bytes: &[u8]) -> Result<Self> {
ensure!(
bytes.len() >= Self::LENGTH,
"invalid bytes, length: {}",
bytes.len()
);
pub fn new(length: u64) -> Self {
Self {
magic: SnapshotHeader::MAGIC,
version: SnapshotRecord::VERSION,
flag: 0,
length,
}
}

let mut cursor = Cursor::new(bytes);
let magic = cursor
pub fn try_new<R>(mut reader: R) -> Result<Self>
where
R: Read,
{
let magic = reader
.read_u32::<LittleEndian>()
.context("read snapshot header magic")?;
ensure!(
magic == SnapshotHeader::MAGIC,
"invalid bytes to convert to header."
);
let version = cursor.read_u8().context("read snapshot header version")?;
let flag = cursor.read_u8().context("read snapshot header flag")?;
let length = cursor
let version = reader.read_u8().context("read snapshot header version")?;
let flag = reader.read_u8().context("read snapshot header flag")?;
let length = reader
.read_u64::<LittleEndian>()
.context("read snapshot header length")?;
Ok(Self {
Expand All @@ -126,20 +131,6 @@ impl TryFrom<&[u8]> for SnapshotHeader {
length,
})
}
}

impl SnapshotHeader {
pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 /*length*/;
pub const MAGIC: u32 = 0xCAFE_1234;

pub fn new(length: u64) -> Self {
Self {
magic: SnapshotHeader::MAGIC,
version: SnapshotRecord::VERSION,
flag: 0,
length,
}
}

pub fn write_to<W>(&self, mut writer: W) -> Result<()>
where
Expand Down Expand Up @@ -167,6 +158,7 @@ impl SnapshotHeader {
/// | id(u64) | time_range(i64*2) | size(u32) | num_rows(u32) |
/// +---------+-------------------+------------+-----------------+
/// ```
#[derive(Debug, PartialEq, Eq)]
pub struct SnapshotRecord {
id: u64,
time_range: TimeRange,
Expand Down Expand Up @@ -216,30 +208,24 @@ impl From<SstFile> for SnapshotRecord {
}
}

impl TryFrom<&[u8]> for SnapshotRecord {
type Error = Error;

fn try_from(value: &[u8]) -> Result<Self> {
ensure!(
value.len() >= SnapshotRecord::LENGTH,
"invalid value len: {}",
value.len()
);

let mut cursor = Cursor::new(value);
let id = cursor
impl SnapshotRecord {
fn try_new<R>(mut reader: R) -> Result<Self>
where
R: Read,
{
let id = reader
.read_u64::<LittleEndian>()
.context("read record id")?;
let start = cursor
let start = reader
.read_i64::<LittleEndian>()
.context("read record start")?;
let end = cursor
let end = reader
.read_i64::<LittleEndian>()
.context("read record end")?;
let size = cursor
let size = reader
.read_u32::<LittleEndian>()
.context("read record size")?;
let num_rows = cursor
let num_rows = reader
.read_u32::<LittleEndian>()
.context("read record num_rows")?;
Ok(SnapshotRecord {
Expand Down Expand Up @@ -286,21 +272,20 @@ impl TryFrom<Bytes> for Snapshot {
if bytes.is_empty() {
return Ok(Snapshot::default());
}
let header = SnapshotHeader::try_from(bytes.as_bytes())?;
let bytes_len = bytes.len();
let mut cursor = Cursor::new(bytes);
let header = SnapshotHeader::try_new(&mut cursor)?;
let record_total_length = header.length as usize;
ensure!(
record_total_length > 0
&& record_total_length % SnapshotRecord::LENGTH == 0
&& record_total_length + SnapshotHeader::LENGTH == bytes.len(),
"create snapshot from bytes failed, header:{header:?}, bytes_length: {}",
bytes.len()
&& record_total_length + SnapshotHeader::LENGTH == bytes_len,
"create snapshot from bytes failed, header:{header:?}, bytes_length: {bytes_len}",
);
let mut index = SnapshotHeader::LENGTH;
let mut records = Vec::with_capacity(record_total_length / SnapshotRecord::LENGTH);
while index < bytes.len() {
let record = SnapshotRecord::try_from(&bytes[index..index + SnapshotRecord::LENGTH])?;
while cursor.has_remaining() {
let record = SnapshotRecord::try_new(&mut cursor)?;
records.push(record);
index += SnapshotRecord::LENGTH;
}

Ok(Self { header, records })
Expand Down Expand Up @@ -351,23 +336,17 @@ mod tests {
let mut writer = vec.as_mut_slice();
header.write_to(&mut writer).unwrap();
assert!(writer.is_empty());
let mut cursor = Cursor::new(vec);
let cursor = Cursor::new(vec);
let header = SnapshotHeader::try_new(cursor).unwrap();

assert_eq!(
SnapshotHeader::MAGIC,
cursor.read_u32::<LittleEndian>().unwrap()
);
assert_eq!(
1, // version
cursor.read_u8().unwrap()
);
assert_eq!(
0, // flag
cursor.read_u8().unwrap()
);
assert_eq!(
257, // length
cursor.read_u64::<LittleEndian>().unwrap()
SnapshotHeader {
magic: SnapshotHeader::MAGIC,
version: 1,
flag: 0,
length: 257
},
header
);
}

Expand All @@ -388,27 +367,16 @@ mod tests {
record.write_to(&mut writer).unwrap();

assert!(writer.is_empty());
let mut cursor = Cursor::new(vec);

assert_eq!(
99, // id
cursor.read_u64::<LittleEndian>().unwrap()
);
assert_eq!(
100, // start range
cursor.read_i64::<LittleEndian>().unwrap()
);
let cursor = Cursor::new(vec);
let record = SnapshotRecord::try_new(cursor).unwrap();
assert_eq!(
200, // end range
cursor.read_i64::<LittleEndian>().unwrap()
);
assert_eq!(
938, // size
cursor.read_u32::<LittleEndian>().unwrap()
);
assert_eq!(
100, // num rows
cursor.read_u32::<LittleEndian>().unwrap()
SnapshotRecord {
id: 99,
time_range: (100..200).into(),
size: 938,
num_rows: 100
},
record
);
}
}

0 comments on commit 58db4f5

Please sign in to comment.