From f8bb7262de1b6e0d1c05c8aae1e0ede6e8e4bfd7 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Mon, 23 Dec 2024 15:37:21 +0800 Subject: [PATCH] fix: manifest delta num may overflow (#1616) ## Rationale Fix bugs found in local write bench. ## Detailed Changes - When manifest starts up, the delta num may overflow since it's initialized to 0. - When manifest merge_update, since the deltas is unsorted, so we may first delete a non-existing file. ## Test Plan CI --- Cargo.lock | 13 ++ docs/example.toml | 13 +- src/benchmarks/src/encoding_bench.rs | 14 +- src/metric_engine/Cargo.toml | 1 + src/metric_engine/src/compaction/executor.rs | 40 ++-- src/metric_engine/src/compaction/mod.rs | 2 +- src/metric_engine/src/compaction/picker.rs | 29 ++- src/metric_engine/src/compaction/scheduler.rs | 60 +++--- src/metric_engine/src/config.rs | 171 ++++++++++++++++++ src/metric_engine/src/lib.rs | 1 + src/metric_engine/src/manifest/encoding.rs | 35 ++-- src/metric_engine/src/manifest/mod.rs | 109 +++++++---- src/metric_engine/src/read.rs | 11 +- src/metric_engine/src/sst.rs | 47 +++-- src/metric_engine/src/storage.rs | 39 ++-- src/metric_engine/src/types.rs | 85 +-------- src/server/Cargo.toml | 2 +- src/server/src/config.rs | 66 ++++--- src/server/src/main.rs | 93 +++++++--- 19 files changed, 537 insertions(+), 294 deletions(-) create mode 100644 src/metric_engine/src/config.rs diff --git a/Cargo.lock b/Cargo.lock index b5c4a8d227..8cace345bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2174,6 +2174,7 @@ dependencies = [ "parquet", "pb_types", "prost", + "serde", "temp-dir", "test-log", "thiserror", @@ -2316,6 +2317,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" +dependencies = [ + "libc", +] + [[package]] name = "object" version = "0.36.4" @@ -3185,7 +3195,9 @@ checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" dependencies = [ "deranged", "itoa", + "libc", "num-conv", + "num_threads", "powerfmt", "serde", "time-core", @@ -3375,6 +3387,7 @@ dependencies = [ "sharded-slab", "smallvec", "thread_local", + "time", "tracing", "tracing-core", "tracing-log", diff --git a/docs/example.toml b/docs/example.toml index 0fbb0220b3..de35e4bb0a 100644 --- a/docs/example.toml +++ b/docs/example.toml @@ -17,8 +17,15 @@ port = 5000 -[metric_engine] +[test] +enable_write = true +write_worker_num = 1 +write_interval = "500ms" -[metric_engine.storage] +[metric_engine.threads] +sst_thread_num = 2 +manifest_thread_num = 2 + +[metric_engine.storage.object_store] type = "Local" -data_dir = "/tmp/horaedb-storage" \ No newline at end of file +data_dir = "/tmp/horaedb-storage" diff --git a/src/benchmarks/src/encoding_bench.rs b/src/benchmarks/src/encoding_bench.rs index 956925bbff..8a899f616a 100644 --- a/src/benchmarks/src/encoding_bench.rs +++ b/src/benchmarks/src/encoding_bench.rs @@ -19,7 +19,7 @@ use bytes::Bytes; use metric_engine::{ - manifest::{ManifestUpdate, Snapshot}, + manifest::Snapshot, sst::{FileMeta, SstFile}, }; @@ -43,11 +43,7 @@ impl EncodingBench { ); let sstfiles = vec![sstfile.clone(); config.record_count]; let mut snapshot = Snapshot::try_from(Bytes::new()).unwrap(); - let update = ManifestUpdate { - to_adds: sstfiles, - to_deletes: vec![], - }; - let _ = snapshot.merge_update(update); + snapshot.add_records(sstfiles); EncodingBench { raw_bytes: snapshot.into_bytes().unwrap(), @@ -60,11 +56,7 @@ impl EncodingBench { // first decode snapshot and then append with delta sstfiles, serialize to bytes // at last let mut snapshot = Snapshot::try_from(self.raw_bytes.clone()).unwrap(); - let update = ManifestUpdate { - to_adds: self.to_append.clone(), - to_deletes: vec![], - }; - let _ = snapshot.merge_update(update); + snapshot.add_records(self.to_append.clone()); let _ = snapshot.into_bytes(); } } diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml index ba6d2daae4..4286610366 100644 --- a/src/metric_engine/Cargo.toml +++ b/src/metric_engine/Cargo.toml @@ -48,6 +48,7 @@ object_store = { workspace = true } parquet = { workspace = true, features = ["object_store"] } pb_types = { workspace = true } prost = { workspace = true } +serde = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/src/metric_engine/src/compaction/executor.rs b/src/metric_engine/src/compaction/executor.rs index 0e5f9935f7..ceca4e8179 100644 --- a/src/metric_engine/src/compaction/executor.rs +++ b/src/metric_engine/src/compaction/executor.rs @@ -30,14 +30,15 @@ use parquet::{ arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter}, file::properties::WriterProperties, }; -use tracing::error; +use tokio::sync::mpsc::Sender; +use tracing::{debug, error, trace}; use crate::{ compaction::Task, ensure, manifest::{ManifestRef, ManifestUpdate}, read::ParquetReader, - sst::{allocate_id, FileMeta, SstFile, SstPathGenerator}, + sst::{FileMeta, SstFile, SstPathGenerator}, types::{ObjectStoreRef, RuntimeRef, StorageSchema}, Result, }; @@ -57,6 +58,7 @@ struct Inner { write_props: WriterProperties, inused_memory: AtomicU64, mem_limit: u64, + trigger_tx: Sender<()>, } impl Executor { @@ -70,6 +72,7 @@ impl Executor { parquet_reader: Arc, write_props: WriterProperties, mem_limit: u64, + trigger_tx: Sender<()>, ) -> Self { let inner = Inner { runtime, @@ -81,6 +84,7 @@ impl Executor { write_props, mem_limit, inused_memory: AtomicU64::new(0), + trigger_tx, }; Self { inner: Arc::new(inner), @@ -141,11 +145,19 @@ impl Executor { runnable.spawn() } + fn trigger_more_task(&self) { + if let Err(e) = self.inner.trigger_tx.try_send(()) { + debug!("Send pick task trigger signal failed, err{e:?}"); + } + } + // TODO: Merge input sst files into one new sst file // and delete the expired sst files pub async fn do_compaction(&self, task: &Task) -> Result<()> { self.pre_check(task)?; + self.trigger_more_task(); + debug!(input_len = task.inputs.len(), "Start do compaction"); let mut time_range = task.inputs[0].meta().time_range.clone(); for f in &task.inputs[1..] { time_range.merge(&f.meta().time_range); @@ -157,7 +169,7 @@ impl Executor { let mut stream = execute_stream(plan, Arc::new(TaskContext::default())) .context("execute datafusion plan")?; - let file_id = allocate_id(); + let file_id = SstFile::allocate_id(); let file_path = self.inner.sst_path_gen.generate(file_id); let file_path = Path::from(file_path); let object_store_writer = @@ -197,6 +209,7 @@ impl Executor { size: object_meta.size as u32, time_range: time_range.clone(), }; + debug!(file_meta = ?file_meta, "Compact output new sst"); // First add new sst to manifest, then delete expired/old sst let to_adds = vec![SstFile::new(file_id, file_meta)]; let to_deletes = task @@ -204,28 +217,19 @@ impl Executor { .iter() .map(|f| f.id()) .chain(task.inputs.iter().map(|f| f.id())) - .collect(); + .collect::>(); self.inner .manifest - .update(ManifestUpdate::new(to_adds, to_deletes)) + .update(ManifestUpdate::new(to_adds, to_deletes.clone())) .await?; // From now on, no error should be returned! // Because we have already updated manifest. let (_, results) = TokioScope::scope_and_block(|scope| { - for file in &task.expireds { - let path = Path::from(self.inner.sst_path_gen.generate(file.id())); - scope.spawn(async move { - self.inner - .store - .delete(&path) - .await - .with_context(|| format!("failed to delete file, path:{path}")) - }); - } - for file in &task.inputs { - let path = Path::from(self.inner.sst_path_gen.generate(file.id())); + for id in to_deletes { + let path = Path::from(self.inner.sst_path_gen.generate(id)); + trace!(id, "Delete sst file"); scope.spawn(async move { self.inner .store @@ -262,7 +266,7 @@ impl Runnable { let rt = self.executor.inner.runtime.clone(); rt.spawn(async move { if let Err(e) = self.executor.do_compaction(&self.task).await { - error!("Do compaction failed, err:{e}"); + error!("Do compaction failed, err:{e:?}"); self.executor.on_failure(&self.task); } else { self.executor.on_success(&self.task); diff --git a/src/metric_engine/src/compaction/mod.rs b/src/metric_engine/src/compaction/mod.rs index 38a37ebec4..fe8c0ddd25 100644 --- a/src/metric_engine/src/compaction/mod.rs +++ b/src/metric_engine/src/compaction/mod.rs @@ -19,7 +19,7 @@ mod executor; mod picker; mod scheduler; -pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig}; +pub use scheduler::Scheduler as CompactionScheduler; use crate::sst::SstFile; diff --git a/src/metric_engine/src/compaction/picker.rs b/src/metric_engine/src/compaction/picker.rs index 28e086884b..79c91ce8a6 100644 --- a/src/metric_engine/src/compaction/picker.rs +++ b/src/metric_engine/src/compaction/picker.rs @@ -18,7 +18,7 @@ use std::{collections::BTreeMap, time::Duration}; use common::now; -use tracing::debug; +use tracing::trace; use crate::{compaction::Task, manifest::ManifestRef, sst::SstFile, types::Timestamp}; @@ -35,6 +35,7 @@ impl Picker { segment_duration: Duration, new_sst_max_size: u64, input_sst_max_num: usize, + input_sst_min_num: usize, ) -> Self { Self { manifest, @@ -43,11 +44,15 @@ impl Picker { segment_duration, new_sst_max_size, input_sst_max_num, + input_sst_min_num, ), } } - pub async fn pick_candidate(&self) -> Option { + /// This function picks a candidate for compaction. + /// Note: It can only execute sequentially, otherwise a SST may be picked by + /// multiple threads(that's why it take a mutable self). + pub async fn pick_candidate(&mut self) -> Option { let ssts = self.manifest.all_ssts().await; let expire_time = self.ttl.map(|ttl| (now() - ttl.as_micros() as i64).into()); self.strategy.pick_candidate(ssts, expire_time) @@ -58,6 +63,7 @@ pub struct TimeWindowCompactionStrategy { segment_duration: Duration, new_sst_max_size: u64, input_sst_max_num: usize, + input_sst_min_num: usize, } impl TimeWindowCompactionStrategy { @@ -65,11 +71,13 @@ impl TimeWindowCompactionStrategy { segment_duration: Duration, new_sst_max_size: u64, input_sst_max_num: usize, + input_sst_min_num: usize, ) -> Self { Self { segment_duration, new_sst_max_size, input_sst_max_num, + input_sst_min_num, } } @@ -80,7 +88,7 @@ impl TimeWindowCompactionStrategy { ) -> Option { let (uncompacted_files, expired_files) = Self::find_uncompacted_and_expired_files(ssts, expire_time); - debug!(uncompacted_files = ?uncompacted_files, expired_files = ?expired_files, "Begin pick candidate"); + trace!(uncompacted_files = ?uncompacted_files, expired_files = ?expired_files, "Begin pick candidate"); let files_by_segment = self.files_by_segment(uncompacted_files); let compaction_files = self.pick_compaction_files(files_by_segment)?; @@ -101,7 +109,7 @@ impl TimeWindowCompactionStrategy { expireds: expired_files, }; - debug!(task = ?task, "End pick candidate"); + trace!(task = ?task, "End pick candidate"); Some(task) } @@ -130,14 +138,14 @@ impl TimeWindowCompactionStrategy { let segment_duration = self.segment_duration; for file in files { let segment = file.meta().time_range.start.truncate_by(segment_duration); - debug!(segment = ?segment, file = ?file); + trace!(segment = ?segment, file = ?file); files_by_segment .entry(segment) .or_insert_with(Vec::new) .push(file); } - debug!( + trace!( files = ?files_by_segment, "Group files of similar timestamp into segment" ); @@ -149,13 +157,14 @@ impl TimeWindowCompactionStrategy { files_by_segment: BTreeMap>, ) -> Option> { for (segment, mut files) in files_by_segment.into_iter().rev() { - debug!(segment = ?segment, files = ?files, "Loop segment for pick files"); - if files.len() < 2 { + trace!(segment = ?segment, files = ?files.len(), "Loop segment for pick files"); + if files.len() < self.input_sst_min_num { continue; } // Prefer to compact smaller files first. files.sort_unstable_by_key(SstFile::size); + trace!(sorted_files = ?files, "Sort files by size"); let mut input_size = 0; // Suppose the comaction will reduce the size of files by 10%. @@ -170,7 +179,7 @@ impl TimeWindowCompactionStrategy { }) .collect::>(); - if compaction_files.len() >= 2 { + if compaction_files.len() >= self.input_sst_min_num { return Some(compaction_files); } } @@ -192,7 +201,7 @@ mod tests { #[test] fn test_pick_candidate() { let segment_duration = Duration::from_millis(20); - let strategy = TimeWindowCompactionStrategy::new(segment_duration, 9999, 10); + let strategy = TimeWindowCompactionStrategy::new(segment_duration, 9999, 10, 2); let ssts = (0_i64..5_i64) .map(|i| { diff --git a/src/metric_engine/src/compaction/scheduler.rs b/src/metric_engine/src/compaction/scheduler.rs index a3d286a2e9..d947096e93 100644 --- a/src/metric_engine/src/compaction/scheduler.rs +++ b/src/metric_engine/src/compaction/scheduler.rs @@ -29,6 +29,7 @@ use tracing::{info, warn}; use super::{executor::Executor, picker::Picker}; use crate::{ compaction::Task, + config::SchedulerConfig, manifest::ManifestRef, read::ParquetReader, sst::SstPathGenerator, @@ -56,13 +57,14 @@ impl Scheduler { sst_path_gen: Arc, parquet_reader: Arc, config: SchedulerConfig, + write_props: WriterProperties, ) -> Self { let (task_tx, task_rx) = mpsc::channel(config.max_pending_compaction_tasks); let (trigger_tx, trigger_rx) = mpsc::channel::<()>(1); let task_handle = { let store = store.clone(); let manifest = manifest.clone(); - let write_props = config.write_props.clone(); + let trigger_tx = trigger_tx.clone(); let executor = Executor::new( runtime.clone(), store, @@ -72,6 +74,7 @@ impl Scheduler { parquet_reader, write_props, config.memory_limit, + trigger_tx.clone(), ); runtime.spawn(async move { @@ -86,6 +89,7 @@ impl Scheduler { segment_duration, config.new_sst_max_size, config.input_sst_max_num, + config.input_sst_min_num, ); Self::generate_task_loop(task_tx, trigger_rx, picker, config.schedule_interval) .await; @@ -109,6 +113,7 @@ impl Scheduler { } async fn recv_task_loop(mut task_rx: Receiver, executor: Executor) { + info!("Scheduler receive task started"); while let Some(task) = task_rx.recv().await { executor.submit(task); } @@ -117,61 +122,40 @@ impl Scheduler { async fn generate_task_loop( task_tx: Sender, mut trigger_rx: Receiver<()>, - picker: Picker, + mut picker: Picker, schedule_interval: Duration, ) { info!( schedule_interval = ?schedule_interval, - "Scheduler generate task started" + "Scheduler generate task loop started" ); + let send_task = |task| { + if let Err(e) = task_tx.try_send(task) { + warn!("Send task failed, err:{e:?}"); + } + }; + + // Generate one task immediately + if let Some(task) = picker.pick_candidate().await { + send_task(task); + } loop { tokio::select! { _ = sleep(schedule_interval) => { if let Some(task) = picker.pick_candidate().await { - if let Err(e) = task_tx.try_send(task) { - warn!("Send task failed, err:{e}"); - } + send_task(task); } } signal = trigger_rx.recv() => { if signal.is_none() { - info!("Scheduler generate task stopped"); - break; + info!("Scheduler generate task loop stopped"); + return; } if let Some(task) = picker.pick_candidate().await { - if let Err(e) = task_tx.try_send(task) { - warn!("Send task failed, err:{e}"); - } + send_task(task); } } } } } } - -#[derive(Clone)] -pub struct SchedulerConfig { - pub schedule_interval: Duration, - pub max_pending_compaction_tasks: usize, - // Runner config - pub memory_limit: u64, - pub write_props: WriterProperties, - // Picker config - pub ttl: Option, - pub new_sst_max_size: u64, - pub input_sst_max_num: usize, -} - -impl Default for SchedulerConfig { - fn default() -> Self { - Self { - schedule_interval: Duration::from_secs(30), - max_pending_compaction_tasks: 10, - memory_limit: bytesize::gb(3_u64), - write_props: WriterProperties::default(), - ttl: None, - new_sst_max_size: bytesize::gb(1_u64), - input_sst_max_num: 10, - } - } -} diff --git a/src/metric_engine/src/config.rs b/src/metric_engine/src/config.rs new file mode 100644 index 0000000000..819e45e668 --- /dev/null +++ b/src/metric_engine/src/config.rs @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::HashMap, time::Duration}; + +use parquet::basic::{Compression, Encoding, ZstdLevel}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct SchedulerConfig { + pub schedule_interval: Duration, + pub max_pending_compaction_tasks: usize, + // Runner config + pub memory_limit: u64, + // Picker config + pub ttl: Option, + pub new_sst_max_size: u64, + pub input_sst_max_num: usize, + pub input_sst_min_num: usize, +} + +impl Default for SchedulerConfig { + fn default() -> Self { + Self { + schedule_interval: Duration::from_secs(10), + max_pending_compaction_tasks: 10, + memory_limit: bytesize::gb(20_u64), + ttl: None, + new_sst_max_size: bytesize::gb(1_u64), + input_sst_max_num: 30, + input_sst_min_num: 5, + } + } +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub enum ParquetEncoding { + #[default] + Plain, + Rle, + DeltaBinaryPacked, + DeltaLengthByteArray, + DeltaByteArray, + RleDictionary, +} + +impl From for Encoding { + fn from(value: ParquetEncoding) -> Self { + match value { + ParquetEncoding::Plain => Encoding::PLAIN, + ParquetEncoding::Rle => Encoding::RLE, + ParquetEncoding::DeltaBinaryPacked => Encoding::DELTA_BINARY_PACKED, + ParquetEncoding::DeltaLengthByteArray => Encoding::DELTA_LENGTH_BYTE_ARRAY, + ParquetEncoding::DeltaByteArray => Encoding::DELTA_BYTE_ARRAY, + ParquetEncoding::RleDictionary => Encoding::RLE_DICTIONARY, + } + } +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub enum ParquetCompression { + #[default] + Uncompressed, + Snappy, + Zstd, +} + +impl From for Compression { + fn from(value: ParquetCompression) -> Self { + match value { + ParquetCompression::Uncompressed => Compression::UNCOMPRESSED, + ParquetCompression::Snappy => Compression::SNAPPY, + ParquetCompression::Zstd => Compression::ZSTD(ZstdLevel::default()), + } + } +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct ColumnOptions { + pub enable_dict: Option, + pub enable_bloom_filter: Option, + pub encoding: Option, + pub compression: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct WriteConfig { + pub max_row_group_size: usize, + pub write_bacth_size: usize, + pub enable_sorting_columns: bool, + // use to set column props with default value + pub enable_dict: bool, + pub enable_bloom_filter: bool, + pub encoding: ParquetEncoding, + pub compression: ParquetCompression, + // use to set column props with column name + pub column_options: Option>, +} + +impl Default for WriteConfig { + fn default() -> Self { + Self { + max_row_group_size: 8192, + write_bacth_size: 1024, + enable_sorting_columns: true, + enable_dict: false, + enable_bloom_filter: false, + encoding: ParquetEncoding::Plain, + compression: ParquetCompression::Snappy, + column_options: None, + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct ManifestConfig { + pub channel_size: usize, + pub merge_interval_seconds: usize, + pub min_merge_threshold: usize, + pub hard_merge_threshold: usize, + pub soft_merge_threshold: usize, +} + +impl Default for ManifestConfig { + fn default() -> Self { + Self { + channel_size: 3, + merge_interval_seconds: 5, + min_merge_threshold: 10, + soft_merge_threshold: 50, + hard_merge_threshold: 90, + } + } +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct StorageConfig { + pub write: WriteConfig, + pub manifest: ManifestConfig, + pub scheduler: SchedulerConfig, + pub update_mode: UpdateMode, +} + +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub enum UpdateMode { + #[default] + Overwrite, + Append, +} diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs index 2bbcdd6b19..a993fb74b7 100644 --- a/src/metric_engine/src/lib.rs +++ b/src/metric_engine/src/lib.rs @@ -19,6 +19,7 @@ #![feature(duration_constructors)] mod compaction; +pub mod config; pub mod error; mod macros; pub mod manifest; diff --git a/src/metric_engine/src/manifest/encoding.rs b/src/metric_engine/src/manifest/encoding.rs index 9ec0ecd6d1..db3e0d8e80 100644 --- a/src/metric_engine/src/manifest/encoding.rs +++ b/src/metric_engine/src/manifest/encoding.rs @@ -99,12 +99,12 @@ impl SnapshotHeader { pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 /*length*/; pub const MAGIC: u32 = 0xCAFE_1234; - pub fn new(length: u64) -> Self { + pub fn new() -> Self { Self { magic: SnapshotHeader::MAGIC, version: SnapshotRecord::VERSION, flag: 0, - length, + length: 0, } } @@ -251,13 +251,13 @@ impl From for SstFile { pub struct Snapshot { header: SnapshotHeader, - records: Vec, + pub records: Vec, } impl Default for Snapshot { // create an empty Snapshot fn default() -> Self { - let header = SnapshotHeader::new(0); + let header = SnapshotHeader::new(); Self { header, records: Vec::new(), @@ -303,14 +303,26 @@ impl Snapshot { // TODO: Ensure no files duplicated // https://github.com/apache/horaedb/issues/1608 - pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> { - self.records - .extend(update.to_adds.into_iter().map(SnapshotRecord::from)); + pub fn add_records(&mut self, ssts: Vec) { self.records - .retain(|record| !update.to_deletes.contains(&record.id)); + .extend(ssts.into_iter().map(SnapshotRecord::from)); + self.header.length = (self.records.len() * SnapshotRecord::LENGTH) as u64; + } + pub fn delete_records(&mut self, to_deletes: Vec) { + // Since this may hurt performance, we only do this in debug mode. + if cfg!(debug_assertions) { + for id in &to_deletes { + assert!( + self.records.iter().any(|r| r.id == *id), + "File not found in snapshot, id:{id}" + ); + } + } + + self.records + .retain(|record| !to_deletes.contains(&record.id)); self.header.length = (self.records.len() * SnapshotRecord::LENGTH) as u64; - Ok(()) } pub fn into_bytes(self) -> Result { @@ -327,11 +339,12 @@ impl Snapshot { #[cfg(test)] mod tests { + use super::*; #[test] fn test_snapshot_header() { - let header = SnapshotHeader::new(257); + let header = SnapshotHeader::new(); let mut vec = vec![0u8; SnapshotHeader::LENGTH]; let mut writer = vec.as_mut_slice(); header.write_to(&mut writer).unwrap(); @@ -344,7 +357,7 @@ mod tests { magic: SnapshotHeader::MAGIC, version: 1, flag: 0, - length: 257 + length: 0 }, header ); diff --git a/src/metric_engine/src/manifest/mod.rs b/src/metric_engine/src/manifest/mod.rs index 3489172e17..a646c32956 100644 --- a/src/metric_engine/src/manifest/mod.rs +++ b/src/metric_engine/src/manifest/mod.rs @@ -18,10 +18,10 @@ mod encoding; use std::{ sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, LazyLock, }, - time::Duration, + time::{Duration, SystemTime}, }; use anyhow::Context; @@ -29,18 +29,19 @@ use async_scoped::TokioScope; use bytes::Bytes; pub use encoding::{ManifestUpdate, Snapshot}; use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; use object_store::{path::Path, PutPayload}; use prost::Message; use tokio::sync::{ mpsc::{self, Receiver, Sender}, RwLock, }; -use tracing::error; -use uuid::Uuid; +use tracing::{debug, error, info, trace}; use crate::{ + config::ManifestConfig, sst::{FileId, FileMeta, SstFile}, - types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange}, + types::{ObjectStoreRef, RuntimeRef, TimeRange}, AnyhowError, Result, }; @@ -48,6 +49,19 @@ pub const PREFIX_PATH: &str = "manifest"; pub const SNAPSHOT_FILENAME: &str = "snapshot"; pub const DELTA_PREFIX: &str = "delta"; +// Used for manifest delta filename +// This number mustn't go backwards on restarts, otherwise file id +// collisions are possible. So don't change time on the server +// between server restarts. +static NEXT_ID: LazyLock = LazyLock::new(|| { + AtomicU64::new( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64, + ) +}); + pub type ManifestRef = Arc; pub struct Manifest { @@ -63,7 +77,7 @@ impl Manifest { root_dir: String, store: ObjectStoreRef, runtime: RuntimeRef, - merge_options: ManifestMergeOptions, + merge_options: ManifestConfig, ) -> Result { let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}")); let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}")); @@ -75,7 +89,13 @@ impl Manifest { merge_options, ) .await?; - + let snapshot = read_snapshot(&store, &snapshot_path).await?; + let ssts = snapshot.into_ssts(); + debug!( + sst_len = ssts.len(), + first_100 = ?ssts.iter().take(100), + "Load manifest snapshot when startup" + ); { let merger = merger.clone(); // Start merger in background @@ -84,9 +104,6 @@ impl Manifest { }); } - let snapshot = read_snapshot(&store, &snapshot_path).await?; - let ssts = snapshot.into_ssts(); - Ok(Self { delta_dir, store, @@ -102,7 +119,17 @@ impl Manifest { pub async fn update(&self, update: ManifestUpdate) -> Result<()> { self.merger.maybe_schedule_merge().await?; - let path = Path::from(format!("{}/{}", self.delta_dir, Uuid::new_v4())); + self.merger.inc_delta_num(); + let res = self.update_inner(update).await; + if res.is_err() { + self.merger.dec_delta_num(); + } + + res + } + + pub async fn update_inner(&self, update: ManifestUpdate) -> Result<()> { + let path = Path::from(format!("{}/{}", self.delta_dir, Self::allocate_id())); let pb_update = pb_types::ManifestUpdate::from(update.clone()); let mut buf: Vec = Vec::with_capacity(pb_update.encoded_len()); pb_update @@ -126,9 +153,6 @@ impl Manifest { ssts.retain(|file| !update.to_deletes.contains(&file.id())); } - // 3. Update delta files num - self.merger.add_delta_num(); - Ok(()) } @@ -146,6 +170,10 @@ impl Manifest { .cloned() .collect() } + + fn allocate_id() -> u64 { + NEXT_ID.fetch_add(1, Ordering::SeqCst) + } } enum MergeType { @@ -160,7 +188,7 @@ struct ManifestMerger { sender: Sender, receiver: RwLock>, deltas_num: AtomicUsize, - merge_options: ManifestMergeOptions, + merge_options: ManifestConfig, } impl ManifestMerger { @@ -168,7 +196,7 @@ impl ManifestMerger { snapshot_path: Path, delta_dir: Path, store: ObjectStoreRef, - merge_options: ManifestMergeOptions, + merge_options: ManifestConfig, ) -> Result> { let (tx, rx) = mpsc::channel(merge_options.channel_size); let merger = Self { @@ -177,11 +205,12 @@ impl ManifestMerger { store, sender: tx, receiver: RwLock::new(rx), + // Init this to 0, because we will merge all delta files when startup. deltas_num: AtomicUsize::new(0), merge_options, }; // Merge all delta files when startup - merger.do_merge().await?; + merger.do_merge(true /* first_run */).await?; Ok(Arc::new(merger)) } @@ -189,18 +218,19 @@ impl ManifestMerger { async fn run(&self) { let merge_interval = Duration::from_secs(self.merge_options.merge_interval_seconds as u64); let mut receiver = self.receiver.write().await; + info!(merge_interval = ?merge_interval, "Start manifest merge background job"); loop { tokio::select! { _ = tokio::time::sleep(merge_interval) => { if self.deltas_num.load(Ordering::Relaxed) > self.merge_options.min_merge_threshold { - if let Err(err) = self.do_merge().await { + if let Err(err) = self.do_merge(false /*first_run*/).await { error!("Failed to merge delta, err:{err}"); } } } _merge_type = receiver.recv() => { if self.deltas_num.load(Ordering::Relaxed) > self.merge_options.min_merge_threshold { - if let Err(err) = self.do_merge().await { + if let Err(err) = self.do_merge(false /*first_run*/).await { error!("Failed to merge delta, err:{err}"); } } @@ -211,17 +241,17 @@ impl ManifestMerger { fn schedule_merge(&self, task: MergeType) { if let Err(err) = self.sender.try_send(task) { - error!("Failed to send merge task, err:{err}"); + trace!("Failed to send merge task, err:{err}"); } } async fn maybe_schedule_merge(&self) -> Result<()> { let current_num = self.deltas_num.load(Ordering::Relaxed); - - if current_num > self.merge_options.hard_merge_threshold { + let hard_limit = self.merge_options.hard_merge_threshold; + if current_num > hard_limit { self.schedule_merge(MergeType::Hard); return Err(AnyhowError::msg(format!( - "Manifest has too many delta files, value:{current_num}" + "Manifest has too many delta files, value:{current_num}, hard_limit:{hard_limit}" )) .into()); } else if current_num > self.merge_options.soft_merge_threshold { @@ -231,15 +261,24 @@ impl ManifestMerger { Ok(()) } - fn add_delta_num(&self) { - self.deltas_num.fetch_add(1, Ordering::Relaxed); + fn inc_delta_num(&self) { + let prev = self.deltas_num.fetch_add(1, Ordering::Relaxed); + trace!(prev, "inc delta num"); + } + + fn dec_delta_num(&self) { + let prev = self.deltas_num.fetch_sub(1, Ordering::Relaxed); + trace!(prev, "dec delta num"); } - async fn do_merge(&self) -> Result<()> { + async fn do_merge(&self, first_run: bool) -> Result<()> { let paths = list_delta_paths(&self.store, &self.delta_dir).await?; if paths.is_empty() { return Ok(()); } + if first_run { + self.deltas_num.store(paths.len(), Ordering::Relaxed); + } let (_, results) = TokioScope::scope_and_block(|scope| { for path in &paths { @@ -248,10 +287,17 @@ impl ManifestMerger { }); let mut snapshot = read_snapshot(&self.store, &self.snapshot_path).await?; + trace!(sst_ids = ?snapshot.records.iter().map(|r| r.id()).collect_vec(), "Before snapshot merge deltas"); + // Since the deltas is unsorted, so we have to first add all new files, then + // delete old files. + let mut to_deletes = Vec::new(); for res in results { let manifest_update = res.context("Failed to join read delta files task")??; - snapshot.merge_update(manifest_update)?; + snapshot.add_records(manifest_update.to_adds); + to_deletes.extend(manifest_update.to_deletes); } + snapshot.delete_records(to_deletes); + trace!(sst_ids = ?snapshot.records.iter().map(|r| r.id()).collect_vec(), "After snapshot merge deltas"); let snapshot_bytes = snapshot.into_bytes()?; let put_payload = PutPayload::from_bytes(snapshot_bytes); // 1. Persist the snapshot @@ -263,6 +309,7 @@ impl ManifestMerger { // 2. Delete the merged manifest files let (_, results) = TokioScope::scope_and_block(|scope| { for path in &paths { + trace!(path = ?path, "delete delta file"); scope.spawn(async { delete_delta_file(&self.store, path).await }); } }); @@ -276,7 +323,7 @@ impl ManifestMerger { if let Err(e) = v { error!("Failed to delete delta, err:{e}") } else { - self.deltas_num.fetch_sub(1, Ordering::Relaxed); + self.dec_delta_num(); } } } @@ -367,7 +414,7 @@ mod tests { root_dir.path().to_string_lossy().to_string(), store, runtime.clone(), - ManifestMergeOptions::default(), + ManifestConfig::default(), ) .await .unwrap(); @@ -424,7 +471,7 @@ mod tests { root_dir, store.clone(), runtime.clone(), - ManifestMergeOptions { + ManifestConfig { merge_interval_seconds: 1, ..Default::default() }, diff --git a/src/metric_engine/src/read.rs b/src/metric_engine/src/read.rs index 1a5abc48e9..1dea5c6ca4 100644 --- a/src/metric_engine/src/read.rs +++ b/src/metric_engine/src/read.rs @@ -51,13 +51,13 @@ use datafusion::{ use futures::{Stream, StreamExt}; use itertools::Itertools; use parquet::arrow::async_reader::ParquetObjectReader; -use tracing::debug; use crate::{ compare_primitive_columns, + config::UpdateMode, operator::{BytesMergeOperator, LastValueOperator, MergeOperator, MergeOperatorRef}, sst::{SstFile, SstPathGenerator}, - types::{ObjectStoreRef, StorageSchema, UpdateMode, SEQ_COLUMN_NAME}, + types::{ObjectStoreRef, StorageSchema, SEQ_COLUMN_NAME}, Result, }; @@ -266,8 +266,6 @@ impl MergeStream { return Ok(None); } - debug!(pending_batch = ?self.pending_batch, "Merge batch"); - // Group rows with the same primary keys let mut groupby_pk_batches = Vec::new(); let mut start_idx = 0; @@ -280,7 +278,6 @@ impl MergeStream { } groupby_pk_batches.push(batch.slice(start_idx, end_idx - start_idx)); start_idx = end_idx; - debug!(end_idx = end_idx, "Group rows with the same primary keys"); } let rows_with_same_primary_keys = &groupby_pk_batches[0]; @@ -443,8 +440,6 @@ impl ParquetReader { // when convert between arrow and parquet. let parquet_exec = builder.build(); let sort_exec = SortPreservingMergeExec::new(sort_exprs, Arc::new(parquet_exec)) - // TODO: make fetch size configurable. - .with_fetch(Some(1024)) .with_round_robin_repartition(true); let merge_exec = MergeExec::new( @@ -571,7 +566,7 @@ mod tests { .indent(true); assert_eq!( r#"MergeExec: [primary_keys: 1, seq_idx: 2] - SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC], fetch=1024 + SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC] ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]] "#, format!("{display_plan}") diff --git a/src/metric_engine/src/sst.rs b/src/metric_engine/src/sst.rs index 070a62ed27..e756c1e1c3 100644 --- a/src/metric_engine/src/sst.rs +++ b/src/metric_engine/src/sst.rs @@ -16,6 +16,7 @@ // under the License. use std::{ + fmt, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, LazyLock, @@ -31,13 +32,36 @@ use crate::{ const PREFIX_PATH: &str = "data"; +// Used for sst file id allocation. +// This number mustn't go backwards on restarts, otherwise file id +// collisions are possible. So don't change time on the server +// between server restarts. +static NEXT_ID: LazyLock = LazyLock::new(|| { + AtomicU64::new( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64, + ) +}); + pub type FileId = u64; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct SstFile { inner: Arc, } +impl fmt::Debug for SstFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SstFile") + .field("id", &self.id()) + .field("meta", &self.meta()) + .field("in_compaction", &self.is_compaction()) + .finish() + } +} + #[derive(Debug)] struct Inner { id: FileId, @@ -92,6 +116,10 @@ impl SstFile { pub fn size(&self) -> u32 { self.meta().size } + + pub fn allocate_id() -> FileId { + NEXT_ID.fetch_add(1, Ordering::SeqCst) + } } impl TryFrom for SstFile { @@ -161,23 +189,6 @@ impl From for pb_types::SstMeta { } } -// Used for sst file id allocation. -// This number mustn't go backwards on restarts, otherwise file id -// collisions are possible. So don't change time on the server -// between server restarts. -static NEXT_ID: LazyLock = LazyLock::new(|| { - AtomicU64::new( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_nanos() as u64, - ) -}); - -pub fn allocate_id() -> u64 { - NEXT_ID.fetch_add(1, Ordering::SeqCst) -} - #[derive(Debug, Clone)] pub struct SstPathGenerator { prefix: String, diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs index 72107abd43..b86ad80382 100644 --- a/src/metric_engine/src/storage.rs +++ b/src/metric_engine/src/storage.rs @@ -49,15 +49,13 @@ use parquet::{ use tokio::runtime::Runtime; use crate::{ - compaction::{CompactionScheduler, SchedulerConfig}, + compaction::CompactionScheduler, + config::{StorageConfig, WriteConfig}, ensure, manifest::{Manifest, ManifestRef}, read::ParquetReader, - sst::{allocate_id, FileMeta, SstPathGenerator}, - types::{ - ObjectStoreRef, StorageOptions, StorageSchema, TimeRange, WriteOptions, WriteResult, - SEQ_COLUMN_NAME, - }, + sst::{FileMeta, SstFile, SstPathGenerator}, + types::{ObjectStoreRef, StorageSchema, TimeRange, WriteResult, SEQ_COLUMN_NAME}, Result, }; @@ -146,7 +144,7 @@ impl CloudObjectStorage { store: ObjectStoreRef, arrow_schema: SchemaRef, num_primary_keys: usize, - storage_opts: StorageOptions, + storage_opts: StorageConfig, runtimes: StorageRuntimes, ) -> Result { let schema = { @@ -177,18 +175,17 @@ impl CloudObjectStorage { path.clone(), store.clone(), runtimes.manifest_compact_runtime.clone(), - storage_opts.manifest_merge_opts, + storage_opts.manifest, ) .await?; let manifest = Arc::new(manifest); - let write_props = Self::build_write_props(storage_opts.write_opts, num_primary_keys); + let write_props = Self::build_write_props(storage_opts.write, num_primary_keys); let sst_path_gen = Arc::new(SstPathGenerator::new(path.clone())); let parquet_reader = Arc::new(ParquetReader::new( store.clone(), schema.clone(), sst_path_gen.clone(), )); - let compact_scheduler = CompactionScheduler::new( runtimes.sst_compact_runtime.clone(), manifest.clone(), @@ -197,10 +194,8 @@ impl CloudObjectStorage { segment_duration, sst_path_gen.clone(), parquet_reader.clone(), - SchedulerConfig { - write_props: write_props.clone(), - ..Default::default() - }, + storage_opts.scheduler, + write_props.clone(), ); Ok(Self { path, @@ -217,7 +212,7 @@ impl CloudObjectStorage { } async fn write_batch(&self, batch: RecordBatch) -> Result { - let file_id = allocate_id(); + let file_id = SstFile::allocate_id(); let file_path = self.sst_path_gen.generate(file_id); let file_path = Path::from(file_path); let object_store_writer = ParquetObjectWriter::new(self.store.clone(), file_path.clone()); @@ -290,7 +285,7 @@ impl CloudObjectStorage { Ok(res) } - fn build_write_props(write_options: WriteOptions, num_primary_key: usize) -> WriterProperties { + fn build_write_props(write_options: WriteConfig, num_primary_key: usize) -> WriterProperties { let sorting_columns = write_options.enable_sorting_columns.then(|| { (0..num_primary_key) .map(|i| { @@ -305,8 +300,8 @@ impl CloudObjectStorage { .set_sorting_columns(sorting_columns) .set_dictionary_enabled(write_options.enable_dict) .set_bloom_filter_enabled(write_options.enable_bloom_filter) - .set_encoding(write_options.encoding) - .set_compression(write_options.compression); + .set_encoding(write_options.encoding.into()) + .set_compression(write_options.compression.into()); if write_options.column_options.is_none() { return builder.build(); @@ -322,10 +317,10 @@ impl CloudObjectStorage { builder.set_column_bloom_filter_enabled(col_path.clone(), enable_bloom_filter); } if let Some(encoding) = col_opt.encoding { - builder = builder.set_column_encoding(col_path.clone(), encoding); + builder = builder.set_column_encoding(col_path.clone(), encoding.into()); } if let Some(compression) = col_opt.compression { - builder = builder.set_column_compression(col_path, compression); + builder = builder.set_column_compression(col_path, compression.into()); } } @@ -433,7 +428,7 @@ mod tests { store, schema.clone(), 2, // num_primary_keys - StorageOptions::default(), + StorageConfig::default(), runtimes, ) .await @@ -509,7 +504,7 @@ mod tests { store, schema.clone(), 1, - StorageOptions::default(), + StorageConfig::default(), runtimes, ) .await diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs index 107a363a22..3cd70764c8 100644 --- a/src/metric_engine/src/types.rs +++ b/src/metric_engine/src/types.rs @@ -16,7 +16,7 @@ // under the License. use std::{ - collections::HashMap, + fmt, ops::{Add, Deref, Range}, sync::Arc, time::Duration, @@ -24,10 +24,9 @@ use std::{ use arrow_schema::SchemaRef; use object_store::ObjectStore; -use parquet::basic::{Compression, Encoding, ZstdLevel}; use tokio::runtime::Runtime; -use crate::sst::FileId; +use crate::{config::UpdateMode, sst::FileId}; // Seq column is a builtin column, and it will be appended to the end of // user-defined schema. @@ -78,9 +77,15 @@ impl Timestamp { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq)] pub struct TimeRange(Range); +impl fmt::Debug for TimeRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[{}, {})", self.0.start.0, self.0.end.0) + } +} + impl From> for TimeRange { fn from(value: Range) -> Self { Self(value) @@ -127,78 +132,6 @@ pub struct WriteResult { pub size: usize, } -#[derive(Debug)] -pub struct ColumnOptions { - pub enable_dict: Option, - pub enable_bloom_filter: Option, - pub encoding: Option, - pub compression: Option, -} - -#[derive(Debug)] -pub struct WriteOptions { - pub max_row_group_size: usize, - pub write_bacth_size: usize, - pub enable_sorting_columns: bool, - // use to set column props with default value - pub enable_dict: bool, - pub enable_bloom_filter: bool, - pub encoding: Encoding, - pub compression: Compression, - // use to set column props with column name - pub column_options: Option>, -} - -impl Default for WriteOptions { - fn default() -> Self { - Self { - max_row_group_size: 8192, - write_bacth_size: 1024, - enable_sorting_columns: true, - enable_dict: false, - enable_bloom_filter: false, - encoding: Encoding::PLAIN, - compression: Compression::ZSTD(ZstdLevel::default()), - column_options: None, - } - } -} - -#[derive(Debug)] -pub struct ManifestMergeOptions { - pub channel_size: usize, - pub merge_interval_seconds: usize, - pub min_merge_threshold: usize, - pub hard_merge_threshold: usize, - pub soft_merge_threshold: usize, -} - -impl Default for ManifestMergeOptions { - fn default() -> Self { - Self { - channel_size: 10, - merge_interval_seconds: 5, - min_merge_threshold: 10, - soft_merge_threshold: 50, - hard_merge_threshold: 90, - } - } -} - -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub enum UpdateMode { - #[default] - Overwrite, - Append, -} - -#[derive(Debug, Default)] -pub struct StorageOptions { - pub write_opts: WriteOptions, - pub manifest_merge_opts: ManifestMergeOptions, - pub update_mode: UpdateMode, -} - #[derive(Debug, Clone)] pub struct StorageSchema { pub arrow_schema: SchemaRef, diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml index 19acaa23e4..94617fa98c 100644 --- a/src/server/Cargo.toml +++ b/src/server/Cargo.toml @@ -43,4 +43,4 @@ serde = { workspace = true } tokio = { workspace = true } toml = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true } +tracing-subscriber = { workspace = true, features = ["local-time", "env-filter"] } diff --git a/src/server/src/config.rs b/src/server/src/config.rs index 41f39d2555..57ebc18b3e 100644 --- a/src/server/src/config.rs +++ b/src/server/src/config.rs @@ -18,11 +18,11 @@ use common::ReadableDuration; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(default)] +#[derive(Debug, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] pub struct Config { pub port: u16, - pub write_worker_num: usize, // for test + pub test: TestConfig, // for test pub metric_engine: MetricEngineConfig, } @@ -30,57 +30,71 @@ impl Default for Config { fn default() -> Self { Self { port: 5000, - write_worker_num: 4, + test: TestConfig::default(), metric_engine: MetricEngineConfig::default(), } } } -#[derive(Default, Debug, Clone, Deserialize, Serialize)] -#[serde(default)] -pub struct MetricEngineConfig { - pub manifest: ManifestConfig, - pub sst: SstConfig, - pub storage: StorageConfig, -} - #[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(default)] -pub struct ManifestConfig { - pub background_thread_num: usize, +#[serde(default, deny_unknown_fields)] +pub struct TestConfig { + pub enable_write: bool, + pub write_worker_num: usize, + pub segment_duration: ReadableDuration, + pub write_interval: ReadableDuration, } -impl Default for ManifestConfig { +impl Default for TestConfig { fn default() -> Self { Self { - background_thread_num: 2, + enable_write: true, + write_worker_num: 1, + segment_duration: ReadableDuration::hours(12), + write_interval: ReadableDuration::millis(500), } } } +#[derive(Default, Debug, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct MetricEngineConfig { + pub threads: ThreadConfig, + pub storage: StorageConfig, +} + #[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(default)] -pub struct SstConfig { - pub background_thread_num: usize, +#[serde(default, deny_unknown_fields)] +pub struct ThreadConfig { + pub manifest_thread_num: usize, + pub sst_thread_num: usize, } -impl Default for SstConfig { +impl Default for ThreadConfig { fn default() -> Self { Self { - background_thread_num: 2, + manifest_thread_num: 2, + sst_thread_num: 2, } } } +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct StorageConfig { + pub object_store: ObjectStorageConfig, + pub time_merge_storage: metric_engine::config::StorageConfig, +} + #[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(tag = "type")] +#[serde(tag = "type", deny_unknown_fields)] #[allow(clippy::large_enum_variant)] -pub enum StorageConfig { +pub enum ObjectStorageConfig { Local(LocalStorageConfig), S3Like(S3LikeStorageConfig), } -impl Default for StorageConfig { +impl Default for ObjectStorageConfig { fn default() -> Self { Self::Local(LocalStorageConfig::default()) } @@ -100,6 +114,7 @@ impl Default for LocalStorageConfig { } #[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] pub struct S3LikeStorageConfig { pub region: String, pub key_id: String, @@ -116,6 +131,7 @@ pub struct S3LikeStorageConfig { } #[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] pub struct HttpOptions { pub pool_max_idle_per_host: usize, pub timeout: ReadableDuration, diff --git a/src/server/src/main.rs b/src/server/src/main.rs index 0644f5a50c..055aa4a0d1 100644 --- a/src/server/src/main.rs +++ b/src/server/src/main.rs @@ -17,7 +17,15 @@ #![feature(duration_constructors)] mod config; -use std::{fs, iter::repeat_with, sync::Arc, time::Duration}; +use std::{ + fs, + iter::repeat_with, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use actix_web::{ get, @@ -29,15 +37,16 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, }; use clap::Parser; -use config::{Config, StorageConfig}; +use config::{Config, ObjectStorageConfig}; use metric_engine::{ storage::{ CloudObjectStorage, CompactRequest, StorageRuntimes, TimeMergeStorageRef, WriteRequest, }, - types::{RuntimeRef, StorageOptions}, + types::RuntimeRef, }; use object_store::local::LocalFileSystem; use tracing::{error, info}; +use tracing_subscriber::EnvFilter; #[derive(Parser, Debug)] #[command(version, about, long_about)] @@ -52,6 +61,16 @@ async fn hello() -> impl Responder { HttpResponse::Ok().body("Hello world!") } +#[get("/toggle")] +async fn toggle(data: web::Data) -> impl Responder { + let prev = data.keep_writing.fetch_not(Ordering::Relaxed); + if prev { + HttpResponse::Ok().body("Stop!") + } else { + HttpResponse::Ok().body("Start write again!") + } +} + #[get("/compact")] async fn compact(data: web::Data) -> impl Responder { if let Err(e) = data.storage.compact(CompactRequest::default()).await { @@ -62,59 +81,80 @@ async fn compact(data: web::Data) -> impl Responder { struct AppState { storage: TimeMergeStorageRef, + keep_writing: Arc, } pub fn main() { - // install global collector configured based on RUST_LOG env var. - tracing_subscriber::fmt::init(); + tracing_subscriber::fmt() + .with_file(true) + .with_line_number(true) + .with_target(false) + .with_env_filter(EnvFilter::from_default_env()) + .with_timer(tracing_subscriber::fmt::time::LocalTime::rfc_3339()) + .init(); let args = Args::parse(); let config_body = fs::read_to_string(args.config).expect("read config file failed"); let config: Config = toml::from_str(&config_body).unwrap(); - info!(config = ?config, "Config loaded"); + info!("Config loaded: \n{:#?}", config); let port = config.port; let rt = build_multi_runtime("main", 1); let manifest_compact_runtime = build_multi_runtime( "manifest-compact", - config.metric_engine.manifest.background_thread_num, - ); - let sst_compact_runtime = build_multi_runtime( - "sst-compact", - config.metric_engine.sst.background_thread_num, + config.metric_engine.threads.manifest_thread_num, ); + let sst_compact_runtime = + build_multi_runtime("sst-compact", config.metric_engine.threads.sst_thread_num); let runtimes = StorageRuntimes::new(manifest_compact_runtime, sst_compact_runtime); - let storage_config = match config.metric_engine.storage { - StorageConfig::Local(v) => v, - StorageConfig::S3Like(_) => panic!("S3 not support yet"), + let object_store_config = match config.metric_engine.storage.object_store { + ObjectStorageConfig::Local(v) => v, + ObjectStorageConfig::S3Like(_) => panic!("S3 not support yet"), }; - let write_worker_num = config.write_worker_num; + let time_merge_storage_config = config.metric_engine.storage.time_merge_storage; + let write_worker_num = config.test.write_worker_num; + let write_interval = config.test.write_interval.0; + let segment_duration = config.test.segment_duration.0; + let enable_write = config.test.enable_write; let write_rt = build_multi_runtime("write", write_worker_num); + let keep_writing = Arc::new(AtomicBool::new(true)); let _ = rt.block_on(async move { let store = Arc::new(LocalFileSystem::new()); let storage = Arc::new( CloudObjectStorage::try_new( - storage_config.data_dir, - Duration::from_mins(10), + object_store_config.data_dir, + segment_duration, store, build_schema(), 3, - StorageOptions::default(), + time_merge_storage_config, runtimes, ) .await .unwrap(), ); - bench_write(write_rt.clone(), write_worker_num, storage.clone()); + if enable_write { + bench_write( + storage.clone(), + write_rt.clone(), + write_worker_num, + write_interval, + keep_writing.clone(), + ); + } - let app_state = Data::new(AppState { storage }); + let app_state = Data::new(AppState { + storage, + keep_writing, + }); info!(port, "Start HoraeDB http server..."); HttpServer::new(move || { App::new() .app_data(app_state.clone()) .service(hello) .service(compact) + .service(toggle) }) .workers(4) .bind(("127.0.0.1", port)) @@ -144,7 +184,13 @@ fn build_schema() -> SchemaRef { ])) } -fn bench_write(rt: RuntimeRef, workers: usize, storage: TimeMergeStorageRef) { +fn bench_write( + storage: TimeMergeStorageRef, + rt: RuntimeRef, + workers: usize, + interval: Duration, + keep_writing: Arc, +) { let schema = Arc::new(Schema::new(vec![ Field::new("pk1", DataType::Int64, true), Field::new("pk2", DataType::Int64, true), @@ -154,8 +200,13 @@ fn bench_write(rt: RuntimeRef, workers: usize, storage: TimeMergeStorageRef) { for _ in 0..workers { let storage = storage.clone(); let schema = schema.clone(); + let keep_writing = keep_writing.clone(); rt.spawn(async move { loop { + tokio::time::sleep(interval).await; + if !keep_writing.load(Ordering::Relaxed) { + continue; + } let pk1: Int64Array = repeat_with(rand::random::).take(1000).collect(); let pk2: Int64Array = repeat_with(rand::random::).take(1000).collect(); let pk3: Int64Array = repeat_with(rand::random::).take(1000).collect();