diff --git a/src/metric_engine/src/compaction/executor.rs b/src/metric_engine/src/compaction/executor.rs index 9f41ec8202..c5cf83e2f9 100644 --- a/src/metric_engine/src/compaction/executor.rs +++ b/src/metric_engine/src/compaction/executor.rs @@ -99,7 +99,7 @@ impl Executor { let inused = self.inner.inused_memory.load(Ordering::Relaxed); let mem_limit = self.inner.mem_limit; ensure!( - inused + task_size > mem_limit, + inused + task_size <= mem_limit, "Compaction memory usage too high, inused:{inused}, task_size:{task_size}, limit:{mem_limit}" ); @@ -113,7 +113,7 @@ impl Executor { let task_size = task.input_size(); self.inner .inused_memory - .fetch_add(task_size, Ordering::Relaxed); + .fetch_sub(task_size, Ordering::Relaxed); } pub fn on_failure(&self, task: &Task) { @@ -137,7 +137,7 @@ impl Executor { executor: self.clone(), task, }; - runnable.run() + runnable.spawn() } // TODO: Merge input sst files into one new sst file @@ -257,7 +257,7 @@ pub struct Runnable { } impl Runnable { - fn run(self) { + fn spawn(self) { let rt = self.executor.inner.runtime.clone(); rt.spawn(async move { if let Err(e) = self.executor.do_compaction(&self.task).await { diff --git a/src/metric_engine/src/manifest/encoding.rs b/src/metric_engine/src/manifest/encoding.rs index 01deb7a503..72d43d6e96 100644 --- a/src/metric_engine/src/manifest/encoding.rs +++ b/src/metric_engine/src/manifest/encoding.rs @@ -15,8 +15,17 @@ // specific language governing permissions and limitations // under the License. +use std::io::{Cursor, Write}; + +use anyhow::Context; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use bytes::Bytes; +use parquet::data_type::AsBytes; + use crate::{ - sst::{FileId, SstFile}, + ensure, + sst::{FileId, FileMeta, SstFile}, + types::TimeRange, Error, Result, }; @@ -66,3 +75,340 @@ impl From for pb_types::ManifestUpdate { } } } + +/// The layout for the header. +/// ```plaintext +/// +-------------+--------------+------------+--------------+ +/// | magic(u32) | version(u8) | flag(u8) | length(u32) | +/// +-------------+--------------+------------+--------------+ +/// ``` +/// - The Magic field (u32) is used to ensure the validity of the data source. +/// - The Flags field (u8) is reserved for future extensibility, such as +/// enabling compression or supporting additional features. +/// - The length field (u64) represents the total length of the subsequent +/// records and serves as a straightforward method for verifying their +/// integrity. (length = record_length * record_count) +#[derive(Debug)] +pub struct SnapshotHeader { + pub magic: u32, + pub version: u8, + pub flag: u8, + pub length: u64, +} + +impl TryFrom<&[u8]> for SnapshotHeader { + type Error = Error; + + fn try_from(bytes: &[u8]) -> Result { + ensure!( + bytes.len() >= Self::LENGTH, + "invalid bytes, length: {}", + bytes.len() + ); + + let mut cursor = Cursor::new(bytes); + let magic = cursor + .read_u32::() + .context("read snapshot header magic")?; + ensure!( + magic == SnapshotHeader::MAGIC, + "invalid bytes to convert to header." + ); + let version = cursor.read_u8().context("read snapshot header version")?; + let flag = cursor.read_u8().context("read snapshot header flag")?; + let length = cursor + .read_u64::() + .context("read snapshot header length")?; + Ok(Self { + magic, + version, + flag, + length, + }) + } +} + +impl SnapshotHeader { + pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 /*length*/; + pub const MAGIC: u32 = 0xCAFE_1234; + + pub fn new(length: u64) -> Self { + Self { + magic: SnapshotHeader::MAGIC, + version: SnapshotRecord::VERSION, + flag: 0, + length, + } + } + + pub fn write_to(&self, mut writer: W) -> Result<()> + where + W: Write, + { + writer + .write_u32::(self.magic) + .context("write shall not fail.")?; + writer + .write_u8(self.version) + .context("write shall not fail.")?; + writer + .write_u8(self.flag) + .context("write shall not fail.")?; + writer + .write_u64::(self.length) + .context("write shall not fail.")?; + Ok(()) + } +} + +/// The layout for manifest Record: +/// ```plaintext +/// +---------+-------------------+------------+-----------------+ +/// | id(u64) | time_range(i64*2) | size(u32) | num_rows(u32) | +/// +---------+-------------------+------------+-----------------+ +/// ``` +pub struct SnapshotRecord { + id: u64, + time_range: TimeRange, + size: u32, + num_rows: u32, +} + +impl SnapshotRecord { + const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num rows*/; + pub const VERSION: u8 = 1; + + pub fn write_to(&self, mut writer: W) -> Result<()> + where + W: Write, + { + writer + .write_u64::(self.id) + .context("write shall not fail.")?; + writer + .write_i64::(*self.time_range.start) + .context("write shall not fail.")?; + writer + .write_i64::(*self.time_range.end) + .context("write shall not fail.")?; + writer + .write_u32::(self.size) + .context("write shall not fail.")?; + writer + .write_u32::(self.num_rows) + .context("write shall not fail.")?; + Ok(()) + } + + pub fn id(&self) -> u64 { + self.id + } +} + +impl From for SnapshotRecord { + fn from(value: SstFile) -> Self { + SnapshotRecord { + id: value.id(), + time_range: value.meta().time_range.clone(), + size: value.meta().size, + num_rows: value.meta().num_rows, + } + } +} + +impl TryFrom<&[u8]> for SnapshotRecord { + type Error = Error; + + fn try_from(value: &[u8]) -> Result { + ensure!( + value.len() >= SnapshotRecord::LENGTH, + "invalid value len: {}", + value.len() + ); + + let mut cursor = Cursor::new(value); + let id = cursor + .read_u64::() + .context("read record id")?; + let start = cursor + .read_i64::() + .context("read record start")?; + let end = cursor + .read_i64::() + .context("read record end")?; + let size = cursor + .read_u32::() + .context("read record size")?; + let num_rows = cursor + .read_u32::() + .context("read record num_rows")?; + Ok(SnapshotRecord { + id, + time_range: (start..end).into(), + size, + num_rows, + }) + } +} + +impl From for SstFile { + fn from(record: SnapshotRecord) -> Self { + let file_meta = FileMeta { + max_sequence: record.id(), + num_rows: record.num_rows, + size: record.size, + time_range: record.time_range.clone(), + }; + SstFile::new(record.id(), file_meta) + } +} + +pub struct Snapshot { + header: SnapshotHeader, + records: Vec, +} + +impl Default for Snapshot { + // create an empty Snapshot + fn default() -> Self { + let header = SnapshotHeader::new(0); + Self { + header, + records: Vec::new(), + } + } +} + +impl TryFrom for Snapshot { + type Error = Error; + + fn try_from(bytes: Bytes) -> Result { + if bytes.is_empty() { + return Ok(Snapshot::default()); + } + let header = SnapshotHeader::try_from(bytes.as_bytes())?; + let record_total_length = header.length as usize; + ensure!( + record_total_length > 0 + && record_total_length % SnapshotRecord::LENGTH == 0 + && record_total_length + SnapshotHeader::LENGTH == bytes.len(), + "create snapshot from bytes failed, header:{header:?}, bytes_length: {}", + bytes.len() + ); + let mut index = SnapshotHeader::LENGTH; + let mut records = Vec::with_capacity(record_total_length / SnapshotRecord::LENGTH); + while index < bytes.len() { + let record = SnapshotRecord::try_from(&bytes[index..index + SnapshotRecord::LENGTH])?; + records.push(record); + index += SnapshotRecord::LENGTH; + } + + Ok(Self { header, records }) + } +} + +impl Snapshot { + pub fn into_ssts(self) -> Vec { + if self.header.length == 0 { + Vec::new() + } else { + self.records.into_iter().map(|r| r.into()).collect() + } + } + + // TODO: Ensure no files duplicated + // https://github.com/apache/horaedb/issues/1608 + pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> { + self.records + .extend(update.to_adds.into_iter().map(SnapshotRecord::from)); + self.records + .retain(|record| !update.to_deletes.contains(&record.id)); + + self.header.length = (self.records.len() * SnapshotRecord::LENGTH) as u64; + Ok(()) + } + + pub fn into_bytes(self) -> Result { + let buf = Vec::with_capacity(self.header.length as usize + SnapshotHeader::LENGTH); + let mut cursor = Cursor::new(buf); + + self.header.write_to(&mut cursor)?; + for record in self.records { + record.write_to(&mut cursor)?; + } + Ok(Bytes::from(cursor.into_inner())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_snapshot_header() { + let header = SnapshotHeader::new(257); + let mut vec = vec![0u8; SnapshotHeader::LENGTH]; + let mut writer = vec.as_mut_slice(); + header.write_to(&mut writer).unwrap(); + assert!(writer.is_empty()); + let mut cursor = Cursor::new(vec); + + assert_eq!( + SnapshotHeader::MAGIC, + cursor.read_u32::().unwrap() + ); + assert_eq!( + 1, // version + cursor.read_u8().unwrap() + ); + assert_eq!( + 0, // flag + cursor.read_u8().unwrap() + ); + assert_eq!( + 257, // length + cursor.read_u64::().unwrap() + ); + } + + #[test] + fn test_snapshot_record() { + let sstfile = SstFile::new( + 99, + FileMeta { + max_sequence: 99, + num_rows: 100, + size: 938, + time_range: (100..200).into(), + }, + ); + let record: SnapshotRecord = sstfile.into(); + let mut vec: Vec = vec![0u8; SnapshotRecord::LENGTH]; + let mut writer = vec.as_mut_slice(); + record.write_to(&mut writer).unwrap(); + + assert!(writer.is_empty()); + let mut cursor = Cursor::new(vec); + + assert_eq!( + 99, // id + cursor.read_u64::().unwrap() + ); + assert_eq!( + 100, // start range + cursor.read_i64::().unwrap() + ); + assert_eq!( + 200, // end range + cursor.read_i64::().unwrap() + ); + assert_eq!( + 938, // size + cursor.read_u32::().unwrap() + ); + assert_eq!( + 100, // num rows + cursor.read_u32::().unwrap() + ); + } +} diff --git a/src/metric_engine/src/manifest.rs b/src/metric_engine/src/manifest/mod.rs similarity index 59% rename from src/metric_engine/src/manifest.rs rename to src/metric_engine/src/manifest/mod.rs index 17b188d601..ffc4556762 100644 --- a/src/metric_engine/src/manifest.rs +++ b/src/metric_engine/src/manifest/mod.rs @@ -17,7 +17,6 @@ mod encoding; use std::{ - io::{Cursor, Write}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -27,12 +26,11 @@ use std::{ use anyhow::Context; use async_scoped::TokioScope; -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use bytes::Bytes; pub use encoding::ManifestUpdate; +use encoding::Snapshot; use futures::{StreamExt, TryStreamExt}; use object_store::{path::Path, PutPayload}; -use parquet::data_type::AsBytes; use prost::Message; use tokio::sync::{ mpsc::{self, Receiver, Sender}, @@ -42,10 +40,9 @@ use tracing::error; use uuid::Uuid; use crate::{ - ensure, sst::{FileId, FileMeta, SstFile}, types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange}, - AnyhowError, Error, Result, + AnyhowError, Result, }; pub const PREFIX_PATH: &str = "manifest"; @@ -152,257 +149,6 @@ impl Manifest { } } -/// The layout for the header. -/// ```plaintext -/// +-------------+--------------+------------+--------------+ -/// | magic(u32) | version(u8) | flag(u8) | length(u32) | -/// +-------------+--------------+------------+--------------+ -/// ``` -/// - The Magic field (u32) is used to ensure the validity of the data source. -/// - The Flags field (u8) is reserved for future extensibility, such as -/// enabling compression or supporting additional features. -/// - The length field (u64) represents the total length of the subsequent -/// records and serves as a straightforward method for verifying their -/// integrity. (length = record_length * record_count) -#[derive(Debug)] -struct SnapshotHeader { - pub magic: u32, - pub version: u8, - pub flag: u8, - pub length: u64, -} - -impl TryFrom<&[u8]> for SnapshotHeader { - type Error = Error; - - fn try_from(bytes: &[u8]) -> Result { - ensure!( - bytes.len() >= Self::LENGTH, - "invalid bytes, length: {}", - bytes.len() - ); - - let mut cursor = Cursor::new(bytes); - let magic = cursor.read_u32::().unwrap(); - ensure!( - magic == SnapshotHeader::MAGIC, - "invalid bytes to convert to header." - ); - let version = cursor.read_u8().unwrap(); - let flag = cursor.read_u8().unwrap(); - let length = cursor.read_u64::().unwrap(); - Ok(Self { - magic, - version, - flag, - length, - }) - } -} - -impl SnapshotHeader { - pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 /*length*/; - pub const MAGIC: u32 = 0xCAFE_6666; - - pub fn new(length: u64) -> Self { - Self { - magic: SnapshotHeader::MAGIC, - version: SnapshotRecordV1::VERSION, - flag: 0, - length, - } - } - - pub fn write_to(&self, mut writer: W) -> Result<()> - where - W: Write, - { - writer - .write_u32::(self.magic) - .context("write shall not fail.")?; - writer - .write_u8(self.version) - .context("write shall not fail.")?; - writer - .write_u8(self.flag) - .context("write shall not fail.")?; - writer - .write_u64::(self.length) - .context("write shall not fail.")?; - Ok(()) - } -} - -/// The layout for manifest Record: -/// ```plaintext -/// +---------+-------------------+------------+-----------------+ -/// | id(u64) | time_range(i64*2) | size(u32) | num_rows(u32) | -/// +---------+-------------------+------------+-----------------+ -/// ``` -struct SnapshotRecordV1 { - id: u64, - time_range: TimeRange, - size: u32, - num_rows: u32, -} - -impl SnapshotRecordV1 { - const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num rows*/; - pub const VERSION: u8 = 1; - - pub fn write_to(&self, mut writer: W) -> Result<()> - where - W: Write, - { - writer - .write_u64::(self.id) - .context("write shall not fail.")?; - writer - .write_i64::(*self.time_range.start) - .context("write shall not fail.")?; - writer - .write_i64::(*self.time_range.end) - .context("write shall not fail.")?; - writer - .write_u32::(self.size) - .context("write shall not fail.")?; - writer - .write_u32::(self.num_rows) - .context("write shall not fail.")?; - Ok(()) - } - - pub fn id(&self) -> u64 { - self.id - } -} - -impl From for SnapshotRecordV1 { - fn from(value: SstFile) -> Self { - SnapshotRecordV1 { - id: value.id(), - time_range: value.meta().time_range.clone(), - size: value.meta().size, - num_rows: value.meta().num_rows, - } - } -} - -impl TryFrom<&[u8]> for SnapshotRecordV1 { - type Error = Error; - - fn try_from(value: &[u8]) -> Result { - ensure!( - value.len() >= SnapshotRecordV1::LENGTH, - "invalid value len: {}", - value.len() - ); - - let mut cursor = Cursor::new(value); - let id = cursor.read_u64::().unwrap(); - let start = cursor.read_i64::().unwrap(); - let end = cursor.read_i64::().unwrap(); - let size = cursor.read_u32::().unwrap(); - let num_rows = cursor.read_u32::().unwrap(); - Ok(SnapshotRecordV1 { - id, - time_range: (start..end).into(), - size, - num_rows, - }) - } -} - -impl From for SstFile { - fn from(record: SnapshotRecordV1) -> Self { - let file_meta = FileMeta { - max_sequence: record.id(), - num_rows: record.num_rows, - size: record.size, - time_range: record.time_range.clone(), - }; - SstFile::new(record.id(), file_meta) - } -} - -struct Snapshot { - header: SnapshotHeader, - records: Vec, -} - -impl Default for Snapshot { - // create an empty Snapshot - fn default() -> Self { - let header = SnapshotHeader::new(0); - Self { - header, - records: Vec::new(), - } - } -} - -impl TryFrom for Snapshot { - type Error = Error; - - fn try_from(bytes: Bytes) -> Result { - if bytes.is_empty() { - return Ok(Snapshot::default()); - } - let header = SnapshotHeader::try_from(bytes.as_bytes())?; - let record_total_length = header.length as usize; - ensure!( - record_total_length > 0 - && record_total_length % SnapshotRecordV1::LENGTH == 0 - && record_total_length + SnapshotHeader::LENGTH == bytes.len(), - "create snapshot from bytes failed, header:{header:?}, bytes_length: {}", - bytes.len() - ); - let mut index = SnapshotHeader::LENGTH; - let mut records = Vec::with_capacity(record_total_length / SnapshotRecordV1::LENGTH); - while index < bytes.len() { - let record = - SnapshotRecordV1::try_from(&bytes[index..index + SnapshotRecordV1::LENGTH])?; - records.push(record); - index += SnapshotRecordV1::LENGTH; - } - - Ok(Self { header, records }) - } -} - -impl Snapshot { - pub fn into_ssts(self) -> Vec { - if self.header.length == 0 { - Vec::new() - } else { - self.records.into_iter().map(|r| r.into()).collect() - } - } - - // TODO: Ensure no files duplicated - // https://github.com/apache/horaedb/issues/1608 - pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> { - self.records - .extend(update.to_adds.into_iter().map(SnapshotRecordV1::from)); - self.records - .retain(|record| !update.to_deletes.contains(&record.id)); - - self.header.length = (self.records.len() * SnapshotRecordV1::LENGTH) as u64; - Ok(()) - } - - pub fn into_bytes(self) -> Result { - let buf = Vec::with_capacity(self.header.length as usize + SnapshotHeader::LENGTH); - let mut cursor = Cursor::new(buf); - - self.header.write_to(&mut cursor)?; - for record in self.records { - record.write_to(&mut cursor).unwrap(); - } - Ok(Bytes::from(cursor.into_inner())) - } -} - enum MergeType { Hard, Soft, @@ -605,7 +351,6 @@ async fn list_delta_paths(store: &ObjectStoreRef, delta_dir: &Path) -> Result().unwrap() - ); - assert_eq!( - 1, // version - cursor.read_u8().unwrap() - ); - assert_eq!( - 0, // flag - cursor.read_u8().unwrap() - ); - assert_eq!( - 257, // length - cursor.read_u64::().unwrap() - ); - } - - #[test] - fn test_snapshot_record() { - let sstfile = SstFile::new( - 99, - FileMeta { - max_sequence: 99, - num_rows: 100, - size: 938, - time_range: (100..200).into(), - }, - ); - let record: SnapshotRecordV1 = sstfile.into(); - let mut vec: Vec = vec![0u8; SnapshotRecordV1::LENGTH]; - let mut writer = vec.as_mut_slice(); - record.write_to(&mut writer).unwrap(); - - assert!(writer.is_empty()); - let mut cursor = Cursor::new(vec); - - assert_eq!( - 99, // id - cursor.read_u64::().unwrap() - ); - assert_eq!( - 100, // start range - cursor.read_i64::().unwrap() - ); - assert_eq!( - 200, // end range - cursor.read_i64::().unwrap() - ); - assert_eq!( - 938, // size - cursor.read_u32::().unwrap() - ); - assert_eq!( - 100, // num rows - cursor.read_u32::().unwrap() - ); - } }