diff --git a/.github/actions/setup-greptimedb-cluster/action.yml b/.github/actions/setup-greptimedb-cluster/action.yml index 93d8c569c95d..eaf0032c7715 100644 --- a/.github/actions/setup-greptimedb-cluster/action.yml +++ b/.github/actions/setup-greptimedb-cluster/action.yml @@ -57,6 +57,7 @@ runs: greptime/greptimedb-cluster \ --create-namespace \ -n my-greptimedb \ + --values ./.github/actions/setup-greptimedb-cluster/values.yaml \ --wait \ --wait-for-jobs - name: Wait for GreptimeDB diff --git a/.github/actions/setup-greptimedb-cluster/values.yaml b/.github/actions/setup-greptimedb-cluster/values.yaml new file mode 100644 index 000000000000..b7ac1eb86e17 --- /dev/null +++ b/.github/actions/setup-greptimedb-cluster/values.yaml @@ -0,0 +1,18 @@ +meta: + config: |- + [runtime] + read_rt_size = 8 + write_rt_size = 8 + bg_rt_size = 8 +datanode: + config: |- + [runtime] + read_rt_size = 8 + write_rt_size = 8 + bg_rt_size = 8 +frontend: + config: |- + [runtime] + read_rt_size = 8 + write_rt_size = 8 + bg_rt_size = 8 \ No newline at end of file diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index b397a0fd6046..deecdc454a95 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -518,6 +518,9 @@ jobs: - name: Setup kafka server working-directory: tests-integration/fixtures/kafka run: docker compose -f docker-compose-standalone.yml up -d --wait + - name: Setup minio + working-directory: tests-integration/fixtures/minio + run: docker compose -f docker-compose-standalone.yml up -d --wait - name: Run nextest cases run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard env: @@ -528,6 +531,11 @@ jobs: GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }} GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }} GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }} + GT_MINIO_BUCKET: greptime + GT_MINIO_ACCESS_KEY_ID: superpower_ci_user + GT_MINIO_ACCESS_KEY: superpower_password + GT_MINIO_REGION: us-west-2 + GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000 GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 UNITTEST_LOG_DIR: "__unittest_logs" diff --git a/Cargo.lock b/Cargo.lock index 6961827e690f..9b85c1a797fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1774,20 +1774,24 @@ dependencies = [ "async-compression 0.3.15", "async-trait", "bytes", + "common-base", "common-error", "common-macro", "common-recordbatch", "common-runtime", + "common-telemetry", "common-test-util", "datafusion 38.0.0", "datatypes", "derive_builder 0.12.0", + "dotenv", "futures", "lazy_static", "object-store", "orc-rust", "parquet", "paste", + "rand", "regex", "serde", "snafu 0.8.3", @@ -1795,6 +1799,7 @@ dependencies = [ "tokio", "tokio-util", "url", + "uuid", ] [[package]] @@ -2097,9 +2102,11 @@ dependencies = [ "common-macro", "common-telemetry", "lazy_static", + "num_cpus", "once_cell", "paste", "prometheus", + "serde", "snafu 0.8.3", "tokio", "tokio-metrics", @@ -3924,6 +3931,7 @@ dependencies = [ "common-time", "common-version", "datanode", + "datatypes", "futures", "humantime-serde", "lazy_static", @@ -3940,6 +3948,7 @@ dependencies = [ "raft-engine", "script", "serde", + "serde_json", "servers", "session", "snafu 0.8.3", @@ -5857,6 +5866,7 @@ dependencies = [ "datafusion-common 38.0.0", "datafusion-expr 38.0.0", "datatypes", + "dotenv", "futures", "humantime-serde", "index", @@ -5874,6 +5884,9 @@ dependencies = [ "puffin", "rand", "regex", + "rskafka", + "rstest", + "rstest_reuse", "serde", "serde_json", "serde_with", @@ -8330,6 +8343,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + [[package]] name = "rend" version = "0.4.2" @@ -8596,9 +8615,9 @@ dependencies = [ [[package]] name = "rstest" -version = "0.17.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de1bb486a691878cd320c2f0d319ba91eeaa2e894066d8b5f8f117c000e9d962" +checksum = "9afd55a67069d6e434a95161415f5beeada95a01c7b815508a82dcb0e1593682" dependencies = [ "futures", "futures-timer", @@ -8608,28 +8627,31 @@ dependencies = [ [[package]] name = "rstest_macros" -version = "0.17.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290ca1a1c8ca7edb7c3283bd44dc35dd54fdec6253a3912e201ba1072018fca8" +checksum = "4165dfae59a39dd41d8dec720d3cbfbc71f69744efb480a3920f5d4e0cc6798d" dependencies = [ "cfg-if", + "glob", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", + "regex", + "relative-path", "rustc_version", - "syn 1.0.109", + "syn 2.0.66", "unicode-ident", ] [[package]] name = "rstest_reuse" -version = "0.5.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45f80dcc84beab3a327bbe161f77db25f336a1452428176787c8c79ac79d7073" +checksum = "b3a8fb4672e840a587a66fc577a5491375df51ddb88f2a2c2a792598c326fe14" dependencies = [ "quote", "rand", - "rustc_version", - "syn 1.0.109", + "syn 2.0.66", ] [[package]] @@ -11106,8 +11128,7 @@ dependencies = [ [[package]] name = "tokio-metrics-collector" version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767da47381602cc481653456823b3ebb600e83d5dd4e0293da9b5566c6c00f0" +source = "git+https://github.com/MichaelScofield/tokio-metrics-collector.git?rev=89d692d5753d28564a7aac73c6ac5aba22243ba0#89d692d5753d28564a7aac73c6ac5aba22243ba0" dependencies = [ "lazy_static", "parking_lot 0.12.3", diff --git a/Cargo.toml b/Cargo.toml index b33a899daabd..84bb497a024a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,6 +154,8 @@ reqwest = { version = "0.12", default-features = false, features = [ "multipart", ] } rskafka = "0.5" +rstest = "0.21" +rstest_reuse = "0.7" rust_decimal = "1.33" schemars = "0.8" serde = { version = "1.0", features = ["derive"] } diff --git a/config/config.md b/config/config.md index 912e8ca7508c..5c4878c9d37d 100644 --- a/config/config.md +++ b/config/config.md @@ -13,6 +13,10 @@ | `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. | | `default_timezone` | String | `None` | The default timezone of the server. | +| `runtime` | -- | -- | The runtime options. | +| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | +| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.timeout` | String | `30s` | HTTP request timeout. | @@ -154,6 +158,10 @@ | --- | -----| ------- | ----------- | | `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. | | `default_timezone` | String | `None` | The default timezone of the server. | +| `runtime` | -- | -- | The runtime options. | +| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | +| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | | `heartbeat` | -- | -- | The heartbeat options. | | `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. | | `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | @@ -240,6 +248,10 @@ | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. | | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | +| `runtime` | -- | -- | The runtime options. | +| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | +| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | | `procedure` | -- | -- | Procedure storage options. | | `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. | | `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially | @@ -300,6 +312,10 @@ | `rpc_max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `rpc_max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. | +| `runtime` | -- | -- | The runtime options. | +| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | +| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | | `heartbeat` | -- | -- | The heartbeat options. | | `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. | | `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index d1849048778c..3a20d3ac5f16 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -32,6 +32,15 @@ rpc_max_send_message_size = "512MB" ## Enable telemetry to collect anonymous usage data. enable_telemetry = true +## The runtime options. +[runtime] +## The number of threads to execute the runtime for global read operations. +read_rt_size = 8 +## The number of threads to execute the runtime for global write operations. +write_rt_size = 8 +## The number of threads to execute the runtime for global background operations. +bg_rt_size = 8 + ## The heartbeat options. [heartbeat] ## Interval for sending heartbeat messages to the metasrv. diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 728a3099f837..4f4bd5bf3d3d 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -5,6 +5,15 @@ mode = "standalone" ## +toml2docs:none-default default_timezone = "UTC" +## The runtime options. +[runtime] +## The number of threads to execute the runtime for global read operations. +read_rt_size = 8 +## The number of threads to execute the runtime for global write operations. +write_rt_size = 8 +## The number of threads to execute the runtime for global background operations. +bg_rt_size = 8 + ## The heartbeat options. [heartbeat] ## Interval for sending heartbeat messages to the metasrv. diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index bc6a5d119342..239533bd5886 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -25,6 +25,15 @@ enable_telemetry = true ## If it's not empty, the metasrv will store all data with this key prefix. store_key_prefix = "" +## The runtime options. +[runtime] +## The number of threads to execute the runtime for global read operations. +read_rt_size = 8 +## The number of threads to execute the runtime for global write operations. +write_rt_size = 8 +## The number of threads to execute the runtime for global background operations. +bg_rt_size = 8 + ## Procedure storage options. [procedure] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 8386c7e1e61a..d6fcc3e8943e 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -8,6 +8,15 @@ enable_telemetry = true ## +toml2docs:none-default default_timezone = "UTC" +## The runtime options. +[runtime] +## The number of threads to execute the runtime for global read operations. +read_rt_size = 8 +## The number of threads to execute the runtime for global write operations. +write_rt_size = 8 +## The number of threads to execute the runtime for global background operations. +bg_rt_size = 8 + ## The HTTP server options. [http] ## The address to bind the HTTP server. diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 3c189f2c3d07..d8680ed5294e 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -23,7 +23,6 @@ use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; use common_wal::config::DatanodeWalConfig; -use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; use datanode::service::DatanodeServiceBuilder; use meta_client::MetaClientOptions; @@ -34,11 +33,13 @@ use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, }; -use crate::options::GlobalOptions; +use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; pub const APP_NAME: &str = "greptime-datanode"; +type DatanodeOptions = GreptimeOptions; + pub struct Instance { datanode: Datanode, @@ -97,7 +98,9 @@ impl Command { } pub fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.subcmd.load_options(global_options) + match &self.subcmd { + SubCommand::Start(cmd) => cmd.load_options(global_options), + } } } @@ -112,12 +115,6 @@ impl SubCommand { SubCommand::Start(cmd) => cmd.build(opts).await, } } - - fn load_options(&self, global_options: &GlobalOptions) -> Result { - match self { - SubCommand::Start(cmd) => cmd.load_options(global_options), - } - } } #[derive(Debug, Parser, Default)] @@ -146,22 +143,25 @@ struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.merge_with_cli_options( - global_options, - DatanodeOptions::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - ) - .context(LoadLayeredConfigSnafu)?, + let mut opts = DatanodeOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), ) + .context(LoadLayeredConfigSnafu)?; + + self.merge_with_cli_options(global_options, &mut opts)?; + + Ok(opts) } // The precedence order is: cli > config file > environment variables > default values. fn merge_with_cli_options( &self, global_options: &GlobalOptions, - mut opts: DatanodeOptions, - ) -> Result { + opts: &mut DatanodeOptions, + ) -> Result<()> { + let opts = &mut opts.component; + if let Some(dir) = &global_options.log_dir { opts.logging.dir.clone_from(dir); } @@ -231,25 +231,28 @@ impl StartCommand { // Disable dashboard in datanode. opts.http.disable_dashboard = true; - Ok(opts) + Ok(()) } - async fn build(&self, mut opts: DatanodeOptions) -> Result { + async fn build(&self, opts: DatanodeOptions) -> Result { + common_runtime::init_global_runtimes(&opts.runtime); + let guard = common_telemetry::init_global_logging( APP_NAME, - &opts.logging, - &opts.tracing, - opts.node_id.map(|x| x.to_string()), + &opts.component.logging, + &opts.component.tracing, + opts.component.node_id.map(|x| x.to_string()), ); log_versions(version!(), short_version!()); + info!("Datanode start command: {:#?}", self); + info!("Datanode options: {:#?}", opts); + + let mut opts = opts.component; let plugins = plugins::setup_datanode_plugins(&mut opts) .await .context(StartDatanodeSnafu)?; - info!("Datanode start command: {:#?}", self); - info!("Datanode options: {:#?}", opts); - let node_id = opts .node_id .context(MissingConfigSnafu { msg: "'node_id'" })?; @@ -353,7 +356,7 @@ mod tests { ..Default::default() }; - let options = cmd.load_options(&GlobalOptions::default()).unwrap(); + let options = cmd.load_options(&Default::default()).unwrap().component; assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); assert_eq!(Some(42), options.node_id); @@ -414,7 +417,8 @@ mod tests { fn test_try_from_cmd() { let opt = StartCommand::default() .load_options(&GlobalOptions::default()) - .unwrap(); + .unwrap() + .component; assert_eq!(Mode::Standalone, opt.mode); let opt = (StartCommand { @@ -423,7 +427,8 @@ mod tests { ..Default::default() }) .load_options(&GlobalOptions::default()) - .unwrap(); + .unwrap() + .component; assert_eq!(Mode::Distributed, opt.mode); assert!((StartCommand { @@ -454,7 +459,8 @@ mod tests { #[cfg(feature = "tokio-console")] tokio_console_addr: None, }) - .unwrap(); + .unwrap() + .component; let logging_opt = options.logging; assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir); @@ -536,7 +542,7 @@ mod tests { ..Default::default() }; - let opts = command.load_options(&GlobalOptions::default()).unwrap(); + let opts = command.load_options(&Default::default()).unwrap().component; // Should be read from env, env > default values. let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else { @@ -562,7 +568,10 @@ mod tests { assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir"); // Should be default value. - assert_eq!(opts.http.addr, DatanodeOptions::default().http.addr); + assert_eq!( + opts.http.addr, + DatanodeOptions::default().component.http.addr + ); }, ); } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index a3e744e9c7ec..a7781e37a2ed 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -29,7 +29,6 @@ use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_time::timezone::set_default_timezone; use common_version::{short_version, version}; -use frontend::frontend::FrontendOptions; use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; @@ -44,9 +43,11 @@ use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu, }; -use crate::options::GlobalOptions; +use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; +type FrontendOptions = GreptimeOptions; + pub struct Instance { frontend: FeInstance, @@ -164,22 +165,25 @@ pub struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.merge_with_cli_options( - global_options, - FrontendOptions::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - ) - .context(LoadLayeredConfigSnafu)?, + let mut opts = FrontendOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), ) + .context(LoadLayeredConfigSnafu)?; + + self.merge_with_cli_options(global_options, &mut opts)?; + + Ok(opts) } // The precedence order is: cli > config file > environment variables > default values. fn merge_with_cli_options( &self, global_options: &GlobalOptions, - mut opts: FrontendOptions, - ) -> Result { + opts: &mut FrontendOptions, + ) -> Result<()> { + let opts = &mut opts.component; + if let Some(dir) = &global_options.log_dir { opts.logging.dir.clone_from(dir); } @@ -242,26 +246,29 @@ impl StartCommand { opts.user_provider.clone_from(&self.user_provider); - Ok(opts) + Ok(()) } - async fn build(&self, mut opts: FrontendOptions) -> Result { + async fn build(&self, opts: FrontendOptions) -> Result { + common_runtime::init_global_runtimes(&opts.runtime); + let guard = common_telemetry::init_global_logging( APP_NAME, - &opts.logging, - &opts.tracing, - opts.node_id.clone(), + &opts.component.logging, + &opts.component.tracing, + opts.component.node_id.clone(), ); log_versions(version!(), short_version!()); + info!("Frontend start command: {:#?}", self); + info!("Frontend options: {:#?}", opts); + + let mut opts = opts.component; #[allow(clippy::unnecessary_mut_passed)] let plugins = plugins::setup_frontend_plugins(&mut opts) .await .context(StartFrontendSnafu)?; - info!("Frontend start command: {:#?}", self); - info!("Frontend options: {:#?}", opts); - set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?; let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu { @@ -380,14 +387,14 @@ mod tests { ..Default::default() }; - let opts = command.load_options(&GlobalOptions::default()).unwrap(); + let opts = command.load_options(&Default::default()).unwrap().component; assert_eq!(opts.http.addr, "127.0.0.1:1234"); assert_eq!(ReadableSize::mb(64), opts.http.body_limit); assert_eq!(opts.mysql.addr, "127.0.0.1:5678"); assert_eq!(opts.postgres.addr, "127.0.0.1:5432"); - let default_opts = FrontendOptions::default(); + let default_opts = FrontendOptions::default().component; assert_eq!(opts.grpc.addr, default_opts.grpc.addr); assert!(opts.mysql.enable); @@ -428,7 +435,8 @@ mod tests { ..Default::default() }; - let fe_opts = command.load_options(&GlobalOptions::default()).unwrap(); + let fe_opts = command.load_options(&Default::default()).unwrap().component; + assert_eq!(Mode::Distributed, fe_opts.mode); assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr); assert_eq!(Duration::from_secs(30), fe_opts.http.timeout); @@ -442,7 +450,7 @@ mod tests { #[tokio::test] async fn test_try_from_start_command_to_anymap() { - let mut fe_opts = FrontendOptions { + let mut fe_opts = frontend::frontend::FrontendOptions { http: HttpOptions { disable_dashboard: false, ..Default::default() @@ -479,7 +487,8 @@ mod tests { #[cfg(feature = "tokio-console")] tokio_console_addr: None, }) - .unwrap(); + .unwrap() + .component; let logging_opt = options.logging; assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir); @@ -557,7 +566,7 @@ mod tests { ..Default::default() }; - let fe_opts = command.load_options(&GlobalOptions::default()).unwrap(); + let fe_opts = command.load_options(&Default::default()).unwrap().component; // Should be read from env, env > default values. assert_eq!(fe_opts.mysql.runtime_size, 11); diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 8648f220f3ac..3b89fdce112e 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -21,14 +21,15 @@ use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; use meta_srv::bootstrap::MetasrvInstance; -use meta_srv::metasrv::MetasrvOptions; use snafu::ResultExt; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu}; -use crate::options::GlobalOptions; +use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; +type MetasrvOptions = GreptimeOptions; + pub const APP_NAME: &str = "greptime-metasrv"; pub struct Instance { @@ -139,22 +140,25 @@ struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.merge_with_cli_options( - global_options, - MetasrvOptions::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - ) - .context(LoadLayeredConfigSnafu)?, + let mut opts = MetasrvOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), ) + .context(LoadLayeredConfigSnafu)?; + + self.merge_with_cli_options(global_options, &mut opts)?; + + Ok(opts) } // The precedence order is: cli > config file > environment variables > default values. fn merge_with_cli_options( &self, global_options: &GlobalOptions, - mut opts: MetasrvOptions, - ) -> Result { + opts: &mut MetasrvOptions, + ) -> Result<()> { + let opts = &mut opts.component; + if let Some(dir) = &global_options.log_dir { opts.logging.dir.clone_from(dir); } @@ -217,21 +221,28 @@ impl StartCommand { // Disable dashboard in metasrv. opts.http.disable_dashboard = true; - Ok(opts) + Ok(()) } - async fn build(&self, mut opts: MetasrvOptions) -> Result { - let guard = - common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); + async fn build(&self, opts: MetasrvOptions) -> Result { + common_runtime::init_global_runtimes(&opts.runtime); + + let guard = common_telemetry::init_global_logging( + APP_NAME, + &opts.component.logging, + &opts.component.tracing, + None, + ); log_versions(version!(), short_version!()); + info!("Metasrv start command: {:#?}", self); + info!("Metasrv options: {:#?}", opts); + + let mut opts = opts.component; let plugins = plugins::setup_metasrv_plugins(&mut opts) .await .context(StartMetaServerSnafu)?; - info!("Metasrv start command: {:#?}", self); - info!("Metasrv options: {:#?}", opts); - let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None) .await .context(error::BuildMetaServerSnafu)?; @@ -266,7 +277,7 @@ mod tests { ..Default::default() }; - let options = cmd.load_options(&GlobalOptions::default()).unwrap(); + let options = cmd.load_options(&Default::default()).unwrap().component; assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs); assert_eq!(SelectorType::LoadBased, options.selector); @@ -299,7 +310,7 @@ mod tests { ..Default::default() }; - let options = cmd.load_options(&GlobalOptions::default()).unwrap(); + let options = cmd.load_options(&Default::default()).unwrap().component; assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!("127.0.0.1:3002".to_string(), options.server_addr); assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs); @@ -349,7 +360,8 @@ mod tests { #[cfg(feature = "tokio-console")] tokio_console_addr: None, }) - .unwrap(); + .unwrap() + .component; let logging_opt = options.logging; assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir); @@ -406,7 +418,7 @@ mod tests { ..Default::default() }; - let opts = command.load_options(&GlobalOptions::default()).unwrap(); + let opts = command.load_options(&Default::default()).unwrap().component; // Should be read from env, env > default values. assert_eq!(opts.bind_addr, "127.0.0.1:14002"); diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 03ccbc536247..26ac9203a225 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -13,6 +13,9 @@ // limitations under the License. use clap::Parser; +use common_config::Configurable; +use common_runtime::global::RuntimeOptions; +use serde::{Deserialize, Serialize}; #[derive(Parser, Default, Debug, Clone)] pub struct GlobalOptions { @@ -29,3 +32,22 @@ pub struct GlobalOptions { #[arg(global = true)] pub tokio_console_addr: Option, } + +// TODO(LFC): Move logging and tracing options into global options, like the runtime options. +/// All the options of GreptimeDB. +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] +#[serde(default)] +pub struct GreptimeOptions { + /// The runtime options. + pub runtime: RuntimeOptions, + + /// The options of each component (like Datanode or Standalone) of GreptimeDB. + #[serde(flatten)] + pub component: T, +} + +impl Configurable for GreptimeOptions { + fn env_list_keys() -> Option<&'static [&'static str]> { + T::env_list_keys() + } +} diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 90958baf1048..e1ac35c98b06 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -67,7 +67,7 @@ use crate::error::{ ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; -use crate::options::GlobalOptions; +use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; pub const APP_NAME: &str = "greptime-standalone"; @@ -79,11 +79,14 @@ pub struct Command { } impl Command { - pub async fn build(&self, opts: StandaloneOptions) -> Result { + pub async fn build(&self, opts: GreptimeOptions) -> Result { self.subcmd.build(opts).await } - pub fn load_options(&self, global_options: &GlobalOptions) -> Result { + pub fn load_options( + &self, + global_options: &GlobalOptions, + ) -> Result> { self.subcmd.load_options(global_options) } } @@ -94,20 +97,23 @@ enum SubCommand { } impl SubCommand { - async fn build(&self, opts: StandaloneOptions) -> Result { + async fn build(&self, opts: GreptimeOptions) -> Result { match self { SubCommand::Start(cmd) => cmd.build(opts).await, } } - fn load_options(&self, global_options: &GlobalOptions) -> Result { + fn load_options( + &self, + global_options: &GlobalOptions, + ) -> Result> { match self { SubCommand::Start(cmd) => cmd.load_options(global_options), } } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(default)] pub struct StandaloneOptions { pub mode: Mode, @@ -161,7 +167,7 @@ impl Default for StandaloneOptions { } } -impl Configurable<'_> for StandaloneOptions { +impl Configurable for StandaloneOptions { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["wal.broker_endpoints"]) } @@ -291,23 +297,27 @@ pub struct StartCommand { } impl StartCommand { - fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.merge_with_cli_options( - global_options, - StandaloneOptions::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - ) - .context(LoadLayeredConfigSnafu)?, + fn load_options( + &self, + global_options: &GlobalOptions, + ) -> Result> { + let mut opts = GreptimeOptions::::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), ) + .context(LoadLayeredConfigSnafu)?; + + self.merge_with_cli_options(global_options, &mut opts.component)?; + + Ok(opts) } // The precedence order is: cli > config file > environment variables > default values. pub fn merge_with_cli_options( &self, global_options: &GlobalOptions, - mut opts: StandaloneOptions, - ) -> Result { + opts: &mut StandaloneOptions, + ) -> Result<()> { // Should always be standalone mode. opts.mode = Mode::Standalone; @@ -369,20 +379,27 @@ impl StartCommand { opts.user_provider.clone_from(&self.user_provider); - Ok(opts) + Ok(()) } #[allow(unreachable_code)] #[allow(unused_variables)] #[allow(clippy::diverging_sub_expression)] - async fn build(&self, opts: StandaloneOptions) -> Result { - let guard = - common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); + async fn build(&self, opts: GreptimeOptions) -> Result { + common_runtime::init_global_runtimes(&opts.runtime); + + let guard = common_telemetry::init_global_logging( + APP_NAME, + &opts.component.logging, + &opts.component.tracing, + None, + ); log_versions(version!(), short_version!()); info!("Standalone start command: {:#?}", self); - info!("Building standalone instance with {opts:#?}"); + info!("Standalone options: {opts:#?}"); + let opts = opts.component; let mut fe_opts = opts.frontend_options(); #[allow(clippy::unnecessary_mut_passed)] let fe_plugins = plugins::setup_frontend_plugins(&mut fe_opts) // mut ref is MUST, DO NOT change it @@ -664,7 +681,10 @@ mod tests { ..Default::default() }; - let options = cmd.load_options(&GlobalOptions::default()).unwrap(); + let options = cmd + .load_options(&GlobalOptions::default()) + .unwrap() + .component; let fe_opts = options.frontend_options(); let dn_opts = options.datanode_options(); let logging_opts = options.logging; @@ -725,7 +745,8 @@ mod tests { #[cfg(feature = "tokio-console")] tokio_console_addr: None, }) - .unwrap(); + .unwrap() + .component; assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir); assert_eq!("debug", opts.logging.level.unwrap()); @@ -787,7 +808,7 @@ mod tests { ..Default::default() }; - let opts = command.load_options(&GlobalOptions::default()).unwrap(); + let opts = command.load_options(&Default::default()).unwrap().component; // Should be read from env, env > default values. assert_eq!(opts.logging.dir, "/other/log/dir"); diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs new file mode 100644 index 000000000000..80075b846e51 --- /dev/null +++ b/src/cmd/tests/load_config_test.rs @@ -0,0 +1,218 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use cmd::options::GreptimeOptions; +use cmd::standalone::StandaloneOptions; +use common_config::Configurable; +use common_runtime::global::RuntimeOptions; +use common_telemetry::logging::LoggingOptions; +use common_wal::config::raft_engine::RaftEngineConfig; +use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig}; +use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; +use frontend::frontend::FrontendOptions; +use frontend::service_config::datanode::DatanodeClientOptions; +use meta_client::MetaClientOptions; +use meta_srv::metasrv::MetasrvOptions; +use meta_srv::selector::SelectorType; +use mito2::config::MitoConfig; +use servers::export_metrics::ExportMetricsOption; + +#[test] +fn test_load_datanode_example_config() { + let example_config = common_test_util::find_workspace_path("config/datanode.example.toml"); + let options = + GreptimeOptions::::load_layered_options(example_config.to_str(), "") + .unwrap(); + + let expected = GreptimeOptions:: { + runtime: RuntimeOptions { + read_rt_size: 8, + write_rt_size: 8, + bg_rt_size: 8, + }, + component: DatanodeOptions { + node_id: Some(42), + rpc_hostname: Some("127.0.0.1".to_string()), + meta_client: Some(MetaClientOptions { + metasrv_addrs: vec!["127.0.0.1:3002".to_string()], + timeout: Duration::from_secs(3), + heartbeat_timeout: Duration::from_millis(500), + ddl_timeout: Duration::from_secs(10), + connect_timeout: Duration::from_secs(1), + tcp_nodelay: true, + metadata_cache_max_capacity: 100000, + metadata_cache_ttl: Duration::from_secs(600), + metadata_cache_tti: Duration::from_secs(300), + }), + wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { + dir: Some("/tmp/greptimedb/wal".to_string()), + sync_period: Some(Duration::from_secs(10)), + ..Default::default() + }), + storage: StorageConfig { + data_home: "/tmp/greptimedb/".to_string(), + ..Default::default() + }, + region_engine: vec![RegionEngineConfig::Mito(MitoConfig { + num_workers: 8, + auto_flush_interval: Duration::from_secs(3600), + scan_parallelism: 0, + ..Default::default() + })], + logging: LoggingOptions { + level: Some("info".to_string()), + otlp_endpoint: Some("".to_string()), + tracing_sample_ratio: Some(Default::default()), + ..Default::default() + }, + export_metrics: ExportMetricsOption { + self_import: Some(Default::default()), + remote_write: Some(Default::default()), + ..Default::default() + }, + ..Default::default() + }, + }; + + assert_eq!(options, expected); +} + +#[test] +fn test_load_frontend_example_config() { + let example_config = common_test_util::find_workspace_path("config/frontend.example.toml"); + let options = + GreptimeOptions::::load_layered_options(example_config.to_str(), "") + .unwrap(); + let expected = GreptimeOptions:: { + runtime: RuntimeOptions { + read_rt_size: 8, + write_rt_size: 8, + bg_rt_size: 8, + }, + component: FrontendOptions { + default_timezone: Some("UTC".to_string()), + meta_client: Some(MetaClientOptions { + metasrv_addrs: vec!["127.0.0.1:3002".to_string()], + timeout: Duration::from_secs(3), + heartbeat_timeout: Duration::from_millis(500), + ddl_timeout: Duration::from_secs(10), + connect_timeout: Duration::from_secs(1), + tcp_nodelay: true, + metadata_cache_max_capacity: 100000, + metadata_cache_ttl: Duration::from_secs(600), + metadata_cache_tti: Duration::from_secs(300), + }), + logging: LoggingOptions { + level: Some("info".to_string()), + otlp_endpoint: Some("".to_string()), + tracing_sample_ratio: Some(Default::default()), + ..Default::default() + }, + datanode: frontend::service_config::DatanodeOptions { + client: DatanodeClientOptions { + connect_timeout: Duration::from_secs(10), + tcp_nodelay: true, + }, + }, + export_metrics: ExportMetricsOption { + self_import: Some(Default::default()), + remote_write: Some(Default::default()), + ..Default::default() + }, + ..Default::default() + }, + }; + assert_eq!(options, expected); +} + +#[test] +fn test_load_metasrv_example_config() { + let example_config = common_test_util::find_workspace_path("config/metasrv.example.toml"); + let options = + GreptimeOptions::::load_layered_options(example_config.to_str(), "") + .unwrap(); + let expected = GreptimeOptions:: { + runtime: RuntimeOptions { + read_rt_size: 8, + write_rt_size: 8, + bg_rt_size: 8, + }, + component: MetasrvOptions { + selector: SelectorType::LeaseBased, + data_home: "/tmp/metasrv/".to_string(), + logging: LoggingOptions { + dir: "/tmp/greptimedb/logs".to_string(), + level: Some("info".to_string()), + otlp_endpoint: Some("".to_string()), + tracing_sample_ratio: Some(Default::default()), + ..Default::default() + }, + export_metrics: ExportMetricsOption { + self_import: Some(Default::default()), + remote_write: Some(Default::default()), + ..Default::default() + }, + ..Default::default() + }, + }; + assert_eq!(options, expected); +} + +#[test] +fn test_load_standalone_example_config() { + let example_config = common_test_util::find_workspace_path("config/standalone.example.toml"); + let options = + GreptimeOptions::::load_layered_options(example_config.to_str(), "") + .unwrap(); + let expected = GreptimeOptions:: { + runtime: RuntimeOptions { + read_rt_size: 8, + write_rt_size: 8, + bg_rt_size: 8, + }, + component: StandaloneOptions { + default_timezone: Some("UTC".to_string()), + wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig { + dir: Some("/tmp/greptimedb/wal".to_string()), + sync_period: Some(Duration::from_secs(10)), + ..Default::default() + }), + region_engine: vec![RegionEngineConfig::Mito(MitoConfig { + num_workers: 8, + auto_flush_interval: Duration::from_secs(3600), + scan_parallelism: 0, + ..Default::default() + })], + storage: StorageConfig { + data_home: "/tmp/greptimedb/".to_string(), + ..Default::default() + }, + logging: LoggingOptions { + level: Some("info".to_string()), + otlp_endpoint: Some("".to_string()), + tracing_sample_ratio: Some(Default::default()), + ..Default::default() + }, + export_metrics: ExportMetricsOption { + self_import: Some(Default::default()), + remote_write: Some(Default::default()), + ..Default::default() + }, + ..Default::default() + }, + }; + assert_eq!(options, expected); +} diff --git a/src/common/config/src/config.rs b/src/common/config/src/config.rs index c21735a059ea..e0816fbd5671 100644 --- a/src/common/config/src/config.rs +++ b/src/common/config/src/config.rs @@ -13,7 +13,8 @@ // limitations under the License. use config::{Environment, File, FileFormat}; -use serde::{Deserialize, Serialize}; +use serde::de::DeserializeOwned; +use serde::Serialize; use snafu::ResultExt; use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu, TomlFormatSnafu}; @@ -25,7 +26,7 @@ pub const ENV_VAR_SEP: &str = "__"; pub const ENV_LIST_SEP: &str = ","; /// Configuration trait defines the common interface for configuration that can be loaded from multiple sources and serialized to TOML. -pub trait Configurable<'de>: Serialize + Deserialize<'de> + Default + Sized { +pub trait Configurable: Serialize + DeserializeOwned + Default + Sized { /// Load the configuration from multiple sources and merge them. /// The precedence order is: config file > environment variables > default values. /// `env_prefix` is the prefix of environment variables, e.g. "FRONTEND__xxx". @@ -128,7 +129,7 @@ mod tests { } } - impl Configurable<'_> for TestDatanodeConfig { + impl Configurable for TestDatanodeConfig { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["meta_client.metasrv_addrs"]) } diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index ece0edd9fe0d..e4bdec2c70dc 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -20,6 +20,7 @@ async-compression = { version = "0.3", features = [ ] } async-trait.workspace = true bytes.workspace = true +common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-recordbatch.workspace = true @@ -33,6 +34,7 @@ object-store.workspace = true orc-rust = { git = "https://github.com/datafusion-contrib/datafusion-orc.git", rev = "502217315726314c4008808fe169764529640599" } parquet.workspace = true paste = "1.0" +rand.workspace = true regex = "1.7" serde.workspace = true snafu.workspace = true @@ -42,4 +44,7 @@ tokio-util.workspace = true url = "2.3" [dev-dependencies] +common-telemetry.workspace = true common-test-util.workspace = true +dotenv.workspace = true +uuid.workspace = true diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 5bb9258ad3d0..c555f763b59b 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -46,6 +46,7 @@ use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter}; use crate::compression::CompressionType; use crate::error::{self, Result}; use crate::share_buffer::SharedBuffer; +use crate::DEFAULT_WRITE_BUFFER_SIZE; pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type"; pub const FORMAT_DELIMITER: &str = "delimiter"; @@ -204,6 +205,7 @@ pub async fn stream_to_file T>( store .writer_with(&path) .concurrent(concurrency) + .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .await .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 2e887ac2f7c3..f5125757b956 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -39,6 +39,7 @@ use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBuffer use crate::error::{self, Result}; use crate::file_format::FileFormat; use crate::share_buffer::SharedBuffer; +use crate::DEFAULT_WRITE_BUFFER_SIZE; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub struct ParquetFormat {} @@ -197,6 +198,7 @@ impl BufferedWriter { store .writer_with(&path) .concurrent(concurrency) + .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .await .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) @@ -276,9 +278,19 @@ pub async fn stream_to_parquet( #[cfg(test)] mod tests { + use std::env; + use std::sync::Arc; + + use common_telemetry::warn; use common_test_util::find_workspace_path; + use datatypes::arrow::array::{ArrayRef, Int64Array, RecordBatch}; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; + use object_store::services::S3; + use object_store::ObjectStore; + use rand::{thread_rng, Rng}; use super::*; + use crate::file_format::parquet::BufferedWriter; use crate::test_util::{format_schema, test_store}; fn test_data_root() -> String { @@ -296,4 +308,64 @@ mod tests { assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted); } + + #[tokio::test] + async fn test_parquet_writer() { + common_telemetry::init_default_ut_logging(); + let _ = dotenv::dotenv(); + let Ok(bucket) = env::var("GT_MINIO_BUCKET") else { + warn!("ignoring test parquet writer"); + return; + }; + + let mut builder = S3::default(); + let _ = builder + .root(&uuid::Uuid::new_v4().to_string()) + .access_key_id(&env::var("GT_MINIO_ACCESS_KEY_ID").unwrap()) + .secret_access_key(&env::var("GT_MINIO_ACCESS_KEY").unwrap()) + .bucket(&bucket) + .region(&env::var("GT_MINIO_REGION").unwrap()) + .endpoint(&env::var("GT_MINIO_ENDPOINT_URL").unwrap()); + + let object_store = ObjectStore::new(builder).unwrap().finish(); + let file_path = uuid::Uuid::new_v4().to_string(); + let fields = vec![ + Field::new("field1", DataType::Int64, true), + Field::new("field0", DataType::Int64, true), + ]; + let arrow_schema = Arc::new(Schema::new(fields)); + let mut buffered_writer = BufferedWriter::try_new( + file_path.clone(), + object_store.clone(), + arrow_schema.clone(), + None, + // Sets a small value. + 128, + 8, + ) + .await + .unwrap(); + let rows = 200000; + let generator = || { + let columns: Vec = vec![ + Arc::new(Int64Array::from( + (0..rows) + .map(|_| thread_rng().gen::()) + .collect::>(), + )), + Arc::new(Int64Array::from( + (0..rows) + .map(|_| thread_rng().gen::()) + .collect::>(), + )), + ]; + RecordBatch::try_new(arrow_schema.clone(), columns).unwrap() + }; + let batch = generator(); + // Writes about ~30Mi + for _ in 0..10 { + buffered_writer.write(&batch).await.unwrap(); + } + buffered_writer.close().await.unwrap(); + } } diff --git a/src/common/datasource/src/lib.rs b/src/common/datasource/src/lib.rs index 8cb8756e06f8..5d24b1cdf49d 100644 --- a/src/common/datasource/src/lib.rs +++ b/src/common/datasource/src/lib.rs @@ -27,3 +27,8 @@ pub mod test_util; #[cfg(test)] pub mod tests; pub mod util; + +use common_base::readable_size::ReadableSize; + +/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb). +pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml index a6da1f571fc2..e5fa276c4bf1 100644 --- a/src/common/runtime/Cargo.toml +++ b/src/common/runtime/Cargo.toml @@ -13,13 +13,15 @@ common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true lazy_static.workspace = true +num_cpus.workspace = true once_cell.workspace = true paste.workspace = true prometheus.workspace = true +serde.workspace = true snafu.workspace = true tokio.workspace = true tokio-metrics = "0.3" -tokio-metrics-collector = "0.2" +tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" } tokio-util.workspace = true [dev-dependencies] diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index 51bad13107c7..6b21851e1680 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -19,6 +19,7 @@ use std::sync::{Mutex, Once}; use common_telemetry::info; use once_cell::sync::Lazy; use paste::paste; +use serde::{Deserialize, Serialize}; use crate::{Builder, JoinHandle, Runtime}; @@ -26,6 +27,28 @@ const READ_WORKERS: usize = 8; const WRITE_WORKERS: usize = 8; const BG_WORKERS: usize = 8; +/// The options for the global runtimes. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct RuntimeOptions { + /// The number of threads to execute the runtime for global read operations. + pub read_rt_size: usize, + /// The number of threads to execute the runtime for global write operations. + pub write_rt_size: usize, + /// The number of threads to execute the runtime for global background operations. + pub bg_rt_size: usize, +} + +impl Default for RuntimeOptions { + fn default() -> Self { + let cpus = num_cpus::get(); + Self { + read_rt_size: cpus, + write_rt_size: cpus, + bg_rt_size: cpus, + } + } +} + pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime { info!("Creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}."); Builder::default() @@ -112,18 +135,26 @@ static CONFIG_RUNTIMES: Lazy> = /// # Panics /// Panics when the global runtimes are already initialized. /// You should call this function before using any runtime functions. -pub fn init_global_runtimes( - read: Option, - write: Option, - background: Option, -) { +pub fn init_global_runtimes(options: &RuntimeOptions) { static START: Once = Once::new(); START.call_once(move || { let mut c = CONFIG_RUNTIMES.lock().unwrap(); assert!(!c.already_init, "Global runtimes already initialized"); - c.read_runtime = read; - c.write_runtime = write; - c.bg_runtime = background; + c.read_runtime = Some(create_runtime( + "global-read", + "global-read-worker", + options.read_rt_size, + )); + c.write_runtime = Some(create_runtime( + "global-write", + "global-write-worker", + options.write_rt_size, + )); + c.bg_runtime = Some(create_runtime( + "global-bg", + "global-bg-worker", + options.bg_rt_size, + )); }); } diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index 08baed46cbd3..ba6f74c96cc6 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -13,7 +13,7 @@ // limitations under the License. pub mod error; -mod global; +pub mod global; mod metrics; mod repeated_task; pub mod runtime; diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index 2b66dd45ce3a..b8084a2a8b3e 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true workspace = true [dependencies] -client.workspace = true +client = { workspace = true, features = ["testing"] } common-query.workspace = true common-recordbatch.workspace = true once_cell.workspace = true diff --git a/src/common/test-util/src/recordbatch.rs b/src/common/test-util/src/recordbatch.rs index 47c949d40715..eb666e167a31 100644 --- a/src/common/test-util/src/recordbatch.rs +++ b/src/common/test-util/src/recordbatch.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use client::Database; use common_query::OutputData; use common_recordbatch::util; @@ -29,3 +30,25 @@ pub async fn check_output_stream(output: OutputData, expected: &str) { let pretty_print = recordbatches.pretty_print().unwrap(); assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print); } + +pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) { + let output = db.sql(sql).await.unwrap(); + let output = output.data; + + match (&output, expected) { + (OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => { + assert_eq!( + *x, y, + r#" +expected: {y} +actual: {x} +"# + ) + } + (OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x)) + | (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => { + check_output_stream(output, x).await + } + _ => panic!(), + } +} diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index ec278d3c4247..7e76c7d68169 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -15,7 +15,7 @@ //! Datanode configurations use common_base::readable_size::ReadableSize; -use common_base::secrets::SecretString; +use common_base::secrets::{ExposeSecret, SecretString}; use common_config::Configurable; use common_grpc::channel_manager::{ DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, @@ -38,7 +38,7 @@ pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256); const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb"; /// Object storage config -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(tag = "type")] pub enum ObjectStoreConfig { File(FileConfig), @@ -61,7 +61,7 @@ impl ObjectStoreConfig { } /// Storage engine config -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(default)] pub struct StorageConfig { /// The working directory of database @@ -85,7 +85,7 @@ impl Default for StorageConfig { #[serde(default)] pub struct FileConfig {} -#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] #[serde(default)] pub struct ObjectStorageCacheConfig { /// The local file cache directory @@ -109,6 +109,18 @@ pub struct S3Config { pub cache: ObjectStorageCacheConfig, } +impl PartialEq for S3Config { + fn eq(&self, other: &Self) -> bool { + self.bucket == other.bucket + && self.root == other.root + && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() + && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret() + && self.endpoint == other.endpoint + && self.region == other.region + && self.cache == other.cache + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct OssConfig { @@ -123,6 +135,17 @@ pub struct OssConfig { pub cache: ObjectStorageCacheConfig, } +impl PartialEq for OssConfig { + fn eq(&self, other: &Self) -> bool { + self.bucket == other.bucket + && self.root == other.root + && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() + && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret() + && self.endpoint == other.endpoint + && self.cache == other.cache + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct AzblobConfig { @@ -138,6 +161,18 @@ pub struct AzblobConfig { pub cache: ObjectStorageCacheConfig, } +impl PartialEq for AzblobConfig { + fn eq(&self, other: &Self) -> bool { + self.container == other.container + && self.root == other.root + && self.account_name.expose_secret() == other.account_name.expose_secret() + && self.account_key.expose_secret() == other.account_key.expose_secret() + && self.endpoint == other.endpoint + && self.sas_token == other.sas_token + && self.cache == other.cache + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct GcsConfig { @@ -151,6 +186,17 @@ pub struct GcsConfig { pub cache: ObjectStorageCacheConfig, } +impl PartialEq for GcsConfig { + fn eq(&self, other: &Self) -> bool { + self.root == other.root + && self.bucket == other.bucket + && self.scope == other.scope + && self.credential_path.expose_secret() == other.credential_path.expose_secret() + && self.endpoint == other.endpoint + && self.cache == other.cache + } +} + impl Default for S3Config { fn default() -> Self { Self { @@ -211,7 +257,7 @@ impl Default for ObjectStoreConfig { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(default)] pub struct DatanodeOptions { pub mode: Mode, @@ -267,7 +313,7 @@ impl Default for DatanodeOptions { } } -impl Configurable<'_> for DatanodeOptions { +impl Configurable for DatanodeOptions { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"]) } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 2e66f3850be8..31f4fadf0326 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -64,8 +64,6 @@ mod table_source; use error::Error; -pub const PER_REQ_MAX_ROW_CNT: usize = 8192; - // TODO: replace this with `GREPTIME_TIMESTAMP` before v0.9 pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 50bd48f5fb70..91cb37c6cf93 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -113,9 +113,21 @@ fn mfp_subgraph( scheduler: &Scheduler, send: &PortCtx, ) { + // all updates that should be send immediately + let mut output_now = vec![]; let run_mfp = || { - let all_updates = eval_mfp_core(input, mfp_plan, now, err_collector); - arrange.write().apply_updates(now, all_updates)?; + let mut all_updates = eval_mfp_core(input, mfp_plan, now, err_collector); + all_updates.retain(|(kv, ts, d)| { + if *ts > now { + true + } else { + output_now.push((kv.clone(), *ts, *d)); + false + } + }); + let future_updates = all_updates; + + arrange.write().apply_updates(now, future_updates)?; Ok(()) }; err_collector.run(run_mfp); @@ -130,13 +142,19 @@ fn mfp_subgraph( std::ops::Bound::Excluded(from), std::ops::Bound::Included(now), ); + + // find all updates that need to be send from arrangement let output_kv = arrange.read().get_updates_in_range(range); + // the output is expected to be key -> empty val let output = output_kv .into_iter() + .chain(output_now) // chain previous immediately send updates .map(|((key, _v), ts, diff)| (key, ts, diff)) .collect_vec(); + // send output send.give(output); + let run_compaction = || { arrange.write().compact_to(now)?; Ok(()) @@ -305,4 +323,42 @@ mod test { ]); run_and_check(&mut state, &mut df, 1..5, expected, output); } + + /// test if mfp operator can run multiple times within same tick + #[test] + fn test_render_mfp_multiple_times() { + let mut df = Hydroflow::new(); + let mut state = DataflowState::default(); + let mut ctx = harness_test_ctx(&mut df, &mut state); + + let (sender, recv) = tokio::sync::broadcast::channel(1000); + let collection = ctx.render_source(recv).unwrap(); + ctx.insert_global(GlobalId::User(1), collection); + let input_plan = Plan::Get { + id: expr::Id::Global(GlobalId::User(1)), + }; + let typ = RelationType::new(vec![ColumnType::new_nullable( + ConcreteDataType::int64_datatype(), + )]); + // filter: col(0)>1 + let mfp = MapFilterProject::new(1) + .filter(vec![ScalarExpr::Column(0).call_binary( + ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()), + BinaryFunc::Gt, + )]) + .unwrap(); + let bundle = ctx + .render_mfp(Box::new(input_plan.with_types(typ)), mfp) + .unwrap(); + + let output = get_output_handle(&mut ctx, bundle); + drop(ctx); + sender.send((Row::new(vec![2.into()]), 0, 1)).unwrap(); + state.run_available_with_schedule(&mut df); + assert_eq!(output.borrow().len(), 1); + output.borrow_mut().clear(); + sender.send((Row::new(vec![3.into()]), 0, 1)).unwrap(); + state.run_available_with_schedule(&mut df); + assert_eq!(output.borrow().len(), 1); + } } diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index e918044c0d91..e28689be4008 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -53,7 +53,8 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); /// broadcast channel capacity, can be important to memory consumption, since this influence how many /// updates can be buffered in memory in the entire dataflow -pub const BROADCAST_CAP: usize = 8192; +/// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this +pub const BROADCAST_CAP: usize = 65535; /// Convert a value that is or can be converted to Datetime to internal timestamp /// diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a359a56702b5..d5489028a916 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -63,7 +63,7 @@ toml.workspace = true tonic.workspace = true [dev-dependencies] -catalog.workspace = true +catalog = { workspace = true, features = ["testing"] } common-test-util.workspace = true datanode.workspace = true futures = "0.3" @@ -71,3 +71,5 @@ meta-srv = { workspace = true, features = ["mock"] } strfmt = "0.2" tower.workspace = true uuid.workspace = true +datatypes.workspace = true +serde_json.workspace = true diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index f0dfac1c7d5c..7907ff20ffe0 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -74,7 +74,7 @@ impl Default for FrontendOptions { } } -impl Configurable<'_> for FrontendOptions { +impl Configurable for FrontendOptions { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["meta_client.metasrv_addrs"]) } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 47eabbe551fe..2d4434222cc5 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -191,7 +191,7 @@ impl Instance { pub fn build_servers( &mut self, - opts: impl Into + for<'de> Configurable<'de>, + opts: impl Into + Configurable, servers: ServerHandlers, ) -> Result<()> { let opts: FrontendOptions = opts.into(); diff --git a/src/frontend/src/pipeline.rs b/src/frontend/src/pipeline.rs index 92fc4efb460a..e94252395ddf 100644 --- a/src/frontend/src/pipeline.rs +++ b/src/frontend/src/pipeline.rs @@ -85,7 +85,7 @@ impl PipelineOperator { ); } - pub async fn create_pipeline_table_if_not_exists(&self, catalog: &str) -> Result<()> { + async fn create_pipeline_table_if_not_exists(&self, catalog: &str) -> Result<()> { if self.get_pipeline_table_from_cache(catalog).is_some() { return Ok(()); } @@ -145,24 +145,21 @@ impl PipelineOperator { } pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option { - // FIXME (qtang): we should impl this self.tables.read().unwrap().get(catalog).cloned() } - pub async fn insert_and_compile( + async fn insert_and_compile( &self, catalog: &str, schema: &str, name: &str, content_type: &str, pipeline: &str, - ) -> Result<()> { - let _compiled_pipeline = PipelineTable::compile_pipeline(pipeline) - .map_err(BoxedError::new) - .context(InsertPipelineSnafu { name })?; + ) -> Result> { self.get_pipeline_table_from_cache(catalog) - // FIXME (qtang): we should add error handling here - .unwrap() + .with_context(|| TableNotFoundSnafu { + table_name: PIPELINE_TABLE_NAME, + })? .insert_and_compile(schema, name, content_type, pipeline) .await .map_err(|e| { @@ -171,8 +168,7 @@ impl PipelineOperator { } BoxedError::new(e) }) - .context(InsertPipelineSnafu { name })?; - Ok(()) + .context(InsertPipelineSnafu { name }) } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index e2f0fdb56254..89e819e78d91 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -39,7 +39,7 @@ use crate::service_config::GrpcOptions; pub struct Services where - T: Into + for<'de> Configurable<'de> + Clone, + T: Into + Configurable + Clone, U: FrontendInstance, { opts: T, @@ -51,7 +51,7 @@ where impl Services where - T: Into + for<'de> Configurable<'de> + Clone, + T: Into + Configurable + Clone, U: FrontendInstance, { pub fn new(opts: T, instance: Arc, plugins: Plugins) -> Self { diff --git a/src/frontend/src/service_config/datanode.rs b/src/frontend/src/service_config/datanode.rs index ccf2b2ebf4c7..3b4de67b48c1 100644 --- a/src/frontend/src/service_config/datanode.rs +++ b/src/frontend/src/service_config/datanode.rs @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct DatanodeOptions { - client: DatanodeClientOptions, + pub client: DatanodeClientOptions, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index c25b411665b3..e714088d89ec 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -13,10 +13,13 @@ // limitations under the License. use std::path::Path; +use std::time::Duration; use common_base::readable_size::ReadableSize; +use common_wal::config::kafka::DatanodeKafkaConfig; use common_wal::config::raft_engine::RaftEngineConfig; +use crate::kafka::log_store::KafkaLogStore; use crate::raft_engine::log_store::RaftEngineLogStore; /// Create a write log for the provided path, used for test. @@ -28,3 +31,14 @@ pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEng }; RaftEngineLogStore::try_new(path, cfg).await.unwrap() } + +/// Create a [KafkaLogStore]. +pub async fn create_kafka_log_store(broker_endpoints: Vec) -> KafkaLogStore { + KafkaLogStore::try_new(&DatanodeKafkaConfig { + broker_endpoints, + linger: Duration::from_millis(1), + ..Default::default() + }) + .await + .unwrap() +} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 76fb794f797c..ce812cfba80f 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -148,7 +148,7 @@ impl Default for MetasrvOptions { } } -impl Configurable<'_> for MetasrvOptions { +impl Configurable for MetasrvOptions { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["wal.broker_endpoints"]) } diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index c71375299c38..dad22c72f9ff 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -38,9 +38,7 @@ use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; -use crate::engine::options::{ - set_index_options_for_data_region, set_memtable_options_for_data_region, -}; +use crate::engine::options::set_data_region_options; use crate::engine::MetricEngineInner; use crate::error::{ AddingFieldColumnSnafu, ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, @@ -478,11 +476,8 @@ impl MetricEngineInner { data_region_request.column_metadatas.push(tsid_col); data_region_request.primary_key = primary_key; - // set index options - set_index_options_for_data_region(&mut data_region_request.options); - - // Set memtable options. - set_memtable_options_for_data_region(&mut data_region_request.options); + // set data region options + set_data_region_options(&mut data_region_request.options); data_region_request } diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 952c923487bf..c42e0376562c 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -26,7 +26,7 @@ use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; use super::MetricEngineInner; -use crate::engine::options::set_index_options_for_data_region; +use crate::engine::options::set_data_region_options; use crate::error::{OpenMitoRegionSnafu, Result}; use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT}; use crate::utils; @@ -80,7 +80,7 @@ impl MetricEngineInner { }; let mut data_region_options = request.options; - set_index_options_for_data_region(&mut data_region_options); + set_data_region_options(&mut data_region_options); let open_data_region_request = RegionOpenRequest { region_dir: data_region_dir, options: data_region_options, diff --git a/src/metric-engine/src/engine/options.rs b/src/metric-engine/src/engine/options.rs index 56981329db29..f22f39271c2e 100644 --- a/src/metric-engine/src/engine/options.rs +++ b/src/metric-engine/src/engine/options.rs @@ -30,20 +30,17 @@ const IGNORE_COLUMN_IDS_FOR_DATA_REGION: [ColumnId; 1] = [ReservedColumnId::tsid /// value and appropriately increasing the size of the index, it results in an improved indexing effect. const SEG_ROW_COUNT_FOR_DATA_REGION: u32 = 256; -/// Set the index options for the data region. -pub fn set_index_options_for_data_region(options: &mut HashMap) { +/// Sets data region specific options. +pub fn set_data_region_options(options: &mut HashMap) { + // Set the index options for the data region. options.insert( "index.inverted_index.ignore_column_ids".to_string(), IGNORE_COLUMN_IDS_FOR_DATA_REGION.iter().join(","), ); - options.insert( "index.inverted_index.segment_row_count".to_string(), SEG_ROW_COUNT_FOR_DATA_REGION.to_string(), ); -} - -/// Set memtable options for the data region. -pub fn set_memtable_options_for_data_region(options: &mut HashMap) { + // Set memtable options for the data region. options.insert("memtable.type".to_string(), "partition_tree".to_string()); } diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index f9fdb5b574a4..3994ebb43985 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [features] default = [] -test = ["common-test-util", "log-store"] +test = ["common-test-util", "log-store", "rstest", "rstest_reuse", "rskafka"] [lints] workspace = true @@ -37,6 +37,7 @@ datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datatypes.workspace = true +dotenv.workspace = true futures.workspace = true humantime-serde.workspace = true index.workspace = true @@ -54,6 +55,9 @@ prost.workspace = true puffin.workspace = true rand.workspace = true regex = "1.5" +rskafka = { workspace = true, optional = true } +rstest = { workspace = true, optional = true } +rstest_reuse = { workspace = true, optional = true } serde.workspace = true serde_json.workspace = true serde_with.workspace = true @@ -71,8 +75,12 @@ uuid.workspace = true common-procedure-test.workspace = true common-test-util.workspace = true criterion = "0.4" +dotenv.workspace = true log-store.workspace = true object-store = { workspace = true, features = ["services-memory"] } +rskafka.workspace = true +rstest.workspace = true +rstest_reuse.workspace = true toml.workspace = true [[bench]] diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 9a5cca209b7a..439b3a2fe0d3 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -22,8 +22,11 @@ use common_base::readable_size::ReadableSize; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; +use common_wal::options::WAL_OPTIONS_KEY; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; +use rstest::rstest; +use rstest_reuse::{self, apply}; use store_api::metadata::ColumnMetadata; use store_api::region_request::{RegionCreateRequest, RegionOpenRequest, RegionPutRequest}; use store_api::storage::RegionId; @@ -32,7 +35,9 @@ use super::*; use crate::region::version::VersionControlData; use crate::test_util::{ build_delete_rows_for_key, build_rows, build_rows_for_key, delete_rows, delete_rows_schema, - flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv, + flush_region, kafka_log_store_factory, multiple_log_store_factories, + prepare_test_for_kafka_log_store, put_rows, raft_engine_log_store_factory, reopen_region, + rows_schema, CreateRequestBuilder, LogStoreFactory, TestEnv, }; #[tokio::test] @@ -83,14 +88,24 @@ async fn test_write_to_region() { put_rows(&engine, region_id, rows).await; } -#[tokio::test] -async fn test_region_replay() { +#[apply(multiple_log_store_factories)] + +async fn test_region_replay(factory: Option) { + use common_wal::options::{KafkaWalOptions, WalOptions}; + common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::with_prefix("region-replay"); + let Some(factory) = factory else { + return; + }; + let mut env = TestEnv::with_prefix("region-replay").with_log_store_factory(factory.clone()); let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); + let topic = prepare_test_for_kafka_log_store(&factory).await; + let request = CreateRequestBuilder::new() + .kafka_topic(topic.clone()) + .build(); + let region_dir = request.region_dir.clone(); let column_schemas = rows_schema(&request); @@ -113,13 +128,24 @@ async fn test_region_replay() { let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let mut options = HashMap::new(); + if let Some(topic) = &topic { + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + })) + .unwrap(), + ); + }; + let result = engine .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), region_dir, - options: HashMap::default(), + options, skip_wal_replay: false, }), ) diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 89d44dc76129..52fb46dfab6a 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -21,6 +21,9 @@ use std::time::Duration; use api::v1::Rows; use common_recordbatch::RecordBatches; use common_time::util::current_time_millis; +use common_wal::options::WAL_OPTIONS_KEY; +use rstest::rstest; +use rstest_reuse::{self, apply}; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -28,8 +31,10 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::engine::listener::{FlushListener, StallListener}; use crate::test_util::{ - build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema, - CreateRequestBuilder, MockWriteBufferManager, TestEnv, + build_rows, build_rows_for_key, flush_region, kafka_log_store_factory, + multiple_log_store_factories, prepare_test_for_kafka_log_store, put_rows, + raft_engine_log_store_factory, reopen_region, rows_schema, CreateRequestBuilder, + LogStoreFactory, MockWriteBufferManager, TestEnv, }; use crate::time_provider::TimeProvider; use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS; @@ -231,13 +236,25 @@ async fn test_flush_empty() { assert_eq!(expected, batches.pretty_print().unwrap()); } -#[tokio::test] -async fn test_flush_reopen_region() { - let mut env = TestEnv::new(); +#[apply(multiple_log_store_factories)] +async fn test_flush_reopen_region(factory: Option) { + use std::collections::HashMap; + + use common_wal::options::{KafkaWalOptions, WalOptions}; + + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + + let mut env = TestEnv::new().with_log_store_factory(factory.clone()); let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); + let topic = prepare_test_for_kafka_log_store(&factory).await; + let request = CreateRequestBuilder::new() + .kafka_topic(topic.clone()) + .build(); let region_dir = request.region_dir.clone(); let column_schemas = rows_schema(&request); @@ -263,7 +280,17 @@ async fn test_flush_reopen_region() { }; check_region(); - reopen_region(&engine, region_id, region_dir, true, Default::default()).await; + let mut options = HashMap::new(); + if let Some(topic) = &topic { + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + })) + .unwrap(), + ); + }; + reopen_region(&engine, region_id, region_dir, true, options).await; check_region(); // Puts again. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index d7c671962c03..3ceff8a297b2 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -33,16 +33,23 @@ use api::v1::value::ValueData; use api::v1::{OpType, Row, Rows, SemanticType}; use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; +use common_telemetry::warn; use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY}; use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; +use log_store::kafka::log_store::KafkaLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::services::Fs; use object_store::util::join_dir; use object_store::ObjectStore; +use rskafka::client::partition::{Compression, UnknownTopicHandling}; +use rskafka::client::{Client, ClientBuilder}; +use rskafka::record::Record; +use rstest_reuse::template; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::region_engine::RegionEngine; use store_api::region_request::{ @@ -75,11 +82,110 @@ pub(crate) fn new_noop_file_purger() -> FilePurgerRef { Arc::new(NoopFilePurger {}) } +pub(crate) fn raft_engine_log_store_factory() -> Option { + Some(LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory)) +} + +pub(crate) fn kafka_log_store_factory() -> Option { + let _ = dotenv::dotenv(); + let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else { + warn!("env GT_KAFKA_ENDPOINTS not found"); + return None; + }; + + let broker_endpoints = broker_endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>(); + + Some(LogStoreFactory::Kafka(KafkaLogStoreFactory { + broker_endpoints, + })) +} + +#[template] +#[rstest] +#[case::with_raft_engine(raft_engine_log_store_factory())] +#[case::with_kafka(kafka_log_store_factory())] +#[tokio::test] +pub(crate) fn multiple_log_store_factories(#[case] factory: Option) {} + +#[derive(Clone)] +pub(crate) struct RaftEngineLogStoreFactory; + +impl RaftEngineLogStoreFactory { + async fn create_log_store>(&self, wal_path: P) -> RaftEngineLogStore { + log_store_util::create_tmp_local_file_log_store(wal_path).await + } +} + +pub(crate) async fn prepare_test_for_kafka_log_store(factory: &LogStoreFactory) -> Option { + if let LogStoreFactory::Kafka(factory) = factory { + let topic = uuid::Uuid::new_v4().to_string(); + let client = factory.client().await; + append_noop_record(&client, &topic).await; + + Some(topic) + } else { + None + } +} + +pub(crate) async fn append_noop_record(client: &Client, topic: &str) { + let partition_client = client + .partition_client(topic, 0, UnknownTopicHandling::Retry) + .await + .unwrap(); + + partition_client + .produce( + vec![Record { + key: None, + value: None, + timestamp: rskafka::chrono::Utc::now(), + headers: Default::default(), + }], + Compression::NoCompression, + ) + .await + .unwrap(); +} +#[derive(Clone)] +pub(crate) struct KafkaLogStoreFactory { + broker_endpoints: Vec, +} + +impl KafkaLogStoreFactory { + async fn create_log_store(&self) -> KafkaLogStore { + log_store_util::create_kafka_log_store(self.broker_endpoints.clone()).await + } + + pub(crate) async fn client(&self) -> Client { + ClientBuilder::new(self.broker_endpoints.clone()) + .build() + .await + .unwrap() + } +} + +#[derive(Clone)] +pub(crate) enum LogStoreFactory { + RaftEngine(RaftEngineLogStoreFactory), + Kafka(KafkaLogStoreFactory), +} + +#[derive(Clone)] +pub(crate) enum LogStoreImpl { + RaftEngine(Arc), + Kafka(Arc), +} + /// Env to test mito engine. pub struct TestEnv { /// Path to store data. data_home: TempDir, - logstore: Option>, + log_store: Option, + log_store_factory: LogStoreFactory, object_store_manager: Option, } @@ -94,7 +200,8 @@ impl TestEnv { pub fn new() -> TestEnv { TestEnv { data_home: create_temp_dir(""), - logstore: None, + log_store: None, + log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, } } @@ -103,7 +210,8 @@ impl TestEnv { pub fn with_prefix(prefix: &str) -> TestEnv { TestEnv { data_home: create_temp_dir(prefix), - logstore: None, + log_store: None, + log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, } } @@ -112,13 +220,16 @@ impl TestEnv { pub fn with_data_home(data_home: TempDir) -> TestEnv { TestEnv { data_home, - logstore: None, + log_store: None, + log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, } } - pub fn get_logstore(&self) -> Option> { - self.logstore.clone() + /// Overwrites the original `log_store_factory`. + pub(crate) fn with_log_store_factory(mut self, log_store_factory: LogStoreFactory) -> TestEnv { + self.log_store_factory = log_store_factory; + self } pub fn get_object_store(&self) -> Option { @@ -139,24 +250,41 @@ impl TestEnv { pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; - let logstore = Arc::new(log_store); let object_store_manager = Arc::new(object_store_manager); - self.logstore = Some(logstore.clone()); + self.log_store = Some(log_store.clone()); self.object_store_manager = Some(object_store_manager.clone()); let data_home = self.data_home().display().to_string(); - MitoEngine::new(&data_home, config, logstore, object_store_manager) - .await - .unwrap() + + match log_store { + LogStoreImpl::RaftEngine(log_store) => { + MitoEngine::new(&data_home, config, log_store, object_store_manager) + .await + .unwrap() + } + LogStoreImpl::Kafka(log_store) => { + MitoEngine::new(&data_home, config, log_store, object_store_manager) + .await + .unwrap() + } + } } /// Creates a new engine with specific config and existing logstore and object store manager. pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine { - let logstore = self.logstore.as_ref().unwrap().clone(); let object_store_manager = self.object_store_manager.as_ref().unwrap().clone(); let data_home = self.data_home().display().to_string(); - MitoEngine::new(&data_home, config, logstore, object_store_manager) - .await - .unwrap() + match self.log_store.as_ref().unwrap().clone() { + LogStoreImpl::RaftEngine(log_store) => { + MitoEngine::new(&data_home, config, log_store, object_store_manager) + .await + .unwrap() + } + LogStoreImpl::Kafka(log_store) => { + MitoEngine::new(&data_home, config, log_store, object_store_manager) + .await + .unwrap() + } + } } /// Creates a new engine with specific config and manager/listener/purge_scheduler under this env. @@ -168,24 +296,36 @@ impl TestEnv { ) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; - let logstore = Arc::new(log_store); let object_store_manager = Arc::new(object_store_manager); - self.logstore = Some(logstore.clone()); + self.log_store = Some(log_store.clone()); self.object_store_manager = Some(object_store_manager.clone()); let data_home = self.data_home().display().to_string(); - MitoEngine::new_for_test( - &data_home, - config, - logstore, - object_store_manager, - manager, - listener, - Arc::new(StdTimeProvider), - ) - .await - .unwrap() + match log_store { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + Arc::new(StdTimeProvider), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + Arc::new(StdTimeProvider), + ) + .await + .unwrap(), + } } pub async fn create_engine_with_multiple_object_stores( @@ -195,7 +335,8 @@ impl TestEnv { listener: Option, custom_storage_names: &[&str], ) -> MitoEngine { - let (logstore, mut object_store_manager) = self.create_log_and_object_store_manager().await; + let (log_store, mut object_store_manager) = + self.create_log_and_object_store_manager().await; for storage_name in custom_storage_names { let data_path = self .data_home @@ -210,23 +351,35 @@ impl TestEnv { let object_store = ObjectStore::new(builder).unwrap().finish(); object_store_manager.add(storage_name, object_store); } - let logstore = Arc::new(logstore); let object_store_manager = Arc::new(object_store_manager); - self.logstore = Some(logstore.clone()); + self.log_store = Some(log_store.clone()); self.object_store_manager = Some(object_store_manager.clone()); let data_home = self.data_home().display().to_string(); - MitoEngine::new_for_test( - &data_home, - config, - logstore, - object_store_manager, - manager, - listener, - Arc::new(StdTimeProvider), - ) - .await - .unwrap() + match log_store { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + Arc::new(StdTimeProvider), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + Arc::new(StdTimeProvider), + ) + .await + .unwrap(), + } } /// Creates a new engine with specific config and manager/listener/time provider under this env. @@ -239,50 +392,82 @@ impl TestEnv { ) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; - let logstore = Arc::new(log_store); let object_store_manager = Arc::new(object_store_manager); - self.logstore = Some(logstore.clone()); + self.log_store = Some(log_store.clone()); self.object_store_manager = Some(object_store_manager.clone()); let data_home = self.data_home().display().to_string(); - MitoEngine::new_for_test( - &data_home, - config, - logstore, - object_store_manager, - manager, - listener, - time_provider.clone(), - ) - .await - .unwrap() + match log_store { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + time_provider.clone(), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + time_provider.clone(), + ) + .await + .unwrap(), + } } /// Reopen the engine. pub async fn reopen_engine(&mut self, engine: MitoEngine, config: MitoConfig) -> MitoEngine { engine.stop().await.unwrap(); - MitoEngine::new( - &self.data_home().display().to_string(), - config, - self.logstore.clone().unwrap(), - self.object_store_manager.clone().unwrap(), - ) - .await - .unwrap() + match self.log_store.as_ref().unwrap().clone() { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( + &self.data_home().display().to_string(), + config, + log_store, + self.object_store_manager.clone().unwrap(), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new( + &self.data_home().display().to_string(), + config, + log_store, + self.object_store_manager.clone().unwrap(), + ) + .await + .unwrap(), + } } /// Open the engine. pub async fn open_engine(&mut self, config: MitoConfig) -> MitoEngine { - MitoEngine::new( - &self.data_home().display().to_string(), - config, - self.logstore.clone().unwrap(), - self.object_store_manager.clone().unwrap(), - ) - .await - .unwrap() + match self.log_store.as_ref().unwrap().clone() { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( + &self.data_home().display().to_string(), + config, + log_store, + self.object_store_manager.clone().unwrap(), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new( + &self.data_home().display().to_string(), + config, + log_store, + self.object_store_manager.clone().unwrap(), + ) + .await + .unwrap(), + } } /// Only initializes the object store manager, returns the default object store. @@ -297,25 +482,44 @@ impl TestEnv { let data_home = self.data_home().display().to_string(); config.sanitize(&data_home).unwrap(); - WorkerGroup::start( - Arc::new(config), - Arc::new(log_store), - Arc::new(object_store_manager), - ) - .await - .unwrap() + + match log_store { + LogStoreImpl::RaftEngine(log_store) => { + WorkerGroup::start(Arc::new(config), log_store, Arc::new(object_store_manager)) + .await + .unwrap() + } + LogStoreImpl::Kafka(log_store) => { + WorkerGroup::start(Arc::new(config), log_store, Arc::new(object_store_manager)) + .await + .unwrap() + } + } } /// Returns the log store and object store manager. - async fn create_log_and_object_store_manager( - &self, - ) -> (RaftEngineLogStore, ObjectStoreManager) { + async fn create_log_and_object_store_manager(&self) -> (LogStoreImpl, ObjectStoreManager) { let data_home = self.data_home.path(); let wal_path = data_home.join("wal"); - let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await; - let object_store_manager = self.create_object_store_manager(); - (log_store, object_store_manager) + + match &self.log_store_factory { + LogStoreFactory::RaftEngine(factory) => { + let log_store = factory.create_log_store(wal_path).await; + ( + LogStoreImpl::RaftEngine(Arc::new(log_store)), + object_store_manager, + ) + } + LogStoreFactory::Kafka(factory) => { + let log_store = factory.create_log_store().await; + + ( + LogStoreImpl::Kafka(Arc::new(log_store)), + object_store_manager, + ) + } + } } fn create_object_store_manager(&self) -> ObjectStoreManager { @@ -399,6 +603,8 @@ pub struct CreateRequestBuilder { all_not_null: bool, engine: String, ts_type: ConcreteDataType, + /// kafka topic name + kafka_topic: Option, } impl Default for CreateRequestBuilder { @@ -412,6 +618,7 @@ impl Default for CreateRequestBuilder { all_not_null: false, engine: MITO_ENGINE_NAME.to_string(), ts_type: ConcreteDataType::timestamp_millisecond_datatype(), + kafka_topic: None, } } } @@ -464,6 +671,12 @@ impl CreateRequestBuilder { self } + #[must_use] + pub fn kafka_topic(mut self, topic: Option) -> Self { + self.kafka_topic = topic; + self + } + pub fn build(&self) -> RegionCreateRequest { let mut column_id = 0; let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1); @@ -504,12 +717,21 @@ impl CreateRequestBuilder { semantic_type: SemanticType::Timestamp, column_id, }); - + let mut options = self.options.clone(); + if let Some(topic) = &self.kafka_topic { + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + }); + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&wal_options).unwrap(), + ); + } RegionCreateRequest { engine: self.engine.to_string(), column_metadatas, primary_key: self.primary_key.clone().unwrap_or(primary_key), - options: self.options.clone(), + options, region_dir: self.region_dir.clone(), } } diff --git a/src/partition/src/partition.rs b/src/partition/src/partition.rs index 6735ccf6d9f1..28cda6a817b4 100644 --- a/src/partition/src/partition.rs +++ b/src/partition/src/partition.rs @@ -63,7 +63,7 @@ impl Display for PartitionBound { match self { Self::Value(v) => write!(f, "{}", v), Self::MaxValue => write!(f, "MAXVALUE"), - Self::Expr(e) => write!(f, "{:?}", e), + Self::Expr(e) => write!(f, "{}", e), } } } @@ -72,8 +72,7 @@ impl Display for PartitionDef { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "({}) VALUES LESS THAN ({})", - self.partition_columns.iter().join(", "), + "{}", self.partition_bounds .iter() .map(|b| format!("{b}")) @@ -188,7 +187,7 @@ mod tests { PartitionBound::Value(1_i32.into()), ], }; - assert_eq!("(a, b) VALUES LESS THAN (MAXVALUE, 1)", def.to_string()); + assert_eq!("MAXVALUE, 1", def.to_string()); let partition: MetaPartition = def.try_into().unwrap(); assert_eq!( diff --git a/src/pipeline/src/mng/table.rs b/src/pipeline/src/mng/table.rs index 7863982a66b0..6034b85205ab 100644 --- a/src/pipeline/src/mng/table.rs +++ b/src/pipeline/src/mng/table.rs @@ -300,7 +300,7 @@ impl PipelineTable { name: &str, content_type: &str, pipeline: &str, - ) -> Result<()> { + ) -> Result> { let compiled_pipeline = Self::compile_pipeline(pipeline)?; self.insert_pipeline_to_pipeline_table(schema, name, content_type, pipeline) @@ -308,10 +308,10 @@ impl PipelineTable { self.pipelines.write().unwrap().insert( Self::generate_pipeline_cache_key(schema, name), - compiled_pipeline, + compiled_pipeline.clone(), ); - Ok(()) + Ok(compiled_pipeline) } async fn find_pipeline_by_name(&self, schema: &str, name: &str) -> Result { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 2473cfc320ad..887f04a3b218 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -53,8 +53,8 @@ object-store.workspace = true operator.workspace = true prost.workspace = true query.workspace = true -rstest = "0.17" -rstest_reuse = "0.5" +rstest.workspace = true +rstest_reuse.workspace = true serde_json.workspace = true servers = { workspace = true, features = ["testing"] } session.workspace = true diff --git a/tests-integration/fixtures/minio/docker-compose-standalone.yml b/tests-integration/fixtures/minio/docker-compose-standalone.yml new file mode 100644 index 000000000000..139cb916a950 --- /dev/null +++ b/tests-integration/fixtures/minio/docker-compose-standalone.yml @@ -0,0 +1,18 @@ +version: '3.8' +services: + minio: + image: bitnami/minio:2024 + ports: + - '9000:9000' + - '9001:9001' + environment: + - MINIO_ROOT_USER=superpower_ci_user + - MINIO_ROOT_PASSWORD=superpower_password + - MINIO_DEFAULT_BUCKETS=greptime + - BITNAMI_DEBUG=true + volumes: + - 'minio_data:/bitnami/minio/data' + +volumes: + minio_data: + driver: local diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index d3e700151345..5def9351d0c9 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -26,8 +26,3 @@ pub mod test_util; pub mod standalone; #[cfg(test)] mod tests; - -#[cfg(test)] -// allowed because https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate -#[allow(clippy::single_component_path_imports)] -use rstest_reuse; diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 7cbd640820b1..4c1b4641c9f4 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -21,16 +21,13 @@ use std::time::Duration; use auth::UserProviderRef; use axum::Router; use catalog::kvbackend::KvBackendCatalogManager; -use client::Database; use common_base::secrets::ExposeSecret; use common_config::Configurable; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; -use common_query::OutputData; use common_runtime::Builder as RuntimeBuilder; use common_telemetry::warn; use common_test_util::ports; -use common_test_util::recordbatch::{check_output_stream, ExpectedOutput}; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use common_wal::config::DatanodeWalConfig; use datanode::config::{ @@ -690,25 +687,3 @@ where test(endpoints).await } - -pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) { - let output = db.sql(sql).await.unwrap(); - let output = output.data; - - match (&output, expected) { - (OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => { - assert_eq!( - *x, y, - r#" -expected: {y} -actual: {x} -"# - ) - } - (OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x)) - | (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => { - check_output_stream(output, x).await - } - _ => panic!(), - } -} diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 7d1f9d57768f..33332170db16 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -28,6 +28,7 @@ use common_grpc::channel_manager::ClientTlsOption; use common_query::Output; use common_recordbatch::RecordBatches; use common_runtime::Runtime; +use common_test_util::find_workspace_path; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::GrpcServerConfig; use servers::http::prometheus::{ @@ -732,10 +733,7 @@ async fn to_batch(output: Output) -> String { } pub async fn test_grpc_tls_config(store_type: StorageType) { - let comm_dir = std::path::PathBuf::from_iter([ - std::env!("CARGO_RUSTC_CURRENT_DIR"), - "src/common/grpc/tests/tls", - ]); + let comm_dir = find_workspace_path("/src/common/grpc/tests/tls"); let ca_path = comm_dir.join("ca.pem").to_str().unwrap().to_string(); let server_cert_path = comm_dir.join("server.pem").to_str().unwrap().to_string(); let server_key_path = comm_dir.join("server.key").to_str().unwrap().to_string(); diff --git a/tests/cases/standalone/common/partition.result b/tests/cases/standalone/common/partition.result index 9c76a87df100..53b30056b879 100644 --- a/tests/cases/standalone/common/partition.result +++ b/tests/cases/standalone/common/partition.result @@ -14,13 +14,13 @@ Affected Rows: 0 -- SQLNESS REPLACE (\d{13}) ID SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; -+---------------+--------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+ -| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | -+---------------+--------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+ -| greptime | public | my_table | p0 | (a) VALUES LESS THAN (PartitionExpr { lhs: Column("a"), op: Lt, rhs: Value(Int32(1000)) }) | ID | -| greptime | public | my_table | p1 | (a) VALUES LESS THAN (PartitionExpr { lhs: Column("a"), op: GtEq, rhs: Value(Int32(2000)) }) | ID | -| greptime | public | my_table | p2 | (a) VALUES LESS THAN (PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("a"), op: GtEq, rhs: Value(Int32(1000)) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("a"), op: Lt, rhs: Value(Int32(2000)) }) }) | ID | -+---------------+--------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+ ++---------------+--------------+------------+----------------+------------------------+-----------------------+ +| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | ++---------------+--------------+------------+----------------+------------------------+-----------------------+ +| greptime | public | my_table | p0 | a < 1000 | ID | +| greptime | public | my_table | p1 | a >= 2000 | ID | +| greptime | public | my_table | p2 | a >= 1000 AND a < 2000 | ID | ++---------------+--------------+------------+----------------+------------------------+-----------------------+ -- SQLNESS REPLACE (\d{13}) REGION_ID -- SQLNESS REPLACE (\d{1}) PEER_ID @@ -120,11 +120,11 @@ Affected Rows: 0 -- SQLNESS REPLACE (\d{13}) ID SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; -+---------------+--------------+------------+----------------+---------------------------------+-----------------------+ -| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | -+---------------+--------------+------------+----------------+---------------------------------+-----------------------+ -| greptime | public | my_table | p0 | (a) VALUES LESS THAN (MAXVALUE) | ID | -+---------------+--------------+------------+----------------+---------------------------------+-----------------------+ ++---------------+--------------+------------+----------------+----------------------+-----------------------+ +| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | ++---------------+--------------+------------+----------------+----------------------+-----------------------+ +| greptime | public | my_table | p0 | MAXVALUE | ID | ++---------------+--------------+------------+----------------+----------------------+-----------------------+ -- SQLNESS REPLACE (\d{13}) REGION_ID -- SQLNESS REPLACE (\d{1}) PEER_ID