diff --git a/.asf.yaml b/.asf.yaml index 5a35a869d9..0040c45289 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -39,7 +39,7 @@ github: protected_branches: main: required_pull_request_reviews: - dismiss_stale_reviews: true + dismiss_stale_reviews: false required_approving_review_count: 1 protected_tags: [] diff --git a/.github/workflows/check-pr-title.yml b/.github/workflows/check-pr-title.yml index d955dee5e9..fd60a1bda9 100644 --- a/.github/workflows/check-pr-title.yml +++ b/.github/workflows/check-pr-title.yml @@ -30,7 +30,7 @@ jobs: with: script: | const title = context.payload.pull_request.title; - const regex = /^(feat|fix|docs|refactor|chore)(\(.+\))?!?: .+$/; + const regex = /^(feat|fix|docs|refactor|chore|test|style|build)(\(.+\))?!?: .+$/; if (!regex.test(title)) { core.setFailed('PR title does not follow the convention, the pattern: ^(feat|fix|docs|refactor|chore)(\(.+\))?!?: .+$'); } diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b7bf8ea22..5e4b6cc4a9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,6 +39,10 @@ on: - 'Cargo.lock' - '.github/workflows/ci.yml' +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + # Common environment variables env: RUSTFLAGS: "-C debuginfo=1" @@ -71,11 +75,13 @@ jobs: rustup component add clippy rustup component add rustfmt cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked + curl --proto '=https' --tlsv1.2 -LsSf https://github.com/korandoru/hawkeye/releases/download/v5.8.1/hawkeye-installer.sh | sh - name: Run Style Check run: | - make clippy make fmt make check-cargo-toml + make check-asf-header + make clippy unit-test: name: unit-test diff --git a/.github/workflows/links.yml b/.github/workflows/links.yml new file mode 100644 index 0000000000..f77fe20b42 --- /dev/null +++ b/.github/workflows/links.yml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Check markdown links + +on: + merge_group: + workflow_dispatch: + schedule: + - cron: '2 0 * * *' + push: + branches: + - main + - dev + pull_request: + +jobs: + url-check: + name: url-check + runs-on: ubuntu-latest + timeout-minutes: 20 + steps: + - uses: actions/checkout@v4 + - name: Install deps + run: | + pip3 install urlchecker + - name: Check markdown links + run: | + urlchecker check --file-types '*.md' \ + --exclude-urls 'http://127.0.0.1:5440/sql,https://github.com/apache/horaedb/issues/new' \ + . diff --git a/.github/workflows/metric-engine-ci.yml b/.github/workflows/metric-engine-ci.yml index db3bf5bddc..be80ebe8fb 100644 --- a/.github/workflows/metric-engine-ci.yml +++ b/.github/workflows/metric-engine-ci.yml @@ -53,6 +53,10 @@ jobs: - name: Release Disk Quota run: | sudo make ensure-disk-quota + - name: Setup Build Environment + run: | + sudo apt update + sudo apt install --yes protobuf-compiler - name: Install check binaries run: | rustup component add clippy diff --git a/.github/workflows/tsbs.yml b/.github/workflows/tsbs.yml index a0a88d56b3..7216edbc44 100644 --- a/.github/workflows/tsbs.yml +++ b/.github/workflows/tsbs.yml @@ -46,7 +46,7 @@ jobs: - name: Setup Build Environment run: | sudo apt update - sudo apt install --yes protobuf-compiler + sudo apt install --yes protobuf-compiler liblzma-dev - name: Build server run: | make build diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2f9bd85713..8492c80a2d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -24,7 +24,7 @@ For features, we don't plan to support we will close the feature request ticket ## Contributing Changes -HoraeDB is written mostly in idiomatic Rust—please see the [Style Guide](https://horaedb.apache.org/dev/style_guide.html) for more details. +HoraeDB is written mostly in idiomatic Rust—please see the [Style Guide](https://horaedb.apache.org/docs/dev/style_guide/) for more details. All code must adhere to the `rustfmt` format, and pass all of the `clippy` checks we run in CI (there are more details further down this README). ### Making a PR @@ -38,7 +38,7 @@ PR title. For PRs that you consider ready for review, verify the following locally before you submit it: -* you have a coherent set of logical commits, with messages conforming to the [Conventional Commits](https://horaedb.apache.org/docs/dev/conventional_commit/) specification; +* you have a coherent set of logical commits, with messages conforming to the [Conventional Commits](https://horaedb.apache.org/docs/dev/style_guide/) specification; * all the tests and/or benchmarks pass, including documentation tests; * the code is correctly formatted and all `clippy` checks pass; and * you haven't left any "code cruft" (commented out code blocks etc). diff --git a/Cargo.lock b/Cargo.lock index e243deace0..a3ff304ee3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,7 +67,7 @@ dependencies = [ [[package]] name = "alloc_tracker" -version = "2.0.0" +version = "2.1.0" [[package]] name = "allocator-api2" @@ -77,12 +77,13 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "analytic_engine" -version = "2.0.0" +version = "2.1.0" dependencies = [ "anyhow", "arc-swap 1.6.0", "arena", "arrow 49.0.0", + "async-scoped", "async-stream", "async-trait", "atomic_enum", @@ -107,7 +108,7 @@ dependencies = [ "macros", "message_queue", "metric_ext", - "object_store 2.0.0", + "object_store 2.1.0", "parquet", "parquet_ext", "pin-project-lite", @@ -236,7 +237,7 @@ checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" [[package]] name = "arena" -version = "2.0.0" +version = "2.1.0" [[package]] name = "array-init" @@ -681,7 +682,7 @@ dependencies = [ [[package]] name = "arrow_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "serde", @@ -764,6 +765,17 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "async-scoped" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4042078ea593edffc452eef14e99fdb2b120caa4ad9618bcdeabc4a023b98740" +dependencies = [ + "futures 0.3.28", + "pin-project", + "tokio", +] + [[package]] name = "async-stream" version = "0.3.4" @@ -939,7 +951,7 @@ dependencies = [ [[package]] name = "benchmarks" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "arena", @@ -954,7 +966,7 @@ dependencies = [ "generic_error", "logger", "macros", - "object_store 2.0.0", + "object_store 2.1.0", "parquet", "parquet_ext", "pprof", @@ -1092,12 +1104,12 @@ dependencies = [ [[package]] name = "borsh" -version = "0.10.3" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" +checksum = "115e54d64eb62cdebad391c19efc9dce4981c690c85a33a12199d99bb9546fee" dependencies = [ "borsh-derive", - "hashbrown 0.13.2", + "hashbrown 0.12.3", ] [[package]] @@ -1226,7 +1238,7 @@ checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "bytes_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "bytes", "snafu 0.6.10", @@ -1292,7 +1304,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "catalog" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "common_types", @@ -1307,7 +1319,7 @@ dependencies = [ [[package]] name = "catalog_impls" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "async-trait", @@ -1478,7 +1490,7 @@ checksum = "b8191fa7302e03607ff0e237d4246cc043ff5b3cb9409d995172ba3bea16b807" [[package]] name = "cluster" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "bytes_ext", @@ -1513,7 +1525,7 @@ dependencies = [ [[package]] name = "codec" -version = "2.0.0" +version = "2.1.0" dependencies = [ "bytes_ext", "common_types", @@ -1562,7 +1574,7 @@ dependencies = [ [[package]] name = "common_types" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "arrow_ext", @@ -2346,7 +2358,7 @@ dependencies = [ [[package]] name = "df_engine_extensions" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "async-recursion", @@ -2371,7 +2383,7 @@ dependencies = [ [[package]] name = "df_operator" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "base64 0.13.1", @@ -2731,7 +2743,7 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "future_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "futures 0.3.28", "lazy_static", @@ -2903,7 +2915,7 @@ dependencies = [ [[package]] name = "generic_error" -version = "2.0.0" +version = "2.1.0" [[package]] name = "getrandom" @@ -2986,7 +2998,7 @@ dependencies = [ [[package]] name = "hash_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "ahash 0.8.3", "byteorder", @@ -3116,7 +3128,7 @@ dependencies = [ [[package]] name = "horaectl" -version = "2.0.0" +version = "2.1.0" dependencies = [ "anyhow", "chrono", @@ -3131,7 +3143,7 @@ dependencies = [ [[package]] name = "horaedb" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "catalog", @@ -3182,7 +3194,7 @@ dependencies = [ [[package]] name = "horaedb-test" -version = "2.0.0" +version = "2.1.0" dependencies = [ "anyhow", "async-trait", @@ -3447,7 +3459,7 @@ dependencies = [ [[package]] name = "id_allocator" -version = "2.0.0" +version = "2.1.0" dependencies = [ "generic_error", "tokio", @@ -3562,7 +3574,7 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "interpreters" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "arrow 49.0.0", @@ -3876,7 +3888,7 @@ checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" [[package]] name = "librocksdb_sys" version = "0.1.0" -source = "git+https://github.com/tikv/rust-rocksdb.git?rev=f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f#f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f" +source = "git+https://github.com/tikv/rust-rocksdb.git?rev=85e79e52c6ad80b8c547fcb90b3cade64f141fac#85e79e52c6ad80b8c547fcb90b3cade64f141fac" dependencies = [ "bindgen 0.65.1", "bzip2-sys", @@ -3893,7 +3905,7 @@ dependencies = [ [[package]] name = "libtitan_sys" version = "0.0.1" -source = "git+https://github.com/tikv/rust-rocksdb.git?rev=f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f#f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f" +source = "git+https://github.com/tikv/rust-rocksdb.git?rev=85e79e52c6ad80b8c547fcb90b3cade64f141fac#85e79e52c6ad80b8c547fcb90b3cade64f141fac" dependencies = [ "bzip2-sys", "cc", @@ -3971,7 +3983,7 @@ dependencies = [ [[package]] name = "logger" -version = "2.0.0" +version = "2.1.0" dependencies = [ "chrono", "log", @@ -4052,7 +4064,7 @@ dependencies = [ [[package]] name = "macros" -version = "2.0.0" +version = "2.1.0" [[package]] name = "matchers" @@ -4129,7 +4141,7 @@ dependencies = [ [[package]] name = "message_queue" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "chrono", @@ -4146,7 +4158,7 @@ dependencies = [ [[package]] name = "meta_client" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "common_types", @@ -4169,7 +4181,7 @@ dependencies = [ [[package]] name = "metric_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "crossbeam-utils", "serde", @@ -4498,7 +4510,7 @@ dependencies = [ [[package]] name = "notifier" -version = "2.0.0" +version = "2.1.0" dependencies = [ "tokio", ] @@ -4681,7 +4693,7 @@ dependencies = [ [[package]] name = "object_store" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "bytes", @@ -4866,7 +4878,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "panic_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "backtrace", "gag", @@ -4967,7 +4979,7 @@ dependencies = [ [[package]] name = "parquet_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "arrow_ext", @@ -4977,7 +4989,7 @@ dependencies = [ "futures 0.3.28", "generic_error", "logger", - "object_store 2.0.0", + "object_store 2.1.0", "parquet", "tokio", ] @@ -4993,7 +5005,7 @@ dependencies = [ [[package]] name = "partition_table_engine" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "arrow 49.0.0", @@ -5014,7 +5026,7 @@ dependencies = [ [[package]] name = "partitioned_lock" -version = "2.0.0" +version = "2.1.0" dependencies = [ "hash_ext", "tokio", @@ -5433,7 +5445,7 @@ dependencies = [ [[package]] name = "profile" -version = "2.0.0" +version = "2.1.0" dependencies = [ "jemalloc-ctl", "jemalloc-sys", @@ -5656,7 +5668,7 @@ checksum = "9653c3ed92974e34c5a6e0a510864dab979760481714c172e0a34e437cb98804" [[package]] name = "proxy" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "arrow_ext", @@ -5757,7 +5769,7 @@ dependencies = [ [[package]] name = "query_engine" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "async-trait", @@ -5787,7 +5799,7 @@ dependencies = [ [[package]] name = "query_frontend" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "async-trait", @@ -5981,9 +5993,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.8.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" dependencies = [ "either", "rayon-core", @@ -5991,9 +6003,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" dependencies = [ "crossbeam-deque", "crossbeam-utils", @@ -6086,7 +6098,7 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "remote_engine_client" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow_ext", "async-trait", @@ -6295,7 +6307,7 @@ dependencies = [ [[package]] name = "rocksdb" version = "0.3.0" -source = "git+https://github.com/tikv/rust-rocksdb.git?rev=f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f#f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f" +source = "git+https://github.com/tikv/rust-rocksdb.git?rev=85e79e52c6ad80b8c547fcb90b3cade64f141fac#85e79e52c6ad80b8c547fcb90b3cade64f141fac" dependencies = [ "libc", "librocksdb_sys", @@ -6303,7 +6315,7 @@ dependencies = [ [[package]] name = "router" -version = "2.0.0" +version = "2.1.0" dependencies = [ "async-trait", "cluster", @@ -6346,7 +6358,7 @@ dependencies = [ [[package]] name = "runtime" -version = "2.0.0" +version = "2.1.0" dependencies = [ "lazy_static", "macros", @@ -6383,7 +6395,7 @@ dependencies = [ [[package]] name = "rust-sdk-test" -version = "2.0.0" +version = "2.1.0" dependencies = [ "horaedb-client", "tokio", @@ -6580,7 +6592,7 @@ dependencies = [ [[package]] name = "sampling_cache" -version = "2.0.0" +version = "2.1.0" dependencies = [ "chrono", ] @@ -6725,7 +6737,7 @@ dependencies = [ [[package]] name = "server" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "arc-swap 1.6.0", @@ -6888,7 +6900,7 @@ dependencies = [ [[package]] name = "size_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "serde", "toml 0.7.3", @@ -6911,7 +6923,7 @@ dependencies = [ [[package]] name = "skiplist" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arena", "bytes", @@ -7030,7 +7042,7 @@ checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" [[package]] name = "snappy-sys" version = "0.1.0" -source = "git+https://github.com/busyjay/rust-snappy.git?branch=static-link#8c12738bad811397600455d6982aff754ea2ac44" +source = "git+https://github.com/tikv/rust-snappy.git?branch=static-link#8c12738bad811397600455d6982aff754ea2ac44" dependencies = [ "cmake", "libc", @@ -7306,7 +7318,7 @@ dependencies = [ [[package]] name = "system_catalog" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "async-trait", @@ -7328,7 +7340,7 @@ dependencies = [ [[package]] name = "system_stats" -version = "2.0.0" +version = "2.1.0" dependencies = [ "sysinfo", "tokio", @@ -7336,7 +7348,7 @@ dependencies = [ [[package]] name = "table_engine" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "arrow_ext", @@ -7368,7 +7380,7 @@ dependencies = [ [[package]] name = "table_kv" -version = "2.0.0" +version = "2.1.0" dependencies = [ "lazy_static", "logger", @@ -7447,7 +7459,7 @@ dependencies = [ [[package]] name = "test_util" -version = "2.0.0" +version = "2.1.0" dependencies = [ "arrow 49.0.0", "chrono", @@ -7537,7 +7549,7 @@ dependencies = [ [[package]] name = "time_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "chrono", "common_types", @@ -7551,7 +7563,7 @@ dependencies = [ [[package]] name = "timed_task" -version = "2.0.0" +version = "2.1.0" dependencies = [ "logger", "runtime", @@ -7761,7 +7773,7 @@ dependencies = [ [[package]] name = "toml_ext" -version = "2.0.0" +version = "2.1.0" dependencies = [ "macros", "serde", @@ -7847,7 +7859,7 @@ dependencies = [ [[package]] name = "tools" -version = "2.0.0" +version = "2.1.0" dependencies = [ "analytic_engine", "anyhow", @@ -7856,7 +7868,7 @@ dependencies = [ "futures 0.3.28", "generic_error", "num_cpus", - "object_store 2.0.0", + "object_store 2.1.0", "parquet", "parquet_ext", "runtime", @@ -7899,14 +7911,14 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "trace_metric" -version = "2.0.0" +version = "2.1.0" dependencies = [ "trace_metric_derive", ] [[package]] name = "trace_metric_derive" -version = "2.0.0" +version = "2.1.0" dependencies = [ "proc-macro2", "quote", @@ -7915,7 +7927,7 @@ dependencies = [ [[package]] name = "trace_metric_derive_tests" -version = "2.0.0" +version = "2.1.0" dependencies = [ "trace_metric", ] @@ -8006,7 +8018,7 @@ dependencies = [ [[package]] name = "tracing_util" -version = "2.0.0" +version = "2.1.0" dependencies = [ "console-subscriber", "lazy_static", @@ -8068,7 +8080,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "rand 0.8.5", "static_assertions", ] @@ -8217,9 +8229,10 @@ checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" [[package]] name = "wal" -version = "2.0.0" +version = "2.1.0" dependencies = [ "anyhow", + "async-scoped", "async-trait", "bytes_ext", "chrono", @@ -8237,6 +8250,7 @@ dependencies = [ "prometheus 0.12.0", "prost 0.11.8", "rand 0.8.5", + "rayon", "rocksdb", "runtime", "serde", diff --git a/Cargo.toml b/Cargo.toml index 8bfdea8402..cfe6589815 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [workspace.package] -version = "2.0.0" +version = "2.1.0" authors = ["HoraeDB Authors"] edition = "2021" license = "Apache-2.0" @@ -84,6 +84,8 @@ members = [ "src/wal" ] +default-members = ["src/horaedb"] + [workspace.dependencies] alloc_tracker = { path = "src/components/alloc_tracker" } arrow = { version = "49.0.0", features = ["prettyprint"] } diff --git a/Makefile b/Makefile index 4480eede75..c58cfb5e75 100644 --- a/Makefile +++ b/Makefile @@ -85,6 +85,9 @@ fmt: check-cargo-toml: cd $(DIR); cargo sort --workspace --check +check-asf-header: + cd $(DIR); hawkeye check + udeps: cd $(DIR); cargo udeps --all-targets --all-features --workspace diff --git a/README-CN.md b/README-CN.md index 81d1bad725..1cf9bba2e7 100644 --- a/README-CN.md +++ b/README-CN.md @@ -79,11 +79,7 @@ Drop TABLE `demo` 与来自世界各地的用户和开发人员一起在 Apache HoraeDB (incubating) 社区中茁壮成长。 -- [订阅邮箱参与讨论](mailto:dev-subscribe@horaedb.apache.org) ([订阅](mailto:dev-subscribe@horaedb.apache.org?subject=(send%20this%20email%20to%20subscribe)) / [取消订阅](mailto:dev-unsubscribe@horaedb.apache.org?subject=(send%20this%20email%20to%20unsubscribe)) / [查看邮件历史记录](https://lists.apache.org/list.html?dev@horaedb.apache.org)) -- 发送 [请求](mailto:dev@horaedb.apache.org?subject=(Request%to%20join%20HoraeDB%20slack)) 至 `dev@horaedb.apache.org` 加入HoraeDB Slack -- 通过[这里的链接](http://horaedb.apache.org/community/),加入我们的社区。 - -[如何参与贡献](CONTRIBUTING.md) +- https://horaedb.apache.org/community/ ## 致谢 diff --git a/README.md b/README.md index 26ef706bc9..423838c192 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ ![HoraeDB](docs/logo/horaedb-banner-white-small.jpg) ![License](https://img.shields.io/badge/license-Apache--2.0-green.svg) -[![CI](https://github.com/apache/incubator-horaedb/actions/workflows/ci.yml/badge.svg)](https://github.com/apache/incubator-horaedb/actions/workflows/ci.yml) -[![OpenIssue](https://img.shields.io/github/issues/apache/incubator-horaedb)](https://github.com/apache/incubator-horaedb/issues) +[![CI](https://github.com/apache/horaedb/actions/workflows/ci.yml/badge.svg)](https://github.com/apache/horaedb/actions/workflows/ci.yml) +[![OpenIssue](https://img.shields.io/github/issues/apache/horaedb)](https://github.com/apache/horaedb/issues) [![HoraeDB Docker](https://img.shields.io/docker/v/apache/horaedb-server?logo=docker&label=horaedb-server)](https://hub.docker.com/r/apache/horaedb-server) [![HoraeMeta Docker](https://img.shields.io/docker/v/apache/horaemeta-server?logo=docker&label=horaemeta-server)](https://hub.docker.com/r/apache/horaemeta-server) @@ -43,7 +43,7 @@ docker compose -f docker/docker-compose.yaml up ### Run from source code -See details [here](https://horaedb.apache.org/dev/compile_run.html). +Please read the [development guide](https://horaedb.apache.org/docs/dev/compile_run/) guide for instructions on how to build. ### Create Table and Write/Read data Create Table. @@ -95,11 +95,11 @@ Drop TABLE `demo` Thrive together in Apache HoraeDB (incubating) community with users and developers from all around the world. -- Discuss at [dev mailing list](mailto:dev-subscribe@horaedb.apache.org) ([subscribe](mailto:dev-subscribe@horaedb.apache.org?subject=(send%20this%20email%20to%20subscribe)) / [unsubscribe](mailto:dev-unsubscribe@horaedb.apache.org?subject=(send%20this%20email%20to%20unsubscribe)) / [archives](https://lists.apache.org/list.html?dev@horaedb.apache.org)) -- Send [request](mailto:dev@horaedb.apache.org?subject=(Request%to%20join%20HoraeDB%20slack)) to `dev@horaedb.apache.org` to join HoraeDB slack channel -- Or you can join our community [here](http://horaedb.apache.org/community/) +- Ask questions on [GitHub Discussion](https://github.com/apache/horaedb/discussions). +- Chat with developers/users on [Discord](https://discord.gg/h5r4kVMRYN) or [DingTalk](https://horaedb.apache.org/images/dingtalk.jpg). +- Mailing lists are a form of communication used by the Apache community. See guide [here](http://horaedb.apache.org/community/) to subscribe to our list. -Read our [Contributing Guide](CONTRIBUTING.md) and make your first contribution! +> Read our [Contributing Guide](CONTRIBUTING.md) and make your first contribution! ## Acknowledgment diff --git a/docs/example-cluster-0.toml b/docs/example-cluster-0.toml index edc809117b..169a9630af 100644 --- a/docs/example-cluster-0.toml +++ b/docs/example-cluster-0.toml @@ -37,7 +37,7 @@ type = "Local" data_dir = "/tmp/horaedb0" [analytic.wal] -type = "RocksDB" +type = "Local" data_dir = "/tmp/horaedb0" [cluster_deployment] diff --git a/docs/example-cluster-1.toml b/docs/example-cluster-1.toml index 7d312bec9b..e293f7cc26 100644 --- a/docs/example-cluster-1.toml +++ b/docs/example-cluster-1.toml @@ -38,7 +38,7 @@ type = "Local" data_dir = "/tmp/horaedb1" [analytic.wal] -type = "RocksDB" +type = "Local" data_dir = "/tmp/horaedb1" [cluster_deployment] diff --git a/docs/example-standalone-static-routing.toml b/docs/example-standalone-static-routing.toml index e519ea5a3a..7c7d77caf2 100644 --- a/docs/example-standalone-static-routing.toml +++ b/docs/example-standalone-static-routing.toml @@ -36,7 +36,7 @@ max_replay_tables_per_batch = 1024 write_group_command_channel_cap = 1024 [analytic.wal] -type = "RocksDB" +type = "Local" data_dir = "/tmp/horaedb1" [analytic.storage] @@ -91,4 +91,3 @@ shards = [ 1 ] [limiter] write_block_list = ['mytable1'] read_block_list = ['mytable1'] - diff --git a/docs/minimal.toml b/docs/minimal.toml index f66046e7de..de11aae7b5 100644 --- a/docs/minimal.toml +++ b/docs/minimal.toml @@ -32,7 +32,7 @@ type = "Local" data_dir = "/tmp/horaedb" [analytic.wal] -type = "RocksDB" +type = "Local" data_dir = "/tmp/horaedb" [analytic] diff --git a/docs/rfcs/20220702-prometheus-read-extension.md b/docs/rfcs/20220702-prometheus-read-extension.md index b35c1dc2d1..e1044f95d4 100644 --- a/docs/rfcs/20220702-prometheus-read-extension.md +++ b/docs/rfcs/20220702-prometheus-read-extension.md @@ -1,18 +1,18 @@ -Prometheus read extension for HoraeDB +Prometheus read extension for HoraeDB --------------------------- - Feature Name: prometheus-read-extension -- Tracking Issue: https://github.com/apache/incubator-horaedb/issues/90 +- Tracking Issue: https://github.com/apache/horaedb/issues/90 # Summary Drop-in and full-featured Prometheus read extension for HoraeDB # Motivation -Prometheus and PromQL are wide used in monitoring scenarios. It would be great if HoraeDB can be queried using PromQL. +Prometheus and PromQL are wide used in monitoring scenarios. It would be great if HoraeDB can be queried using PromQL. HoraeDB has the ability to store and compute a large amount of data. But PromQL contains some specific operators. Though HoraeDB supports a subset, it is hard and infeasible to implement all of them. -There are some brilliant distributed solutions like `Thanos` and `Cortex`. But the computation ability is limited in aspects of distributed execution or extensible (`Thanos` supports split query on time range (https://thanos.io/tip/components/query-frontend.md/#splitting). Combining `Prometheus` with `HoraeDB` can gain both high performance computation and the ability to query in other forms like SQL. +There are some brilliant distributed solutions like `Thanos` and `Cortex`. But the computation ability is limited in aspects of distributed execution or extensible (`Thanos` supports split query on time range (https://thanos.io/tip/components/query-frontend.md/#splitting). Combining `Prometheus` with `HoraeDB` can gain both high performance computation and the ability to query in other forms like SQL. This proposal aims to provide a way that: @@ -110,7 +110,7 @@ Query Frontend has to feed PromQL and SQL to servers separately because this int 1. `Query Frontend` accepts a PromQL. 2. `Query Frontend` splits the original PromQL into two sub queries and assigns a `TaskID`. -3. `Query Frontend` sends sub PromQL to `Prometheus` and sub SQL to HoraeDB. +3. `Query Frontend` sends sub PromQL to `Prometheus` and sub SQL to HoraeDB. 4. `Prometheus` processes the sub PromQL. It will query the data source (HoraeDB) for data. 5. `HoraeDB` receives a request from `Prometheus`, and a sub-SQL with the same `TaskID` from `Query Frontend`. 6. `HoraeDB` processes and returns result to `Prometheus`. @@ -137,48 +137,48 @@ Query_Frontend -> Client : response PromQL request --> ```plaintext - ,.-^^-._ - ,-. |-.____.-| - `-' | | - /|\ | | - | ,--------------. ,----------. | | - / \ |Query_Frontend| |Prometheus| '-.____.-' - Client `------+-------' `----+-----' HoraeDB - | PromQL request | | | - | -----------------------> | | - | | | | - | | sub PromQL request with TaskID| | - | | ------------------------------> | - | | | | - | | sub SQL request with TaskID | - | | ---------------------------------------------------------------> - | | | | - | | | remote storage read with TaskID| - | | | -------------------------------> - | | | | - | | | |----. + ,.-^^-._ + ,-. |-.____.-| + `-' | | + /|\ | | + | ,--------------. ,----------. | | + / \ |Query_Frontend| |Prometheus| '-.____.-' + Client `------+-------' `----+-----' HoraeDB + | PromQL request | | | + | -----------------------> | | + | | | | + | | sub PromQL request with TaskID| | + | | ------------------------------> | + | | | | + | | sub SQL request with TaskID | + | | ---------------------------------------------------------------> + | | | | + | | | remote storage read with TaskID| + | | | -------------------------------> + | | | | + | | | |----. | | | | | pull data and compute - | | | |<---' - | | | | - | | | response remote read request | - | | | <------------------------------- - | | | | - | | |----. | - | | | | compute | - | | |<---' | - | | | | - | | response PromQL request | | - | | <------------------------------ | - | | | | - | response PromQL request| | | - | <----------------------- | | - Client ,------+-------. ,----+-----. HoraeDB - ,-. |Query_Frontend| |Prometheus| ,.-^^-._ - `-' `--------------' `----------' |-.____.-| - /|\ | | - | | | - / \ | | - '-.____.-' + | | | |<---' + | | | | + | | | response remote read request | + | | | <------------------------------- + | | | | + | | |----. | + | | | | compute | + | | |<---' | + | | | | + | | response PromQL request | | + | | <------------------------------ | + | | | | + | response PromQL request| | | + | <----------------------- | | + Client ,------+-------. ,----+-----. HoraeDB + ,-. |Query_Frontend| |Prometheus| ,.-^^-._ + `-' `--------------' `----------' |-.____.-| + /|\ | | + | | | + / \ | | + '-.____.-' ``` ### Separated HoraeDB cluster @@ -216,50 +216,50 @@ Query_Frontend -> Client : response PromQL request --> ```plaintext - ,.-^^-._ + ,.-^^-._ ,-. |-.____.-| `-' | | /|\ | | | ,--------------. ,----------. | | / \ |Query_Frontend| |Prometheus| '-.____.-' - Client `------+-------' `----+-----' HoraeDB - | PromQL request | | | - | -----------------------> | | - | | | | - | | sub PromQL request | | - | | ----------------------------> | - | | | | - | |----. | | - | | | store the sub SQL | | - | |<---' | | - | | | | - | | remote storage read | | - | | <---------------------------- | - | | | | - | | query sub SQL using HoraeDB Client | - | | ------------------------------------------------>| - | | | | - | | sub SQL query result | - | | <------------------------------------------------| - | | | | - | |----. | | - | | | transform data format | | - | |<---' | | - | | | | - | | response remote read request| | - | | ----------------------------> | - | | | | - | | |----. | - | | | | compute | - | | |<---' | - | | | | - | | response sub PromQL request | | - | | <---------------------------- | - | | | | - | response PromQL request| | | - | <----------------------- | | - Client ,------+-------. ,----+-----. HoraeDB - ,-. |Query_Frontend| |Prometheus| ,.-^^-._ + Client `------+-------' `----+-----' HoraeDB + | PromQL request | | | + | -----------------------> | | + | | | | + | | sub PromQL request | | + | | ----------------------------> | + | | | | + | |----. | | + | | | store the sub SQL | | + | |<---' | | + | | | | + | | remote storage read | | + | | <---------------------------- | + | | | | + | | query sub SQL using HoraeDB Client | + | | ------------------------------------------------>| + | | | | + | | sub SQL query result | + | | <------------------------------------------------| + | | | | + | |----. | | + | | | transform data format | | + | |<---' | | + | | | | + | | response remote read request| | + | | ----------------------------> | + | | | | + | | |----. | + | | | | compute | + | | |<---' | + | | | | + | | response sub PromQL request | | + | | <---------------------------- | + | | | | + | response PromQL request| | | + | <----------------------- | | + Client ,------+-------. ,----+-----. HoraeDB + ,-. |Query_Frontend| |Prometheus| ,.-^^-._ `-' `--------------' `----------' |-.____.-| /|\ | | | | | @@ -268,13 +268,13 @@ Query_Frontend -> Client : response PromQL request ``` ## Comparison -Both ways can achieve our initial requirements and are able to implement distributed execution in the future. +Both ways can achieve our initial requirements and are able to implement distributed execution in the future. - Embedded `HoraeDB` - Pros. - `HoraeDB` feeds data to `Prometheus` directly, reducing some computation and transmission. - Cons. - - Need to customize a `Prometheus` specific interface in `HoraeDB`. + - Need to customize a `Prometheus` specific interface in `HoraeDB`. - The deployment may requires all three components bound together for simplicity. - Separated `HoraeDB` cluster - Pros. @@ -282,7 +282,7 @@ Both ways can achieve our initial requirements and are able to implement distrib - The deployment only requires one `Query Frontend` along with `Prometheus` which is more lightweight and less invasive. - States of `HoraeDB` and `Query Frontend` are simple and clear. - Cons. - - One more data transforming and forwarding in `Query Frontend` (pass results from `HoraeDB` to `Prometheus`). + - One more data transforming and forwarding in `Query Frontend` (pass results from `HoraeDB` to `Prometheus`). # Drawbacks Detailed in the "Comparison" section above. diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock index df1d9e0d23..632b74b743 100644 --- a/horaedb/Cargo.lock +++ b/horaedb/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -122,17 +122,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45aef0d9cf9a039bf6cd1acc451b137aca819977b0928dece52bd92811b640ba" dependencies = [ "arrow-arith 53.0.0", - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-cast 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", "arrow-csv 53.0.0", - "arrow-data 53.0.0", - "arrow-ipc 53.0.0", + "arrow-data 53.1.0", + "arrow-ipc 53.1.0", "arrow-json 53.0.0", "arrow-ord 53.0.0", "arrow-row 53.0.0", - "arrow-schema 53.0.0", - "arrow-select 53.0.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", "arrow-string 53.0.0", ] @@ -157,10 +157,10 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03675e42d1560790f3524800e41403b40d0da1c793fe9528929fde06d8c7649a" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "chrono", "half", "num", @@ -185,14 +185,14 @@ dependencies = [ [[package]] name = "arrow-array" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd2bf348cf9f02a5975c5962c7fa6dee107a2009a7b41ac5fb1a027e12dc033f" +checksum = "7f16835e8599dbbb1659fd869d865254c4cf32c6c2bb60b6942ac9fc36bfa5da" dependencies = [ "ahash", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "chrono", "half", "hashbrown", @@ -212,9 +212,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3092e37715f168976012ce52273c3989b5793b0db5f06cbaa246be25e5f0924d" +checksum = "1a1f34f0faae77da6b142db61deba2cb6d60167592b178be317b341440acba80" dependencies = [ "bytes", "half", @@ -237,28 +237,28 @@ dependencies = [ "chrono", "comfy-table", "half", - "lexical-core", + "lexical-core 0.8.5", "num", "ryu", ] [[package]] name = "arrow-cast" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ce1018bb710d502f9db06af026ed3561552e493e989a79d0d0f5d9cf267a785" +checksum = "450e4abb5775bca0740bec0bcf1b1a5ae07eff43bd625661c4436d8e8e4540c4" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", - "arrow-select 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", "atoi", "base64", "chrono", "comfy-table", "half", - "lexical-core", + "lexical-core 1.0.2", "num", "ryu", ] @@ -278,7 +278,7 @@ dependencies = [ "csv", "csv-core", "lazy_static", - "lexical-core", + "lexical-core 0.8.5", "regex", ] @@ -288,16 +288,16 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd178575f45624d045e4ebee714e246a05d9652e41363ee3f57ec18cca97f740" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-cast 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "chrono", "csv", "csv-core", "lazy_static", - "lexical-core", + "lexical-core 0.8.5", "regex", ] @@ -315,12 +315,12 @@ dependencies = [ [[package]] name = "arrow-data" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4ac0c4ee79150afe067dc4857154b3ee9c1cd52b5f40d59a77306d0ed18d65" +checksum = "2b1e618bbf714c7a9e8d97203c806734f012ff71ae3adc8ad1b075689f540634" dependencies = [ - "arrow-buffer 53.0.0", - "arrow-schema 53.0.0", + "arrow-buffer 53.1.0", + "arrow-schema 53.1.0", "half", "num", ] @@ -342,15 +342,15 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb307482348a1267f91b0912e962cd53440e5de0f7fb24c5f7b10da70b38c94a" +checksum = "f98e983549259a2b97049af7edfb8f28b8911682040e99a94e4ceb1196bd65c2" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-cast 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "flatbuffers", ] @@ -368,7 +368,7 @@ dependencies = [ "chrono", "half", "indexmap", - "lexical-core", + "lexical-core 0.8.5", "num", "serde", "serde_json", @@ -380,15 +380,15 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d24805ba326758effdd6f2cbdd482fcfab749544f21b134701add25b33f474e6" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-cast 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "chrono", "half", "indexmap", - "lexical-core", + "lexical-core 0.8.5", "num", "serde", "serde_json", @@ -415,11 +415,11 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "644046c479d80ae8ed02a7f1e1399072ea344ca6a7b0e293ab2d5d9ed924aa3b" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", - "arrow-select 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", "half", "num", ] @@ -445,10 +445,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a29791f8eb13b340ce35525b723f5f0df17ecb955599e11f65c2a94ab34e2efb" dependencies = [ "ahash", - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "half", ] @@ -460,9 +460,9 @@ checksum = "9e972cd1ff4a4ccd22f86d3e53e835c2ed92e0eea6a3e8eadb72b4f1ac802cf8" [[package]] name = "arrow-schema" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c85320a3a2facf2b2822b57aa9d6d9d55edb8aee0b6b5d3b8df158e503d10858" +checksum = "fbf0388a18fd7f7f3fe3de01852d30f54ed5182f9004db700fbe3ba843ed2794" [[package]] name = "arrow-select" @@ -480,15 +480,15 @@ dependencies = [ [[package]] name = "arrow-select" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cc7e6b582e23855fd1625ce46e51647aa440c20ea2e71b1d748e0839dd73cba" +checksum = "b83e5723d307a38bf00ecd2972cd078d1339c7fd3eb044f609958a9a24463f3a" dependencies = [ "ahash", - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "num", ] @@ -515,11 +515,11 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0775b6567c66e56ded19b87a954b6b1beffbdd784ef95a3a2b03f59570c1d230" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", - "arrow-select 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", "memchr", "num", "regex", @@ -552,7 +552,7 @@ checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] @@ -905,7 +905,7 @@ dependencies = [ "num_cpus", "object_store 0.10.2", "parking_lot", - "parquet", + "parquet 52.2.0", "paste", "pin-project-lite", "rand", @@ -951,7 +951,7 @@ dependencies = [ "libc", "num_cpus", "object_store 0.10.2", - "parquet", + "parquet 52.2.0", "sqlparser", ] @@ -1334,7 +1334,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] @@ -1583,11 +1583,24 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" dependencies = [ - "lexical-parse-float", - "lexical-parse-integer", - "lexical-util", - "lexical-write-float", - "lexical-write-integer", + "lexical-parse-float 0.8.5", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "lexical-write-float 0.8.5", + "lexical-write-integer 0.8.5", +] + +[[package]] +name = "lexical-core" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0431c65b318a590c1de6b8fd6e72798c92291d27762d94c9e6c37ed7a73d8458" +dependencies = [ + "lexical-parse-float 1.0.2", + "lexical-parse-integer 1.0.2", + "lexical-util 1.0.3", + "lexical-write-float 1.0.2", + "lexical-write-integer 1.0.2", ] [[package]] @@ -1596,8 +1609,19 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" dependencies = [ - "lexical-parse-integer", - "lexical-util", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb17a4bdb9b418051aa59d41d65b1c9be5affab314a872e5ad7f06231fb3b4e0" +dependencies = [ + "lexical-parse-integer 1.0.2", + "lexical-util 1.0.3", "static_assertions", ] @@ -1607,7 +1631,17 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5df98f4a4ab53bf8b175b363a34c7af608fe31f93cc1fb1bf07130622ca4ef61" +dependencies = [ + "lexical-util 1.0.3", "static_assertions", ] @@ -1620,14 +1654,34 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "lexical-util" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85314db53332e5c192b6bca611fb10c114a80d1b831ddac0af1e9be1b9232ca0" +dependencies = [ + "static_assertions", +] + [[package]] name = "lexical-write-float" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" dependencies = [ - "lexical-util", - "lexical-write-integer", + "lexical-util 0.8.5", + "lexical-write-integer 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e7c3ad4e37db81c1cbe7cf34610340adc09c322871972f74877a712abc6c809" +dependencies = [ + "lexical-util 1.0.3", + "lexical-write-integer 1.0.2", "static_assertions", ] @@ -1637,7 +1691,17 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb89e9f6958b83258afa3deed90b5de9ef68eef090ad5086c791cd2345610162" +dependencies = [ + "lexical-util 1.0.3", "static_assertions", ] @@ -1695,6 +1759,10 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "macros" +version = "2.1.0" + [[package]] name = "md-5" version = "0.10.6" @@ -1718,11 +1786,16 @@ dependencies = [ "anyhow", "arrow 53.0.0", "async-trait", + "bytes", "datafusion", "futures", "itertools 0.3.25", "lazy_static", + "macros", "object_store 0.11.0", + "parquet 53.1.0", + "pb_types", + "prost", "thiserror", "tokio", ] @@ -1748,6 +1821,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1973,6 +2052,42 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "parquet" +version = "53.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "310c46a70a3ba90d98fec39fa2da6d9d731e544191da6fb56c9d199484d0dd3e" +dependencies = [ + "ahash", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", + "arrow-data 53.1.0", + "arrow-ipc 53.1.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", + "base64", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "half", + "hashbrown", + "lz4_flex", + "num", + "num-bigint", + "object_store 0.11.0", + "paste", + "seq-macro", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd", + "zstd-sys", +] + [[package]] name = "parse-zoneinfo" version = "0.3.1" @@ -1988,6 +2103,14 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pb_types" +version = "2.0.0" +dependencies = [ + "prost", + "prost-build", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -2069,6 +2192,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "910d41a655dac3b764f1ade94821093d3610248694320cd072303a8eedcf221d" +dependencies = [ + "proc-macro2", + "syn 2.0.82", +] + [[package]] name = "proc-macro2" version = "1.0.86" @@ -2078,6 +2211,59 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.13.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.82", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.82", +] + +[[package]] +name = "prost-types" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.37" @@ -2239,7 +2425,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] @@ -2350,7 +2536,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] @@ -2387,7 +2573,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] @@ -2415,7 +2601,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] @@ -2437,9 +2623,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.77" +version = "2.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" +checksum = "83540f837a8afc019423a8edb95b52a8effe46957ee402287f4292fae35be021" dependencies = [ "proc-macro2", "quote", @@ -2476,7 +2662,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] @@ -2550,7 +2736,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] @@ -2585,7 +2771,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] @@ -2742,7 +2928,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", "wasm-bindgen-shared", ] @@ -2764,7 +2950,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2934,7 +3120,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.82", ] [[package]] diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml index b304860201..ee231cbea3 100644 --- a/horaedb/Cargo.toml +++ b/horaedb/Cargo.toml @@ -23,15 +23,19 @@ license = "Apache-2.0" [workspace] resolver = "2" -members = ["metric_engine", "server"] +members = ["metric_engine", "pb_types", "server"] [workspace.dependencies] anyhow = { version = "1.0" } metric_engine = { path = "metric_engine" } thiserror = "1" +bytes = "1" datafusion = "41" parquet = { version = "53" } object_store = { version = "0.11" } +macros = { path = "../src/components/macros" } +pb_types = { path = "pb_types" } +prost = { version = "0.13" } arrow = { version = "53", features = ["prettyprint"] } tokio = { version = "1", features = ["full"] } async-trait = "0.1" diff --git a/horaedb/Makefile b/horaedb/Makefile index 908a8cb872..72736b0af4 100644 --- a/horaedb/Makefile +++ b/horaedb/Makefile @@ -19,10 +19,14 @@ SHELL = /bin/bash clippy: cargo clippy --all-targets --all-features -- -D warnings \ - -A dead_code -A unused_variables # Remove these once we have a clean build + -A dead_code -A unused_variables -A clippy::unreachable # Remove these once we have a clean build sort: cargo sort --workspace --check fmt: cargo fmt -- --check + +fix: + cargo fmt + cargo sort --workspace diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml index a29abb8c6d..d2ea85c8c9 100644 --- a/horaedb/metric_engine/Cargo.toml +++ b/horaedb/metric_engine/Cargo.toml @@ -34,10 +34,15 @@ workspace = true anyhow = { workspace = true } arrow = { workspace = true } async-trait = { workspace = true } +bytes = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } +macros = { workspace = true } object_store = { workspace = true } +parquet = { workspace = true, features = ["object_store"] } +pb_types = { workspace = true } +prost = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/horaedb/metric_engine/src/error.rs b/horaedb/metric_engine/src/error.rs index 08e720f499..35b8e9aef5 100644 --- a/horaedb/metric_engine/src/error.rs +++ b/horaedb/metric_engine/src/error.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub use anyhow::Error as AnyhowError; use thiserror::Error; #[derive(Error, Debug)] diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs index 1a0149dba2..8a05223ca3 100644 --- a/horaedb/metric_engine/src/lib.rs +++ b/horaedb/metric_engine/src/lib.rs @@ -23,4 +23,4 @@ mod sst; pub mod storage; pub mod types; -pub use error::{Error, Result}; +pub use error::{AnyhowError, Error, Result}; diff --git a/horaedb/metric_engine/src/manifest.rs b/horaedb/metric_engine/src/manifest.rs index 4839126765..aceac3daba 100644 --- a/horaedb/metric_engine/src/manifest.rs +++ b/horaedb/metric_engine/src/manifest.rs @@ -15,11 +15,119 @@ // specific language governing permissions and limitations // under the License. -pub struct Manifest {} +use anyhow::Context; +use bytes::Bytes; +use object_store::{path::Path, PutPayload}; +use prost::Message; +use tokio::sync::RwLock; + +use crate::{ + sst::{FileId, FileMeta, SstFile}, + types::ObjectStoreRef, + AnyhowError, Error, Result, +}; + +pub const PREFIX_PATH: &str = "manifest"; +pub const SNAPSHOT_FILENAME: &str = "snapshot"; + +pub struct Manifest { + path: String, + snapshot_path: Path, + store: ObjectStoreRef, + + payload: RwLock, +} + +pub struct Payload { + files: Vec, +} + +impl TryFrom for Payload { + type Error = Error; + + fn try_from(value: pb_types::Manifest) -> Result { + let files = value + .files + .into_iter() + .map(SstFile::try_from) + .collect::>>()?; + + Ok(Self { files }) + } +} impl Manifest { - pub fn new(id: u64) -> Self { - // Recover the manifest using the id from storage. - Self {} + pub async fn try_new(path: String, store: ObjectStoreRef) -> Result { + let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}")); + let payload = match store.get(&snapshot_path).await { + Ok(v) => { + let bytes = v + .bytes() + .await + .context("failed to read manifest snapshot")?; + let pb_payload = pb_types::Manifest::decode(bytes) + .context("failed to decode manifest snapshot")?; + Payload::try_from(pb_payload)? + } + Err(err) => { + if err.to_string().contains("not found") { + Payload { files: vec![] } + } else { + let context = format!("Failed to get manifest snapshot, path:{snapshot_path}"); + return Err(AnyhowError::new(err).context(context).into()); + } + } + }; + + Ok(Self { + path, + snapshot_path, + store, + payload: RwLock::new(payload), + }) + } + + // TODO: Now this functions is poorly implemented, we concat new_sst to + // snapshot, and upload it back in a whole. + // In more efficient way, we can create a new diff file, and do compaction in + // background to merge them to `snapshot`. + pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> { + let mut payload = self.payload.write().await; + let mut tmp_ssts = payload.files.clone(); + let new_sst = SstFile { id, meta }; + tmp_ssts.push(new_sst.clone()); + let pb_manifest = pb_types::Manifest { + files: tmp_ssts + .into_iter() + .map(|f| pb_types::SstFile { + id: f.id, + meta: Some(pb_types::SstMeta { + max_sequence: f.meta.max_sequence, + num_rows: f.meta.num_rows, + time_range: Some(pb_types::TimeRange { + start: f.meta.time_range.start, + end: f.meta.time_range.end, + }), + }), + }) + .collect::>(), + }; + + let mut buf = Vec::with_capacity(pb_manifest.encoded_len()); + pb_manifest + .encode(&mut buf) + .context("failed to encode manifest")?; + let put_payload = PutPayload::from_bytes(Bytes::from(buf)); + + // 1. Persist the snapshot + self.store + .put(&self.snapshot_path, put_payload) + .await + .context("Failed to update manifest")?; + + // 2. Update cached payload + payload.files.push(new_sst); + + Ok(()) } } diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs index 37cc8f11d8..5eb96867ad 100644 --- a/horaedb/metric_engine/src/sst.rs +++ b/horaedb/metric_engine/src/sst.rs @@ -15,6 +15,78 @@ // specific language governing permissions and limitations // under the License. -pub struct SSTable { - pub id: u64, +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + LazyLock, + }, + time::SystemTime, +}; + +use macros::ensure; + +use crate::{types::TimeRange, Error}; + +pub const PREFIX_PATH: &str = "data"; + +pub type FileId = u64; + +#[derive(Clone, Debug)] +pub struct SstFile { + pub id: FileId, + pub meta: FileMeta, +} + +impl TryFrom for SstFile { + type Error = Error; + + fn try_from(value: pb_types::SstFile) -> Result { + ensure!(value.meta.is_some(), "file meta is missing"); + let meta = value.meta.unwrap(); + let meta = meta.try_into()?; + + Ok(Self { id: value.id, meta }) + } +} + +#[derive(Clone, Debug)] +pub struct FileMeta { + pub max_sequence: u64, + pub num_rows: u32, + pub time_range: TimeRange, +} + +impl TryFrom for FileMeta { + type Error = Error; + + fn try_from(value: pb_types::SstMeta) -> Result { + ensure!(value.time_range.is_some(), "time range is missing"); + let time_range = value.time_range.unwrap(); + + Ok(Self { + max_sequence: value.max_sequence, + num_rows: value.num_rows, + time_range: TimeRange { + start: time_range.start, + end: time_range.end, + }, + }) + } +} + +// Used for sst file id allocation. +// This number mustn't go backwards on restarts, otherwise file id +// collisions are possible. So don't change time on the server +// between server restarts. +static NEXT_ID: LazyLock = LazyLock::new(|| { + AtomicU64::new( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64, + ) +}); + +pub fn allocate_id() -> u64 { + NEXT_ID.fetch_add(1, Ordering::SeqCst) } diff --git a/horaedb/metric_engine/src/storage.rs b/horaedb/metric_engine/src/storage.rs index 1cae2bb9c0..4c5b2667e6 100644 --- a/horaedb/metric_engine/src/storage.rs +++ b/horaedb/metric_engine/src/storage.rs @@ -15,19 +15,30 @@ // specific language governing permissions and limitations // under the License. -use arrow::{array::RecordBatch, datatypes::Schema}; +use anyhow::Context; +use arrow::{ + array::{Int64Array, RecordBatch}, + datatypes::SchemaRef, +}; use async_trait::async_trait; use datafusion::logical_expr::Expr; +use macros::ensure; +use object_store::path::Path; +use parquet::{ + arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter}, + file::properties::WriterProperties, +}; use crate::{ manifest::Manifest, - sst::SSTable, - types::{ObjectStoreRef, SendableRecordBatchStream, TimeRange}, + sst::{allocate_id, FileId, FileMeta}, + types::{ObjectStoreRef, SendableRecordBatchStream, TimeRange, Timestamp}, Result, }; pub struct WriteRequest { batch: RecordBatch, + props: Option, } pub struct ScanRequest { @@ -42,7 +53,7 @@ pub struct CompactRequest {} /// Time-aware merge storage interface. #[async_trait] pub trait TimeMergeStorage { - fn schema(&self) -> Result<&Schema>; + fn schema(&self) -> &SchemaRef; async fn write(&self, req: WriteRequest) -> Result<()>; @@ -53,35 +64,106 @@ pub trait TimeMergeStorage { async fn compact(&self, req: CompactRequest) -> Result<()>; } -/// TMStorage implementation using cloud object storage. +/// `TimeMergeStorage` implementation using cloud object storage. pub struct CloudObjectStorage { - name: String, - id: u64, + path: String, store: ObjectStoreRef, - sstables: Vec, + arrow_schema: SchemaRef, + timestamp_index: usize, manifest: Manifest, } +/// It will organize the data in the following way: +/// ```plaintext +/// {root_path}/manifest/snapshot +/// {root_path}/manifest/timestamp1 +/// {root_path}/manifest/timestamp2 +/// {root_path}/manifest/... +/// {root_path}/data/timestamp_a.sst +/// {root_path}/data/timestamp_b.sst +/// {root_path}/data/... +/// ``` impl CloudObjectStorage { - pub fn new(name: String, id: u64, store: ObjectStoreRef) -> Self { - Self { - name, - id, + pub async fn try_new( + root_path: String, + store: ObjectStoreRef, + arrow_schema: SchemaRef, + timestamp_index: usize, + ) -> Result { + let manifest_prefix = crate::manifest::PREFIX_PATH; + let manifest = + Manifest::try_new(format!("{root_path}/{manifest_prefix}"), store.clone()).await?; + Ok(Self { + path: root_path, + timestamp_index, store, - sstables: Vec::new(), - manifest: Manifest::new(id), - } + arrow_schema, + manifest, + }) + } + + fn build_file_path(&self, id: FileId) -> String { + let root = &self.path; + let prefix = crate::sst::PREFIX_PATH; + format!("{root}/{prefix}/{id}") + } + + async fn write_batch(&self, req: WriteRequest) -> Result { + let file_id = allocate_id(); + let file_path = self.build_file_path(file_id); + let object_store_writer = + ParquetObjectWriter::new(self.store.clone(), Path::from(file_path)); + let mut writer = + AsyncArrowWriter::try_new(object_store_writer, self.schema().clone(), req.props) + .context("create arrow writer")?; + + // TODO: sort record batch according to primary key columns. + writer + .write(&req.batch) + .await + .context("write arrow batch")?; + writer.close().await.context("close arrow writer")?; + + Ok(file_id) } } #[async_trait] impl TimeMergeStorage for CloudObjectStorage { - fn schema(&self) -> Result<&Schema> { - todo!() + fn schema(&self) -> &SchemaRef { + &self.arrow_schema } async fn write(&self, req: WriteRequest) -> Result<()> { - todo!() + ensure!(req.batch.schema_ref().eq(self.schema()), "schema not match"); + + let num_rows = req.batch.num_rows(); + let time_column = req + .batch + .column(self.timestamp_index) + .as_any() + .downcast_ref::() + .context("timestamp column should be int64")?; + + let mut start = Timestamp::MAX; + let mut end = Timestamp::MIN; + for v in time_column.values() { + start = start.min(*v); + end = end.max(*v); + } + let time_range = TimeRange { + start, + end: end + 1, + }; + let file_id = self.write_batch(req).await?; + let file_meta = FileMeta { + max_sequence: file_id, // Since file_id in increasing order, we can use it as sequence. + num_rows: num_rows as u32, + time_range, + }; + self.manifest.add_file(file_id, file_meta).await?; + + Ok(()) } async fn scan(&self, req: ScanRequest) -> Result { diff --git a/horaedb/metric_engine/src/types.rs b/horaedb/metric_engine/src/types.rs index 08d42fcdcb..96a4b74ad4 100644 --- a/horaedb/metric_engine/src/types.rs +++ b/horaedb/metric_engine/src/types.rs @@ -23,7 +23,8 @@ use object_store::ObjectStore; use crate::error::Result; -pub type TimeRange = Range; +pub type Timestamp = i64; +pub type TimeRange = Range; pub type ObjectStoreRef = Arc; diff --git a/horaedb/pb_types/Cargo.toml b/horaedb/pb_types/Cargo.toml new file mode 100644 index 0000000000..e6929fa018 --- /dev/null +++ b/horaedb/pb_types/Cargo.toml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "pb_types" + +[package.license] +workspace = true + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] +prost = { workspace = true } + +[build-dependencies] +prost-build = { version = "0.13" } diff --git a/horaedb/pb_types/build.rs b/horaedb/pb_types/build.rs new file mode 100644 index 0000000000..7eb68464b0 --- /dev/null +++ b/horaedb/pb_types/build.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::Result; + +fn main() -> Result<()> { + prost_build::compile_protos(&["protos/sst.proto"], &["protos/"])?; + Ok(()) +} diff --git a/horaedb/pb_types/protos/sst.proto b/horaedb/pb_types/protos/sst.proto new file mode 100644 index 0000000000..ce3db30169 --- /dev/null +++ b/horaedb/pb_types/protos/sst.proto @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +syntax = "proto3"; + +package pb_types.sst; + +// Time range of [start, end) +message TimeRange { + // inclusive + int64 start = 1; + // exclusive + int64 end = 2; +} + +message SstMeta { + uint64 max_sequence = 1; + uint32 num_rows = 2; + TimeRange time_range = 3; +} + +message SstFile { + uint64 id = 1; + SstMeta meta = 2; +} + +message Manifest { + repeated SstFile files = 1; +} + +message MetaUpdate { + repeated SstFile to_adds = 1; + repeated uint64 to_removes = 2; +} diff --git a/horaedb/pb_types/src/lib.rs b/horaedb/pb_types/src/lib.rs new file mode 100644 index 0000000000..bfa215b02c --- /dev/null +++ b/horaedb/pb_types/src/lib.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod pb_types { + include!(concat!(env!("OUT_DIR"), "/pb_types.sst.rs")); +} + +pub use pb_types::*; diff --git a/horaedb/rust-toolchain.toml b/horaedb/rust-toolchain.toml new file mode 100644 index 0000000000..4c621ca810 --- /dev/null +++ b/horaedb/rust-toolchain.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[toolchain] +channel = "nightly-2024-10-15" +components = [ "rustfmt", "clippy" ] diff --git a/horaemeta/CONTRIBUTING.md b/horaemeta/CONTRIBUTING.md index 97aa18b210..e361cf0b59 100644 --- a/horaemeta/CONTRIBUTING.md +++ b/horaemeta/CONTRIBUTING.md @@ -5,14 +5,14 @@ To make the process easier and more valuable for everyone involved we have a few ## Submitting Issues and Feature Requests -Before you file an [issue](https://github.com/apache/incubator-horaedb-meta/issues/new), please search existing issues in case the same or similar issues have already been filed. +Before you file an [issue](https://github.com/apache/horaedb/issues/new), please search existing issues in case the same or similar issues have already been filed. If you find an existing open ticket covering your issue then please avoid adding "👍" or "me too" comments; Github notifications can cause a lot of noise for the project maintainers who triage the back-log. However, if you have a new piece of information for an existing ticket and you think it may help the investigation or resolution, then please do add it as a comment! You can signal to the team that you're experiencing an existing issue with one of Github's emoji reactions (these are a good way to add "weight" to an issue from a prioritisation perspective). ### Submitting an Issue -The [New Issue]((https://github.com/apache/incubator-horaedb-meta/issues/new)) page has templates for both bug reports and feature requests. +The [New Issue]((https://github.com/apache/horaedb/issues/new)) page has templates for both bug reports and feature requests. Please fill one of them out! The issue templates provide details on what information we will find useful to help us fix an issue. In short though, the more information you can provide us about your environment and what behaviour you're seeing, the easier we can fix the issue. @@ -30,14 +30,14 @@ Please see the [Style Guide](docs/style_guide.md) for more details. To open a PR you will need to have a Github account. Fork the `horaemeta` repo and work on a branch on your fork. -When you have completed your changes, or you want some incremental feedback make a Pull Request to HoraeDB [here](https://github.com/apache/incubator-horaedb-meta/compare). +When you have completed your changes, or you want some incremental feedback make a Pull Request to HoraeDB [here](https://github.com/apache/horaedb/compare). If you want to discuss some work in progress then please prefix `[WIP]` to the PR title. For PRs that you consider ready for review, verify the following locally before you submit it: -* you have a coherent set of logical commits, with messages conforming to the [Conventional Commits](https://github.com/apache/incubator-horaedb-docs/blob/main/docs/src/en/dev/conventional_commit.md) specification; +* you have a coherent set of logical commits, with messages conforming to the [Conventional Commits](https://horaedb.apache.org/docs/dev/conventional_commit/) specification; * all the tests and/or benchmarks pass, including documentation tests; * the code is correctly formatted and all linter checks pass; and * you haven't left any "code cruft" (commented out code blocks etc). diff --git a/horaemeta/go.mod b/horaemeta/go.mod index 056f06910c..86af45c8c6 100644 --- a/horaemeta/go.mod +++ b/horaemeta/go.mod @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + module github.com/apache/incubator-horaedb-meta go 1.21 diff --git a/integration_tests/Makefile b/integration_tests/Makefile index e6ce21bdfa..fe7fbcdcb8 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -54,7 +54,7 @@ build-meta: ./build_meta.sh build-horaedb: - cd .. && cargo build --bin horaedb-server --features wal-table-kv,wal-message-queue,wal-rocksdb,wal-local-storage + cd .. && make build-debug build-test: cargo build diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result b/integration_tests/cases/env/cluster/ddl/partition_table.result index 87f0708ade..980a7bc1e1 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/ddl/partition_table.result @@ -100,10 +100,11 @@ UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx -- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx @@ -111,10 +112,11 @@ String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:f -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx -- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4"); plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), ALTER TABLE partition_table_t ADD COLUMN (b string); diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql b/integration_tests/cases/env/cluster/ddl/partition_table.sql index 9b056de7b5..e1f32de515 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.sql +++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql @@ -58,6 +58,7 @@ SELECT * from partition_table_t where name in ("horaedb5", "horaedb6", "horaedb7 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx -- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx @@ -65,6 +66,7 @@ EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx -- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4"); ALTER TABLE partition_table_t ADD COLUMN (b string); diff --git a/integration_tests/cases/env/local/ddl/query-plan.result b/integration_tests/cases/env/local/ddl/query-plan.result index 11b19c14c2..ee1e27c0d0 100644 --- a/integration_tests/cases/env/local/ddl/query-plan.result +++ b/integration_tests/cases/env/local/ddl/query-plan.result @@ -50,7 +50,7 @@ explain analyze select t from `03_dml_select_real_time_range` where t > 1695348001000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t]:\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_164, fetched_columns:[tsid,t]:\n=0]\n"), -- This query should have higher priority @@ -60,7 +60,7 @@ explain analyze select t from `03_dml_select_real_time_range` where t >= 1695348001000 and t < 1695348002000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t]:\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_164, fetched_columns:[tsid,t]:\n=0]\n"), -- This query should have higher priority @@ -70,7 +70,7 @@ explain analyze select name from `03_dml_select_real_time_range` where t >= 1695348001000 and t < 1695348002000; plan_type,plan, -String("Plan with Metrics"),String("ProjectionExec: expr=[name@0 as name], metrics=xx\n ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t,name]:\n=0]\n"), +String("Plan with Metrics"),String("ProjectionExec: expr=[name@0 as name], metrics=xx\n ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_164, fetched_columns:[tsid,t,name]:\n=0]\n"), -- This query should not include memtable @@ -135,7 +135,7 @@ explain analyze select t from `03_append_mode_table` where t >= 1695348001000 and name = 'ceresdb'; plan_type,plan, -String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=xx\n ScanTable: table=03_append_mode_table, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=1\n num_ssts=0\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_memtable_1, fetched_columns:[t,name]:\n=0]\n"), +String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=xx\n ScanTable: table=03_append_mode_table, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=1\n num_ssts=0\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_memtable_166, fetched_columns:[t,name]:\n=0]\n"), -- Should just fetch projected columns from SST diff --git a/integration_tests/config/horaedb-cluster-0.toml b/integration_tests/config/horaedb-cluster-0.toml index da6170bc86..3a585bdfda 100644 --- a/integration_tests/config/horaedb-cluster-0.toml +++ b/integration_tests/config/horaedb-cluster-0.toml @@ -37,7 +37,7 @@ type = "Local" data_dir = "/tmp/horaedb0" [analytic.wal] -type = "RocksDB" +type = "Local" data_dir = "/tmp/horaedb0" [cluster_deployment] diff --git a/integration_tests/config/horaedb-cluster-1.toml b/integration_tests/config/horaedb-cluster-1.toml index f1f7173256..e3943a6983 100644 --- a/integration_tests/config/horaedb-cluster-1.toml +++ b/integration_tests/config/horaedb-cluster-1.toml @@ -38,7 +38,7 @@ type = "Local" data_dir = "/tmp/horaedb1" [analytic.wal] -type = "RocksDB" +type = "Local" data_dir = "/tmp/horaedb1" [cluster_deployment] diff --git a/integration_tests/config/shard-based-recovery.toml b/integration_tests/config/shard-based-recovery.toml index 78b09a8fa9..92e56f6a18 100644 --- a/integration_tests/config/shard-based-recovery.toml +++ b/integration_tests/config/shard-based-recovery.toml @@ -34,5 +34,5 @@ type = "Local" data_dir = "/tmp/horaedb" [analytic.wal] -type = "RocksDB" +type = "Local" data_dir = "/tmp/horaedb" diff --git a/integration_tests/sdk/go/go.mod b/integration_tests/sdk/go/go.mod index b50e5d5734..8bae76d8c9 100644 --- a/integration_tests/sdk/go/go.mod +++ b/integration_tests/sdk/go/go.mod @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + module go-sdk-test go 1.21 diff --git a/licenserc.toml b/licenserc.toml index e08f40f95d..9bace81eb3 100644 --- a/licenserc.toml +++ b/licenserc.toml @@ -34,5 +34,12 @@ excludes = [ "src/components/future_ext/src/cancel.rs", "src/components/tracing_util/src/lib.rs", "src/components/tracing_util/src/logging.rs", - "src/components/tracing_util/Cargo.toml" + "src/components/tracing_util/Cargo.toml", + "DISCLAIMER", + "NOTICE", + "horaemeta/DEPENDENCIES.csv", + "DEPENDENCIES.tsv", + # Test files + "*snap", + "*result" ] diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml index 8197b4eeb1..09ff47af21 100644 --- a/src/analytic_engine/Cargo.toml +++ b/src/analytic_engine/Cargo.toml @@ -43,6 +43,7 @@ anyhow = { workspace = true } arc-swap = "1.4.0" arena = { workspace = true } arrow = { workspace = true } +async-scoped = { version = "0.9.0", features = ["use-tokio"] } async-stream = { workspace = true } async-trait = { workspace = true } atomic_enum = { workspace = true } diff --git a/src/analytic_engine/src/instance/wal_replayer.rs b/src/analytic_engine/src/instance/wal_replayer.rs index f782895145..6c67414037 100644 --- a/src/analytic_engine/src/instance/wal_replayer.rs +++ b/src/analytic_engine/src/instance/wal_replayer.rs @@ -30,14 +30,13 @@ use common_types::{ schema::{IndexInWriterSchema, Schema}, table::ShardId, }; -use futures::StreamExt; use generic_error::BoxError; use lazy_static::lazy_static; use logger::{debug, error, info, trace, warn}; use prometheus::{exponential_buckets, register_histogram, Histogram}; use snafu::ResultExt; use table_engine::table::TableId; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::{Mutex, MutexGuard, Semaphore}; use wal::{ log_batch::LogEntry, manager::{ @@ -74,6 +73,8 @@ lazy_static! { .unwrap(); } +const MAX_REPLAY_TASK_NUM: usize = 20; + /// Wal replayer supporting both table based and region based // TODO: limit the memory usage in `RegionBased` mode. pub struct WalReplayer<'a> { @@ -189,22 +190,23 @@ impl Replay for TableBasedReplay { ..Default::default() }; - let mut tasks = futures::stream::iter( - table_datas - .iter() - .map(|table_data| { - let table_id = table_data.id; - let read_ctx = &read_ctx; - async move { - let ret = Self::recover_table_logs(context, table_data, read_ctx).await; - (table_id, ret) - } - }) - .collect::>(), - ) - .buffer_unordered(20); - while let Some((table_id, ret)) = tasks.next().await { - if let Err(e) = ret { + let ((), results) = async_scoped::TokioScope::scope_and_block(|scope| { + // Limit the maximum number of concurrent tasks. + let semaphore = Arc::new(Semaphore::new(MAX_REPLAY_TASK_NUM)); + for table_data in table_datas { + let table_id = table_data.id; + let read_ctx = &read_ctx; + let semaphore = semaphore.clone(); + scope.spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + let ret = Self::recover_table_logs(context, table_data, read_ctx).await; + (table_id, ret) + }); + } + }); + + for result in results.into_iter().flatten() { + if let (table_id, Err(e)) = result { // If occur error, mark this table as failed and store the cause. failed_tables.insert(table_id, e); } @@ -345,7 +347,7 @@ impl RegionBasedReplay { table_data: table_data.clone(), serial_exec, }; - serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); + serial_exec_ctxs.insert(table_data.id, Mutex::new(serial_exec_ctx)); table_datas_by_id.insert(table_data.id.as_u64(), table_data.clone()); } @@ -353,7 +355,7 @@ impl RegionBasedReplay { let schema_provider = TableSchemaProviderAdapter { table_datas: table_datas_by_id.clone(), }; - let serial_exec_ctxs = Arc::new(Mutex::new(serial_exec_ctxs)); + let serial_exec_ctxs = serial_exec_ctxs; // Split and replay logs. loop { let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer(); @@ -381,49 +383,53 @@ impl RegionBasedReplay { async fn replay_single_batch( context: &ReplayContext, log_batch: &VecDeque>, - serial_exec_ctxs: &Arc>>>, + serial_exec_ctxs: &HashMap>>, failed_tables: &mut FailedTables, ) -> Result<()> { let mut table_batches = Vec::new(); // TODO: No `group_by` method in `VecDeque`, so implement it manually here... Self::split_log_batch_by_table(log_batch, &mut table_batches); - // TODO: Replay logs of different tables in parallel. - let mut replay_tasks = Vec::with_capacity(table_batches.len()); - for table_batch in table_batches { - // Some tables may have failed in previous replay, ignore them. - if failed_tables.contains_key(&table_batch.table_id) { - continue; - } - let log_entries: Vec<_> = table_batch - .ranges - .iter() - .flat_map(|range| log_batch.range(range.clone())) - .collect(); - - let serial_exec_ctxs = serial_exec_ctxs.clone(); - replay_tasks.push(async move { - // Some tables may have been moved to other shards or dropped, ignore such logs. - if let Some(ctx) = serial_exec_ctxs.lock().await.get_mut(&table_batch.table_id) { - let result = replay_table_log_entries( - &context.flusher, - context.max_retry_flush_limit, - &mut ctx.serial_exec, - &ctx.table_data, - log_entries.into_iter(), - ) - .await; - (table_batch.table_id, Some(result)) - } else { - (table_batch.table_id, None) + let ((), results) = async_scoped::TokioScope::scope_and_block(|scope| { + // Limit the maximum number of concurrent tasks. + let semaphore = Arc::new(Semaphore::new(MAX_REPLAY_TASK_NUM)); + + for table_batch in table_batches { + // Some tables may have failed in previous replay, ignore them. + if failed_tables.contains_key(&table_batch.table_id) { + continue; } - }); - } + let log_entries: Vec<_> = table_batch + .ranges + .iter() + .flat_map(|range| log_batch.range(range.clone())) + .collect(); + let semaphore = semaphore.clone(); + + scope.spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + // Some tables may have been moved to other shards or dropped, ignore such logs. + if let Some(ctx) = serial_exec_ctxs.get(&table_batch.table_id) { + let mut ctx = ctx.lock().await; + let table_data = ctx.table_data.clone(); + let result = replay_table_log_entries( + &context.flusher, + context.max_retry_flush_limit, + &mut ctx.serial_exec, + &table_data, + log_entries.into_iter(), + ) + .await; + (table_batch.table_id, Some(result)) + } else { + (table_batch.table_id, None) + } + }); + } + }); - // Run at most 20 tasks in parallel - let mut replay_tasks = futures::stream::iter(replay_tasks).buffer_unordered(20); - while let Some((table_id, ret)) = replay_tasks.next().await { - if let Some(Err(e)) = ret { + for result in results.into_iter().flatten() { + if let (table_id, Some(Err(e))) = result { // If occur error, mark this table as failed and store the cause. failed_tables.insert(table_id, e); } diff --git a/src/horaedb/Cargo.toml b/src/horaedb/Cargo.toml index 2abfa49e17..ce505105f2 100644 --- a/src/horaedb/Cargo.toml +++ b/src/horaedb/Cargo.toml @@ -31,7 +31,7 @@ workspace = true workspace = true [features] -default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue", "wal-local-storage"] +default = ["wal-table-kv", "wal-message-queue", "wal-local-storage"] wal-table-kv = ["wal/wal-table-kv", "analytic_engine/wal-table-kv"] wal-message-queue = ["wal/wal-message-queue", "analytic_engine/wal-message-queue"] wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"] diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml index 30a5b00461..ce59ae5554 100644 --- a/src/wal/Cargo.toml +++ b/src/wal/Cargo.toml @@ -32,7 +32,7 @@ workspace = true [dependencies.rocksdb] git = "https://github.com/tikv/rust-rocksdb.git" -rev = "f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f" +rev = "85e79e52c6ad80b8c547fcb90b3cade64f141fac" features = ["portable"] optional = true @@ -48,6 +48,7 @@ required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb", "wal-lo [dependencies] anyhow = { workspace = true } +async-scoped = { version = "0.9.0", features = ["use-tokio"] } async-trait = { workspace = true } bytes_ext = { workspace = true } chrono = { workspace = true } @@ -64,6 +65,7 @@ memmap2 = { version = "0.9.4", optional = true } message_queue = { workspace = true, optional = true } prometheus = { workspace = true } prost = { workspace = true } +rayon = "1.10.0" runtime = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/src/wal/src/local_storage_impl/config.rs b/src/wal/src/local_storage_impl/config.rs index b3e70e9b5c..5a88131371 100644 --- a/src/wal/src/local_storage_impl/config.rs +++ b/src/wal/src/local_storage_impl/config.rs @@ -16,12 +16,13 @@ // under the License. use serde::{Deserialize, Serialize}; +use size_ext::ReadableSize; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct LocalStorageConfig { pub data_dir: String, - pub segment_size: usize, + pub segment_size: ReadableSize, pub cache_size: usize, } @@ -29,7 +30,7 @@ impl Default for LocalStorageConfig { fn default() -> Self { Self { data_dir: "/tmp/horaedb".to_string(), - segment_size: 64 * 1024 * 1024, // 64MB + segment_size: ReadableSize::mb(64), cache_size: 3, } } diff --git a/src/wal/src/local_storage_impl/record_encoding.rs b/src/wal/src/local_storage_impl/record_encoding.rs index 6e8011c2e1..f7b570c795 100644 --- a/src/wal/src/local_storage_impl/record_encoding.rs +++ b/src/wal/src/local_storage_impl/record_encoding.rs @@ -17,6 +17,7 @@ use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut}; use codec::Encoder; +use crc32fast::Hasher; use generic_error::GenericError; use macros::define_result; use snafu::{ensure, Backtrace, ResultExt, Snafu}; @@ -26,7 +27,11 @@ pub const NEWEST_RECORD_ENCODING_VERSION: u8 = RECORD_ENCODING_V0; pub const VERSION_SIZE: usize = 1; pub const CRC_SIZE: usize = 4; -pub const RECORD_LENGTH_SIZE: usize = 4; +pub const TABLE_ID_SIZE: usize = 8; +pub const SEQUENCE_NUM_SIZE: usize = 8; +pub const VALUE_LENGTH_SIZE: usize = 4; +pub const RECORD_HEADER_SIZE: usize = + VERSION_SIZE + CRC_SIZE + TABLE_ID_SIZE + SEQUENCE_NUM_SIZE + VALUE_LENGTH_SIZE; #[derive(Debug, Snafu)] pub enum Error { @@ -57,22 +62,19 @@ define_result!(Error); /// Record format: /// /// ```text -/// +---------+--------+--------+------------+--------------+--------------+-------+ -/// | version | crc | length | table id | sequence num | value length | value | -/// | (u8) | (u32) | (u32) | (u64) | (u64) | (u32) | | -/// +---------+--------+--------+------------+--------------+--------------+-------+ +/// +---------+--------+------------+--------------+--------------+-------+ +/// | version | crc | table id | sequence num | value length | value | +/// | (u8) | (u32) | (u64) | (u64) | (u32) |(bytes)| +/// +---------+--------+------------+--------------+--------------+-------+ /// ``` #[derive(Debug)] -pub struct Record<'a> { +pub struct Record { /// The version number of the record. pub version: u8, /// The CRC checksum of the record. pub crc: u32, - /// The length of the record (excluding version, crc and length). - pub length: u32, - /// Identifier for tables. pub table_id: u64, @@ -83,71 +85,59 @@ pub struct Record<'a> { pub value_length: u32, /// Common log value. - pub value: &'a [u8], + pub value: Vec, } -impl<'a> Record<'a> { - pub fn new(table_id: u64, sequence_num: u64, value: &'a [u8]) -> Result { - let mut record = Record { +impl Record { + pub fn new(table_id: u64, sequence_num: u64, value: &[u8]) -> Self { + Record { version: NEWEST_RECORD_ENCODING_VERSION, - crc: 0, - length: (8 + 8 + 4 + value.len()) as u32, + crc: compute_crc32(table_id, sequence_num, value), table_id, sequence_num, value_length: value.len() as u32, - value, - }; - - // Calculate CRC - let mut buf = Vec::new(); - buf.try_put_u64(table_id).context(Encoding)?; - buf.try_put_u64(sequence_num).context(Encoding)?; - buf.try_put_u32(record.value_length).context(Encoding)?; - buf.extend_from_slice(value); - record.crc = crc32fast::hash(&buf); - - Ok(record) + value: value.to_vec(), + } } // Return the length of the record pub fn len(&self) -> usize { - VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE + self.length as usize + RECORD_HEADER_SIZE + self.value_length as usize } } #[derive(Clone, Debug)] pub struct RecordEncoding { - version: u8, + expected_version: u8, } impl RecordEncoding { pub fn newest() -> Self { Self { - version: NEWEST_RECORD_ENCODING_VERSION, + expected_version: NEWEST_RECORD_ENCODING_VERSION, } } } -impl Encoder> for RecordEncoding { +impl Encoder for RecordEncoding { type Error = Error; fn encode(&self, buf: &mut B, record: &Record) -> Result<()> { // Verify version ensure!( - record.version == self.version, + record.version == self.expected_version, Version { - expected: self.version, + expected: self.expected_version, given: record.version } ); buf.try_put_u8(record.version).context(Encoding)?; buf.try_put_u32(record.crc).context(Encoding)?; - buf.try_put_u32(record.length).context(Encoding)?; buf.try_put_u64(record.table_id).context(Encoding)?; buf.try_put_u64(record.sequence_num).context(Encoding)?; buf.try_put_u32(record.value_length).context(Encoding)?; - buf.try_put(record.value).context(Encoding)?; + buf.try_put(record.value.as_slice()).context(Encoding)?; Ok(()) } @@ -160,9 +150,9 @@ impl RecordEncoding { pub fn decode<'a>(&'a self, mut buf: &'a [u8]) -> Result { // Ensure that buf is not shorter than the shortest record. ensure!( - buf.remaining() >= VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE, + buf.remaining() >= RECORD_HEADER_SIZE, LengthMismatch { - expected: VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE, + expected: RECORD_HEADER_SIZE, actual: buf.remaining() } ); @@ -172,38 +162,22 @@ impl RecordEncoding { // Verify version ensure!( - version == self.version, + version == self.expected_version, Version { - expected: self.version, + expected: self.expected_version, given: version } ); - // Read CRC let crc = buf.try_get_u32().context(Decoding)?; - - // Read length - let length = buf.try_get_u32().context(Decoding)?; - ensure!( - length > 0, - LengthMismatch { - expected: 1usize, - actual: 0usize - } - ); - - // Ensure the buf is long enough - ensure!( - buf.remaining() >= length as usize, - LengthMismatch { - expected: length as usize, - actual: buf.remaining() - } - ); + let table_id = buf.try_get_u64().context(Decoding)?; + let sequence_num = buf.try_get_u64().context(Decoding)?; + let value_length = buf.try_get_u32().context(Decoding)?; + let mut value = vec![0; value_length as usize]; + buf.try_copy_to_slice(&mut value).context(Decoding)?; // Verify CRC - let data = &buf[0..length as usize]; - let computed_crc = crc32fast::hash(data); + let computed_crc = compute_crc32(table_id, sequence_num, &value); ensure!( computed_crc == crc, ChecksumMismatch { @@ -212,23 +186,9 @@ impl RecordEncoding { } ); - // Read table id - let table_id = buf.try_get_u64().context(Decoding)?; - - // Read sequence number - let sequence_num = buf.try_get_u64().context(Decoding)?; - - // Read value length - let value_length = buf.try_get_u32().context(Decoding)?; - - // Read value - let value = &buf[0..value_length as usize]; - buf.advance(value_length as usize); - Ok(Record { version, crc, - length, table_id, sequence_num, value_length, @@ -237,6 +197,18 @@ impl RecordEncoding { } } +/// The crc32 checksum is calculated over the table_id, sequence_num, +/// value_length and value. +// This function does the same with `crc32fast::hash`. +fn compute_crc32(table_id: u64, seq_num: u64, value: &[u8]) -> u32 { + let mut h = Hasher::new(); + h.update(&table_id.to_le_bytes()); + h.update(&seq_num.to_le_bytes()); + h.update(&value.len().to_le_bytes()); + h.update(value); + h.finalize() +} + #[cfg(test)] mod tests { use bytes_ext::BytesMut; @@ -245,11 +217,11 @@ mod tests { use crate::local_storage_impl::record_encoding::{Record, RecordEncoding}; #[test] - fn test_record_encoding() { + fn test_local_wal_record_encoding() { let table_id = 1; let sequence_num = 2; let value = b"test_value"; - let record = Record::new(table_id, sequence_num, value).unwrap(); + let record = Record::new(table_id, sequence_num, value); let encoder = RecordEncoding::newest(); let mut buf = BytesMut::new(); @@ -260,11 +232,11 @@ mod tests { } #[test] - fn test_record_decoding() { + fn test_local_wal_record_decoding() { let table_id = 1; let sequence_num = 2; let value = b"test_value"; - let record = Record::new(table_id, sequence_num, value).unwrap(); + let record = Record::new(table_id, sequence_num, value); let encoder = RecordEncoding::newest(); let mut buf = BytesMut::new(); @@ -274,7 +246,6 @@ mod tests { assert_eq!(decoded_record.version, record.version); assert_eq!(decoded_record.crc, record.crc); - assert_eq!(decoded_record.length, record.length); assert_eq!(decoded_record.table_id, record.table_id); assert_eq!(decoded_record.sequence_num, record.sequence_num); assert_eq!(decoded_record.value_length, record.value_length); diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 02bd1d13e0..8fa0915b12 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -33,7 +33,7 @@ use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, MIN_SEQU use generic_error::{BoxError, GenericError}; use macros::define_result; use memmap2::{MmapMut, MmapOptions}; -use runtime::Runtime; +use runtime::{JoinHandle, Runtime}; use snafu::{ensure, Backtrace, ResultExt, Snafu}; use crate::{ @@ -134,6 +134,7 @@ pub enum Error { define_result!(Error); +const SEGMENT_NAME_PREFIX: &str = "seg_"; const SEGMENT_HEADER: &[u8] = b"HoraeDBWAL"; const WAL_SEGMENT_V0: u8 = 0; const NEWEST_WAL_SEGMENT_VERSION: u8 = WAL_SEGMENT_V0; @@ -653,61 +654,43 @@ impl Region { let mut all_segments = HashMap::new(); // Scan the directory for existing WAL files - let mut max_segment_id: i32 = -1; + let mut max_segment_id: u64 = 0; let mut next_sequence_num: u64 = MIN_SEQUENCE_NUMBER + 1; - // Segment file naming convention: segment_.wal + // Segment file naming convention: {SEGMENT_NAME_PREFIX}{id} for entry in fs::read_dir(®ion_dir).context(FileOpen)? { let entry = entry.context(FileOpen)?; - - let path = entry.path(); - - if !path.is_file() { - continue; - } - - match path.extension() { - Some(ext) if ext == "wal" => ext, - _ => continue, - }; - - let file_name = match path.file_name().and_then(|name| name.to_str()) { - Some(name) => name, - None => continue, - }; - - let segment_id = match file_name - .trim_start_matches("segment_") - .trim_end_matches(".wal") - .parse::() - .ok() - { + let filename = entry.file_name(); + let filename = filename.to_string_lossy(); + let segment_id = match filename.strip_prefix(SEGMENT_NAME_PREFIX) { Some(id) => id, None => continue, }; + let segment_id = segment_id + .parse::() + .map_err(anyhow::Error::new) + .context(Internal)?; - let segment = - Segment::new(path.to_string_lossy().to_string(), segment_id, segment_size)?; + let segment = Segment::new( + entry.path().to_string_lossy().to_string(), + segment_id, + segment_size, + )?; next_sequence_num = next_sequence_num.max(segment.max_seq + 1); + max_segment_id = max_segment_id.max(segment_id); let segment = Arc::new(Mutex::new(segment)); - - if segment_id as i32 > max_segment_id { - max_segment_id = segment_id as i32; - } all_segments.insert(segment_id, segment); } // If no existing segments, create a new one - if max_segment_id == -1 { - max_segment_id = 0; - let path = format!("{}/segment_{}.wal", region_dir, max_segment_id); - let new_segment = Segment::new(path, max_segment_id as u64, segment_size)?; - let new_segment = Arc::new(Mutex::new(new_segment)); - all_segments.insert(0, new_segment); + if all_segments.is_empty() { + all_segments.insert( + max_segment_id, + Self::create_new_segment(®ion_dir, max_segment_id, segment_size)?, + ); } - let latest_segment = all_segments.get(&(max_segment_id as u64)).unwrap().clone(); - + let latest_segment = all_segments.get(&max_segment_id).unwrap().clone(); let segment_manager = SegmentManager { all_segments: Mutex::new(all_segments), cache: Mutex::new(VecDeque::new()), @@ -727,17 +710,9 @@ impl Region { }) } - fn create_new_segment(&self, id: u64) -> Result>> { - // Create a new segment - let new_segment = Segment::new( - format!("{}/segment_{}.wal", self.region_dir, id), - id, - self.segment_size, - )?; - let new_segment = Arc::new(Mutex::new(new_segment)); - self.segment_manager.add_segment(id, new_segment.clone())?; - - Ok(new_segment) + fn create_new_segment(dir: &str, id: u64, size: usize) -> Result>> { + let new_segment = Segment::new(format!("{dir}/{SEGMENT_NAME_PREFIX}{id}"), id, size)?; + Ok(Arc::new(Mutex::new(new_segment))) } pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> Result { @@ -760,9 +735,7 @@ impl Region { for entry in &batch.entries { // Encode the record - let record = Record::new(table_id, next_sequence_num, &entry.payload) - .box_err() - .context(Encoding)?; + let record = Record::new(table_id, next_sequence_num, &entry.payload); self.record_encoding .encode(&mut data, &record) .box_err() @@ -784,7 +757,10 @@ impl Region { let new_segment_id = guard.id + 1; drop(guard); - *current_segment = self.create_new_segment(new_segment_id)?; + *current_segment = + Self::create_new_segment(&self.region_dir, new_segment_id, self.segment_size)?; + self.segment_manager + .add_segment(new_segment_id, current_segment.clone())?; } } @@ -832,6 +808,7 @@ impl Region { Some(req.location.table_id), start, end, + self.runtime.clone(), )?; Ok(BatchLogIteratorAdapter::new_with_sync( @@ -849,6 +826,7 @@ impl Region { None, MIN_SEQUENCE_NUMBER, MAX_SEQUENCE_NUMBER, + self.runtime.clone(), )?; Ok(BatchLogIteratorAdapter::new_with_sync( Box::new(iter), @@ -1006,19 +984,37 @@ impl RegionManager { } } +fn decode_segment_content( + segment_content: &[u8], + record_positions: &[Position], + record_encoding: &RecordEncoding, +) -> Result> { + let mut records = Vec::with_capacity(record_positions.len()); + + for pos in record_positions { + // Extract the record data from the segment content + let record_data = &segment_content[pos.start..pos.end]; + + // Decode the record + let record = record_encoding + .decode(record_data) + .box_err() + .context(InvalidRecord)?; + records.push(record); + } + Ok(records) +} + #[derive(Debug)] struct SegmentLogIterator { /// Encoding method for common log. log_encoding: CommonLogEncoding, /// Encoding method for records. - record_encoding: RecordEncoding, + _record_encoding: RecordEncoding, - /// Raw content of the segment. - segment_content: Vec, - - /// Positions of records within the segment content. - record_positions: Vec, + /// Decoded log records in the segment. + records: Vec, /// Optional identifier for the table, which is used to filter logs. table_id: Option, @@ -1040,27 +1036,19 @@ struct SegmentLogIterator { } impl SegmentLogIterator { - pub fn new( + pub fn new_with_records( log_encoding: CommonLogEncoding, record_encoding: RecordEncoding, - segment: Arc>, - segment_manager: Arc, + records: Vec, + table_ranges: HashMap, table_id: Option, start: SequenceNumber, end: SequenceNumber, ) -> Result { - let mut guard = segment.lock().unwrap(); - // Open the segment if it is not open - segment_manager.open_segment(&mut guard, segment.clone())?; - let segment_content = guard.read(0, guard.current_size)?; - let record_positions = guard.record_position.clone(); - let table_ranges = guard.table_ranges.clone(); - Ok(Self { log_encoding, - record_encoding, - segment_content, - record_positions, + _record_encoding: record_encoding, + records, table_id, table_ranges, start, @@ -1076,24 +1064,14 @@ impl SegmentLogIterator { } loop { - // Get the next record position - let Some(pos) = self.record_positions.get(self.current_record_idx) else { + // Get the next record + let Some(record) = self.records.get(self.current_record_idx) else { self.no_more_data = true; return Ok(None); }; self.current_record_idx += 1; - // Extract the record data from the segment content - let record_data = &self.segment_content[pos.start..pos.end]; - - // Decode the record - let record = self - .record_encoding - .decode(record_data) - .box_err() - .context(InvalidRecord)?; - // Filter by sequence number if record.sequence_num < self.start { continue; @@ -1122,7 +1100,7 @@ impl SegmentLogIterator { // Decode the value let value = self .log_encoding - .decode_value(record.value) + .decode_value(&record.value) .box_err() .context(InvalidRecord)?; @@ -1150,6 +1128,9 @@ pub struct MultiSegmentLogIterator { /// Current segment iterator. current_iterator: Option, + /// Future iterator for preloading the next segment. + next_segment_iterator: Option>>, + /// Encoding method for common log. log_encoding: CommonLogEncoding, @@ -1167,6 +1148,9 @@ pub struct MultiSegmentLogIterator { /// The raw payload data of the current record. current_payload: Vec, + + /// Runtime for preloading segments + runtime: Arc, } impl MultiSegmentLogIterator { @@ -1177,6 +1161,7 @@ impl MultiSegmentLogIterator { table_id: Option, start: SequenceNumber, end: SequenceNumber, + runtime: Arc, ) -> Result { let relevant_segments = segment_manager.get_relevant_segments(table_id, start, end)?; @@ -1185,12 +1170,14 @@ impl MultiSegmentLogIterator { segments: relevant_segments, current_segment_idx: 0, current_iterator: None, + next_segment_iterator: None, log_encoding, record_encoding, table_id, start, end, current_payload: Vec::new(), + runtime, }; // Load the first segment iterator @@ -1199,25 +1186,88 @@ impl MultiSegmentLogIterator { Ok(iter) } + fn preload_next_segment(&mut self) { + assert!(self.next_segment_iterator.is_none()); + if self.current_segment_idx >= self.segments.len() { + return; + } + + let next_segment_idx = self.current_segment_idx; + let segment = self.segments[next_segment_idx].clone(); + let segment_manager = self.segment_manager.clone(); + let log_encoding = self.log_encoding.clone(); + let record_encoding = self.record_encoding.clone(); + let table_id = self.table_id; + let start = self.start; + let end = self.end; + + // Spawn an async task to preload the next SegmentLogIterator + let handle = self.runtime.spawn(async move { + let mut guard = segment.lock().unwrap(); + // Open the segment if it is not open + segment_manager.open_segment(&mut guard, segment.clone())?; + let segment_content = guard.read(0, guard.current_size)?; + let table_ranges = guard.table_ranges.clone(); + let records = + decode_segment_content(&segment_content, &guard.record_position, &record_encoding)?; + let iterator = SegmentLogIterator::new_with_records( + log_encoding, + record_encoding, + records, + table_ranges, + table_id, + start, + end, + )?; + Ok(iterator) + }); + + self.next_segment_iterator = Some(handle); + } + fn load_next_segment_iterator(&mut self) -> Result { if self.current_segment_idx >= self.segments.len() { self.current_iterator = None; return Ok(false); } - let segment = self.segments[self.current_segment_idx].clone(); - let iterator = SegmentLogIterator::new( - self.log_encoding.clone(), - self.record_encoding.clone(), - segment, - self.segment_manager.clone(), - self.table_id, - self.start, - self.end, - )?; + if let Some(handle) = self.next_segment_iterator.take() { + // Wait for the future to complete + let iterator = self + .runtime + .block_on(handle) + .map_err(anyhow::Error::new) + .context(Internal)??; + self.current_iterator = Some(iterator); + self.current_segment_idx += 1; + } else { + // Preload was not set, load synchronously + let segment = self.segments[self.current_segment_idx].clone(); + let mut guard = segment.lock().unwrap(); + self.segment_manager + .open_segment(&mut guard, segment.clone())?; + let segment_content = guard.read(0, guard.current_size)?; + let table_ranges = guard.table_ranges.clone(); + let records = decode_segment_content( + &segment_content, + &guard.record_position, + &self.record_encoding, + )?; + let iterator = SegmentLogIterator::new_with_records( + self.log_encoding.clone(), + self.record_encoding.clone(), + records, + table_ranges, + self.table_id, + self.start, + self.end, + )?; + self.current_iterator = Some(iterator); + self.current_segment_idx += 1; + } - self.current_iterator = Some(iterator); - self.current_segment_idx += 1; + // Preload the next segment + self.preload_next_segment(); Ok(true) } diff --git a/src/wal/src/local_storage_impl/wal_manager.rs b/src/wal/src/local_storage_impl/wal_manager.rs index 694831eae1..2c4494654d 100644 --- a/src/wal/src/local_storage_impl/wal_manager.rs +++ b/src/wal/src/local_storage_impl/wal_manager.rs @@ -57,11 +57,11 @@ impl LocalStorageImpl { segment_size, .. } = config.clone(); - let wal_path_str = wal_path.to_str().unwrap().to_string(); + let wal_path_str = wal_path.to_string_lossy().to_string(); let region_manager = RegionManager::new( wal_path_str.clone(), cache_size, - segment_size, + segment_size.as_byte() as usize, runtime.clone(), ) .box_err() diff --git a/src/wal/src/manager.rs b/src/wal/src/manager.rs index 9c4a960b51..fcd017dc25 100644 --- a/src/wal/src/manager.rs +++ b/src/wal/src/manager.rs @@ -27,6 +27,7 @@ use common_types::{ }; pub use error::*; use generic_error::BoxError; +use rayon::{iter::ParallelIterator, prelude::IntoParallelIterator}; use runtime::Runtime; use snafu::ResultExt; @@ -428,13 +429,29 @@ impl BatchLogIteratorAdapter { let batch_size = self.batch_size; let (log_entries, iter_opt) = runtime .spawn_blocking(move || { - while buffer.len() < batch_size { + let mut raw_entries = Vec::new(); + + while raw_entries.len() < batch_size { if let Some(raw_log_entry) = iter.next_log_entry()? { if !filter(raw_log_entry.table_id) { continue; } - let mut raw_payload = raw_log_entry.payload; + raw_entries.push(LogEntry { + table_id: raw_log_entry.table_id, + sequence: raw_log_entry.sequence, + payload: raw_log_entry.payload.to_vec(), + }); + } else { + break; + } + } + + // Decoding is time-consuming, so we do it in parallel. + let result: Result> = raw_entries + .into_par_iter() + .map(|raw_log_entry| { + let mut raw_payload = raw_log_entry.payload.as_slice(); let ctx = PayloadDecodeContext { table_id: raw_log_entry.table_id, }; @@ -442,18 +459,20 @@ impl BatchLogIteratorAdapter { .decode(&ctx, &mut raw_payload) .box_err() .context(error::Decoding)?; - let log_entry = LogEntry { + Ok(LogEntry { table_id: raw_log_entry.table_id, sequence: raw_log_entry.sequence, payload, - }; - buffer.push_back(log_entry); - } else { - return Ok((buffer, None)); - } - } + }) + }) + .collect(); - Ok((buffer, Some(iter))) + let log_entries = result?; + if log_entries.len() < batch_size { + Ok((log_entries, None)) + } else { + Ok((log_entries, Some(iter))) + } }) .await .context(RuntimeExec)??;