Skip to content

Commit

Permalink
refactor config
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 23, 2024
1 parent 3f9e107 commit 0b085a5
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 97 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ object_store = { workspace = true }
parquet = { workspace = true, features = ["object_store"] }
pb_types = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions src/metric_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ impl Picker {

/// This function picks a candidate for compaction.
/// Note: It can only execute sequentially, otherwise a SST may be picked by
/// multiple threads.
pub async fn pick_candidate(&self) -> Option<Task> {
/// multiple threads(that's why it take a mutable self).
pub async fn pick_candidate(&mut self) -> Option<Task> {
let ssts = self.manifest.all_ssts().await;
let expire_time = self.ttl.map(|ttl| (now() - ttl.as_micros() as i64).into());
self.strategy.pick_candidate(ssts, expire_time)
Expand Down
7 changes: 4 additions & 3 deletions src/metric_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::{sync::Arc, time::Duration};

use anyhow::Context;
use parquet::file::properties::WriterProperties;
use tokio::{
sync::mpsc::{self, Receiver, Sender},
task::JoinHandle,
Expand Down Expand Up @@ -56,13 +57,13 @@ impl Scheduler {
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
config: SchedulerConfig,
write_props: WriterProperties,
) -> Self {
let (task_tx, task_rx) = mpsc::channel(config.max_pending_compaction_tasks);
let (trigger_tx, trigger_rx) = mpsc::channel::<()>(1);
let task_handle = {
let store = store.clone();
let manifest = manifest.clone();
let write_props = config.write_props.clone();
let trigger_tx = trigger_tx.clone();
let executor = Executor::new(
runtime.clone(),
Expand Down Expand Up @@ -121,7 +122,7 @@ impl Scheduler {
async fn generate_task_loop(
task_tx: Sender<Task>,
mut trigger_rx: Receiver<()>,
picker: Picker,
mut picker: Picker,
schedule_interval: Duration,
) {
info!(
Expand All @@ -130,7 +131,7 @@ impl Scheduler {
);
let send_task = |task| {
if let Err(e) = task_tx.try_send(task) {
warn!("Send task failed, err:{e}");
warn!("Send task failed, err:{e:?}");
}
};

Expand Down
106 changes: 79 additions & 27 deletions src/metric_engine/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

use std::{collections::HashMap, time::Duration};

use parquet::{
basic::{Compression, Encoding, ZstdLevel},
file::properties::WriterProperties,
};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use serde::{Deserialize, Serialize};

use crate::types::UpdateMode;

#[derive(Clone)]
#[derive(Debug, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct SchedulerConfig {
pub schedule_interval: Duration,
pub max_pending_compaction_tasks: usize,
// Runner config
pub memory_limit: u64,
pub write_props: WriterProperties,
// Picker config
pub ttl: Option<Duration>,
pub new_sst_max_size: u64,
Expand All @@ -43,8 +39,7 @@ impl Default for SchedulerConfig {
Self {
schedule_interval: Duration::from_secs(10),
max_pending_compaction_tasks: 10,
memory_limit: bytesize::gb(3_u64),
write_props: WriterProperties::default(),
memory_limit: bytesize::gb(20_u64),
ttl: None,
new_sst_max_size: bytesize::gb(1_u64),
input_sst_max_num: 30,
Expand All @@ -53,53 +48,100 @@ impl Default for SchedulerConfig {
}
}

#[derive(Debug)]
#[derive(Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub enum ParquetEncoding {
#[default]
Plain,
Rle,
DeltaBinaryPacked,
DeltaLengthByteArray,
DeltaByteArray,
RleDictionary,
}

impl From<ParquetEncoding> for Encoding {
fn from(value: ParquetEncoding) -> Self {
match value {
ParquetEncoding::Plain => Encoding::PLAIN,
ParquetEncoding::Rle => Encoding::RLE,
ParquetEncoding::DeltaBinaryPacked => Encoding::DELTA_BINARY_PACKED,
ParquetEncoding::DeltaLengthByteArray => Encoding::DELTA_LENGTH_BYTE_ARRAY,
ParquetEncoding::DeltaByteArray => Encoding::DELTA_BYTE_ARRAY,
ParquetEncoding::RleDictionary => Encoding::RLE_DICTIONARY,
}
}
}

#[derive(Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub enum ParquetCompression {
#[default]
Uncompressed,
Snappy,
Zstd,
}

impl From<ParquetCompression> for Compression {
fn from(value: ParquetCompression) -> Self {
match value {
ParquetCompression::Uncompressed => Compression::UNCOMPRESSED,
ParquetCompression::Snappy => Compression::SNAPPY,
ParquetCompression::Zstd => Compression::ZSTD(ZstdLevel::default()),
}
}
}

#[derive(Debug, Default, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct ColumnOptions {
pub enable_dict: Option<bool>,
pub enable_bloom_filter: Option<bool>,
pub encoding: Option<Encoding>,
pub compression: Option<Compression>,
pub encoding: Option<ParquetEncoding>,
pub compression: Option<ParquetCompression>,
}

#[derive(Debug)]
pub struct WriteOptions {
#[derive(Debug, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct WriteConfig {
pub max_row_group_size: usize,
pub write_bacth_size: usize,
pub enable_sorting_columns: bool,
// use to set column props with default value
pub enable_dict: bool,
pub enable_bloom_filter: bool,
pub encoding: Encoding,
pub compression: Compression,
pub encoding: ParquetEncoding,
pub compression: ParquetCompression,
// use to set column props with column name
pub column_options: Option<HashMap<String, ColumnOptions>>,
}

impl Default for WriteOptions {
impl Default for WriteConfig {
fn default() -> Self {
Self {
max_row_group_size: 8192,
write_bacth_size: 1024,
enable_sorting_columns: true,
enable_dict: false,
enable_bloom_filter: false,
encoding: Encoding::PLAIN,
compression: Compression::ZSTD(ZstdLevel::default()),
encoding: ParquetEncoding::Plain,
compression: ParquetCompression::Snappy,
column_options: None,
}
}
}

#[derive(Debug)]
pub struct ManifestMergeOptions {
#[derive(Debug, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct ManifestConfig {
pub channel_size: usize,
pub merge_interval_seconds: usize,
pub min_merge_threshold: usize,
pub hard_merge_threshold: usize,
pub soft_merge_threshold: usize,
}

impl Default for ManifestMergeOptions {
impl Default for ManifestConfig {
fn default() -> Self {
Self {
channel_size: 3,
Expand All @@ -111,9 +153,19 @@ impl Default for ManifestMergeOptions {
}
}

#[derive(Debug, Default)]
pub struct StorageOptions {
pub write_opts: WriteOptions,
pub manifest_merge_opts: ManifestMergeOptions,
#[derive(Debug, Default, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct StorageConfig {
pub write: WriteConfig,
pub manifest: ManifestConfig,
pub scheduler: SchedulerConfig,
pub update_mode: UpdateMode,
}

#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub enum UpdateMode {
#[default]
Overwrite,
Append,
}
12 changes: 6 additions & 6 deletions src/metric_engine/src/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tokio::sync::{
use tracing::{debug, error, info, trace};

use crate::{
config::ManifestMergeOptions,
config::ManifestConfig,
sst::{FileId, FileMeta, SstFile},
types::{ObjectStoreRef, RuntimeRef, TimeRange},
AnyhowError, Result,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl Manifest {
root_dir: String,
store: ObjectStoreRef,
runtime: RuntimeRef,
merge_options: ManifestMergeOptions,
merge_options: ManifestConfig,
) -> Result<Self> {
let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
Expand Down Expand Up @@ -188,15 +188,15 @@ struct ManifestMerger {
sender: Sender<MergeType>,
receiver: RwLock<Receiver<MergeType>>,
deltas_num: AtomicUsize,
merge_options: ManifestMergeOptions,
merge_options: ManifestConfig,
}

impl ManifestMerger {
async fn try_new(
snapshot_path: Path,
delta_dir: Path,
store: ObjectStoreRef,
merge_options: ManifestMergeOptions,
merge_options: ManifestConfig,
) -> Result<Arc<Self>> {
let (tx, rx) = mpsc::channel(merge_options.channel_size);
let merger = Self {
Expand Down Expand Up @@ -414,7 +414,7 @@ mod tests {
root_dir.path().to_string_lossy().to_string(),
store,
runtime.clone(),
ManifestMergeOptions::default(),
ManifestConfig::default(),
)
.await
.unwrap();
Expand Down Expand Up @@ -471,7 +471,7 @@ mod tests {
root_dir,
store.clone(),
runtime.clone(),
ManifestMergeOptions {
ManifestConfig {
merge_interval_seconds: 1,
..Default::default()
},
Expand Down
3 changes: 2 additions & 1 deletion src/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ use parquet::arrow::async_reader::ParquetObjectReader;

use crate::{
compare_primitive_columns,
config::UpdateMode,
operator::{BytesMergeOperator, LastValueOperator, MergeOperator, MergeOperatorRef},
sst::{SstFile, SstPathGenerator},
types::{ObjectStoreRef, StorageSchema, UpdateMode, SEQ_COLUMN_NAME},
types::{ObjectStoreRef, StorageSchema, SEQ_COLUMN_NAME},
Result,
};

Expand Down
Loading

0 comments on commit 0b085a5

Please sign in to comment.