diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock index 8f2e800f4a..2edef3a4e5 100644 --- a/horaedb/Cargo.lock +++ b/horaedb/Cargo.lock @@ -122,17 +122,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45aef0d9cf9a039bf6cd1acc451b137aca819977b0928dece52bd92811b640ba" dependencies = [ "arrow-arith 53.0.0", - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-cast 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", "arrow-csv 53.0.0", - "arrow-data 53.0.0", - "arrow-ipc 53.0.0", + "arrow-data 53.1.0", + "arrow-ipc 53.1.0", "arrow-json 53.0.0", "arrow-ord 53.0.0", "arrow-row 53.0.0", - "arrow-schema 53.0.0", - "arrow-select 53.0.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", "arrow-string 53.0.0", ] @@ -157,10 +157,10 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03675e42d1560790f3524800e41403b40d0da1c793fe9528929fde06d8c7649a" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "chrono", "half", "num", @@ -185,14 +185,14 @@ dependencies = [ [[package]] name = "arrow-array" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd2bf348cf9f02a5975c5962c7fa6dee107a2009a7b41ac5fb1a027e12dc033f" +checksum = "7f16835e8599dbbb1659fd869d865254c4cf32c6c2bb60b6942ac9fc36bfa5da" dependencies = [ "ahash", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "chrono", "half", "hashbrown", @@ -212,9 +212,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3092e37715f168976012ce52273c3989b5793b0db5f06cbaa246be25e5f0924d" +checksum = "1a1f34f0faae77da6b142db61deba2cb6d60167592b178be317b341440acba80" dependencies = [ "bytes", "half", @@ -237,28 +237,28 @@ dependencies = [ "chrono", "comfy-table", "half", - "lexical-core", + "lexical-core 0.8.5", "num", "ryu", ] [[package]] name = "arrow-cast" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ce1018bb710d502f9db06af026ed3561552e493e989a79d0d0f5d9cf267a785" +checksum = "450e4abb5775bca0740bec0bcf1b1a5ae07eff43bd625661c4436d8e8e4540c4" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", - "arrow-select 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", "atoi", "base64", "chrono", "comfy-table", "half", - "lexical-core", + "lexical-core 1.0.2", "num", "ryu", ] @@ -278,7 +278,7 @@ dependencies = [ "csv", "csv-core", "lazy_static", - "lexical-core", + "lexical-core 0.8.5", "regex", ] @@ -288,16 +288,16 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd178575f45624d045e4ebee714e246a05d9652e41363ee3f57ec18cca97f740" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-cast 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "chrono", "csv", "csv-core", "lazy_static", - "lexical-core", + "lexical-core 0.8.5", "regex", ] @@ -315,12 +315,12 @@ dependencies = [ [[package]] name = "arrow-data" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4ac0c4ee79150afe067dc4857154b3ee9c1cd52b5f40d59a77306d0ed18d65" +checksum = "2b1e618bbf714c7a9e8d97203c806734f012ff71ae3adc8ad1b075689f540634" dependencies = [ - "arrow-buffer 53.0.0", - "arrow-schema 53.0.0", + "arrow-buffer 53.1.0", + "arrow-schema 53.1.0", "half", "num", ] @@ -342,15 +342,15 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb307482348a1267f91b0912e962cd53440e5de0f7fb24c5f7b10da70b38c94a" +checksum = "f98e983549259a2b97049af7edfb8f28b8911682040e99a94e4ceb1196bd65c2" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-cast 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "flatbuffers", ] @@ -368,7 +368,7 @@ dependencies = [ "chrono", "half", "indexmap", - "lexical-core", + "lexical-core 0.8.5", "num", "serde", "serde_json", @@ -380,15 +380,15 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d24805ba326758effdd6f2cbdd482fcfab749544f21b134701add25b33f474e6" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-cast 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "chrono", "half", "indexmap", - "lexical-core", + "lexical-core 0.8.5", "num", "serde", "serde_json", @@ -415,11 +415,11 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "644046c479d80ae8ed02a7f1e1399072ea344ca6a7b0e293ab2d5d9ed924aa3b" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", - "arrow-select 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", "half", "num", ] @@ -445,10 +445,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a29791f8eb13b340ce35525b723f5f0df17ecb955599e11f65c2a94ab34e2efb" dependencies = [ "ahash", - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "half", ] @@ -460,9 +460,9 @@ checksum = "9e972cd1ff4a4ccd22f86d3e53e835c2ed92e0eea6a3e8eadb72b4f1ac802cf8" [[package]] name = "arrow-schema" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c85320a3a2facf2b2822b57aa9d6d9d55edb8aee0b6b5d3b8df158e503d10858" +checksum = "fbf0388a18fd7f7f3fe3de01852d30f54ed5182f9004db700fbe3ba843ed2794" [[package]] name = "arrow-select" @@ -480,15 +480,15 @@ dependencies = [ [[package]] name = "arrow-select" -version = "53.0.0" +version = "53.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cc7e6b582e23855fd1625ce46e51647aa440c20ea2e71b1d748e0839dd73cba" +checksum = "b83e5723d307a38bf00ecd2972cd078d1339c7fd3eb044f609958a9a24463f3a" dependencies = [ "ahash", - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", "num", ] @@ -515,11 +515,11 @@ version = "53.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0775b6567c66e56ded19b87a954b6b1beffbdd784ef95a3a2b03f59570c1d230" dependencies = [ - "arrow-array 53.0.0", - "arrow-buffer 53.0.0", - "arrow-data 53.0.0", - "arrow-schema 53.0.0", - "arrow-select 53.0.0", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-data 53.1.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", "memchr", "num", "regex", @@ -905,7 +905,7 @@ dependencies = [ "num_cpus", "object_store 0.10.2", "parking_lot", - "parquet", + "parquet 52.2.0", "paste", "pin-project-lite", "rand", @@ -951,7 +951,7 @@ dependencies = [ "libc", "num_cpus", "object_store 0.10.2", - "parquet", + "parquet 52.2.0", "sqlparser", ] @@ -1583,11 +1583,24 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" dependencies = [ - "lexical-parse-float", - "lexical-parse-integer", - "lexical-util", - "lexical-write-float", - "lexical-write-integer", + "lexical-parse-float 0.8.5", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "lexical-write-float 0.8.5", + "lexical-write-integer 0.8.5", +] + +[[package]] +name = "lexical-core" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0431c65b318a590c1de6b8fd6e72798c92291d27762d94c9e6c37ed7a73d8458" +dependencies = [ + "lexical-parse-float 1.0.2", + "lexical-parse-integer 1.0.2", + "lexical-util 1.0.3", + "lexical-write-float 1.0.2", + "lexical-write-integer 1.0.2", ] [[package]] @@ -1596,8 +1609,19 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" dependencies = [ - "lexical-parse-integer", - "lexical-util", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb17a4bdb9b418051aa59d41d65b1c9be5affab314a872e5ad7f06231fb3b4e0" +dependencies = [ + "lexical-parse-integer 1.0.2", + "lexical-util 1.0.3", "static_assertions", ] @@ -1607,7 +1631,17 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5df98f4a4ab53bf8b175b363a34c7af608fe31f93cc1fb1bf07130622ca4ef61" +dependencies = [ + "lexical-util 1.0.3", "static_assertions", ] @@ -1620,14 +1654,34 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "lexical-util" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85314db53332e5c192b6bca611fb10c114a80d1b831ddac0af1e9be1b9232ca0" +dependencies = [ + "static_assertions", +] + [[package]] name = "lexical-write-float" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" dependencies = [ - "lexical-util", - "lexical-write-integer", + "lexical-util 0.8.5", + "lexical-write-integer 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e7c3ad4e37db81c1cbe7cf34610340adc09c322871972f74877a712abc6c809" +dependencies = [ + "lexical-util 1.0.3", + "lexical-write-integer 1.0.2", "static_assertions", ] @@ -1637,7 +1691,17 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb89e9f6958b83258afa3deed90b5de9ef68eef090ad5086c791cd2345610162" +dependencies = [ + "lexical-util 1.0.3", "static_assertions", ] @@ -1728,6 +1792,7 @@ dependencies = [ "lazy_static", "macros", "object_store 0.11.0", + "parquet 53.1.0", "thiserror", "tokio", ] @@ -1978,6 +2043,42 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "parquet" +version = "53.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "310c46a70a3ba90d98fec39fa2da6d9d731e544191da6fb56c9d199484d0dd3e" +dependencies = [ + "ahash", + "arrow-array 53.1.0", + "arrow-buffer 53.1.0", + "arrow-cast 53.1.0", + "arrow-data 53.1.0", + "arrow-ipc 53.1.0", + "arrow-schema 53.1.0", + "arrow-select 53.1.0", + "base64", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "half", + "hashbrown", + "lz4_flex", + "num", + "num-bigint", + "object_store 0.11.0", + "paste", + "seq-macro", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd", + "zstd-sys", +] + [[package]] name = "parse-zoneinfo" version = "0.3.1" diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml index 076a911a59..3c1f1ec3d6 100644 --- a/horaedb/metric_engine/Cargo.toml +++ b/horaedb/metric_engine/Cargo.toml @@ -37,8 +37,9 @@ async-trait = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } itertools = { workspace = true } -macros = { workspace = true } lazy_static = { workspace = true } +macros = { workspace = true } object_store = { workspace = true } +parquet = { workspace = true, features = ["object_store"] } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/horaedb/metric_engine/src/manifest.rs b/horaedb/metric_engine/src/manifest.rs index 5024c1cf31..2eac2e1963 100644 --- a/horaedb/metric_engine/src/manifest.rs +++ b/horaedb/metric_engine/src/manifest.rs @@ -15,16 +15,38 @@ // specific language governing permissions and limitations // under the License. -use crate::{sst::FileId, Result}; -pub struct Manifest {} +use anyhow::Context; +use object_store::path::Path; + +use crate::{ + sst::{FileId, FileMeta}, + types::ObjectStoreRef, + Result, +}; + +pub const PREFIX_PATH: &str = "manifest"; +pub const SNAPSHOT_FILENAME: &str = "snapshot"; + +pub struct Manifest { + path: String, + store: ObjectStoreRef, +} + +struct Inner {} impl Manifest { - pub fn new(id: u64) -> Self { - // Recover the manifest using the id from storage. - Self {} + pub async fn try_new(path: String, store: ObjectStoreRef) -> Result { + let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}")); + let bytes = store + .get(&snapshot_path) + .await + .with_context(|| format!("failed to get manifest snapshot, path: {path}"))?; + // TODO: decode bytes into manifest details + Ok(Self { path, store }) } - pub fn allocate_id(&self) -> Result { - todo!() + pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> { + // TODO: implement this later + Ok(()) } } diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs index 88cfc06dc3..a2148619d4 100644 --- a/horaedb/metric_engine/src/sst.rs +++ b/horaedb/metric_engine/src/sst.rs @@ -15,8 +15,42 @@ // specific language governing permissions and limitations // under the License. +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + LazyLock, + }, + time::SystemTime, +}; + +use crate::types::TimeRange; + +pub const PREFIX_PATH: &str = "data"; + pub type FileId = u64; pub struct SSTable { pub id: FileId, } + +pub struct FileMeta { + pub num_row: u32, + pub range: TimeRange, +} + +// Used as base for id allocation +// This number mustn't go backwards on restarts, otherwise file id +// collisions are possible. So don't change time on the server +// between server restarts. +static NEXT_ID: LazyLock = LazyLock::new(|| { + AtomicU64::new( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64, + ) +}); + +pub fn allocate_id() -> u64 { + NEXT_ID.fetch_add(1, Ordering::SeqCst) +} diff --git a/horaedb/metric_engine/src/storage.rs b/horaedb/metric_engine/src/storage.rs index fd85884eb9..8a1e3ab746 100644 --- a/horaedb/metric_engine/src/storage.rs +++ b/horaedb/metric_engine/src/storage.rs @@ -15,20 +15,27 @@ // specific language governing permissions and limitations // under the License. +use anyhow::Context; use arrow::{array::RecordBatch, datatypes::SchemaRef}; use async_trait::async_trait; use datafusion::logical_expr::Expr; use macros::ensure; +use object_store::path::Path; +use parquet::{ + arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter}, + file::properties::WriterProperties, +}; use crate::{ manifest::Manifest, - sst::{FileId, SSTable}, + sst::{allocate_id, FileId, FileMeta, SSTable}, types::{ObjectStoreRef, SendableRecordBatchStream, TimeRange}, Result, }; pub struct WriteRequest { batch: RecordBatch, + props: Option, } pub struct ScanRequest { @@ -57,41 +64,76 @@ pub trait TimeMergeStorage { /// `TimeMergeStorage` implementation using cloud object storage. pub struct CloudObjectStorage { path: String, - id: u64, store: ObjectStoreRef, + arrow_schema: SchemaRef, + timestamp_index: usize, sstables: Vec, manifest: Manifest, } impl CloudObjectStorage { - pub fn new(path: String, id: u64, store: ObjectStoreRef) -> Self { - Self { - path, - id, + pub async fn try_new( + root_path: String, + store: ObjectStoreRef, + arrow_schema: SchemaRef, + timestamp_index: usize, + ) -> Result { + let manifest_prefix = crate::manifest::PREFIX_PATH; + let manifest = + Manifest::try_new(format!("{root_path}/{manifest_prefix}"), store.clone()).await?; + Ok(Self { + path: root_path, + timestamp_index, store, - sstables: Vec::new(), - manifest: Manifest::new(id), - } + arrow_schema, + sstables: Vec::new(), // TODO: recover sst from manifest + manifest, + }) } fn build_file_path(&self, id: FileId) -> String { let root = &self.path; - let prefix = self.id; + let prefix = crate::sst::PREFIX_PATH; format!("{root}/{prefix}/{id}") } + + async fn write_batch(&self, req: WriteRequest) -> Result { + let file_id = allocate_id(); + let file_path = self.build_file_path(file_id); + let object_store_writer = + ParquetObjectWriter::new(self.store.clone(), Path::from(file_path)); + let mut writer = + AsyncArrowWriter::try_new(object_store_writer, self.schema().clone(), req.props) + .context("create arrow writer")?; + writer + .write(&req.batch) + .await + .context("write arrow batch")?; + writer.close().await.context("close arrow writer")?; + + Ok(file_id) + } } #[async_trait] impl TimeMergeStorage for CloudObjectStorage { fn schema(&self) -> &SchemaRef { - todo!() + &self.arrow_schema } async fn write(&self, req: WriteRequest) -> Result<()> { ensure!(req.batch.schema_ref().eq(self.schema()), "schema not match"); - - let id = self.manifest.allocate_id()?; - todo!() + let num_rows = req.batch.num_rows(); + // TODO: extract time range from batch + let time_range = TimeRange { start: 0, end: 1 }; + let file_id = self.write_batch(req).await?; + let file_meta = FileMeta { + num_row: num_rows as u32, + range: time_range, + }; + self.manifest.add_file(file_id, file_meta).await?; + + Ok(()) } async fn scan(&self, req: ScanRequest) -> Result {