From 3865c1b6c93c44dde2e27d2b0c2f576b2092b8c7 Mon Sep 17 00:00:00 2001 From: WEI Xikai Date: Fri, 17 Feb 2023 14:56:27 +0800 Subject: [PATCH] refactor: use enum instead of version for encoding format in pb (#655) * refactor: use enum instead of version for encoding format in pb * fix ut * update package fs_extra to resolve incompact issue reported by cargo * remove wrong exmaple config * fix integration test * rename 'type' of cluster deployment to 'mode' * rename wal_storage to wal --- Cargo.lock | 38 ++++---- Cargo.toml | 2 +- analytic_engine/src/lib.rs | 4 +- analytic_engine/src/setup.rs | 14 +-- analytic_engine/src/sst/parquet/meta_data.rs | 95 ++++++++++---------- analytic_engine/src/table_options.rs | 2 +- analytic_engine/src/tests/util.rs | 8 +- common_types/src/column_schema.rs | 37 +++++--- common_types/src/row/mod.rs | 2 +- docs/example-cluster-0.toml | 37 -------- docs/example-cluster-1.toml | 37 -------- docs/example-standalone-static-routing.toml | 8 +- docs/minimal.toml | 11 +-- integration_tests/cases/local/config.toml | 3 +- sql/src/parser.rs | 8 +- sql/src/planner.rs | 2 +- src/config.rs | 2 +- src/setup.rs | 2 +- table_engine/src/remote/model.rs | 2 +- 19 files changed, 125 insertions(+), 189 deletions(-) delete mode 100644 docs/example-cluster-0.toml delete mode 100644 docs/example-cluster-1.toml diff --git a/Cargo.lock b/Cargo.lock index 0fb7ab5745..bf372496b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,7 +90,7 @@ dependencies = [ "async-trait", "base64 0.13.0", "bytes 1.2.1", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "common_types", "common_util", "datafusion", @@ -1111,7 +1111,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1#2d99b441d8dd947caa51019a9d2ff3873aaca6d1" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=43939799b2e65e3fc5795118fc77593f7c4b19d7#43939799b2e65e3fc5795118fc77593f7c4b19d7" dependencies = [ "prost", "protoc-bin-vendored", @@ -1122,7 +1122,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=43939799b2e65e3fc5795118fc77593f7c4b19d7#43939799b2e65e3fc5795118fc77593f7c4b19d7" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067#4dbd2b36262affd83f8a2d4a99273e43aebcb067" dependencies = [ "prost", "protoc-bin-vendored", @@ -1261,7 +1261,7 @@ name = "cluster" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "common_types", "common_util", "log", @@ -1312,7 +1312,7 @@ dependencies = [ "arrow_ext", "byteorder", "bytes_ext", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "chrono", "datafusion", "murmur3", @@ -1331,7 +1331,7 @@ dependencies = [ "arrow 32.0.0", "avro-rs", "backtrace", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "chrono", "common_types", "crossbeam-utils 0.8.11", @@ -2363,9 +2363,9 @@ dependencies = [ [[package]] name = "fs_extra" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" [[package]] name = "fuchsia-cprng" @@ -3556,7 +3556,7 @@ name = "meta_client" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "common_types", "common_util", "futures 0.3.25", @@ -4051,7 +4051,7 @@ version = "1.0.0-alpha02" dependencies = [ "async-trait", "bytes 1.2.1", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "chrono", "clru", "common_util", @@ -5213,7 +5213,7 @@ version = "1.0.0-alpha02" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "clru", "common_types", "common_util", @@ -5336,7 +5336,7 @@ name = "router" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "cluster", "common_types", "common_util", @@ -5675,7 +5675,7 @@ dependencies = [ "async-trait", "bytes 1.2.1", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "cluster", "common_types", "common_util", @@ -6003,7 +6003,7 @@ dependencies = [ "arrow 32.0.0", "async-trait", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "common_types", "common_util", "datafusion", @@ -6265,7 +6265,7 @@ dependencies = [ "arrow 32.0.0", "async-trait", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "common_types", "common_util", "futures 0.3.25", @@ -6282,7 +6282,7 @@ version = "1.0.0-alpha02" dependencies = [ "arrow 32.0.0", "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "common_types", "common_util", "datafusion", @@ -7026,8 +7026,8 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", - "rand 0.8.5", + "cfg-if 0.1.10", + "rand 0.3.23", "static_assertions 1.1.0", ] @@ -7204,7 +7204,7 @@ name = "wal" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=2d99b441d8dd947caa51019a9d2ff3873aaca6d1)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=4dbd2b36262affd83f8a2d4a99273e43aebcb067)", "chrono", "common_types", "common_util", diff --git a/Cargo.toml b/Cargo.toml index 830c352b37..67cd049886 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,7 +117,7 @@ zstd = { version = "0.12", default-features = false } [workspace.dependencies.ceresdbproto] git = "https://github.com/CeresDB/ceresdbproto.git" -rev = "2d99b441d8dd947caa51019a9d2ff3873aaca6d1" +rev = "4dbd2b36262affd83f8a2d4a99273e43aebcb067" [dependencies] analytic_engine = { workspace = true } diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 8fff9ef43c..a83300c24b 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -81,7 +81,7 @@ pub struct Config { /// + RocksDB /// + OBKV /// + Kafka - pub wal_storage: WalStorageConfig, + pub wal: WalStorageConfig, pub remote_engine_client: remote_engine_client::config::Config, } @@ -107,7 +107,7 @@ impl Default for Config { db_write_buffer_size: 0, scan_batch_size: 500, sst_background_read_parallelism: 8, - wal_storage: WalStorageConfig::RocksDB(Box::default()), + wal: WalStorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), } } diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index a2a6bf5d26..daaf622111 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -187,13 +187,13 @@ impl EngineBuilder for RocksDBWalEngineBuilder { engine_runtimes: Arc, object_store: ObjectStoreRef, ) -> Result<(WalManagerRef, ManifestRef)> { - let rocksdb_wal_config = match config.wal_storage { + let rocksdb_wal_config = match config.wal { WalStorageConfig::RocksDB(config) => *config, _ => { return InvalidWalConfig { msg: format!( "invalid wal storage config while opening rocksDB wal, config:{:?}", - config.wal_storage + config.wal ), } .fail(); @@ -236,13 +236,13 @@ impl EngineBuilder for ObkvWalEngineBuilder { engine_runtimes: Arc, object_store: ObjectStoreRef, ) -> Result<(WalManagerRef, ManifestRef)> { - let obkv_wal_config = match &config.wal_storage { + let obkv_wal_config = match &config.wal { WalStorageConfig::Obkv(config) => config.clone(), _ => { return InvalidWalConfig { msg: format!( "invalid wal storage config while opening obkv wal, config:{:?}", - config.wal_storage + config.wal ), } .fail(); @@ -285,13 +285,13 @@ impl EngineBuilder for MemWalEngineBuilder { engine_runtimes: Arc, object_store: ObjectStoreRef, ) -> Result<(WalManagerRef, ManifestRef)> { - let obkv_wal_config = match &config.wal_storage { + let obkv_wal_config = match &config.wal { WalStorageConfig::Obkv(config) => config.clone(), _ => { return InvalidWalConfig { msg: format!( "invalid wal storage config while opening memory wal, config:{:?}", - config.wal_storage + config.wal ), } .fail(); @@ -320,7 +320,7 @@ impl EngineBuilder for KafkaWalEngineBuilder { engine_runtimes: Arc, object_store: ObjectStoreRef, ) -> Result<(WalManagerRef, ManifestRef)> { - let kafka_wal_config = match &config.wal_storage { + let kafka_wal_config = match &config.wal { WalStorageConfig::Kafka(config) => config.clone(), _ => { return InvalidWalConfig { diff --git a/analytic_engine/src/sst/parquet/meta_data.rs b/analytic_engine/src/sst/parquet/meta_data.rs index ef95f77829..d5ef18cd16 100644 --- a/analytic_engine/src/sst/parquet/meta_data.rs +++ b/analytic_engine/src/sst/parquet/meta_data.rs @@ -8,7 +8,7 @@ use bytes::Bytes; use ceresdbproto::{schema as schema_pb, sst as sst_pb}; use common_types::{schema::Schema, time::TimeRange, SequenceNumber}; use common_util::define_result; -use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use xorfilter::{Xor8, Xor8Builder}; use crate::sst::writer::MetaData; @@ -42,13 +42,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Unsupported parquet_filter version, version:{}.\nBacktrace\n:{}", - version, - backtrace - ))] - UnsupportedParquetFilter { version: u32, backtrace: Backtrace }, - #[snafu(display("Failed to convert time range, err:{}", source))] ConvertTimeRange { source: common_types::time::Error }, @@ -58,13 +51,13 @@ pub enum Error { define_result!(Error); -const DEFAULT_FILTER_VERSION: u32 = 0; - /// Filter can be used to test whether an element is a member of a set. /// False positive matches are possible if space-efficient probabilistic data /// structure are used. // TODO: move this to sst module, and add a FilterBuild trait trait Filter: fmt::Debug { + fn r#type(&self) -> FilterType; + /// Check the key is in the bitmap index. fn contains(&self, key: &[u8]) -> bool; @@ -77,6 +70,11 @@ trait Filter: fmt::Debug { Self: Sized; } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FilterType { + Xor8, +} + /// Filter based on https://docs.rs/xorfilter-rs/latest/xorfilter/struct.Xor8.html #[derive(Default)] struct Xor8Filter { @@ -90,6 +88,10 @@ impl fmt::Debug for Xor8Filter { } impl Filter for Xor8Filter { + fn r#type(&self) -> FilterType { + FilterType::Xor8 + } + fn contains(&self, key: &[u8]) -> bool { self.xor8.contains(key) } @@ -132,7 +134,7 @@ impl RowGroupFilterBuilder { b.map(|mut b| { b.build() .context(BuildXor8Filter) - .map(|xor8| Box::new(Xor8Filter { xor8 }) as Box<_>) + .map(|xor8| Box::new(Xor8Filter { xor8 }) as _) }) .transpose() }) @@ -228,21 +230,26 @@ impl From for sst_pb::ParquetFilter { let column_filters = row_group_filter .column_filters .into_iter() - .map(|column_filter| { - column_filter - .map(|v| v.to_bytes()) - // If the column filter does not exist, use an empty vector for it. - .unwrap_or_default() + .map(|column_filter| match column_filter { + Some(v) => { + let encoded_filter = v.to_bytes(); + match v.r#type() { + FilterType::Xor8 => sst_pb::ColumnFilter { + filter: Some(sst_pb::column_filter::Filter::Xor( + encoded_filter, + )), + }, + } + } + None => sst_pb::ColumnFilter { filter: None }, }) .collect::>(); - sst_pb::parquet_filter::RowGroupFilter { column_filters } + + sst_pb::RowGroupFilter { column_filters } }) .collect::>(); - sst_pb::ParquetFilter { - version: DEFAULT_FILTER_VERSION, - row_group_filters, - } + sst_pb::ParquetFilter { row_group_filters } } } @@ -250,13 +257,6 @@ impl TryFrom for ParquetFilter { type Error = Error; fn try_from(src: sst_pb::ParquetFilter) -> Result { - ensure!( - src.version == DEFAULT_FILTER_VERSION, - UnsupportedParquetFilter { - version: src.version - } - ); - let row_group_filters = src .row_group_filters .into_iter() @@ -264,16 +264,14 @@ impl TryFrom for ParquetFilter { let column_filters = row_group_filter .column_filters .into_iter() - .map(|encoded_bytes| { - if encoded_bytes.is_empty() { - Ok(None) - } else { - Some( + .map(|column_filter| match column_filter.filter { + Some(v) => match v { + sst_pb::column_filter::Filter::Xor(encoded_bytes) => { Xor8Filter::from_bytes(encoded_bytes) - .map(|e| Box::new(e) as Box<_>), - ) - .transpose() - } + .map(|v| Some(Box::new(v) as _)) + } + }, + None => Ok(None), }) .collect::>>()?; Ok(RowGroupFilter { column_filters }) @@ -399,7 +397,6 @@ mod tests { }; let parquet_filter_pb: sst_pb::ParquetFilter = parquet_filter.clone().into(); - assert_eq!(parquet_filter_pb.version, DEFAULT_FILTER_VERSION); assert_eq!(parquet_filter_pb.row_group_filters.len(), 2); assert_eq!( parquet_filter_pb.row_group_filters[0].column_filters.len(), @@ -409,16 +406,18 @@ mod tests { parquet_filter_pb.row_group_filters[1].column_filters.len(), 2 ); - assert!(parquet_filter_pb.row_group_filters[0].column_filters[0].is_empty()); - assert_eq!( - parquet_filter_pb.row_group_filters[0].column_filters[1].len(), - 24 - ); - assert_eq!( - parquet_filter_pb.row_group_filters[1].column_filters[0].len(), - 24 - ); - assert!(parquet_filter_pb.row_group_filters[1].column_filters[1].is_empty()); + assert!(parquet_filter_pb.row_group_filters[0].column_filters[0] + .filter + .is_none()); + assert!(parquet_filter_pb.row_group_filters[0].column_filters[1] + .filter + .is_some(),); + assert!(parquet_filter_pb.row_group_filters[1].column_filters[0] + .filter + .is_some(),); + assert!(parquet_filter_pb.row_group_filters[1].column_filters[1] + .filter + .is_none()); let decoded_parquet_filter = ParquetFilter::try_from(parquet_filter_pb).unwrap(); assert_eq!(decoded_parquet_filter, parquet_filter); diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index 0faf500bdc..e5738d23f1 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -251,7 +251,7 @@ pub enum StorageFormat { /// Design for time-series data /// Collapsible Columns within same primary key are collapsed - /// into list, other columns are the same format with columar's. + /// into list, other columns are the same format with columnar's. /// /// Whether a column is collapsible is decided by /// `Schema::is_collapsible_column` diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index dcdfa55a6f..46b92ece26 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -417,7 +417,7 @@ impl Builder { data_dir: dir.path().to_str().unwrap().to_string(), }), }, - wal_storage: WalStorageConfig::RocksDB(Box::new(RocksDBConfig { + wal: WalStorageConfig::RocksDB(Box::new(RocksDBConfig { data_dir: dir.path().to_str().unwrap().to_string(), })), ..Default::default() @@ -477,7 +477,7 @@ impl Default for RocksDBEngineContext { }), }, - wal_storage: WalStorageConfig::RocksDB(Box::new(RocksDBConfig { + wal: WalStorageConfig::RocksDB(Box::new(RocksDBConfig { data_dir: dir.path().to_str().unwrap().to_string(), })), ..Default::default() @@ -504,7 +504,7 @@ impl Clone for RocksDBEngineContext { }; config.storage = storage; - config.wal_storage = WalStorageConfig::RocksDB(Box::new(RocksDBConfig { + config.wal = WalStorageConfig::RocksDB(Box::new(RocksDBConfig { data_dir: dir.path().to_str().unwrap().to_string(), })); @@ -544,7 +544,7 @@ impl Default for MemoryEngineContext { data_dir: dir.path().to_str().unwrap().to_string(), }), }, - wal_storage: WalStorageConfig::Obkv(Box::default()), + wal: WalStorageConfig::Obkv(Box::default()), ..Default::default() }; diff --git a/common_types/src/column_schema.rs b/common_types/src/column_schema.rs index 18e27247b7..7184629c9c 100644 --- a/common_types/src/column_schema.rs +++ b/common_types/src/column_schema.rs @@ -62,12 +62,15 @@ pub enum Error { source: Box, backtrace: Backtrace, }, + #[snafu(display( - "Can not deserialize default-value-option from pb data, err:{}.\nBacktrace:\n{}", + "Failed to decode default value, encoded_val:{:?}, err:{}.\nBacktrace:\n{}", + encoded_val, source, backtrace ))] - InvalidDefaultValueData { + DecodeDefaultValue { + encoded_val: Vec, source: serde_json::error::Error, backtrace: Backtrace, }, @@ -252,13 +255,15 @@ impl TryFrom for ColumnSchema { fn try_from(column_schema: schema_pb::ColumnSchema) -> Result { let escaped_name = column_schema.name.escape_debug().to_string(); let data_type = column_schema.data_type(); - let default_value = if column_schema.default_value.is_empty() { - None - } else { - let default_value = serde_json::from_slice::(&column_schema.default_value) - .context(InvalidDefaultValueData)?; - Some(default_value) - }; + let default_value = column_schema + .default_value + .map(|v| match v { + schema_pb::column_schema::DefaultValue::SerdeJson(encoded_val) => { + serde_json::from_slice::(&encoded_val) + .context(DecodeDefaultValue { encoded_val }) + } + }) + .transpose()?; Ok(Self { id: column_schema.id, @@ -441,10 +446,11 @@ impl Builder { impl From for schema_pb::ColumnSchema { fn from(src: ColumnSchema) -> Self { - let default_value = src - .default_value - .map(|v| serde_json::to_vec(&v).unwrap()) - .unwrap_or_default(); + let default_value = src.default_value.map(|v| { + // FIXME: Maybe we should throw this error rather than panic here. + let encoded_value = serde_json::to_vec(&v).unwrap(); + schema_pb::column_schema::DefaultValue::SerdeJson(encoded_value) + }); schema_pb::ColumnSchema { name: src.name, @@ -460,6 +466,8 @@ impl From for schema_pb::ColumnSchema { #[cfg(test)] mod tests { + use sqlparser::ast::Value; + use super::*; /// Create a column schema for test, each field is filled with non-default @@ -470,6 +478,7 @@ mod tests { .is_nullable(true) .is_tag(true) .comment("Comment of this column".to_string()) + .default_value(Some(Expr::Value(Value::Boolean(true)))) .build() .expect("should succeed to build column schema") } @@ -485,7 +494,7 @@ mod tests { is_tag: true, comment: "Comment of this column".to_string(), escaped_name: "test_column_schema".escape_debug().to_string(), - default_value: None, + default_value: Some(Expr::Value(Value::Boolean(true))), }; assert_eq!(&lhs, &rhs); diff --git a/common_types/src/row/mod.rs b/common_types/src/row/mod.rs index 600052cfcc..fb6572979b 100644 --- a/common_types/src/row/mod.rs +++ b/common_types/src/row/mod.rs @@ -274,7 +274,7 @@ impl RowGroup { /// Get the max timestamp of rows #[inline] - pub fn max_timestmap(&self) -> Timestamp { + pub fn max_timestamp(&self) -> Timestamp { self.max_timestamp } } diff --git a/docs/example-cluster-0.toml b/docs/example-cluster-0.toml deleted file mode 100644 index 2282a123bb..0000000000 --- a/docs/example-cluster-0.toml +++ /dev/null @@ -1,37 +0,0 @@ -[node] -addr = "127.0.0.1" - -[server] -bind_addr = "0.0.0.0" -http_port = 5440 -grpc_port = 8831 -mysql_port = 3307 -log_level = "info" -deploy_mode = "Cluster" - -[analytic.storage] -mem_cache_capacity = '1G' -mem_cache_partition_bits = 0 - -[analytic.storage.object_store] -type = "Local" -data_dir = "/tmp/ceresdb0" - -[analytic.wal_storage] -type = "RocksDB" -data_dir = "/tmp/ceresdb0" - -[cluster_deployment] -type = "WithMeta" -cmd_channel_buffer_size = 10 - -[cluster_deployment.meta_client] -# Only support "defaultCluster" currently. -cluster_name = "defaultCluster" -meta_addr = "http://127.0.0.1:2379" -lease = "10s" -timeout = "5s" - -[limiter] -write_block_list = ['mytable1'] -read_block_list = ['mytable1'] diff --git a/docs/example-cluster-1.toml b/docs/example-cluster-1.toml deleted file mode 100644 index fb57d5c538..0000000000 --- a/docs/example-cluster-1.toml +++ /dev/null @@ -1,37 +0,0 @@ -[node] -addr = "127.0.0.1" - -[server] -bind_addr = "0.0.0.0" -http_port = 5441 -grpc_port = 8832 -mysql_port = 13307 -log_level = "info" -deploy_mode = "Cluster" - -[analytic.storage] -mem_cache_capacity = '1G' -mem_cache_partition_bits = 0 - -[analytic.storage.object_store] -type = "Local" -data_dir = "/tmp/ceresdb1" - -[analytic.wal_storage] -type = "RocksDB" -data_dir = "/tmp/ceresdb1" - -[cluster_deployment] -type = "WithMeta" -cmd_channel_buffer_size = 10 - -[cluster_deployment.meta_client] -# Only support "defaultCluster" currently. -cluster_name = "defaultCluster" -meta_addr = "http://127.0.0.1:2379" -lease = "10s" -timeout = "5s" - -[limiter] -write_block_list = ['mytable1'] -read_block_list = ['mytable1'] diff --git a/docs/example-standalone-static-routing.toml b/docs/example-standalone-static-routing.toml index ea00364a14..a9c59b2c50 100644 --- a/docs/example-standalone-static-routing.toml +++ b/docs/example-standalone-static-routing.toml @@ -2,7 +2,9 @@ bind_addr = "0.0.0.0" http_port = 5440 grpc_port = 8831 -log_level = "info" + +[logger] +level = "info" [runtime] read_thread_num = 4 @@ -15,7 +17,7 @@ replay_batch_size = 500 max_replay_tables_per_batch = 1024 write_group_command_channel_cap = 1024 -[analytic.wal_storage] +[analytic.wal] type = "RocksDB" data_dir = "/tmp/ceresdb1" @@ -33,7 +35,7 @@ schedule_interval = "30m" max_ongoing_tasks = 4 [cluster_deployment] -type = "NoMeta" +mode = "NoMeta" # Route&Shard: public [[cluster_deployment.topology.schema_shards]] diff --git a/docs/minimal.toml b/docs/minimal.toml index 5312d88db4..53b77abe5b 100644 --- a/docs/minimal.toml +++ b/docs/minimal.toml @@ -2,16 +2,17 @@ bind_addr = "0.0.0.0" http_port = 5440 grpc_port = 8831 -log_level = "info" -[analytic.storage] -mem_cache_capacity = '1G' -mem_cache_partition_bits = 0 +[logger] +level = "info" + +[tracing] +dir = "/tmp/ceresdb" [analytic.storage.object_store] type = "Local" data_dir = "/tmp/ceresdb" -[analytic.wal_storage] +[analytic.wal] type = "RocksDB" data_dir = "/tmp/ceresdb" diff --git a/integration_tests/cases/local/config.toml b/integration_tests/cases/local/config.toml index f8a1d05121..c2fd66d8a2 100644 --- a/integration_tests/cases/local/config.toml +++ b/integration_tests/cases/local/config.toml @@ -2,12 +2,11 @@ bind_addr = "127.0.0.1" http_port = 5440 grpc_port = 8831 -log_level = "debug" [query_engine] read_parallelism = 8 -[analytic.wal_storage] +[analytic.wal] type = "RocksDB" data_dir = "/tmp/ceresdb" diff --git a/sql/src/parser.rs b/sql/src/parser.rs index 6a50063200..7be0a131e4 100644 --- a/sql/src/parser.rs +++ b/sql/src/parser.rs @@ -85,7 +85,7 @@ pub fn get_default_value(opt: &ColumnOption) -> Option { } /// Returns true when is a TIMESTAMP KEY table constraint -pub fn is_timestamp_key_constraint(constrait: &TableConstraint) -> bool { +pub fn is_timestamp_key_constraint(constraint: &TableConstraint) -> bool { if let TableConstraint::Unique { name: Some(Ident { value, @@ -93,7 +93,7 @@ pub fn is_timestamp_key_constraint(constrait: &TableConstraint) -> bool { }), columns: _, is_primary: false, - } = constrait + } = constraint { return value == TS_KEY; } @@ -698,8 +698,8 @@ impl<'a> Parser<'a> { } } - fn consume_tokens(&mut self, expecteds: &[&str]) -> bool { - for expected in expecteds { + fn consume_tokens(&mut self, expected_tokens: &[&str]) -> bool { + for expected in expected_tokens { if !self.consume_token(expected) { return false; } diff --git a/sql/src/planner.rs b/sql/src/planner.rs index 68f403a5fd..9a22666096 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -1066,7 +1066,7 @@ fn ensure_column_default_value_valid<'a, P: MetaProvider>( Ok(()) } -// Workaroud for TableReference::from(&str) +// Workaround for TableReference::from(&str) // it will always convert table to lowercase when not quoted // TODO: support catalog/schema pub fn get_table_ref(table_name: &str) -> TableReference { diff --git a/src/config.rs b/src/config.rs index 9fbd0f8f08..a00b59b0ec 100644 --- a/src/config.rs +++ b/src/config.rs @@ -68,7 +68,7 @@ pub struct Config { /// [ClusterDeployment::WithMeta] means to start one or multiple CeresDB /// instance(s) under the control of CeresMeta. #[derive(Clone, Debug, Deserialize)] -#[serde(tag = "type")] +#[serde(tag = "mode")] pub enum ClusterDeployment { NoMeta(StaticRouteConfig), WithMeta(ClusterConfig), diff --git a/src/setup.rs b/src/setup.rs index 51853dc2f7..17bb300a22 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -83,7 +83,7 @@ pub fn run_server(config: Config, log_runtime: RuntimeLevel) { info!("Server starts up, config:{:#?}", config); runtimes.bg_runtime.block_on(async { - match config.analytic.wal_storage { + match config.analytic.wal { WalStorageConfig::RocksDB(_) => { run_server_with_runtimes::( config, diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 2f04c91b8e..2c3a1655d2 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -179,7 +179,7 @@ impl TryFrom for ceresdbproto::remote_engine::WriteRequest { let row_group = request.write_request.row_group; let table_schema_pb = row_group.schema().into(); let min_timestamp = row_group.min_timestamp().as_i64(); - let max_timestamp = row_group.max_timestmap().as_i64(); + let max_timestamp = row_group.max_timestamp().as_i64(); let avro_rows = avro::row_group_to_avro_rows(row_group).context(WriteRequestToPbWithCause { table_ident: request.table.clone(),