diff --git a/Cargo.lock b/Cargo.lock index b1de9d06d6..8cace345bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2174,6 +2174,7 @@ dependencies = [ "parquet", "pb_types", "prost", + "serde", "temp-dir", "test-log", "thiserror", diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml index ba6d2daae4..4286610366 100644 --- a/src/metric_engine/Cargo.toml +++ b/src/metric_engine/Cargo.toml @@ -48,6 +48,7 @@ object_store = { workspace = true } parquet = { workspace = true, features = ["object_store"] } pb_types = { workspace = true } prost = { workspace = true } +serde = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/src/metric_engine/src/compaction/picker.rs b/src/metric_engine/src/compaction/picker.rs index 58337ec7cc..79c91ce8a6 100644 --- a/src/metric_engine/src/compaction/picker.rs +++ b/src/metric_engine/src/compaction/picker.rs @@ -51,8 +51,8 @@ impl Picker { /// This function picks a candidate for compaction. /// Note: It can only execute sequentially, otherwise a SST may be picked by - /// multiple threads. - pub async fn pick_candidate(&self) -> Option { + /// multiple threads(that's why it take a mutable self). + pub async fn pick_candidate(&mut self) -> Option { let ssts = self.manifest.all_ssts().await; let expire_time = self.ttl.map(|ttl| (now() - ttl.as_micros() as i64).into()); self.strategy.pick_candidate(ssts, expire_time) diff --git a/src/metric_engine/src/compaction/scheduler.rs b/src/metric_engine/src/compaction/scheduler.rs index 25f994fe79..d947096e93 100644 --- a/src/metric_engine/src/compaction/scheduler.rs +++ b/src/metric_engine/src/compaction/scheduler.rs @@ -18,6 +18,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::Context; +use parquet::file::properties::WriterProperties; use tokio::{ sync::mpsc::{self, Receiver, Sender}, task::JoinHandle, @@ -56,13 +57,13 @@ impl Scheduler { sst_path_gen: Arc, parquet_reader: Arc, config: SchedulerConfig, + write_props: WriterProperties, ) -> Self { let (task_tx, task_rx) = mpsc::channel(config.max_pending_compaction_tasks); let (trigger_tx, trigger_rx) = mpsc::channel::<()>(1); let task_handle = { let store = store.clone(); let manifest = manifest.clone(); - let write_props = config.write_props.clone(); let trigger_tx = trigger_tx.clone(); let executor = Executor::new( runtime.clone(), @@ -121,7 +122,7 @@ impl Scheduler { async fn generate_task_loop( task_tx: Sender, mut trigger_rx: Receiver<()>, - picker: Picker, + mut picker: Picker, schedule_interval: Duration, ) { info!( @@ -130,7 +131,7 @@ impl Scheduler { ); let send_task = |task| { if let Err(e) = task_tx.try_send(task) { - warn!("Send task failed, err:{e}"); + warn!("Send task failed, err:{e:?}"); } }; diff --git a/src/metric_engine/src/config.rs b/src/metric_engine/src/config.rs index 5b2d7497f7..819e45e668 100644 --- a/src/metric_engine/src/config.rs +++ b/src/metric_engine/src/config.rs @@ -17,20 +17,16 @@ use std::{collections::HashMap, time::Duration}; -use parquet::{ - basic::{Compression, Encoding, ZstdLevel}, - file::properties::WriterProperties, -}; +use parquet::basic::{Compression, Encoding, ZstdLevel}; +use serde::{Deserialize, Serialize}; -use crate::types::UpdateMode; - -#[derive(Clone)] +#[derive(Debug, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] pub struct SchedulerConfig { pub schedule_interval: Duration, pub max_pending_compaction_tasks: usize, // Runner config pub memory_limit: u64, - pub write_props: WriterProperties, // Picker config pub ttl: Option, pub new_sst_max_size: u64, @@ -43,8 +39,7 @@ impl Default for SchedulerConfig { Self { schedule_interval: Duration::from_secs(10), max_pending_compaction_tasks: 10, - memory_limit: bytesize::gb(3_u64), - write_props: WriterProperties::default(), + memory_limit: bytesize::gb(20_u64), ttl: None, new_sst_max_size: bytesize::gb(1_u64), input_sst_max_num: 30, @@ -53,29 +48,75 @@ impl Default for SchedulerConfig { } } -#[derive(Debug)] +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub enum ParquetEncoding { + #[default] + Plain, + Rle, + DeltaBinaryPacked, + DeltaLengthByteArray, + DeltaByteArray, + RleDictionary, +} + +impl From for Encoding { + fn from(value: ParquetEncoding) -> Self { + match value { + ParquetEncoding::Plain => Encoding::PLAIN, + ParquetEncoding::Rle => Encoding::RLE, + ParquetEncoding::DeltaBinaryPacked => Encoding::DELTA_BINARY_PACKED, + ParquetEncoding::DeltaLengthByteArray => Encoding::DELTA_LENGTH_BYTE_ARRAY, + ParquetEncoding::DeltaByteArray => Encoding::DELTA_BYTE_ARRAY, + ParquetEncoding::RleDictionary => Encoding::RLE_DICTIONARY, + } + } +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub enum ParquetCompression { + #[default] + Uncompressed, + Snappy, + Zstd, +} + +impl From for Compression { + fn from(value: ParquetCompression) -> Self { + match value { + ParquetCompression::Uncompressed => Compression::UNCOMPRESSED, + ParquetCompression::Snappy => Compression::SNAPPY, + ParquetCompression::Zstd => Compression::ZSTD(ZstdLevel::default()), + } + } +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] pub struct ColumnOptions { pub enable_dict: Option, pub enable_bloom_filter: Option, - pub encoding: Option, - pub compression: Option, + pub encoding: Option, + pub compression: Option, } -#[derive(Debug)] -pub struct WriteOptions { +#[derive(Debug, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct WriteConfig { pub max_row_group_size: usize, pub write_bacth_size: usize, pub enable_sorting_columns: bool, // use to set column props with default value pub enable_dict: bool, pub enable_bloom_filter: bool, - pub encoding: Encoding, - pub compression: Compression, + pub encoding: ParquetEncoding, + pub compression: ParquetCompression, // use to set column props with column name pub column_options: Option>, } -impl Default for WriteOptions { +impl Default for WriteConfig { fn default() -> Self { Self { max_row_group_size: 8192, @@ -83,15 +124,16 @@ impl Default for WriteOptions { enable_sorting_columns: true, enable_dict: false, enable_bloom_filter: false, - encoding: Encoding::PLAIN, - compression: Compression::ZSTD(ZstdLevel::default()), + encoding: ParquetEncoding::Plain, + compression: ParquetCompression::Snappy, column_options: None, } } } -#[derive(Debug)] -pub struct ManifestMergeOptions { +#[derive(Debug, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct ManifestConfig { pub channel_size: usize, pub merge_interval_seconds: usize, pub min_merge_threshold: usize, @@ -99,7 +141,7 @@ pub struct ManifestMergeOptions { pub soft_merge_threshold: usize, } -impl Default for ManifestMergeOptions { +impl Default for ManifestConfig { fn default() -> Self { Self { channel_size: 3, @@ -111,9 +153,19 @@ impl Default for ManifestMergeOptions { } } -#[derive(Debug, Default)] -pub struct StorageOptions { - pub write_opts: WriteOptions, - pub manifest_merge_opts: ManifestMergeOptions, +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(default, deny_unknown_fields)] +pub struct StorageConfig { + pub write: WriteConfig, + pub manifest: ManifestConfig, + pub scheduler: SchedulerConfig, pub update_mode: UpdateMode, } + +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub enum UpdateMode { + #[default] + Overwrite, + Append, +} diff --git a/src/metric_engine/src/manifest/mod.rs b/src/metric_engine/src/manifest/mod.rs index d2e6ebc75c..a646c32956 100644 --- a/src/metric_engine/src/manifest/mod.rs +++ b/src/metric_engine/src/manifest/mod.rs @@ -39,7 +39,7 @@ use tokio::sync::{ use tracing::{debug, error, info, trace}; use crate::{ - config::ManifestMergeOptions, + config::ManifestConfig, sst::{FileId, FileMeta, SstFile}, types::{ObjectStoreRef, RuntimeRef, TimeRange}, AnyhowError, Result, @@ -77,7 +77,7 @@ impl Manifest { root_dir: String, store: ObjectStoreRef, runtime: RuntimeRef, - merge_options: ManifestMergeOptions, + merge_options: ManifestConfig, ) -> Result { let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}")); let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}")); @@ -188,7 +188,7 @@ struct ManifestMerger { sender: Sender, receiver: RwLock>, deltas_num: AtomicUsize, - merge_options: ManifestMergeOptions, + merge_options: ManifestConfig, } impl ManifestMerger { @@ -196,7 +196,7 @@ impl ManifestMerger { snapshot_path: Path, delta_dir: Path, store: ObjectStoreRef, - merge_options: ManifestMergeOptions, + merge_options: ManifestConfig, ) -> Result> { let (tx, rx) = mpsc::channel(merge_options.channel_size); let merger = Self { @@ -414,7 +414,7 @@ mod tests { root_dir.path().to_string_lossy().to_string(), store, runtime.clone(), - ManifestMergeOptions::default(), + ManifestConfig::default(), ) .await .unwrap(); @@ -471,7 +471,7 @@ mod tests { root_dir, store.clone(), runtime.clone(), - ManifestMergeOptions { + ManifestConfig { merge_interval_seconds: 1, ..Default::default() }, diff --git a/src/metric_engine/src/read.rs b/src/metric_engine/src/read.rs index 07d7de29a3..1dea5c6ca4 100644 --- a/src/metric_engine/src/read.rs +++ b/src/metric_engine/src/read.rs @@ -54,9 +54,10 @@ use parquet::arrow::async_reader::ParquetObjectReader; use crate::{ compare_primitive_columns, + config::UpdateMode, operator::{BytesMergeOperator, LastValueOperator, MergeOperator, MergeOperatorRef}, sst::{SstFile, SstPathGenerator}, - types::{ObjectStoreRef, StorageSchema, UpdateMode, SEQ_COLUMN_NAME}, + types::{ObjectStoreRef, StorageSchema, SEQ_COLUMN_NAME}, Result, }; diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs index a85da85033..b86ad80382 100644 --- a/src/metric_engine/src/storage.rs +++ b/src/metric_engine/src/storage.rs @@ -50,7 +50,7 @@ use tokio::runtime::Runtime; use crate::{ compaction::CompactionScheduler, - config::{SchedulerConfig, StorageOptions, WriteOptions}, + config::{StorageConfig, WriteConfig}, ensure, manifest::{Manifest, ManifestRef}, read::ParquetReader, @@ -144,7 +144,7 @@ impl CloudObjectStorage { store: ObjectStoreRef, arrow_schema: SchemaRef, num_primary_keys: usize, - storage_opts: StorageOptions, + storage_opts: StorageConfig, runtimes: StorageRuntimes, ) -> Result { let schema = { @@ -175,18 +175,17 @@ impl CloudObjectStorage { path.clone(), store.clone(), runtimes.manifest_compact_runtime.clone(), - storage_opts.manifest_merge_opts, + storage_opts.manifest, ) .await?; let manifest = Arc::new(manifest); - let write_props = Self::build_write_props(storage_opts.write_opts, num_primary_keys); + let write_props = Self::build_write_props(storage_opts.write, num_primary_keys); let sst_path_gen = Arc::new(SstPathGenerator::new(path.clone())); let parquet_reader = Arc::new(ParquetReader::new( store.clone(), schema.clone(), sst_path_gen.clone(), )); - let compact_scheduler = CompactionScheduler::new( runtimes.sst_compact_runtime.clone(), manifest.clone(), @@ -195,10 +194,8 @@ impl CloudObjectStorage { segment_duration, sst_path_gen.clone(), parquet_reader.clone(), - SchedulerConfig { - write_props: write_props.clone(), - ..Default::default() - }, + storage_opts.scheduler, + write_props.clone(), ); Ok(Self { path, @@ -288,7 +285,7 @@ impl CloudObjectStorage { Ok(res) } - fn build_write_props(write_options: WriteOptions, num_primary_key: usize) -> WriterProperties { + fn build_write_props(write_options: WriteConfig, num_primary_key: usize) -> WriterProperties { let sorting_columns = write_options.enable_sorting_columns.then(|| { (0..num_primary_key) .map(|i| { @@ -303,8 +300,8 @@ impl CloudObjectStorage { .set_sorting_columns(sorting_columns) .set_dictionary_enabled(write_options.enable_dict) .set_bloom_filter_enabled(write_options.enable_bloom_filter) - .set_encoding(write_options.encoding) - .set_compression(write_options.compression); + .set_encoding(write_options.encoding.into()) + .set_compression(write_options.compression.into()); if write_options.column_options.is_none() { return builder.build(); @@ -320,10 +317,10 @@ impl CloudObjectStorage { builder.set_column_bloom_filter_enabled(col_path.clone(), enable_bloom_filter); } if let Some(encoding) = col_opt.encoding { - builder = builder.set_column_encoding(col_path.clone(), encoding); + builder = builder.set_column_encoding(col_path.clone(), encoding.into()); } if let Some(compression) = col_opt.compression { - builder = builder.set_column_compression(col_path, compression); + builder = builder.set_column_compression(col_path, compression.into()); } } @@ -431,7 +428,7 @@ mod tests { store, schema.clone(), 2, // num_primary_keys - StorageOptions::default(), + StorageConfig::default(), runtimes, ) .await @@ -507,7 +504,7 @@ mod tests { store, schema.clone(), 1, - StorageOptions::default(), + StorageConfig::default(), runtimes, ) .await diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs index 9cbd735395..3cd70764c8 100644 --- a/src/metric_engine/src/types.rs +++ b/src/metric_engine/src/types.rs @@ -26,7 +26,7 @@ use arrow_schema::SchemaRef; use object_store::ObjectStore; use tokio::runtime::Runtime; -use crate::sst::FileId; +use crate::{config::UpdateMode, sst::FileId}; // Seq column is a builtin column, and it will be appended to the end of // user-defined schema. @@ -132,13 +132,6 @@ pub struct WriteResult { pub size: usize, } -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub enum UpdateMode { - #[default] - Overwrite, - Append, -} - #[derive(Debug, Clone)] pub struct StorageSchema { pub arrow_schema: SchemaRef, diff --git a/src/server/src/config.rs b/src/server/src/config.rs index fb63324672..062da9444a 100644 --- a/src/server/src/config.rs +++ b/src/server/src/config.rs @@ -18,7 +18,7 @@ use common::ReadableDuration; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize)] #[serde(default, deny_unknown_fields)] pub struct Config { pub port: u16, @@ -56,51 +56,51 @@ impl Default for TestConfig { } } -#[derive(Default, Debug, Clone, Deserialize, Serialize)] +#[derive(Default, Debug, Deserialize, Serialize)] #[serde(default, deny_unknown_fields)] pub struct MetricEngineConfig { - pub manifest: ManifestConfig, - pub sst: SstConfig, + pub threads: ThreadConfig, pub storage: StorageConfig, } #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(default, deny_unknown_fields)] -pub struct ManifestConfig { - pub background_thread_num: usize, +pub struct ThreadConfig { + #[serde(default = "default_thread_num")] + pub manifest_thread_num: usize, + #[serde(default = "default_thread_num")] + pub sst_thread_num: usize, +} + +fn default_thread_num() -> usize { + 2 } -impl Default for ManifestConfig { +impl Default for ThreadConfig { fn default() -> Self { Self { - background_thread_num: 2, + manifest_thread_num: 2, + sst_thread_num: 2, } } } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize)] #[serde(default, deny_unknown_fields)] -pub struct SstConfig { - pub background_thread_num: usize, -} - -impl Default for SstConfig { - fn default() -> Self { - Self { - background_thread_num: 2, - } - } +pub struct StorageConfig { + pub object_store: ObjectStorageConfig, + pub time_merge_storage: metric_engine::config::StorageConfig, } #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(tag = "type", deny_unknown_fields)] #[allow(clippy::large_enum_variant)] -pub enum StorageConfig { +pub enum ObjectStorageConfig { Local(LocalStorageConfig), S3Like(S3LikeStorageConfig), } -impl Default for StorageConfig { +impl Default for ObjectStorageConfig { fn default() -> Self { Self::Local(LocalStorageConfig::default()) } diff --git a/src/server/src/main.rs b/src/server/src/main.rs index 7abf3db225..055aa4a0d1 100644 --- a/src/server/src/main.rs +++ b/src/server/src/main.rs @@ -37,9 +37,8 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, }; use clap::Parser; -use config::{Config, StorageConfig}; +use config::{Config, ObjectStorageConfig}; use metric_engine::{ - config::StorageOptions, storage::{ CloudObjectStorage, CompactRequest, StorageRuntimes, TimeMergeStorageRef, WriteRequest, }, @@ -97,23 +96,22 @@ pub fn main() { let args = Args::parse(); let config_body = fs::read_to_string(args.config).expect("read config file failed"); let config: Config = toml::from_str(&config_body).unwrap(); - info!(config = ?config, "Config loaded"); + info!("Config loaded: \n{:#?}", config); let port = config.port; let rt = build_multi_runtime("main", 1); let manifest_compact_runtime = build_multi_runtime( "manifest-compact", - config.metric_engine.manifest.background_thread_num, - ); - let sst_compact_runtime = build_multi_runtime( - "sst-compact", - config.metric_engine.sst.background_thread_num, + config.metric_engine.threads.manifest_thread_num, ); + let sst_compact_runtime = + build_multi_runtime("sst-compact", config.metric_engine.threads.sst_thread_num); let runtimes = StorageRuntimes::new(manifest_compact_runtime, sst_compact_runtime); - let storage_config = match config.metric_engine.storage { - StorageConfig::Local(v) => v, - StorageConfig::S3Like(_) => panic!("S3 not support yet"), + let object_store_config = match config.metric_engine.storage.object_store { + ObjectStorageConfig::Local(v) => v, + ObjectStorageConfig::S3Like(_) => panic!("S3 not support yet"), }; + let time_merge_storage_config = config.metric_engine.storage.time_merge_storage; let write_worker_num = config.test.write_worker_num; let write_interval = config.test.write_interval.0; let segment_duration = config.test.segment_duration.0; @@ -124,12 +122,12 @@ pub fn main() { let store = Arc::new(LocalFileSystem::new()); let storage = Arc::new( CloudObjectStorage::try_new( - storage_config.data_dir, + object_store_config.data_dir, segment_duration, store, build_schema(), 3, - StorageOptions::default(), + time_merge_storage_config, runtimes, ) .await