From 3bd961a83ee44587b1145281b13ef4b8ca9139f5 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 11 Dec 2024 23:05:52 +0800 Subject: [PATCH] refactor --- horaedb/Cargo.lock | 149 ++++++++++++++- horaedb/Cargo.toml | 1 + horaedb/metric_engine/Cargo.toml | 1 + .../metric_engine/src/compaction/picker.rs | 171 +++++++----------- horaedb/metric_engine/src/types.rs | 22 +++ 5 files changed, 230 insertions(+), 114 deletions(-) diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock index f2f8003b3c..581e0f7522 100644 --- a/horaedb/Cargo.lock +++ b/horaedb/Cargo.lock @@ -76,6 +76,55 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125" +dependencies = [ + "anstyle", + "windows-sys 0.59.0", +] + [[package]] name = "anyhow" version = "1.0.87" @@ -308,7 +357,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax", + "regex-syntax 0.8.4", ] [[package]] @@ -546,6 +595,12 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "colorchoice" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" + [[package]] name = "comfy-table" version = "7.1.1" @@ -955,7 +1010,7 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "regex-syntax", + "regex-syntax 0.8.4", ] [[package]] @@ -1086,6 +1141,27 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", +] + +[[package]] +name = "env_logger" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1382,6 +1458,12 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.3.25" @@ -1549,6 +1631,15 @@ dependencies = [ name = "macros" version = "2.2.0-dev" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "md-5" version = "0.10.6" @@ -1586,6 +1677,7 @@ dependencies = [ "pb_types", "prost", "temp-dir", + "test-log", "thiserror", "tokio", "tracing", @@ -2074,8 +2166,17 @@ checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2086,9 +2187,15 @@ checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.4", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.4" @@ -2372,6 +2479,28 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "test-log" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93" +dependencies = [ + "env_logger", + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5999e24eaa32083191ba4e425deb75cdf25efefabe5aaccb7446dd0d4122a3f5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thiserror" version = "1.0.63" @@ -2528,10 +2657,14 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] @@ -2596,6 +2729,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.10.0" diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml index 863c789f08..6335c51267 100644 --- a/horaedb/Cargo.toml +++ b/horaedb/Cargo.toml @@ -49,6 +49,7 @@ lazy_static = "1" tracing = "0.1" tracing-subscriber = "0.3" async-scoped = { version = "0.9.0", features = ["use-tokio"] } +test-log = "0.2" # This profile optimizes for good runtime performance. [profile.release] diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml index d0e31a044f..3be3cdde92 100644 --- a/horaedb/metric_engine/Cargo.toml +++ b/horaedb/metric_engine/Cargo.toml @@ -53,3 +53,4 @@ tracing = { workspace = true } [dev-dependencies] temp-dir = { workspace = true } +test-log = { workspace = true, features = ["trace"] } diff --git a/horaedb/metric_engine/src/compaction/picker.rs b/horaedb/metric_engine/src/compaction/picker.rs index fc0b6c0fc3..e15f8150ec 100644 --- a/horaedb/metric_engine/src/compaction/picker.rs +++ b/horaedb/metric_engine/src/compaction/picker.rs @@ -16,7 +16,7 @@ // under the License. use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, time::Duration, }; @@ -45,13 +45,10 @@ impl TimeWindowCompactionStrategy { ) -> Option { let (uncompacted_files, expired_files) = Self::find_uncompacted_and_expired_files(ssts, expire_time); - debug!( - "uncompacted_files: {:?}, expired_files: {:?}", - uncompacted_files, expired_files - ); + debug!(uncompacted_files = ?uncompacted_files, expired_files = ?expired_files, "Begin pick candidate"); let files_by_segment = self.files_by_segment(uncompacted_files); - let compaction_files = self.pick_compaction_files(files_by_segment); + let compaction_files = self.pick_compaction_files(files_by_segment)?; if compaction_files.is_empty() && expired_files.is_empty() { return None; @@ -60,12 +57,16 @@ impl TimeWindowCompactionStrategy { for f in &compaction_files { f.mark_compaction(); } + for f in &expired_files { + f.mark_compaction(); + } let task = Task { inputs: compaction_files, expireds: expired_files, }; - debug!("Pick compaction task: {:?}", task); + + debug!(task = ?task, "End pick candidate"); Some(task) } @@ -89,11 +90,12 @@ impl TimeWindowCompactionStrategy { (uncompacted_files, expired_files) } - fn files_by_segment(&self, files: Vec) -> HashMap> { - let mut files_by_segment = HashMap::new(); + fn files_by_segment(&self, files: Vec) -> BTreeMap> { + let mut files_by_segment = BTreeMap::new(); let segment_duration = self.segment_duration; for file in files { - let segment = file.meta().time_range.end.truncate_by(segment_duration); + let segment = file.meta().time_range.start.truncate_by(segment_duration); + debug!(segment = ?segment, file = ?file); files_by_segment .entry(segment) .or_insert_with(Vec::new) @@ -101,51 +103,44 @@ impl TimeWindowCompactionStrategy { } debug!( - "Group files of similar timestamp into same segment: {:?}", - files_by_segment + files = ?files_by_segment, + "Group files of similar timestamp into segment" ); files_by_segment } fn pick_compaction_files( &self, - mut files_by_segment: HashMap>, - ) -> Vec { - let all_segments: BTreeSet<_> = files_by_segment.keys().copied().collect(); - - for segment in all_segments { - if let Some(mut files) = files_by_segment.remove(&segment) { - if files.len() < 2 { - debug!( - "No compaction necessary for files size {} , segment {:?}", - files.len(), - segment, - ); - continue; - } + files_by_segment: BTreeMap>, + ) -> Option> { + for (segment, mut files) in files_by_segment.into_iter().rev() { + debug!(segment = ?segment, files = ?files, "Loop segment for pick files"); + if files.len() < 2 { + continue; + } - // Pick files for compaction - files.sort_unstable_by_key(SstFile::size); + // Prefer to compact smaller files first. + files.sort_unstable_by_key(SstFile::size); - let mut input_size = 0; - let memory_limit = self.config.memory_limit; - let compaction_files_limit = self.config.compaction_files_limit; + let mut input_size = 0; + let memory_limit = self.config.memory_limit; + let compaction_files_limit = self.config.compaction_files_limit; - let compaction_files = files - .into_iter() - .take(compaction_files_limit) - .take_while(|f| { - input_size += f.size() as u64; - input_size <= memory_limit - }) - .collect::>(); + let compaction_files = files + .into_iter() + .take(compaction_files_limit) + .take_while(|f| { + input_size += f.size() as u64; + input_size <= memory_limit + }) + .collect::>(); - if compaction_files.len() >= 2 { - return compaction_files; - } + if compaction_files.len() >= 2 { + return Some(compaction_files); } } - Vec::new() + + None } } @@ -153,6 +148,9 @@ impl TimeWindowCompactionStrategy { mod tests { use std::time::Duration; + use itertools::Itertools; + use test_log::test; + use super::*; use crate::sst::FileMeta; @@ -162,80 +160,35 @@ mod tests { let config = SchedulerConfig::default(); let strategy = TimeWindowCompactionStrategy::new(segment_duration, config); - let sst1 = SstFile::new( - 1, - FileMeta { - time_range: (0..10).into(), - size: 1, - max_sequence: 1, - num_rows: 1, - }, - ); - let sst2 = SstFile::new( - 2, - FileMeta { - time_range: (10..20).into(), - size: 2, - max_sequence: 2, - num_rows: 2, - }, - ); - let sst3 = SstFile::new( - 3, - FileMeta { - time_range: (20..30).into(), - size: 3, - max_sequence: 3, - num_rows: 3, - }, - ); - let sst4 = SstFile::new( - 4, - FileMeta { - time_range: (30..40).into(), - size: 4, - max_sequence: 4, - num_rows: 4, - }, - ); - let sst5 = SstFile::new( - 5, - FileMeta { - time_range: (40..50).into(), - size: 5, - max_sequence: 5, - num_rows: 5, - }, - ); - - let ssts = vec![ - sst1.clone(), - sst2.clone(), - sst3.clone(), - sst4.clone(), - sst5.clone(), - ]; - + let ssts = (0_i64..5_i64) + .map(|i| { + SstFile::new( + i as u64, + FileMeta { + max_sequence: i as u64, + num_rows: i as u32, + size: (100 - i) as u32, // size desc + time_range: (i * 10..(i * 10 + 10)).into(), + }, + ) + }) + .collect_vec(); let task = strategy .pick_candidate(ssts.clone(), Some(15.into())) .unwrap(); + // ssts should be grouped by tree segments: + // | 0 1 | 2 3 | 4 | let excepted_task = Task { - inputs: vec![sst2, sst3], - expireds: vec![sst1], + inputs: vec![ssts[3].clone(), ssts[2].clone()], + expireds: vec![ssts[0].clone()], }; - assert_eq!(task.inputs.len(), 2); - assert_eq!(task.expireds.len(), 1); assert_eq!(task, excepted_task); - let task = strategy.pick_candidate(ssts, None).unwrap(); - let excepted_task = Task { - inputs: vec![sst4, sst5], - expireds: vec![], - }; - assert_eq!(task.inputs.len(), 2); - assert_eq!(task.expireds.len(), 0); - assert_eq!(task, excepted_task); + // sst1, sst3, ss4 are in compaction, so it should not be picked again. + // sst2, sst5 are in different segment, so it also should not be picked. + let task = strategy.pick_candidate(ssts, None); + assert!(task.is_none()); } } diff --git a/horaedb/metric_engine/src/types.rs b/horaedb/metric_engine/src/types.rs index 3c49af76cf..fbb956c43f 100644 --- a/horaedb/metric_engine/src/types.rs +++ b/horaedb/metric_engine/src/types.rs @@ -196,3 +196,25 @@ pub struct StorageOptions { pub manifest_merge_opts: ManifestMergeOptions, pub runtime_opts: RuntimeOptions, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_timestamp_truncate_by() { + let testcases = [ + // ts, segment, expected + (0, 20, 0), + (10, 20, 0), + (20, 20, 20), + (30, 20, 20), + (40, 20, 40), + (41, 20, 40), + ]; + for (ts, segment, expected) in testcases { + let actual = Timestamp::from(ts).truncate_by(Duration::from_millis(segment)); + assert_eq!(actual.0, expected); + } + } +}