From 6cdf2e66a04f11c60c04cacae0111a14d344a501 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Fri, 29 Nov 2024 16:41:23 +0800 Subject: [PATCH] feat: init compact scheduler (#1601) ## Rationale Setup the basic structure for compaction ## Detailed Changes ## Test Plan Old CI, not scheduler has no tests now. --- horaedb/Cargo.lock | 7 + horaedb/Cargo.toml | 1 + horaedb/metric_engine/Cargo.toml | 1 + horaedb/metric_engine/src/compaction/mod.rs | 34 ++++ .../metric_engine/src/compaction/picker.rs | 34 ++++ .../metric_engine/src/compaction/scheduler.rs | 166 ++++++++++++++++++ horaedb/metric_engine/src/lib.rs | 1 + horaedb/metric_engine/src/manifest.rs | 36 ++-- horaedb/metric_engine/src/sst.rs | 77 +++++++- horaedb/metric_engine/src/storage.rs | 93 ++++++---- horaedb/metric_engine/src/types.rs | 9 +- 11 files changed, 398 insertions(+), 61 deletions(-) create mode 100644 horaedb/metric_engine/src/compaction/mod.rs create mode 100644 horaedb/metric_engine/src/compaction/picker.rs create mode 100644 horaedb/metric_engine/src/compaction/scheduler.rs diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock index 1d68e84993..f2f8003b3c 100644 --- a/horaedb/Cargo.lock +++ b/horaedb/Cargo.lock @@ -469,6 +469,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +[[package]] +name = "bytesize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" + [[package]] name = "bzip2" version = "0.4.4" @@ -1569,6 +1575,7 @@ dependencies = [ "async-scoped", "async-trait", "bytes", + "bytesize", "datafusion", "futures", "itertools 0.3.25", diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml index d40bdafd97..863c789f08 100644 --- a/horaedb/Cargo.toml +++ b/horaedb/Cargo.toml @@ -37,6 +37,7 @@ macros = { path = "../src/components/macros" } pb_types = { path = "pb_types" } prost = { version = "0.13" } arrow = { version = "53", features = ["prettyprint"] } +bytesize = "1" arrow-schema = "53" tokio = { version = "1", features = ["full"] } async-trait = "0.1" diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml index 7235fc6837..d0e31a044f 100644 --- a/horaedb/metric_engine/Cargo.toml +++ b/horaedb/metric_engine/Cargo.toml @@ -37,6 +37,7 @@ arrow-schema = { workspace = true } async-scoped = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +bytesize = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/horaedb/metric_engine/src/compaction/mod.rs b/horaedb/metric_engine/src/compaction/mod.rs new file mode 100644 index 0000000000..a661c48bac --- /dev/null +++ b/horaedb/metric_engine/src/compaction/mod.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod picker; +mod scheduler; + +pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig}; + +use crate::sst::{FileId, SstFile}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Input { + files: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Task { + pub inputs: Vec, + pub expireds: Vec, +} diff --git a/horaedb/metric_engine/src/compaction/picker.rs b/horaedb/metric_engine/src/compaction/picker.rs new file mode 100644 index 0000000000..9e41acf8ea --- /dev/null +++ b/horaedb/metric_engine/src/compaction/picker.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::time::Duration; + +use crate::{compaction::Task, sst::SstFile}; + +pub struct TimeWindowCompactionStrategy { + segment_duration: Duration, +} + +impl TimeWindowCompactionStrategy { + pub fn new(segment_duration: Duration) -> Self { + Self { segment_duration } + } + + pub fn pick_candidate(&self, _ssts: Vec) -> Option { + todo!() + } +} diff --git a/horaedb/metric_engine/src/compaction/scheduler.rs b/horaedb/metric_engine/src/compaction/scheduler.rs new file mode 100644 index 0000000000..4d311e49c2 --- /dev/null +++ b/horaedb/metric_engine/src/compaction/scheduler.rs @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + sync::{atomic::AtomicU64, Arc}, + time::Duration, +}; + +use anyhow::Context; +use tokio::{ + sync::mpsc::{self, Receiver, Sender}, + task::JoinHandle, + time::sleep, +}; +use tracing::warn; + +use crate::{ + compaction::{picker::TimeWindowCompactionStrategy, Task}, + manifest::ManifestRef, + sst::SstPathGenerator, + types::{ObjectStoreRef, RuntimeRef}, + Result, +}; + +pub struct Scheduler { + runtime: RuntimeRef, + + task_tx: Sender, + inused_memory: AtomicU64, + task_handle: JoinHandle<()>, + picker_handle: JoinHandle<()>, +} + +impl Scheduler { + pub fn new( + runtime: RuntimeRef, + manifest: ManifestRef, + store: ObjectStoreRef, + segment_duration: Duration, + sst_path_gen: Arc, + config: SchedulerConfig, + ) -> Self { + let (task_tx, task_rx) = mpsc::channel(config.max_pending_compaction_tasks); + let task_handle = { + let rt = runtime.clone(); + let store = store.clone(); + let manifest = manifest.clone(); + runtime.spawn(async move { + Self::recv_task_loop( + rt, + task_rx, + store, + manifest, + sst_path_gen, + config.memory_limit, + ) + .await; + }) + }; + let picker_handle = { + let task_tx = task_tx.clone(); + let interval = config.schedule_interval; + runtime.spawn(async move { + Self::generate_task_loop(manifest, task_tx, interval, segment_duration).await; + }) + }; + + Self { + runtime, + task_tx, + task_handle, + picker_handle, + inused_memory: AtomicU64::new(0), + } + } + + pub fn try_send(&self, task: Task) -> Result<()> { + self.task_tx + .try_send(task) + .context("failed to send task to scheduler")?; + + Ok(()) + } + + async fn recv_task_loop( + rt: RuntimeRef, + mut task_rx: Receiver, + store: ObjectStoreRef, + manifest: ManifestRef, + _sst_path_gen: Arc, + _mem_limit: u64, + ) { + while let Some(task) = task_rx.recv().await { + let store = store.clone(); + let manifest = manifest.clone(); + rt.spawn(async move { + let runner = Runner { store, manifest }; + if let Err(e) = runner.do_compaction(task).await { + warn!("Do compaction failed, err:{e}"); + } + }); + } + } + + async fn generate_task_loop( + manifest: ManifestRef, + task_tx: Sender, + schedule_interval: Duration, + segment_duration: Duration, + ) { + let compactor = TimeWindowCompactionStrategy::new(segment_duration); + loop { + let ssts = manifest.all_ssts().await; + if let Some(task) = compactor.pick_candidate(ssts) { + if let Err(e) = task_tx.try_send(task) { + warn!("Send task failed, err:{e}"); + } + } + + sleep(schedule_interval).await; + } + } +} + +pub struct SchedulerConfig { + pub schedule_interval: Duration, + pub memory_limit: u64, + pub max_pending_compaction_tasks: usize, +} + +impl Default for SchedulerConfig { + fn default() -> Self { + Self { + schedule_interval: Duration::from_secs(30), + memory_limit: bytesize::gb(2_u64), + max_pending_compaction_tasks: 10, + } + } +} + +pub struct Runner { + store: ObjectStoreRef, + manifest: ManifestRef, +} + +impl Runner { + // TODO: Merge input sst files into one new sst file + // and delete the expired sst files + async fn do_compaction(&self, _task: Task) -> Result<()> { + todo!() + } +} diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs index e0dfdb3cf6..c77cdbb34e 100644 --- a/horaedb/metric_engine/src/lib.rs +++ b/horaedb/metric_engine/src/lib.rs @@ -18,6 +18,7 @@ //! Storage Engine for metrics. #![feature(duration_constructors)] +mod compaction; pub mod error; mod macros; mod manifest; diff --git a/horaedb/metric_engine/src/manifest.rs b/horaedb/metric_engine/src/manifest.rs index e435e6998b..3764634847 100644 --- a/horaedb/metric_engine/src/manifest.rs +++ b/horaedb/metric_engine/src/manifest.rs @@ -41,7 +41,7 @@ use tracing::error; use crate::{ sst::{FileId, FileMeta, SstFile}, - types::{ManifestMergeOptions, ObjectStoreRef, TimeRange}, + types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange}, AnyhowError, Error, Result, }; @@ -49,6 +49,8 @@ pub const PREFIX_PATH: &str = "manifest"; pub const SNAPSHOT_FILENAME: &str = "snapshot"; pub const DELTA_PREFIX: &str = "delta"; +pub type ManifestRef = Arc; + pub struct Manifest { delta_dir: Path, store: ObjectStoreRef, @@ -66,7 +68,7 @@ impl Payload { // efficient pub fn dedup_files(&mut self) { let mut seen = HashSet::with_capacity(self.files.len()); - self.files.retain(|file| seen.insert(file.id)); + self.files.retain(|file| seen.insert(file.id())); } } @@ -103,8 +105,8 @@ impl Manifest { runtime: Arc, merge_options: ManifestMergeOptions, ) -> Result { - let snapshot_path = Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}")); - let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}")); + let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}")); + let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}")); let merger = ManifestMerger::try_new( snapshot_path.clone(), @@ -137,7 +139,7 @@ impl Manifest { self.merger.maybe_schedule_merge().await?; let new_sst_path = Path::from(format!("{}/{id}", self.delta_dir)); - let new_sst = SstFile { id, meta }; + let new_sst = SstFile::new(id, meta); let new_sst_payload = pb_types::SstFile::from(new_sst.clone()); let mut buf: Vec = Vec::with_capacity(new_sst_payload.encoded_len()); @@ -164,13 +166,19 @@ impl Manifest { Ok(()) } + // TODO: avoid clone + pub async fn all_ssts(&self) -> Vec { + let payload = self.payload.read().await; + payload.files.clone() + } + pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec { let payload = self.payload.read().await; payload .files .iter() - .filter(move |f| f.meta.time_range.overlaps(time_range)) + .filter(move |f| f.meta().time_range.overlaps(time_range)) .cloned() .collect() } @@ -185,7 +193,7 @@ struct ManifestMerger { snapshot_path: Path, delta_dir: Path, store: ObjectStoreRef, - runtime: Arc, + runtime: RuntimeRef, sender: Sender, receiver: RwLock>, deltas_num: AtomicUsize, @@ -442,12 +450,12 @@ mod tests { size: i as u32, time_range, }; - SstFile { id, meta } + SstFile::new(id, meta) }) .collect::>(); - expected_ssts.sort_by(|a, b| a.id.cmp(&b.id)); - ssts.sort_by(|a, b| a.id.cmp(&b.id)); + expected_ssts.sort_by_key(|a| a.id()); + ssts.sort_by_key(|a| a.id()); assert_eq!(expected_ssts, ssts); } @@ -458,8 +466,8 @@ mod tests { .path() .to_string_lossy() .to_string(); - let snapshot_path = Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}")); - let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}")); + let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}")); + let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}")); let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(4) .enable_all() @@ -497,8 +505,8 @@ mod tests { let mut mem_ssts = manifest.payload.read().await.files.clone(); let mut ssts = read_snapshot(&store, &snapshot_path).await.unwrap().files; - mem_ssts.sort_by(|a, b| a.id.cmp(&b.id)); - ssts.sort_by(|a, b| a.id.cmp(&b.id)); + mem_ssts.sort_by_key(|a| a.id()); + ssts.sort_by_key(|a| a.id()); assert_eq!(mem_ssts, ssts); let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap(); diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs index 644988f850..56c3a2250a 100644 --- a/horaedb/metric_engine/src/sst.rs +++ b/horaedb/metric_engine/src/sst.rs @@ -17,8 +17,8 @@ use std::{ sync::{ - atomic::{AtomicU64, Ordering}, - LazyLock, + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, LazyLock, }, time::SystemTime, }; @@ -27,14 +27,50 @@ use macros::ensure; use crate::{types::TimeRange, Error}; -pub const PREFIX_PATH: &str = "data"; +const PREFIX_PATH: &str = "data"; pub type FileId = u64; -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] pub struct SstFile { - pub id: FileId, - pub meta: FileMeta, + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + id: FileId, + meta: FileMeta, + + in_compaction: AtomicBool, +} + +impl Inner { + pub fn new(id: FileId, meta: FileMeta) -> Self { + Self { + id, + meta, + in_compaction: AtomicBool::new(false), + } + } +} + +impl SstFile { + pub fn new(id: FileId, meta: FileMeta) -> Self { + let inner = Arc::new(Inner::new(id, meta)); + Self { inner } + } + + pub fn id(&self) -> FileId { + self.inner.id + } + + pub fn meta(&self) -> &FileMeta { + &self.inner.meta + } + + pub fn mark_compaction(&self) { + self.inner.in_compaction.store(true, Ordering::Relaxed); + } } impl TryFrom for SstFile { @@ -45,19 +81,27 @@ impl TryFrom for SstFile { let meta = value.meta.unwrap(); let meta = meta.try_into()?; - Ok(Self { id: value.id, meta }) + Ok(Self::new(value.id, meta)) } } impl From for pb_types::SstFile { fn from(value: SstFile) -> Self { pb_types::SstFile { - id: value.id, - meta: Some(value.meta.into()), + id: value.id(), + meta: Some(value.meta().clone().into()), } } } +impl PartialEq for SstFile { + fn eq(&self, other: &Self) -> bool { + self.id() == other.id() && self.meta() == self.meta() + } +} + +impl Eq for SstFile {} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct FileMeta { pub max_sequence: u64, @@ -112,3 +156,18 @@ static NEXT_ID: LazyLock = LazyLock::new(|| { pub fn allocate_id() -> u64 { NEXT_ID.fetch_add(1, Ordering::SeqCst) } + +#[derive(Debug, Clone)] +pub struct SstPathGenerator { + prefix: String, +} + +impl SstPathGenerator { + pub fn new(prefix: String) -> Self { + Self { prefix } + } + + pub fn generate(&self, id: FileId) -> String { + format!("{}/{}/{}.sst", self.prefix, PREFIX_PATH, id) + } +} diff --git a/horaedb/metric_engine/src/storage.rs b/horaedb/metric_engine/src/storage.rs index ac37e9c8b0..e30cb2a8fb 100644 --- a/horaedb/metric_engine/src/storage.rs +++ b/horaedb/metric_engine/src/storage.rs @@ -57,9 +57,10 @@ use parquet::{ use tokio::runtime::Runtime; use crate::{ - manifest::Manifest, + compaction::{CompactionScheduler, SchedulerConfig}, + manifest::{Manifest, ManifestRef}, read::{DefaultParquetFileReaderFactory, MergeExec}, - sst::{allocate_id, FileId, FileMeta, SstFile}, + sst::{allocate_id, FileMeta, SstFile, SstPathGenerator}, types::{ ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange, WriteOptions, WriteResult, SEQ_COLUMN_NAME, @@ -97,21 +98,31 @@ pub trait TimeMergeStorage { async fn compact(&self, req: CompactRequest) -> Result<()>; } +#[derive(Clone)] struct StorageRuntimes { - compact_runtime: Arc, + manifest_compact_runtime: Arc, + sst_compact_runtime: Arc, } impl StorageRuntimes { pub fn new(runtime_opts: RuntimeOptions) -> Result { - let compact_runtime = tokio::runtime::Builder::new_multi_thread() - .thread_name("storage-compact") - .worker_threads(runtime_opts.compact_thread_num) + let manifest_compact_runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("man-compact") + .worker_threads(runtime_opts.manifest_compact_thread_num) + .enable_all() + .build() + .context("build storgae compact runtime")?; + + let sst_compact_runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("sst-compact") + .worker_threads(runtime_opts.sst_compact_thread_num) .enable_all() .build() .context("build storgae compact runtime")?; Ok(Self { - compact_runtime: Arc::new(compact_runtime), + manifest_compact_runtime: Arc::new(manifest_compact_runtime), + sst_compact_runtime: Arc::new(sst_compact_runtime), }) } } @@ -127,11 +138,13 @@ pub struct CloudObjectStorage { store: ObjectStoreRef, arrow_schema: SchemaRef, num_primary_keys: usize, - manifest: Manifest, + manifest: ManifestRef, runtimes: StorageRuntimes, df_schema: DFSchema, write_props: WriterProperties, + sst_path_gen: Arc, + compact_scheduler: CompactionScheduler, } /// It will organize the data in the following way: @@ -154,16 +167,17 @@ impl CloudObjectStorage { num_primary_keys: usize, storage_opts: StorageOptions, ) -> Result { - let manifest_prefix = crate::manifest::PREFIX_PATH; let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?; - let manifest = Manifest::try_new( - format!("{path}/{manifest_prefix}"), - store.clone(), - runtimes.compact_runtime.clone(), - storage_opts.manifest_merge_opts, - ) - .await?; + let manifest = Arc::new( + Manifest::try_new( + path.clone(), + store.clone(), + runtimes.manifest_compact_runtime.clone(), + storage_opts.manifest_merge_opts, + ) + .await?, + ); let mut new_fields = arrow_schema.fields.clone().to_vec(); new_fields.push(Arc::new(Field::new( SEQ_COLUMN_NAME, @@ -176,6 +190,15 @@ impl CloudObjectStorage { )); let df_schema = DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?; let write_props = Self::build_write_props(storage_opts.write_opts, num_primary_keys); + let sst_path_gen = Arc::new(SstPathGenerator::new(path.clone())); + let compact_scheduler = CompactionScheduler::new( + runtimes.sst_compact_runtime.clone(), + manifest.clone(), + store.clone(), + segment_duration, + sst_path_gen.clone(), + SchedulerConfig::default(), + ); Ok(Self { path, num_primary_keys, @@ -186,18 +209,14 @@ impl CloudObjectStorage { runtimes, df_schema, write_props, + sst_path_gen, + compact_scheduler, }) } - fn build_file_path(&self, id: FileId) -> String { - let root = &self.path; - let prefix = crate::sst::PREFIX_PATH; - format!("{root}/{prefix}/{id}") - } - async fn write_batch(&self, batch: RecordBatch) -> Result { let file_id = allocate_id(); - let file_path = self.build_file_path(file_id); + let file_path = self.sst_path_gen.generate(file_id); let file_path = Path::from(file_path); let object_store_writer = ParquetObjectWriter::new(self.store.clone(), file_path.clone()); let mut writer = AsyncArrowWriter::try_new( @@ -324,8 +343,8 @@ impl CloudObjectStorage { .into_iter() .map(|f| { vec![PartitionedFile::new( - self.build_file_path(f.id), - f.meta.size as u64, + self.sst_path_gen.generate(f.id()), + f.meta().size as u64, )] }) .collect::>(); @@ -405,7 +424,7 @@ impl TimeMergeStorage for CloudObjectStorage { } let ssts_by_segment = total_ssts.into_iter().group_by(|file| { - file.meta.time_range.start.0 / self.segment_duration.as_millis() as i64 + file.meta().time_range.start.0 / self.segment_duration.as_millis() as i64 }); let mut plan_for_all_segments = Vec::new(); @@ -429,7 +448,7 @@ impl TimeMergeStorage for CloudObjectStorage { return Ok(res); } - async fn compact(&self, req: CompactRequest) -> Result<()> { + async fn compact(&self, _req: CompactRequest) -> Result<()> { todo!() } } @@ -460,14 +479,16 @@ mod tests { let plan = storage .build_scan_plan( (100..103) - .map(|id| SstFile { - id, - meta: FileMeta { - max_sequence: id, - num_rows: 1, - size: 1, - time_range: (1..10).into(), - }, + .map(|id| { + SstFile::new( + id, + FileMeta { + max_sequence: id, + num_rows: 1, + size: 1, + time_range: (1..10).into(), + }, + ) }) .collect(), None, @@ -481,7 +502,7 @@ mod tests { assert_eq!( r#"MergeExec: [primary_keys: 1, seq_idx: 1] SortPreservingMergeExec: [pk1@0 ASC, __seq__@1 ASC], fetch=1024 - ParquetExec: file_groups={3 groups: [[mock/data/100], [mock/data/101], [mock/data/102]]}, projection=[pk1, __seq__], output_orderings=[[pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC]] + ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, __seq__], output_orderings=[[pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC]] "#, format!("{display_plan}") ); diff --git a/horaedb/metric_engine/src/types.rs b/horaedb/metric_engine/src/types.rs index 98ba9764f8..8e7c994e88 100644 --- a/horaedb/metric_engine/src/types.rs +++ b/horaedb/metric_engine/src/types.rs @@ -23,6 +23,7 @@ use std::{ use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; +use tokio::runtime::Runtime; use crate::sst::FileId; @@ -30,6 +31,8 @@ use crate::sst::FileId; // user-defined schema. pub const SEQ_COLUMN_NAME: &str = "__seq__"; +pub type RuntimeRef = Arc; + #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct Timestamp(pub i64); @@ -148,13 +151,15 @@ impl Default for WriteOptions { } pub struct RuntimeOptions { - pub compact_thread_num: usize, + pub manifest_compact_thread_num: usize, + pub sst_compact_thread_num: usize, } impl Default for RuntimeOptions { fn default() -> Self { Self { - compact_thread_num: 4, + manifest_compact_thread_num: 2, + sst_compact_thread_num: 4, } } }