diff --git a/Cargo.lock b/Cargo.lock index 2c660fc33a..add6e1533a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.18" @@ -436,6 +442,21 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "benchmarks" +version = "2.2.0-alpha" +dependencies = [ + "bytes", + "criterion", + "metric_engine", + "pb_types", + "prost", + "serde", + "toml", + "tracing", + "tracing-subscriber", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -545,6 +566,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.1.18" @@ -595,6 +622,58 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + +[[package]] +name = "clap" +version = "4.5.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.5.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + [[package]] name = "colorchoice" version = "1.0.3" @@ -662,6 +741,61 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -1374,6 +1508,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + [[package]] name = "hex" version = "0.4.3" @@ -1458,6 +1598,17 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "is-terminal" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" +dependencies = [ + "hermit-abi 0.4.0", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1470,6 +1621,15 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16b73f1c685cfd8ff8d75698ed87e6188cd09944b30c0863d45c2c3699d1da0c" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -1695,7 +1855,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", "wasi", "windows-sys 0.52.0", @@ -1797,7 +1957,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", ] @@ -1837,6 +1997,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + [[package]] name = "ordered-float" version = "2.10.1" @@ -2026,6 +2192,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -2146,6 +2340,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.3" @@ -2298,6 +2512,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" +dependencies = [ + "serde", +] + [[package]] name = "sha2" version = "0.10.8" @@ -2548,6 +2771,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -2605,6 +2838,40 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tracing" version = "0.1.40" @@ -2969,6 +3236,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +dependencies = [ + "memchr", +] + [[package]] name = "xz2" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index 4d121b5030..d81bca03d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ license = "Apache-2.0" [workspace] resolver = "2" -members = ["src/metric_engine", "src/pb_types", "src/server"] +members = ["src/benchmarks", "src/metric_engine", "src/pb_types", "src/server"] [workspace.dependencies] anyhow = { version = "1.0" } @@ -50,7 +50,10 @@ tracing = "0.1" tracing-subscriber = "0.3" async-scoped = { version = "0.9.0", features = ["use-tokio"] } test-log = "0.2" -uuid = { version = "1" } +uuid = "1" +criterion = "0.5" +serde = { version = "1.0", features = ["derive"] } +toml = "0.8" # This profile optimizes for good runtime performance. [profile.release] diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml new file mode 100644 index 0000000000..788c56a3c1 --- /dev/null +++ b/src/benchmarks/Cargo.toml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "benchmarks" + +[package.license] +workspace = true + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] +bytes = { workspace = true } +metric_engine = { workspace = true } +pb_types = { workspace = true } +prost = { workspace = true } +serde = { workspace = true } +toml = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } + +[dev-dependencies] +criterion = { workspace = true } + +[[bench]] +name = "bench" +harness = false diff --git a/src/benchmarks/README.md b/src/benchmarks/README.md new file mode 100644 index 0000000000..73cc9d2def --- /dev/null +++ b/src/benchmarks/README.md @@ -0,0 +1,15 @@ +# Benchmarks + +A config template can be found in [here](config.toml). + +## How to run +```bash +ANALYTIC_BENCH_CONFIG_PATH=/path/to/config cargo bench -p benchmarks +``` + +Set `RUST_LOG=debug` to enable verbose log. + +## Run specific bench: +```bash +ANALYTIC_BENCH_CONFIG_PATH=/path/to/config cargo bench --bench bench -p benchmarks -- bench_encoding +``` diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs new file mode 100644 index 0000000000..5ce94444e8 --- /dev/null +++ b/src/benchmarks/benches/bench.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks + +use std::{cell::RefCell, sync::Once}; + +use benchmarks::{ + config::{self, BenchConfig}, + encoding_bench::EncodingBench, +}; +use criterion::*; + +static INIT_LOG: Once = Once::new(); + +pub fn init_bench() -> BenchConfig { + INIT_LOG.call_once(|| { + // install global collector configured based on RUST_LOG env var. + tracing_subscriber::fmt::init(); + }); + + config::config_from_env() +} + +fn bench_manifest_encoding(c: &mut Criterion) { + let config = init_bench(); + + let mut group = c.benchmark_group("manifest_encoding"); + + group.measurement_time(config.manifest.bench_measurement_time.0); + group.sample_size(config.manifest.bench_sample_size); + + let bench = RefCell::new(EncodingBench::new(config.manifest)); + group.bench_with_input( + BenchmarkId::new("snapshot_encoding", 0), + &bench, + |b, bench| { + let mut bench = bench.borrow_mut(); + b.iter(|| bench.raw_bytes_bench()) + }, + ); + group.finish(); +} + +criterion_group!( + name = benches; + config = Criterion::default(); + targets = bench_manifest_encoding, +); + +criterion_main!(benches); diff --git a/src/benchmarks/config.toml b/src/benchmarks/config.toml new file mode 100644 index 0000000000..1859f30310 --- /dev/null +++ b/src/benchmarks/config.toml @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[manifest] +record_count = 1000 +append_count = 100 +bench_measurement_time = "5s" +bench_sample_size = 10 diff --git a/src/benchmarks/src/config.rs b/src/benchmarks/src/config.rs new file mode 100644 index 0000000000..4da2939623 --- /dev/null +++ b/src/benchmarks/src/config.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmark configs. + +use std::{env, fs}; + +use serde::Deserialize; +use tracing::info; + +use crate::util::ReadableDuration; +const BENCH_CONFIG_PATH_KEY: &str = "BENCH_CONFIG_PATH"; + +#[derive(Debug, Deserialize)] +pub struct BenchConfig { + pub manifest: ManifestConfig, +} + +pub fn config_from_env() -> BenchConfig { + let path = env::var(BENCH_CONFIG_PATH_KEY) + .expect("Env {BENCH_CONFIG_PATH_KEY} is required to run benches"); + + info!(config_path = ?path.as_str(), "Load bench config"); + + let toml_str = fs::read_to_string(&path).expect("read bench config file failed"); + let config = toml::from_str(&toml_str).expect("parse bench config file failed"); + info!(config = ?config, "Bench config"); + config +} + +#[derive(Deserialize, Debug)] +pub struct ManifestConfig { + pub record_count: usize, + pub append_count: usize, + pub bench_measurement_time: ReadableDuration, + pub bench_sample_size: usize, +} diff --git a/src/benchmarks/src/encoding_bench.rs b/src/benchmarks/src/encoding_bench.rs new file mode 100644 index 0000000000..956925bbff --- /dev/null +++ b/src/benchmarks/src/encoding_bench.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! encoding bench. + +use bytes::Bytes; +use metric_engine::{ + manifest::{ManifestUpdate, Snapshot}, + sst::{FileMeta, SstFile}, +}; + +use crate::config::ManifestConfig; + +pub struct EncodingBench { + raw_bytes: Bytes, + to_append: Vec, +} + +impl EncodingBench { + pub fn new(config: ManifestConfig) -> Self { + let sstfile = SstFile::new( + 1, + FileMeta { + max_sequence: 1, + num_rows: 1, + time_range: (1..2).into(), + size: 1, + }, + ); + let sstfiles = vec![sstfile.clone(); config.record_count]; + let mut snapshot = Snapshot::try_from(Bytes::new()).unwrap(); + let update = ManifestUpdate { + to_adds: sstfiles, + to_deletes: vec![], + }; + let _ = snapshot.merge_update(update); + + EncodingBench { + raw_bytes: snapshot.into_bytes().unwrap(), + to_append: vec![sstfile; config.append_count], + } + } + + pub fn raw_bytes_bench(&mut self) { + // mock do_merge procedure + // first decode snapshot and then append with delta sstfiles, serialize to bytes + // at last + let mut snapshot = Snapshot::try_from(self.raw_bytes.clone()).unwrap(); + let update = ManifestUpdate { + to_adds: self.to_append.clone(), + to_deletes: vec![], + }; + let _ = snapshot.merge_update(update); + let _ = snapshot.into_bytes(); + } +} diff --git a/src/benchmarks/src/lib.rs b/src/benchmarks/src/lib.rs new file mode 100644 index 0000000000..180d1b8a38 --- /dev/null +++ b/src/benchmarks/src/lib.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! lib for benchmarks. + +pub mod config; +pub mod encoding_bench; +mod util; diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs new file mode 100644 index 0000000000..623b5b0679 --- /dev/null +++ b/src/benchmarks/src/util.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for benchmarks. + +use std::{ + fmt::{self, Write}, + str::FromStr, + time::Duration, +}; + +use serde::{ + de::{self, Visitor}, + Deserialize, Deserializer, Serialize, Serializer, +}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Default)] +pub struct ReadableDuration(pub Duration); +impl From for ReadableDuration { + fn from(t: Duration) -> ReadableDuration { + ReadableDuration(t) + } +} +const TIME_MAGNITUDE_1: u64 = 1000; +const TIME_MAGNITUDE_2: u64 = 60; +const TIME_MAGNITUDE_3: u64 = 24; +const UNIT: u64 = 1; +const MS: u64 = UNIT; +const SECOND: u64 = MS * TIME_MAGNITUDE_1; +const MINUTE: u64 = SECOND * TIME_MAGNITUDE_2; +const HOUR: u64 = MINUTE * TIME_MAGNITUDE_2; +const DAY: u64 = HOUR * TIME_MAGNITUDE_3; + +impl FromStr for ReadableDuration { + type Err = String; + + fn from_str(dur_str: &str) -> std::result::Result { + let dur_str = dur_str.trim(); + if !dur_str.is_ascii() { + return Err(format!("unexpected ascii string: {dur_str}")); + } + let err_msg = "valid duration, only d, h, m, s, ms are supported.".to_owned(); + let mut left = dur_str.as_bytes(); + let mut last_unit = DAY + 1; + let mut dur = 0f64; + while let Some(idx) = left.iter().position(|c| b"dhms".contains(c)) { + let (first, second) = left.split_at(idx); + let unit = if second.starts_with(b"ms") { + left = &left[idx + 2..]; + MS + } else { + let u = match second[0] { + b'd' => DAY, + b'h' => HOUR, + b'm' => MINUTE, + b's' => SECOND, + _ => return Err(err_msg), + }; + left = &left[idx + 1..]; + u + }; + if unit >= last_unit { + return Err("d, h, m, s, ms should occur in given order.".to_owned()); + } + // do we need to check 12h360m? + let number_str = unsafe { std::str::from_utf8_unchecked(first) }; + dur += match number_str.trim().parse::() { + Ok(n) => n * unit as f64, + Err(_) => return Err(err_msg), + }; + last_unit = unit; + } + if !left.is_empty() { + return Err(err_msg); + } + if dur.is_sign_negative() { + return Err("duration should be positive.".to_owned()); + } + let secs = dur as u64 / SECOND; + let millis = (dur as u64 % SECOND) as u32 * 1_000_000; + Ok(ReadableDuration(Duration::new(secs, millis))) + } +} + +impl fmt::Display for ReadableDuration { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut dur = self.0.as_millis() as u64; + let mut written = false; + if dur >= DAY { + written = true; + write!(f, "{}d", dur / DAY)?; + dur %= DAY; + } + if dur >= HOUR { + written = true; + write!(f, "{}h", dur / HOUR)?; + dur %= HOUR; + } + if dur >= MINUTE { + written = true; + write!(f, "{}m", dur / MINUTE)?; + dur %= MINUTE; + } + if dur >= SECOND { + written = true; + write!(f, "{}s", dur / SECOND)?; + dur %= SECOND; + } + if dur > 0 { + written = true; + write!(f, "{dur}ms")?; + } + if !written { + write!(f, "0s")?; + } + Ok(()) + } +} + +impl Serialize for ReadableDuration { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + let mut buffer = String::new(); + write!(buffer, "{self}").unwrap(); + serializer.serialize_str(&buffer) + } +} + +impl<'de> Deserialize<'de> for ReadableDuration { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + struct DurVisitor; + + impl Visitor<'_> for DurVisitor { + type Value = ReadableDuration; + + fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("valid duration") + } + + fn visit_str(self, dur_str: &str) -> std::result::Result + where + E: de::Error, + { + dur_str.parse().map_err(E::custom) + } + } + + deserializer.deserialize_str(DurVisitor) + } +} diff --git a/src/metric_engine/src/compaction/executor.rs b/src/metric_engine/src/compaction/executor.rs index c5cf83e2f9..0e5f9935f7 100644 --- a/src/metric_engine/src/compaction/executor.rs +++ b/src/metric_engine/src/compaction/executor.rs @@ -60,6 +60,7 @@ struct Inner { } impl Executor { + #[allow(clippy::too_many_arguments)] pub fn new( runtime: RuntimeRef, store: ObjectStoreRef, diff --git a/src/metric_engine/src/compaction/scheduler.rs b/src/metric_engine/src/compaction/scheduler.rs index 4832bf96dc..3aff52c972 100644 --- a/src/metric_engine/src/compaction/scheduler.rs +++ b/src/metric_engine/src/compaction/scheduler.rs @@ -44,6 +44,7 @@ pub struct Scheduler { } impl Scheduler { + #[allow(clippy::too_many_arguments)] pub fn new( runtime: RuntimeRef, manifest: ManifestRef, diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs index c9e1235913..3c2334ded6 100644 --- a/src/metric_engine/src/lib.rs +++ b/src/metric_engine/src/lib.rs @@ -21,10 +21,10 @@ mod compaction; pub mod error; mod macros; -mod manifest; +pub mod manifest; pub mod operator; mod read; -mod sst; +pub mod sst; pub mod storage; #[cfg(test)] mod test_util; diff --git a/src/metric_engine/src/manifest/mod.rs b/src/metric_engine/src/manifest/mod.rs index ffc4556762..3489172e17 100644 --- a/src/metric_engine/src/manifest/mod.rs +++ b/src/metric_engine/src/manifest/mod.rs @@ -27,8 +27,7 @@ use std::{ use anyhow::Context; use async_scoped::TokioScope; use bytes::Bytes; -pub use encoding::ManifestUpdate; -use encoding::Snapshot; +pub use encoding::{ManifestUpdate, Snapshot}; use futures::{StreamExt, TryStreamExt}; use object_store::{path::Path, PutPayload}; use prost::Message;