diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock index 1ab7b2bc66..1d68e84993 100644 --- a/horaedb/Cargo.lock +++ b/horaedb/Cargo.lock @@ -329,6 +329,17 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-scoped" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4042078ea593edffc452eef14e99fdb2b120caa4ad9618bcdeabc4a023b98740" +dependencies = [ + "futures", + "pin-project", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.82" @@ -1530,7 +1541,7 @@ dependencies = [ [[package]] name = "macros" -version = "2.1.0" +version = "2.2.0-dev" [[package]] name = "md-5" @@ -1555,6 +1566,7 @@ dependencies = [ "anyhow", "arrow", "arrow-schema", + "async-scoped", "async-trait", "bytes", "datafusion", @@ -1880,6 +1892,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.14" diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml index 6f56f09091..d40bdafd97 100644 --- a/horaedb/Cargo.toml +++ b/horaedb/Cargo.toml @@ -47,6 +47,7 @@ itertools = "0.3" lazy_static = "1" tracing = "0.1" tracing-subscriber = "0.3" +async-scoped = { version = "0.9.0", features = ["use-tokio"] } # This profile optimizes for good runtime performance. [profile.release] diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml index ed237fcb23..7235fc6837 100644 --- a/horaedb/metric_engine/Cargo.toml +++ b/horaedb/metric_engine/Cargo.toml @@ -34,6 +34,7 @@ workspace = true anyhow = { workspace = true } arrow = { workspace = true } arrow-schema = { workspace = true } +async-scoped = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } datafusion = { workspace = true } diff --git a/horaedb/metric_engine/src/manifest.rs b/horaedb/metric_engine/src/manifest.rs index f9e6cef324..e435e6998b 100644 --- a/horaedb/metric_engine/src/manifest.rs +++ b/horaedb/metric_engine/src/manifest.rs @@ -25,19 +25,23 @@ use std::{ }; use anyhow::Context; +use async_scoped::TokioScope; use bytes::Bytes; -use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use object_store::{path::Path, PutPayload}; use prost::Message; -use tokio::sync::{ - mpsc::{self, Receiver, Sender}, - RwLock, +use tokio::{ + runtime::Runtime, + sync::{ + mpsc::{self, Receiver, Sender}, + RwLock, + }, }; use tracing::error; use crate::{ sst::{FileId, FileMeta, SstFile}, - types::{ObjectStoreRef, TimeRange}, + types::{ManifestMergeOptions, ObjectStoreRef, TimeRange}, AnyhowError, Error, Result, }; @@ -96,6 +100,7 @@ impl Manifest { pub async fn try_new( root_dir: String, store: ObjectStoreRef, + runtime: Arc, merge_options: ManifestMergeOptions, ) -> Result { let snapshot_path = Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}")); @@ -105,6 +110,7 @@ impl Manifest { snapshot_path.clone(), delta_dir.clone(), store.clone(), + runtime.clone(), merge_options, ) .await?; @@ -112,7 +118,7 @@ impl Manifest { { let merger = merger.clone(); // Start merger in background - tokio::spawn(async move { + runtime.spawn(async move { merger.run().await; }); } @@ -175,31 +181,11 @@ enum MergeType { Soft, } -#[derive(Clone)] -pub struct ManifestMergeOptions { - channel_size: usize, - merge_interval_seconds: usize, - min_merge_threshold: usize, - hard_merge_threshold: usize, - soft_merge_threshold: usize, -} - -impl Default for ManifestMergeOptions { - fn default() -> Self { - Self { - channel_size: 10, - merge_interval_seconds: 5, - min_merge_threshold: 10, - soft_merge_threshold: 50, - hard_merge_threshold: 90, - } - } -} - struct ManifestMerger { snapshot_path: Path, delta_dir: Path, store: ObjectStoreRef, + runtime: Arc, sender: Sender, receiver: RwLock>, deltas_num: AtomicUsize, @@ -211,6 +197,7 @@ impl ManifestMerger { snapshot_path: Path, delta_dir: Path, store: ObjectStoreRef, + runtime: Arc, merge_options: ManifestMergeOptions, ) -> Result> { let (tx, rx) = mpsc::channel(merge_options.channel_size); @@ -218,6 +205,7 @@ impl ManifestMerger { snapshot_path, delta_dir, store, + runtime, sender: tx, receiver: RwLock::new(rx), deltas_num: AtomicUsize::new(0), @@ -279,31 +267,21 @@ impl ManifestMerger { } async fn do_merge(&self) -> Result<()> { - let paths = self - .store - .list(Some(&self.delta_dir)) - .map(|value| { - value - .map(|v| v.location) - .context("failed to get delta file path") - }) - .try_collect::>() - .await?; + let paths = list_delta_paths(&self.store, &self.delta_dir).await?; if paths.is_empty() { return Ok(()); } - let mut stream_read = FuturesUnordered::new(); - for path in paths.clone() { - let store = self.store.clone(); - // TODO: use thread pool to read manifest files - let handle = tokio::spawn(async move { read_delta_file(store, path).await }); - stream_read.push(handle); - } - let mut delta_files = Vec::with_capacity(stream_read.len()); - while let Some(res) = stream_read.next().await { - let res = res.context("Failed to join read delta task")??; - delta_files.push(res); + let (_, results) = TokioScope::scope_and_block(|scope| { + for path in &paths { + scope.spawn(async { read_delta_file(&self.store, path).await }); + } + }); + + let mut delta_files = Vec::with_capacity(results.len()); + for res in results { + let sst_file = res.context("Failed to join read delta files task")??; + delta_files.push(sst_file); } let mut payload = read_snapshot(&self.store, &self.snapshot_path).await?; @@ -327,21 +305,19 @@ impl ManifestMerger { self.store .put(&self.snapshot_path, put_payload) .await - .context("Failed to update manifest")?; + .with_context(|| format!("Failed to update manifest, path:{}", self.snapshot_path))?; // 2. Delete the merged manifest files - let mut stream_delete = FuturesUnordered::new(); - for path in paths { - let store = self.store.clone(); - // TODO: use thread pool to delete sst files - let handle = tokio::spawn(async move { delete_delta_file(store, path).await }); - stream_delete.push(handle); - } + let (_, results) = TokioScope::scope_and_block(|scope| { + for path in &paths { + scope.spawn(async { delete_delta_file(&self.store, path).await }); + } + }); - while let Some(res) = stream_delete.next().await { + for res in results { match res { Err(e) => { - error!("Failed to join delete delta task, err:{e}") + error!("Failed to join delete delta files task, err:{e}") } Ok(v) => { if let Err(e) = v { @@ -363,25 +339,25 @@ async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> Result { let bytes = v .bytes() .await - .context("failed to read manifest snapshot")?; - let pb_payload = - pb_types::Manifest::decode(bytes).context("failed to decode manifest snapshot")?; + .with_context(|| format!("Failed to read manifest snapshot, path:{path}"))?; + let pb_payload = pb_types::Manifest::decode(bytes) + .with_context(|| format!("Failed to decode manifest snapshot, path:{path}"))?; Payload::try_from(pb_payload) } Err(err) => { if err.to_string().contains("not found") { Ok(Payload { files: vec![] }) } else { - let context = format!("Failed to get manifest snapshot, path:{path}"); + let context = format!("Failed to read manifest snapshot, path:{path}"); Err(AnyhowError::new(err).context(context).into()) } } } } -async fn read_delta_file(store: ObjectStoreRef, sst_path: Path) -> Result { +async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) -> Result { let bytes = store - .get(&sst_path) + .get(sst_path) .await .with_context(|| format!("failed to get delta file, path:{sst_path}"))? .bytes() @@ -396,11 +372,136 @@ async fn read_delta_file(store: ObjectStoreRef, sst_path: Path) -> Result Result<()> { +async fn delete_delta_file(store: &ObjectStoreRef, path: &Path) -> Result<()> { store - .delete(&path) + .delete(path) .await .with_context(|| format!("Failed to delete delta files, path:{path}"))?; Ok(()) } + +async fn list_delta_paths(store: &ObjectStoreRef, delta_dir: &Path) -> Result> { + let paths = store + .list(Some(delta_dir)) + .map(|value| { + value + .map(|v| v.location) + .with_context(|| format!("Failed to list delta paths, delta dir:{}", delta_dir)) + }) + .try_collect::>() + .await?; + + Ok(paths) +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, thread::sleep}; + + use object_store::local::LocalFileSystem; + + use super::*; + + #[tokio::test] + async fn test_find_manifest() { + let root_dir = temp_dir::TempDir::new().unwrap(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + let store = Arc::new(LocalFileSystem::new()); + + let manifest = Manifest::try_new( + root_dir.path().to_string_lossy().to_string(), + store, + Arc::new(runtime), + ManifestMergeOptions::default(), + ) + .await + .unwrap(); + + for i in 0..20 { + let time_range = (i..i + 1).into(); + let meta = FileMeta { + max_sequence: i as u64, + num_rows: i as u32, + size: i as u32, + time_range, + }; + manifest.add_file(i as u64, meta).await.unwrap(); + } + + let find_range = (10..15).into(); + let mut ssts = manifest.find_ssts(&find_range).await; + + let mut expected_ssts = (10..15) + .map(|i| { + let id = i as u64; + let time_range = (i..i + 1).into(); + let meta = FileMeta { + max_sequence: i as u64, + num_rows: i as u32, + size: i as u32, + time_range, + }; + SstFile { id, meta } + }) + .collect::>(); + + expected_ssts.sort_by(|a, b| a.id.cmp(&b.id)); + ssts.sort_by(|a, b| a.id.cmp(&b.id)); + assert_eq!(expected_ssts, ssts); + } + + #[tokio::test] + async fn test_merge_manifest() { + let root_dir = temp_dir::TempDir::new() + .unwrap() + .path() + .to_string_lossy() + .to_string(); + let snapshot_path = Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}")); + let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}")); + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap(); + let store: ObjectStoreRef = Arc::new(LocalFileSystem::new()); + + let manifest = Manifest::try_new( + root_dir, + store.clone(), + Arc::new(runtime), + ManifestMergeOptions { + merge_interval_seconds: 1, + ..Default::default() + }, + ) + .await + .unwrap(); + + // Add manifest files + for i in 0..20 { + let time_range = (i..i + 1).into(); + let meta = FileMeta { + max_sequence: i as u64, + num_rows: i as u32, + size: i as u32, + time_range, + }; + manifest.add_file(i as u64, meta).await.unwrap(); + } + + // Wait for merge manifest to finish + sleep(Duration::from_secs(2)); + + let mut mem_ssts = manifest.payload.read().await.files.clone(); + let mut ssts = read_snapshot(&store, &snapshot_path).await.unwrap().files; + + mem_ssts.sort_by(|a, b| a.id.cmp(&b.id)); + ssts.sort_by(|a, b| a.id.cmp(&b.id)); + assert_eq!(mem_ssts, ssts); + + let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap(); + assert!(delta_paths.is_empty()); + } +} diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs index 703b4bc41f..644988f850 100644 --- a/horaedb/metric_engine/src/sst.rs +++ b/horaedb/metric_engine/src/sst.rs @@ -31,7 +31,7 @@ pub const PREFIX_PATH: &str = "data"; pub type FileId = u64; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct SstFile { pub id: FileId, pub meta: FileMeta, @@ -58,7 +58,7 @@ impl From for pb_types::SstFile { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct FileMeta { pub max_sequence: u64, pub num_rows: u32, diff --git a/horaedb/metric_engine/src/storage.rs b/horaedb/metric_engine/src/storage.rs index c8c832d2aa..ac37e9c8b0 100644 --- a/horaedb/metric_engine/src/storage.rs +++ b/horaedb/metric_engine/src/storage.rs @@ -54,12 +54,16 @@ use parquet::{ format::SortingColumn, schema::types::ColumnPath, }; +use tokio::runtime::Runtime; use crate::{ - manifest::{Manifest, ManifestMergeOptions}, + manifest::Manifest, read::{DefaultParquetFileReaderFactory, MergeExec}, sst::{allocate_id, FileId, FileMeta, SstFile}, - types::{ObjectStoreRef, TimeRange, WriteOptions, WriteResult, SEQ_COLUMN_NAME}, + types::{ + ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange, WriteOptions, WriteResult, + SEQ_COLUMN_NAME, + }, Result, }; @@ -93,6 +97,25 @@ pub trait TimeMergeStorage { async fn compact(&self, req: CompactRequest) -> Result<()>; } +struct StorageRuntimes { + compact_runtime: Arc, +} + +impl StorageRuntimes { + pub fn new(runtime_opts: RuntimeOptions) -> Result { + let compact_runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("storage-compact") + .worker_threads(runtime_opts.compact_thread_num) + .enable_all() + .build() + .context("build storgae compact runtime")?; + + Ok(Self { + compact_runtime: Arc::new(compact_runtime), + }) + } +} + /// `TimeMergeStorage` implementation using cloud object storage, it will split /// data into different segments(aka `segment_duration`) based time range. /// @@ -105,6 +128,7 @@ pub struct CloudObjectStorage { arrow_schema: SchemaRef, num_primary_keys: usize, manifest: Manifest, + runtimes: StorageRuntimes, df_schema: DFSchema, write_props: WriterProperties, @@ -128,14 +152,16 @@ impl CloudObjectStorage { store: ObjectStoreRef, arrow_schema: SchemaRef, num_primary_keys: usize, - write_options: WriteOptions, - merge_options: ManifestMergeOptions, + storage_opts: StorageOptions, ) -> Result { let manifest_prefix = crate::manifest::PREFIX_PATH; + let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?; + let manifest = Manifest::try_new( format!("{path}/{manifest_prefix}"), store.clone(), - merge_options, + runtimes.compact_runtime.clone(), + storage_opts.manifest_merge_opts, ) .await?; let mut new_fields = arrow_schema.fields.clone().to_vec(); @@ -149,7 +175,7 @@ impl CloudObjectStorage { arrow_schema.metadata.clone(), )); let df_schema = DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?; - let write_props = Self::build_write_props(write_options, num_primary_keys); + let write_props = Self::build_write_props(storage_opts.write_opts, num_primary_keys); Ok(Self { path, num_primary_keys, @@ -157,6 +183,7 @@ impl CloudObjectStorage { store, arrow_schema, manifest, + runtimes, df_schema, write_props, }) @@ -414,7 +441,7 @@ mod tests { use object_store::local::LocalFileSystem; use super::*; - use crate::{arrow_schema, manifest::ManifestMergeOptions, types::Timestamp}; + use crate::{arrow_schema, types::Timestamp}; #[tokio::test] async fn test_build_scan_plan() { @@ -426,8 +453,7 @@ mod tests { store, schema.clone(), 1, // num_primary_keys - WriteOptions::default(), - ManifestMergeOptions::default(), + StorageOptions::default(), ) .await .unwrap(); @@ -472,8 +498,7 @@ mod tests { store, schema.clone(), 2, // num_primary_keys - WriteOptions::default(), - ManifestMergeOptions::default(), + StorageOptions::default(), ) .await .unwrap(); @@ -549,8 +574,7 @@ mod tests { store, schema.clone(), 1, - WriteOptions::default(), - ManifestMergeOptions::default(), + StorageOptions::default(), ) .await .unwrap(); diff --git a/horaedb/metric_engine/src/types.rs b/horaedb/metric_engine/src/types.rs index 624b8a2e64..98ba9764f8 100644 --- a/horaedb/metric_engine/src/types.rs +++ b/horaedb/metric_engine/src/types.rs @@ -68,7 +68,7 @@ impl Timestamp { pub const MIN: Timestamp = Timestamp(i64::MIN); } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct TimeRange(Range); impl From> for TimeRange { @@ -146,3 +146,42 @@ impl Default for WriteOptions { } } } + +pub struct RuntimeOptions { + pub compact_thread_num: usize, +} + +impl Default for RuntimeOptions { + fn default() -> Self { + Self { + compact_thread_num: 4, + } + } +} + +pub struct ManifestMergeOptions { + pub channel_size: usize, + pub merge_interval_seconds: usize, + pub min_merge_threshold: usize, + pub hard_merge_threshold: usize, + pub soft_merge_threshold: usize, +} + +impl Default for ManifestMergeOptions { + fn default() -> Self { + Self { + channel_size: 10, + merge_interval_seconds: 5, + min_merge_threshold: 10, + soft_merge_threshold: 50, + hard_merge_threshold: 90, + } + } +} + +#[derive(Default)] +pub struct StorageOptions { + pub write_opts: WriteOptions, + pub manifest_merge_opts: ManifestMergeOptions, + pub runtime_opts: RuntimeOptions, +}