diff --git a/Cargo.lock b/Cargo.lock index 29ea367b9825..768c50c0d756 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1768,6 +1768,7 @@ dependencies = [ "arc-swap", "async-trait", "chrono-tz 0.6.3", + "common-base", "common-catalog", "common-error", "common-macro", @@ -1790,6 +1791,7 @@ dependencies = [ "session", "snafu", "statrs", + "store-api", "table", ] @@ -1900,6 +1902,7 @@ dependencies = [ "base64 0.21.5", "bytes", "chrono", + "common-base", "common-catalog", "common-error", "common-grpc-expr", @@ -9122,6 +9125,7 @@ dependencies = [ "common-telemetry", "common-time", "derive_builder 0.12.0", + "snafu", "sql", ] diff --git a/src/client/src/region.rs b/src/client/src/region.rs index a9a337808e37..819c4453f9a2 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -197,7 +197,7 @@ impl RegionRequester { check_response_header(header)?; - Ok(affected_rows) + Ok(affected_rows as _) } pub async fn handle(&self, request: RegionRequest) -> Result { diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index a111bf3c7376..506c273c1e2d 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -21,6 +21,8 @@ pub mod readable_size; use core::any::Any; use std::sync::{Arc, Mutex, MutexGuard}; +pub type AffectedRows = usize; + pub use bit_vec::BitVec; /// [`Plugins`] is a wrapper of Arc contents. diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 93fef8593cc1..d6570782d489 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -12,6 +12,7 @@ api.workspace = true arc-swap = "1.0" async-trait.workspace = true chrono-tz = "0.6" +common-base.workspace = true common-catalog.workspace = true common-error.workspace = true common-macro.workspace = true @@ -33,6 +34,7 @@ serde_json.workspace = true session.workspace = true snafu.workspace = true statrs = "0.16" +store-api.workspace = true table.workspace = true [dev-dependencies] diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index 629f55e32235..11175a87bd73 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use async_trait::async_trait; +use common_base::AffectedRows; use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; use common_query::error::Result; use session::context::QueryContextRef; -use table::requests::{DeleteRequest, InsertRequest}; - -pub type AffectedRows = usize; +use store_api::storage::RegionId; +use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest}; /// A trait for handling table mutations in `QueryEngine`. #[async_trait] @@ -30,6 +30,28 @@ pub trait TableMutationHandler: Send + Sync { /// Delete rows from the table. async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result; + + /// Trigger a flush task for table. + async fn flush(&self, request: FlushTableRequest, ctx: QueryContextRef) + -> Result; + + /// Trigger a compaction task for table. + async fn compact( + &self, + request: CompactTableRequest, + ctx: QueryContextRef, + ) -> Result; + + /// Trigger a flush task for a table region. + async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef) + -> Result; + + /// Trigger a compaction task for a table region. + async fn compact_region( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> Result; } /// A trait for handling procedure service requests in `QueryEngine`. diff --git a/src/common/function/src/helper.rs b/src/common/function/src/helper.rs index 6f549d6619e3..e4a1cd1af8b9 100644 --- a/src/common/function/src/helper.rs +++ b/src/common/function/src/helper.rs @@ -12,8 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_query::error::{InvalidInputTypeSnafu, Result}; use common_query::prelude::{Signature, TypeSignature, Volatility}; use datatypes::prelude::ConcreteDataType; +use datatypes::types::cast::cast; +use datatypes::value::ValueRef; +use snafu::ResultExt; /// Create a function signature with oneof signatures of interleaving two arguments. pub fn one_of_sigs2(args1: Vec, args2: Vec) -> Signature { @@ -27,3 +31,15 @@ pub fn one_of_sigs2(args1: Vec, args2: Vec) Signature::one_of(sigs, Volatility::Immutable) } + +/// Cast a [`ValueRef`] to u64, returns `None` if fails +pub fn cast_u64(value: &ValueRef) -> Result> { + cast((*value).into(), &ConcreteDataType::uint64_datatype()) + .context(InvalidInputTypeSnafu { + err_msg: format!( + "Failed to cast input into uint64, actual type: {:#?}", + value.data_type(), + ), + }) + .map(|v| v.as_u64()) +} diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index 418509dc52e9..469f1ffa5ade 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -32,11 +32,19 @@ impl FunctionState { use api::v1::meta::ProcedureStatus; use async_trait::async_trait; + use common_base::AffectedRows; use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; use common_query::error::Result; + use session::context::QueryContextRef; + use store_api::storage::RegionId; + use table::requests::{ + CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, + }; - use crate::handlers::ProcedureServiceHandler; + use crate::handlers::{ProcedureServiceHandler, TableMutationHandler}; struct MockProcedureServiceHandler; + struct MockTableMutationHandler; + const ROWS: usize = 42; #[async_trait] impl ProcedureServiceHandler for MockProcedureServiceHandler { @@ -56,8 +64,59 @@ impl FunctionState { } } + #[async_trait] + impl TableMutationHandler for MockTableMutationHandler { + async fn insert( + &self, + _request: InsertRequest, + _ctx: QueryContextRef, + ) -> Result { + Ok(ROWS) + } + + async fn delete( + &self, + _request: DeleteRequest, + _ctx: QueryContextRef, + ) -> Result { + Ok(ROWS) + } + + async fn flush( + &self, + _request: FlushTableRequest, + _ctx: QueryContextRef, + ) -> Result { + Ok(ROWS) + } + + async fn compact( + &self, + _request: CompactTableRequest, + _ctx: QueryContextRef, + ) -> Result { + Ok(ROWS) + } + + async fn flush_region( + &self, + _region_id: RegionId, + _ctx: QueryContextRef, + ) -> Result { + Ok(ROWS) + } + + async fn compact_region( + &self, + _region_id: RegionId, + _ctx: QueryContextRef, + ) -> Result { + Ok(ROWS) + } + } + Self { - table_mutation_handler: None, + table_mutation_handler: Some(Arc::new(MockTableMutationHandler)), procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)), } } diff --git a/src/common/function/src/system/procedure_state.rs b/src/common/function/src/system/procedure_state.rs index 4f6305078465..1eeffe63d468 100644 --- a/src/common/function/src/system/procedure_state.rs +++ b/src/common/function/src/system/procedure_state.rs @@ -13,9 +13,9 @@ // limitations under the License. use std::fmt; -use std::sync::Arc; use api::v1::meta::ProcedureStatus; +use common_macro::admin_fn; use common_meta::rpc::procedure::ProcedureStateResponse; use common_query::error::Error::ThreadJoin; use common_query::error::{ @@ -25,24 +25,14 @@ use common_query::error::{ use common_query::prelude::{Signature, Volatility}; use common_telemetry::error; use datatypes::prelude::*; -use datatypes::vectors::{ConstantVector, Helper, StringVector, VectorRef}; +use datatypes::vectors::VectorRef; use serde::Serialize; +use session::context::QueryContextRef; use snafu::{ensure, Location, OptionExt}; +use crate::ensure_greptime; use crate::function::{Function, FunctionContext}; - -const NAME: &str = "procedure_state"; - -/// A function to query procedure state by its id. -/// Such as `procedure_state(pid)`. -#[derive(Clone, Debug, Default)] -pub struct ProcedureStateFunction; - -impl fmt::Display for ProcedureStateFunction { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "PROCEDURE_STATE") - } -} +use crate::handlers::ProcedureServiceHandlerRef; #[derive(Serialize)] struct ProcedureStateJson { @@ -51,105 +41,58 @@ struct ProcedureStateJson { error: Option, } -impl Function for ProcedureStateFunction { - fn name(&self) -> &str { - NAME - } - - fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { - Ok(ConcreteDataType::string_datatype()) - } - - fn signature(&self) -> Signature { - Signature::uniform( - 1, - vec![ConcreteDataType::string_datatype()], - Volatility::Immutable, - ) - } - - fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { - crate::ensure_greptime!(func_ctx); - - ensure!( - columns.len() == 1, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect 1, have: {}", - columns.len() - ), - } - ); +/// A function to query procedure state by its id. +/// Such as `procedure_state(pid)`. +#[admin_fn( + name = "ProcedureStateFunction", + display_name = "procedure_state", + sig_fn = "signature", + ret = "string" +)] +pub(crate) async fn procedure_state( + procedure_service_handler: &ProcedureServiceHandlerRef, + _ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + ensure!( + params.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, have: {}", + params.len() + ), + } + ); - let pids = columns[0].clone(); - let expect_len = pids.len(); - let is_const = pids.is_const(); - - match pids.data_type() { - ConcreteDataType::String(_) => { - // TODO(dennis): datafusion UDF doesn't support async function currently - std::thread::spawn(move || { - let pids: &StringVector = if is_const { - let pids: &ConstantVector = unsafe { Helper::static_cast(&pids) }; - unsafe { Helper::static_cast(pids.inner()) } - } else { - unsafe { Helper::static_cast(&pids) } - }; - - let procedure_service_handler = func_ctx - .state - .procedure_service_handler - .as_ref() - .context(MissingProcedureServiceHandlerSnafu)?; - - let states = pids - .iter_data() - .map(|pid| { - if let Some(pid) = pid { - let ProcedureStateResponse { status, error, .. } = - common_runtime::block_on_read(async move { - procedure_service_handler.query_procedure_state(pid).await - })?; - - let status = ProcedureStatus::try_from(status) - .map(|v| v.as_str_name()) - .unwrap_or("Unknown"); - - let state = ProcedureStateJson { - status: status.to_string(), - error: if error.is_empty() { None } else { Some(error) }, - }; - - Ok(Some(serde_json::to_string(&state).unwrap_or_default())) - } else { - Ok(None) - } - }) - .collect::>>()?; - - let results: VectorRef = Arc::new(StringVector::from(states)); - - if is_const { - Ok(Arc::new(ConstantVector::new(results, expect_len)) as _) - } else { - Ok(results) - } - }) - .join() - .map_err(|e| { - error!(e; "Join thread error"); - ThreadJoin { - location: Location::default(), - } - })? - } - _ => UnsupportedInputDataTypeSnafu { - function: NAME, - datatypes: columns.iter().map(|c| c.data_type()).collect::>(), - } - .fail(), + let ValueRef::String(pid) = params[0] else { + return UnsupportedInputDataTypeSnafu { + function: "procedure_state", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), } - } + .fail(); + }; + + let ProcedureStateResponse { status, error, .. } = + procedure_service_handler.query_procedure_state(pid).await?; + let status = ProcedureStatus::try_from(status) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"); + + let state = ProcedureStateJson { + status: status.to_string(), + error: if error.is_empty() { None } else { Some(error) }, + }; + let json = serde_json::to_string(&state).unwrap_or_default(); + + Ok(Value::from(json)) +} + +fn signature() -> Signature { + Signature::uniform( + 1, + vec![ConcreteDataType::string_datatype()], + Volatility::Immutable, + ) } #[cfg(test)] diff --git a/src/common/function/src/table.rs b/src/common/function/src/table.rs index 64b8fd20096c..244c395cb4c4 100644 --- a/src/common/function/src/table.rs +++ b/src/common/function/src/table.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod flush_compact_region; +mod flush_compact_table; mod migrate_region; use std::sync::Arc; +use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; +use flush_compact_table::{CompactTableFunction, FlushTableFunction}; use migrate_region::MigrateRegionFunction; use crate::function_registry::FunctionRegistry; @@ -27,5 +31,9 @@ impl TableFunction { /// Register all table functions to [`FunctionRegistry`]. pub fn register(registry: &FunctionRegistry) { registry.register(Arc::new(MigrateRegionFunction)); + registry.register(Arc::new(FlushRegionFunction)); + registry.register(Arc::new(CompactRegionFunction)); + registry.register(Arc::new(FlushTableFunction)); + registry.register(Arc::new(CompactTableFunction)); } } diff --git a/src/common/function/src/table/flush_compact_region.rs b/src/common/function/src/table/flush_compact_region.rs new file mode 100644 index 000000000000..7be1c68a2ce4 --- /dev/null +++ b/src/common/function/src/table/flush_compact_region.rs @@ -0,0 +1,148 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +use common_macro::admin_fn; +use common_query::error::Error::ThreadJoin; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::{Signature, Volatility}; +use common_telemetry::error; +use datatypes::prelude::*; +use datatypes::vectors::VectorRef; +use session::context::QueryContextRef; +use snafu::{ensure, Location, OptionExt}; +use store_api::storage::RegionId; + +use crate::ensure_greptime; +use crate::function::{Function, FunctionContext}; +use crate::handlers::TableMutationHandlerRef; +use crate::helper::cast_u64; + +macro_rules! define_region_function { + ($name: expr, $display_name_str: expr, $display_name: ident) => { + /// A function to $display_name + #[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")] + pub(crate) async fn $display_name( + table_mutation_handler: &TableMutationHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], + ) -> Result { + ensure!( + params.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, have: {}", + params.len() + ), + } + ); + + let Some(region_id) = cast_u64(¶ms[0])? else { + return UnsupportedInputDataTypeSnafu { + function: $display_name_str, + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + + let affected_rows = table_mutation_handler + .$display_name(RegionId::from_u64(region_id), query_ctx.clone()) + .await?; + + Ok(Value::from(affected_rows as u64)) + } + }; +} + +define_region_function!("FlushRegionFunction", "flush_region", flush_region); + +define_region_function!("CompactRegionFunction", "compact_region", compact_region); + +fn signature() -> Signature { + Signature::uniform(1, ConcreteDataType::numerics(), Volatility::Immutable) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use datatypes::vectors::UInt64Vector; + + use super::*; + + macro_rules! define_region_function_test { + ($name: ident, $func: ident) => { + paste::paste! { + #[test] + fn []() { + let f = $func; + assert_eq!(stringify!($name), f.name()); + assert_eq!( + ConcreteDataType::uint64_datatype(), + f.return_type(&[]).unwrap() + ); + assert!(matches!(f.signature(), + Signature { + type_signature: TypeSignature::Uniform(1, valid_types), + volatility: Volatility::Immutable + } if valid_types == ConcreteDataType::numerics())); + } + + #[test] + fn []() { + let f = $func; + + let args = vec![99]; + + let args = args + .into_iter() + .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + assert_eq!( + "Missing TableMutationHandler, not expected", + result.to_string() + ); + } + + #[test] + fn []() { + let f = $func; + + + let args = vec![99]; + + let args = args + .into_iter() + .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::mock(), &args).unwrap(); + + let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42])); + assert_eq!(expect, result); + } + } + }; + } + + define_region_function_test!(flush_region, FlushRegionFunction); + + define_region_function_test!(compact_region, CompactRegionFunction); +} diff --git a/src/common/function/src/table/flush_compact_table.rs b/src/common/function/src/table/flush_compact_table.rs new file mode 100644 index 000000000000..83c0cbd93c80 --- /dev/null +++ b/src/common/function/src/table/flush_compact_table.rs @@ -0,0 +1,178 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +use common_error::ext::BoxedError; +use common_macro::admin_fn; +use common_query::error::Error::ThreadJoin; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu, + UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::{Signature, Volatility}; +use common_telemetry::error; +use datatypes::prelude::*; +use datatypes::vectors::VectorRef; +use session::context::QueryContextRef; +use session::table_name::table_name_to_full_name; +use snafu::{ensure, Location, OptionExt, ResultExt}; +use table::requests::{CompactTableRequest, FlushTableRequest}; + +use crate::ensure_greptime; +use crate::function::{Function, FunctionContext}; +use crate::handlers::TableMutationHandlerRef; + +macro_rules! define_table_function { + ($name: expr, $display_name_str: expr, $display_name: ident, $func: ident, $request: ident) => { + /// A function to $func table, such as `$display_name(table_name)`. + #[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")] + pub(crate) async fn $display_name( + table_mutation_handler: &TableMutationHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], + ) -> Result { + ensure!( + params.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, have: {}", + params.len() + ), + } + ); + + let ValueRef::String(table_name) = params[0] else { + return UnsupportedInputDataTypeSnafu { + function: $display_name_str, + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + + let (catalog_name, schema_name, table_name) = + table_name_to_full_name(table_name, &query_ctx) + .map_err(BoxedError::new) + .context(TableMutationSnafu)?; + + let affected_rows = table_mutation_handler + .$func( + $request { + catalog_name, + schema_name, + table_name, + }, + query_ctx.clone(), + ) + .await?; + + Ok(Value::from(affected_rows as u64)) + } + }; +} + +define_table_function!( + "FlushTableFunction", + "flush_table", + flush_table, + flush, + FlushTableRequest +); + +define_table_function!( + "CompactTableFunction", + "compact_table", + compact_table, + compact, + CompactTableRequest +); + +fn signature() -> Signature { + Signature::uniform( + 1, + vec![ConcreteDataType::string_datatype()], + Volatility::Immutable, + ) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use datatypes::vectors::{StringVector, UInt64Vector}; + + use super::*; + + macro_rules! define_table_function_test { + ($name: ident, $func: ident) => { + paste::paste!{ + #[test] + fn []() { + let f = $func; + assert_eq!(stringify!($name), f.name()); + assert_eq!( + ConcreteDataType::uint64_datatype(), + f.return_type(&[]).unwrap() + ); + assert!(matches!(f.signature(), + Signature { + type_signature: TypeSignature::Uniform(1, valid_types), + volatility: Volatility::Immutable + } if valid_types == vec![ConcreteDataType::string_datatype()])); + } + + #[test] + fn []() { + let f = $func; + + let args = vec!["test"]; + + let args = args + .into_iter() + .map(|arg| Arc::new(StringVector::from(vec![arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + assert_eq!( + "Missing TableMutationHandler, not expected", + result.to_string() + ); + } + + #[test] + fn []() { + let f = $func; + + + let args = vec!["test"]; + + let args = args + .into_iter() + .map(|arg| Arc::new(StringVector::from(vec![arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::mock(), &args).unwrap(); + + let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42])); + assert_eq!(expect, result); + } + } + } + } + + define_table_function_test!(flush_table, FlushTableFunction); + + define_table_function_test!(compact_table, CompactTableFunction); +} diff --git a/src/common/function/src/table/migrate_region.rs b/src/common/function/src/table/migrate_region.rs index 6447c6de6b3d..85692792ce3b 100644 --- a/src/common/function/src/table/migrate_region.rs +++ b/src/common/function/src/table/migrate_region.rs @@ -15,19 +15,25 @@ use std::fmt::{self}; use std::time::Duration; +use common_macro::admin_fn; use common_meta::rpc::procedure::MigrateRegionRequest; use common_query::error::Error::ThreadJoin; -use common_query::error::{ - InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingProcedureServiceHandlerSnafu, Result, -}; +use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result}; use common_query::prelude::{Signature, TypeSignature, Volatility}; use common_telemetry::logging::error; -use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder}; -use datatypes::value::Value; -use datatypes::vectors::{StringVectorBuilder, VectorRef}; -use snafu::{Location, OptionExt, ResultExt}; - +use datatypes::data_type::DataType; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::{Value, ValueRef}; +use datatypes::vectors::VectorRef; +use session::context::QueryContextRef; +use snafu::{Location, OptionExt}; + +use crate::ensure_greptime; use crate::function::{Function, FunctionContext}; +use crate::handlers::ProcedureServiceHandlerRef; +use crate::helper::cast_u64; + +const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10; /// A function to migrate a region from source peer to target peer. /// Returns the submitted procedure id if success. Only available in cluster mode. @@ -39,137 +45,82 @@ use crate::function::{Function, FunctionContext}; /// - `region_id`: the region id /// - `from_peer`: the source peer id /// - `to_peer`: the target peer id -#[derive(Clone, Debug, Default)] -pub struct MigrateRegionFunction; - -const NAME: &str = "migrate_region"; -const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10; - -fn cast_u64_vector(vector: &VectorRef) -> Result { - vector - .cast(&ConcreteDataType::uint64_datatype()) - .context(InvalidInputTypeSnafu { - err_msg: format!( - "Failed to cast input into uint64, actual type: {:#?}", - vector.data_type(), - ), - }) -} - -impl Function for MigrateRegionFunction { - fn name(&self) -> &str { - NAME - } - - fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { - Ok(ConcreteDataType::string_datatype()) - } - - fn signature(&self) -> Signature { - Signature::one_of( - vec![ - // migrate_region(region_id, from_peer, to_peer) - TypeSignature::Uniform(3, ConcreteDataType::numerics()), - // migrate_region(region_id, from_peer, to_peer, timeout(secs)) - TypeSignature::Uniform(4, ConcreteDataType::numerics()), - ], - Volatility::Immutable, - ) - } - - fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { - crate::ensure_greptime!(func_ctx); - - let (region_ids, from_peers, to_peers, replay_timeouts) = match columns.len() { - 3 => { - let region_ids = cast_u64_vector(&columns[0])?; - let from_peers = cast_u64_vector(&columns[1])?; - let to_peers = cast_u64_vector(&columns[2])?; - - (region_ids, from_peers, to_peers, None) +#[admin_fn( + name = "MigrateRegionFunction", + display_name = "migrate_region", + sig_fn = "signature", + ret = "string" +)] +pub(crate) async fn migrate_region( + procedure_service_handler: &ProcedureServiceHandlerRef, + _ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + let (region_id, from_peer, to_peer, replay_timeout) = match params.len() { + 3 => { + let region_id = cast_u64(¶ms[0])?; + let from_peer = cast_u64(¶ms[1])?; + let to_peer = cast_u64(¶ms[2])?; + + ( + region_id, + from_peer, + to_peer, + Some(DEFAULT_REPLAY_TIMEOUT_SECS), + ) + } + + 4 => { + let region_id = cast_u64(¶ms[0])?; + let from_peer = cast_u64(¶ms[1])?; + let to_peer = cast_u64(¶ms[2])?; + let replay_timeout = cast_u64(¶ms[3])?; + + (region_id, from_peer, to_peer, replay_timeout) + } + + size => { + return InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly 3 or 4, have: {}", + size + ), } - - 4 => { - let region_ids = cast_u64_vector(&columns[0])?; - let from_peers = cast_u64_vector(&columns[1])?; - let to_peers = cast_u64_vector(&columns[2])?; - let replay_timeouts = cast_u64_vector(&columns[3])?; - - (region_ids, from_peers, to_peers, Some(replay_timeouts)) + .fail(); + } + }; + + match (region_id, from_peer, to_peer, replay_timeout) { + (Some(region_id), Some(from_peer), Some(to_peer), Some(replay_timeout)) => { + let pid = procedure_service_handler + .migrate_region(MigrateRegionRequest { + region_id, + from_peer, + to_peer, + replay_timeout: Duration::from_secs(replay_timeout), + }) + .await?; + + match pid { + Some(pid) => Ok(Value::from(pid)), + None => Ok(Value::Null), } + } - size => { - return InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly 3 or 4, have: {}", - size - ), - } - .fail(); - } - }; - - // TODO(dennis): datafusion UDF doesn't support async function currently - std::thread::spawn(move || { - let len = region_ids.len(); - let mut results = StringVectorBuilder::with_capacity(len); - let procedure_service_handler = func_ctx - .state - .procedure_service_handler - .as_ref() - .context(MissingProcedureServiceHandlerSnafu)?; - - for index in 0..len { - let region_id = region_ids.get(index); - let from_peer = from_peers.get(index); - let to_peer = to_peers.get(index); - let replay_timeout = match &replay_timeouts { - Some(replay_timeouts) => replay_timeouts.get(index), - None => Value::UInt64(DEFAULT_REPLAY_TIMEOUT_SECS), - }; - - match (region_id, from_peer, to_peer, replay_timeout) { - ( - Value::UInt64(region_id), - Value::UInt64(from_peer), - Value::UInt64(to_peer), - Value::UInt64(replay_timeout), - ) => { - let pid = common_runtime::block_on_read(async move { - procedure_service_handler - .migrate_region(MigrateRegionRequest { - region_id, - from_peer, - to_peer, - replay_timeout: Duration::from_secs(replay_timeout), - }) - .await - })?; - - results.push(pid.as_deref()) - } - _ => { - results.push(None); - } - } - } - - Ok(results.to_vector()) - }) - .join() - .map_err(|e| { - error!(e; "Join thread error"); - ThreadJoin { - location: Location::default(), - } - })? + _ => Ok(Value::Null), } } -impl fmt::Display for MigrateRegionFunction { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "MIGRATE_REGION") - } +fn signature() -> Signature { + Signature::one_of( + vec![ + // migrate_region(region_id, from_peer, to_peer) + TypeSignature::Uniform(3, ConcreteDataType::numerics()), + // migrate_region(region_id, from_peer, to_peer, timeout(secs)) + TypeSignature::Uniform(4, ConcreteDataType::numerics()), + ], + Volatility::Immutable, + ) } #[cfg(test)] diff --git a/src/common/macro/src/admin_fn.rs b/src/common/macro/src/admin_fn.rs new file mode 100644 index 000000000000..c6452ec060bf --- /dev/null +++ b/src/common/macro/src/admin_fn.rs @@ -0,0 +1,236 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use proc_macro::TokenStream; +use quote::quote; +use syn::spanned::Spanned; +use syn::{ + parse_macro_input, Attribute, AttributeArgs, Ident, ItemFn, Signature, Type, TypePath, + TypeReference, Visibility, +}; + +use crate::utils::{extract_arg_map, extract_input_types, get_ident}; + +/// Internal util macro to early return on error. +macro_rules! ok { + ($item:expr) => { + match $item { + Ok(item) => item, + Err(e) => return e.into_compile_error().into(), + } + }; +} + +/// Internal util macro to to create an error. +macro_rules! error { + ($span:expr, $msg: expr) => { + Err(syn::Error::new($span, $msg)) + }; +} + +pub(crate) fn process_admin_fn(args: TokenStream, input: TokenStream) -> TokenStream { + let mut result = TokenStream::new(); + + // extract arg map + let arg_pairs = parse_macro_input!(args as AttributeArgs); + let arg_span = arg_pairs[0].span(); + let arg_map = ok!(extract_arg_map(arg_pairs)); + + // decompose the fn block + let compute_fn = parse_macro_input!(input as ItemFn); + let ItemFn { + attrs, + vis, + sig, + block, + } = compute_fn; + + // extract fn arg list + let Signature { + inputs, + ident: fn_name, + .. + } = &sig; + + let arg_types = ok!(extract_input_types(inputs)); + if arg_types.len() < 2 { + ok!(error!( + sig.span(), + "Expect at least two argument for admin fn: (handler, query_ctx)" + )); + } + let handler_type = ok!(extract_handler_type(&arg_types)); + + // build the struct and its impl block + // only do this when `display_name` is specified + if let Ok(display_name) = get_ident(&arg_map, "display_name", arg_span) { + let struct_code = build_struct( + attrs, + vis, + fn_name, + ok!(get_ident(&arg_map, "name", arg_span)), + ok!(get_ident(&arg_map, "sig_fn", arg_span)), + ok!(get_ident(&arg_map, "ret", arg_span)), + handler_type, + display_name, + ); + result.extend(struct_code); + } + + // preserve this fn + let input_fn_code: TokenStream = quote! { + #sig { #block } + } + .into(); + + result.extend(input_fn_code); + result +} + +/// Retrieve the handler type, `ProcedureServiceHandlerRef` or `TableMutationHandlerRef`. +fn extract_handler_type(arg_types: &[Type]) -> Result<&Ident, syn::Error> { + match &arg_types[0] { + Type::Reference(TypeReference { elem, .. }) => match &**elem { + Type::Path(TypePath { path, .. }) => Ok(&path + .segments + .first() + .expect("Expected a reference of handler") + .ident), + other => { + error!(other.span(), "Expected a reference of handler") + } + }, + other => { + error!(other.span(), "Expected a reference of handler") + } + } +} + +/// Build the function struct +#[allow(clippy::too_many_arguments)] +fn build_struct( + attrs: Vec, + vis: Visibility, + fn_name: &Ident, + name: Ident, + sig_fn: Ident, + ret: Ident, + handler_type: &Ident, + display_name_ident: Ident, +) -> TokenStream { + let display_name = display_name_ident.to_string(); + let ret = Ident::new(&format!("{ret}_datatype"), ret.span()); + let uppcase_display_name = display_name.to_uppercase(); + // Get the handler name in function state by the argument ident + let (handler, snafu_type) = match handler_type.to_string().as_str() { + "ProcedureServiceHandlerRef" => ( + Ident::new("procedure_service_handler", handler_type.span()), + Ident::new("MissingProcedureServiceHandlerSnafu", handler_type.span()), + ), + + "TableMutationHandlerRef" => ( + Ident::new("table_mutation_handler", handler_type.span()), + Ident::new("MissingTableMutationHandlerSnafu", handler_type.span()), + ), + handler => ok!(error!( + handler_type.span(), + format!("Unknown handler type: {handler}") + )), + }; + + quote! { + #(#attrs)* + #[derive(Debug)] + #vis struct #name; + + impl fmt::Display for #name { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, #uppcase_display_name) + } + } + + + impl Function for #name { + fn name(&self) -> &'static str { + #display_name + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::#ret()) + } + + fn signature(&self) -> Signature { + #sig_fn() + } + + fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + // Ensure under the `greptime` catalog for security + ensure_greptime!(func_ctx); + + let columns_num = columns.len(); + let rows_num = if columns.is_empty() { + 1 + } else { + columns[0].len() + }; + let columns = Vec::from(columns); + + // TODO(dennis): DataFusion doesn't support async UDF currently + std::thread::spawn(move || { + let query_ctx = &func_ctx.query_ctx; + let handler = func_ctx + .state + .#handler + .as_ref() + .context(#snafu_type)?; + + let mut builder = ConcreteDataType::#ret() + .create_mutable_vector(rows_num); + + if columns_num == 0 { + let result = common_runtime::block_on_read(async move { + #fn_name(handler, query_ctx, &[]).await + })?; + + builder.push_value_ref(result.as_value_ref()); + } else { + for i in 0..rows_num { + let args: Vec<_> = columns.iter() + .map(|vector| vector.get_ref(i)) + .collect(); + + let result = common_runtime::block_on_read(async move { + #fn_name(handler, query_ctx, &args).await + })?; + + builder.push_value_ref(result.as_value_ref()); + } + } + + Ok(builder.to_vector()) + }) + .join() + .map_err(|e| { + error!(e; "Join thread error"); + ThreadJoin { + location: Location::default(), + } + })? + + } + + } + } + .into() +} diff --git a/src/common/macro/src/lib.rs b/src/common/macro/src/lib.rs index f33f308a86a9..89b7599e26cb 100644 --- a/src/common/macro/src/lib.rs +++ b/src/common/macro/src/lib.rs @@ -12,17 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod admin_fn; mod aggr_func; mod print_caller; mod range_fn; mod stack_trace_debug; - +mod utils; use aggr_func::{impl_aggr_func_type_store, impl_as_aggr_func_creator}; use print_caller::process_print_caller; use proc_macro::TokenStream; use range_fn::process_range_fn; use syn::{parse_macro_input, DeriveInput}; +use crate::admin_fn::process_admin_fn; + /// Make struct implemented trait [AggrFuncTypeStore], which is necessary when writing UDAF. /// This derive macro is expect to be used along with attribute macro [macro@as_aggr_func_creator]. #[proc_macro_derive(AggrFuncTypeStore)] @@ -68,6 +71,25 @@ pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream { process_range_fn(args, input) } +/// Attribute macro to convert a normal function to SQL administration function. The annotated function +/// should accept: +/// - `&ProcedureServiceHandlerRef` or `&TableMutationHandlerRef` as the first argument, +/// - `&QueryContextRef` as the second argument, and +/// - `&[ValueRef<'_>]` as the third argument which is SQL function input values in each row. +/// Return type must be `common_query::error::Result`. +/// +/// # Example see `common/function/src/system/procedure_state.rs`. +/// +/// # Arguments +/// - `name`: The name of the generated `Function` implementation. +/// - `ret`: The return type of the generated SQL function, it will be transformed into `ConcreteDataType::{ret}_datatype()` result. +/// - `display_name`: The display name of the generated SQL function. +/// - `sig_fn`: the function to returns `Signature` of generated `Function`. +#[proc_macro_attribute] +pub fn admin_fn(args: TokenStream, input: TokenStream) -> TokenStream { + process_admin_fn(args, input) +} + /// Attribute macro to print the caller to the annotated function. /// The caller is printed as its filename and the call site line number. /// diff --git a/src/common/macro/src/range_fn.rs b/src/common/macro/src/range_fn.rs index da0c997eff0a..622e21ef6c73 100644 --- a/src/common/macro/src/range_fn.rs +++ b/src/common/macro/src/range_fn.rs @@ -12,20 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use proc_macro::TokenStream; -use proc_macro2::Span; use quote::quote; -use syn::punctuated::Punctuated; use syn::spanned::Spanned; -use syn::token::Comma; use syn::{ - parse_macro_input, Attribute, AttributeArgs, FnArg, Ident, ItemFn, Meta, MetaNameValue, - NestedMeta, Signature, Type, TypeReference, Visibility, + parse_macro_input, Attribute, AttributeArgs, Ident, ItemFn, Signature, Type, TypeReference, + Visibility, }; -/// Internal util macro to early return on error. +use crate::utils::{extract_arg_map, extract_input_types, get_ident}; + macro_rules! ok { ($item:expr) => { match $item { @@ -89,48 +85,6 @@ pub(crate) fn process_range_fn(args: TokenStream, input: TokenStream) -> TokenSt result } -/// Extract a String <-> Ident map from the attribute args. -fn extract_arg_map(args: Vec) -> Result, syn::Error> { - args.into_iter() - .map(|meta| { - if let NestedMeta::Meta(Meta::NameValue(MetaNameValue { path, lit, .. })) = meta { - let name = path.get_ident().unwrap().to_string(); - let ident = match lit { - syn::Lit::Str(lit_str) => lit_str.parse::(), - _ => Err(syn::Error::new( - lit.span(), - "Unexpected attribute format. Expected `name = \"value\"`", - )), - }?; - Ok((name, ident)) - } else { - Err(syn::Error::new( - meta.span(), - "Unexpected attribute format. Expected `name = \"value\"`", - )) - } - }) - .collect::, syn::Error>>() -} - -/// Helper function to get an Ident from the previous arg map. -fn get_ident(map: &HashMap, key: &str, span: Span) -> Result { - map.get(key) - .cloned() - .ok_or_else(|| syn::Error::new(span, format!("Expect attribute {key} but not found"))) -} - -/// Extract the argument list from the annotated function. -fn extract_input_types(inputs: &Punctuated) -> Result, syn::Error> { - inputs - .iter() - .map(|arg| match arg { - FnArg::Receiver(receiver) => Err(syn::Error::new(receiver.span(), "expected bool")), - FnArg::Typed(pat_type) => Ok(*pat_type.ty.clone()), - }) - .collect() -} - fn build_struct( attrs: Vec, vis: Visibility, @@ -214,7 +168,7 @@ fn build_calc_fn( #( let #range_array_names = RangeArray::try_new(extract_array(&input[#param_numbers])?.to_data().into())?; )* - // TODO(ruihang): add ensure!() + // TODO(ruihang): add ensure!() let mut result_array = Vec::new(); for index in 0..#first_range_array_name.len(){ diff --git a/src/common/macro/src/utils.rs b/src/common/macro/src/utils.rs new file mode 100644 index 000000000000..a4587092efd9 --- /dev/null +++ b/src/common/macro/src/utils.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use proc_macro2::Span; +use syn::punctuated::Punctuated; +use syn::spanned::Spanned; +use syn::token::Comma; +use syn::{FnArg, Ident, Meta, MetaNameValue, NestedMeta, Type}; + +/// Extract a String <-> Ident map from the attribute args. +pub(crate) fn extract_arg_map(args: Vec) -> Result, syn::Error> { + args.into_iter() + .map(|meta| { + if let NestedMeta::Meta(Meta::NameValue(MetaNameValue { path, lit, .. })) = meta { + let name = path.get_ident().unwrap().to_string(); + let ident = match lit { + syn::Lit::Str(lit_str) => lit_str.parse::(), + _ => Err(syn::Error::new( + lit.span(), + "Unexpected attribute format. Expected `name = \"value\"`", + )), + }?; + Ok((name, ident)) + } else { + Err(syn::Error::new( + meta.span(), + "Unexpected attribute format. Expected `name = \"value\"`", + )) + } + }) + .collect::, syn::Error>>() +} + +/// Helper function to get an Ident from the previous arg map. +pub(crate) fn get_ident( + map: &HashMap, + key: &str, + span: Span, +) -> Result { + map.get(key) + .cloned() + .ok_or_else(|| syn::Error::new(span, format!("Expect attribute {key} but not found"))) +} + +/// Extract the argument list from the annotated function. +pub(crate) fn extract_input_types( + inputs: &Punctuated, +) -> Result, syn::Error> { + inputs + .iter() + .map(|arg| match arg { + FnArg::Receiver(receiver) => Err(syn::Error::new(receiver.span(), "expected bool")), + FnArg::Typed(pat_type) => Ok(*pat_type.ty.clone()), + }) + .collect() +} diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 554b0d6d795d..fdab5eae0371 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -18,6 +18,7 @@ async-trait.workspace = true base64.workspace = true bytes.workspace = true chrono.workspace = true +common-base.workspace = true common-catalog.workspace = true common-error.workspace = true common-grpc-expr.workspace = true diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/datanode_manager.rs index 198ce6911c35..4795512f25ec 100644 --- a/src/common/meta/src/datanode_manager.rs +++ b/src/common/meta/src/datanode_manager.rs @@ -15,23 +15,25 @@ use std::sync::Arc; use api::v1::region::{QueryRequest, RegionRequest}; +pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::error::Result; use crate::peer::Peer; -pub type AffectedRows = u64; - +/// The trait for handling requests to datanode. #[async_trait::async_trait] pub trait Datanode: Send + Sync { /// Handles DML, and DDL requests. async fn handle(&self, request: RegionRequest) -> Result; + /// Handles query requests async fn handle_query(&self, request: QueryRequest) -> Result; } pub type DatanodeRef = Arc; +/// Datanode manager #[async_trait::async_trait] pub trait DatanodeManager: Send + Sync { /// Retrieves a target `datanode`. diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 20a006ebce49..605be9dfe320 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use api::v1::region::{QueryRequest, RegionRequest}; +pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; @@ -29,8 +30,6 @@ use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; use crate::wal_options_allocator::WalOptionsAllocator; -pub type AffectedRows = u64; - #[async_trait::async_trait] pub trait MockDatanodeHandler: Sync + Send + Clone { async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result; diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index bf3445a922d9..031f3556552b 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -124,37 +124,45 @@ impl Display for Value { } } -impl Value { - /// Returns data type of the value. - /// - /// # Panics - /// Panics if the data type is not supported. - pub fn data_type(&self) -> ConcreteDataType { - match self { - Value::Null => ConcreteDataType::null_datatype(), - Value::Boolean(_) => ConcreteDataType::boolean_datatype(), - Value::UInt8(_) => ConcreteDataType::uint8_datatype(), - Value::UInt16(_) => ConcreteDataType::uint16_datatype(), - Value::UInt32(_) => ConcreteDataType::uint32_datatype(), - Value::UInt64(_) => ConcreteDataType::uint64_datatype(), - Value::Int8(_) => ConcreteDataType::int8_datatype(), - Value::Int16(_) => ConcreteDataType::int16_datatype(), - Value::Int32(_) => ConcreteDataType::int32_datatype(), - Value::Int64(_) => ConcreteDataType::int64_datatype(), - Value::Float32(_) => ConcreteDataType::float32_datatype(), - Value::Float64(_) => ConcreteDataType::float64_datatype(), - Value::String(_) => ConcreteDataType::string_datatype(), - Value::Binary(_) => ConcreteDataType::binary_datatype(), - Value::Date(_) => ConcreteDataType::date_datatype(), - Value::DateTime(_) => ConcreteDataType::datetime_datatype(), - Value::Time(t) => ConcreteDataType::time_datatype(*t.unit()), - Value::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()), - Value::Interval(v) => ConcreteDataType::interval_datatype(v.unit()), - Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), - Value::Duration(d) => ConcreteDataType::duration_datatype(d.unit()), - Value::Decimal128(d) => ConcreteDataType::decimal128_datatype(d.precision(), d.scale()), +macro_rules! define_data_type_func { + ($struct: ident) => { + /// Returns data type of the value. + /// + /// # Panics + /// Panics if the data type is not supported. + pub fn data_type(&self) -> ConcreteDataType { + match self { + $struct::Null => ConcreteDataType::null_datatype(), + $struct::Boolean(_) => ConcreteDataType::boolean_datatype(), + $struct::UInt8(_) => ConcreteDataType::uint8_datatype(), + $struct::UInt16(_) => ConcreteDataType::uint16_datatype(), + $struct::UInt32(_) => ConcreteDataType::uint32_datatype(), + $struct::UInt64(_) => ConcreteDataType::uint64_datatype(), + $struct::Int8(_) => ConcreteDataType::int8_datatype(), + $struct::Int16(_) => ConcreteDataType::int16_datatype(), + $struct::Int32(_) => ConcreteDataType::int32_datatype(), + $struct::Int64(_) => ConcreteDataType::int64_datatype(), + $struct::Float32(_) => ConcreteDataType::float32_datatype(), + $struct::Float64(_) => ConcreteDataType::float64_datatype(), + $struct::String(_) => ConcreteDataType::string_datatype(), + $struct::Binary(_) => ConcreteDataType::binary_datatype(), + $struct::Date(_) => ConcreteDataType::date_datatype(), + $struct::DateTime(_) => ConcreteDataType::datetime_datatype(), + $struct::Time(t) => ConcreteDataType::time_datatype(*t.unit()), + $struct::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()), + $struct::Interval(v) => ConcreteDataType::interval_datatype(v.unit()), + $struct::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), + $struct::Duration(d) => ConcreteDataType::duration_datatype(d.unit()), + $struct::Decimal128(d) => { + ConcreteDataType::decimal128_datatype(d.precision(), d.scale()) + } + } } - } + }; +} + +impl Value { + define_data_type_func!(Value); /// Returns true if this is a null value. pub fn is_null(&self) -> bool { @@ -250,6 +258,17 @@ impl Value { } } + /// Cast Value to u64. Return None if value is not a valid uint64 data type. + pub fn as_u64(&self) -> Option { + match self { + Value::UInt8(v) => Some(*v as _), + Value::UInt16(v) => Some(*v as _), + Value::UInt32(v) => Some(*v as _), + Value::UInt64(v) => Some(*v), + _ => None, + } + } + /// Returns the logical type of the value. pub fn logical_type_id(&self) -> LogicalTypeId { match self { @@ -938,6 +957,8 @@ macro_rules! impl_as_for_value_ref { } impl<'a> ValueRef<'a> { + define_data_type_func!(ValueRef); + /// Returns true if this is null. pub fn is_null(&self) -> bool { matches!(self, ValueRef::Null) @@ -1143,6 +1164,14 @@ impl<'a> ListValueRef<'a> { ListValueRef::Ref { val } => Value::List(val.clone()), } } + + /// Returns the inner element's data type. + fn datatype(&self) -> ConcreteDataType { + match self { + ListValueRef::Indexed { vector, .. } => vector.data_type(), + ListValueRef::Ref { val } => val.datatype().clone(), + } + } } impl<'a> PartialEq for ListValueRef<'a> { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8f63adee0198..830261bca9af 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -47,7 +47,6 @@ use meta_client::MetaClientOptions; use operator::delete::DeleterRef; use operator::insert::InserterRef; use operator::statement::StatementExecutor; -use operator::table::table_idents_to_full_name; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; @@ -69,6 +68,7 @@ use servers::query_handler::{ }; use servers::server::ServerHandlers; use session::context::QueryContextRef; +use session::table_name::table_idents_to_full_name; use snafu::prelude::*; use sql::dialect::Dialect; use sql::parser::{ParseOptions, ParserContext}; diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 8d3666cbd74f..bb99c2c5ee4a 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -24,6 +24,7 @@ use common_meta::kv_backend::KvBackendRef; use operator::delete::Deleter; use operator::insert::Inserter; use operator::procedure::ProcedureServiceOperator; +use operator::request::Requester; use operator::statement::StatementExecutor; use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; @@ -105,6 +106,11 @@ impl FrontendBuilder { datanode_manager.clone(), )); let deleter = Arc::new(Deleter::new( + catalog_manager.clone(), + partition_manager.clone(), + datanode_manager.clone(), + )); + let requester = Arc::new(Requester::new( catalog_manager.clone(), partition_manager, datanode_manager.clone(), @@ -112,6 +118,7 @@ impl FrontendBuilder { let table_mutation_handler = Arc::new(TableMutationOperator::new( inserter.clone(), deleter.clone(), + requester, )); let procedure_service_handler = Arc::new(ProcedureServiceOperator::new( diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 4ff928fe6d4d..28758504257b 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -79,7 +79,7 @@ impl Datanode for RegionInvoker { check_response_header(response.header) .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - Ok(response.affected_rows) + Ok(response.affected_rows as _) } async fn handle_query(&self, request: QueryRequest) -> MetaResult { diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 4b4feb10b0ea..f4e0e20e8c95 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -98,7 +98,7 @@ impl Deleter { &self, request: TableDeleteRequest, ctx: QueryContextRef, - ) -> Result { + ) -> Result { let catalog = request.catalog_name.as_str(); let schema = request.schema_name.as_str(); let table = request.table_name.as_str(); @@ -143,8 +143,8 @@ impl Deleter { }); let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; - crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows); + let affected_rows = results.into_iter().sum::>()?; + crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows as u64); Ok(affected_rows) } diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 0a3eb5c71d90..f6826a328166 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -67,6 +67,15 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to send request to region"))] + RequestRegion { + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Unsupported region request"))] + UnsupportedRegionRequest { location: Location }, + #[snafu(display("Failed to parse SQL"))] ParseSql { location: Location, @@ -538,6 +547,7 @@ impl ErrorExt for Error { | Error::PrepareFileTable { .. } | Error::InferFileTableSchema { .. } | Error::SchemaIncompatible { .. } + | Error::UnsupportedRegionRequest { .. } | Error::InvalidTableName { .. } => StatusCode::InvalidArguments, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, @@ -565,6 +575,7 @@ impl ErrorExt for Error { | Error::IntoVectors { source, .. } => source.status_code(), Error::RequestInserts { source, .. } => source.status_code(), + Error::RequestRegion { source, .. } => source.status_code(), Error::RequestDeletes { source, .. } => source.status_code(), Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => { diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 55072ddd3c51..9f205ad9ed7c 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -30,6 +30,7 @@ use query::sql::{ infer_file_table_schema, prepare_file_table_files, }; use session::context::QueryContextRef; +use session::table_name::table_idents_to_full_name; use snafu::{ensure, ResultExt}; use sql::ast::{ColumnDef, ColumnOption, TableConstraint}; use sql::statements::alter::{AlterTable, AlterTableOperation}; @@ -45,7 +46,6 @@ use crate::error::{ InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu, }; -use crate::table::table_idents_to_full_name; #[derive(Debug, Copy, Clone)] pub struct CreateExprFactory; diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 9256d76f74cf..7aae77df9593 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -156,7 +156,7 @@ impl Inserter { &self, request: TableInsertRequest, ctx: QueryContextRef, - ) -> Result { + ) -> Result { let catalog = request.catalog_name.as_str(); let schema = request.schema_name.as_str(); let table_name = request.table_name.as_str(); @@ -219,8 +219,8 @@ impl Inserter { }); let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; - crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows); + let affected_rows = results.into_iter().sum::>()?; + crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64); Ok(affected_rows) } diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs index e672b488a9ac..6634bc530401 100644 --- a/src/operator/src/lib.rs +++ b/src/operator/src/lib.rs @@ -20,6 +20,7 @@ pub mod metrics; pub mod procedure; pub mod region_req_factory; pub mod req_convert; +pub mod request; pub mod statement; pub mod table; #[cfg(test)] diff --git a/src/operator/src/region_req_factory.rs b/src/operator/src/region_req_factory.rs index d033216bb17e..21cc90dbb67a 100644 --- a/src/operator/src/region_req_factory.rs +++ b/src/operator/src/region_req_factory.rs @@ -40,4 +40,11 @@ impl RegionRequestFactory { body: Some(Body::Deletes(requests)), } } + + pub fn build_request(&self, body: Body) -> RegionRequest { + RegionRequest { + header: Some(self.header.clone()), + body: Some(body), + } + } } diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs new file mode 100644 index 000000000000..a300da034c4d --- /dev/null +++ b/src/operator/src/request.rs @@ -0,0 +1,232 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::region::region_request::Body as RegionRequestBody; +use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader}; +use catalog::CatalogManagerRef; +use common_catalog::build_db_string; +use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::peer::Peer; +use common_telemetry::logging::{error, info}; +use common_telemetry::tracing_context::TracingContext; +use futures_util::future; +use partition::manager::{PartitionInfo, PartitionRuleManagerRef}; +use session::context::QueryContextRef; +use snafu::prelude::*; +use store_api::storage::RegionId; +use table::requests::{CompactTableRequest, FlushTableRequest}; + +use crate::error::{ + CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu, + RequestRegionSnafu, Result, TableNotFoundSnafu, UnsupportedRegionRequestSnafu, +}; +use crate::region_req_factory::RegionRequestFactory; + +/// Region requester which processes flush, compact requests etc. +pub struct Requester { + catalog_manager: CatalogManagerRef, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, +} + +pub type RequesterRef = Arc; + +impl Requester { + pub fn new( + catalog_manager: CatalogManagerRef, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, + ) -> Self { + Self { + catalog_manager, + partition_manager, + datanode_manager, + } + } + + /// Handle the request to flush table. + pub async fn handle_table_flush( + &self, + request: FlushTableRequest, + ctx: QueryContextRef, + ) -> Result { + let partitions = self + .get_table_partitions( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .await?; + + let requests = partitions + .into_iter() + .map(|partition| { + RegionRequestBody::Flush(FlushRequest { + region_id: partition.id.into(), + }) + }) + .collect(); + + info!("Handle table manual flush request: {:?}", request); + + self.do_request( + requests, + Some(build_db_string(&request.catalog_name, &request.schema_name)), + &ctx, + ) + .await + } + + /// Handle the request to compact table. + pub async fn handle_table_compaction( + &self, + request: CompactTableRequest, + ctx: QueryContextRef, + ) -> Result { + let partitions = self + .get_table_partitions( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .await?; + + let requests = partitions + .into_iter() + .map(|partition| { + RegionRequestBody::Compact(CompactRequest { + region_id: partition.id.into(), + }) + }) + .collect(); + + info!("Handle table manual compaction request: {:?}", request); + + self.do_request( + requests, + Some(build_db_string(&request.catalog_name, &request.schema_name)), + &ctx, + ) + .await + } + + /// Handle the request to flush the region. + pub async fn handle_region_flush( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> Result { + let request = RegionRequestBody::Flush(FlushRequest { + region_id: region_id.into(), + }); + + info!("Handle region manual flush request: {region_id}"); + self.do_request(vec![request], None, &ctx).await + } + + /// Handle the request to compact the region. + pub async fn handle_region_compaction( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> Result { + let request = RegionRequestBody::Compact(CompactRequest { + region_id: region_id.into(), + }); + + info!("Handle region manual compaction request: {region_id}"); + self.do_request(vec![request], None, &ctx).await + } +} + +impl Requester { + async fn do_request( + &self, + requests: Vec, + db_string: Option, + ctx: &QueryContextRef, + ) -> Result { + let request_factory = RegionRequestFactory::new(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + dbname: db_string.unwrap_or_else(|| ctx.get_db_string()), + }); + + let tasks = requests.into_iter().map(|req_body| { + let request = request_factory.build_request(req_body.clone()); + let partition_manager = self.partition_manager.clone(); + let datanode_manager = self.datanode_manager.clone(); + common_runtime::spawn_write(async move { + let peer = + Self::find_region_leader_by_request(partition_manager, &req_body).await?; + datanode_manager + .datanode(&peer) + .await + .handle(request) + .await + .context(RequestRegionSnafu) + }) + }); + let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; + + let affected_rows = results.into_iter().sum::>()?; + + Ok(affected_rows) + } + + async fn find_region_leader_by_request( + partition_manager: PartitionRuleManagerRef, + req: &RegionRequestBody, + ) -> Result { + let region_id = match req { + RegionRequestBody::Flush(req) => req.region_id, + RegionRequestBody::Compact(req) => req.region_id, + _ => { + error!("Unsupported region request: {:?}", req); + return UnsupportedRegionRequestSnafu {}.fail(); + } + }; + + partition_manager + .find_region_leader(region_id.into()) + .await + .context(FindRegionLeaderSnafu) + } + + async fn get_table_partitions( + &self, + catalog: &str, + schema: &str, + table_name: &str, + ) -> Result> { + let table = self + .catalog_manager + .table(catalog, schema, table_name) + .await + .context(CatalogSnafu)?; + + let table = table.with_context(|| TableNotFoundSnafu { + table_name: common_catalog::format_full_table_name(catalog, schema, table_name), + })?; + let table_info = table.table_info(); + + self.partition_manager + .find_table_partitions(table_info.ident.table_id) + .await + .with_context(|_| FindTablePartitionRuleSnafu { + table_name: common_catalog::format_full_table_name(catalog, schema, table_name), + }) + } +} diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index b0cd2c773b08..1361fbb12ce7 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -39,6 +39,7 @@ use query::parser::QueryStatement; use query::plan::LogicalPlan; use query::QueryEngineRef; use session::context::QueryContextRef; +use session::table_name::table_idents_to_full_name; use snafu::{OptionExt, ResultExt}; use sql::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; @@ -55,7 +56,6 @@ use crate::error::{ }; use crate::insert::InserterRef; use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; -use crate::table::table_idents_to_full_name; #[derive(Clone)] pub struct StatementExecutor { diff --git a/src/operator/src/statement/describe.rs b/src/operator/src/statement/describe.rs index a9162c352655..d40990e4dda7 100644 --- a/src/operator/src/statement/describe.rs +++ b/src/operator/src/statement/describe.rs @@ -16,6 +16,7 @@ use common_error::ext::BoxedError; use common_query::Output; use common_telemetry::tracing; use session::context::QueryContextRef; +use session::table_name::table_idents_to_full_name; use snafu::{OptionExt, ResultExt}; use sql::statements::describe::DescribeTable; use sql::util::format_raw_object_name; @@ -24,7 +25,6 @@ use crate::error::{ CatalogSnafu, DescribeStatementSnafu, ExternalSnafu, Result, TableNotFoundSnafu, }; use crate::statement::StatementExecutor; -use crate::table::table_idents_to_full_name; impl StatementExecutor { #[tracing::instrument(skip_all)] diff --git a/src/operator/src/table.rs b/src/operator/src/table.rs index 38271abb87af..b516597e0ba6 100644 --- a/src/operator/src/table.rs +++ b/src/operator/src/table.rs @@ -13,58 +13,36 @@ // limitations under the License. use async_trait::async_trait; +use common_base::AffectedRows; use common_error::ext::BoxedError; -use common_function::handlers::{AffectedRows, TableMutationHandler}; +use common_function::handlers::TableMutationHandler; use common_query::error as query_error; use common_query::error::Result as QueryResult; use session::context::QueryContextRef; use snafu::ResultExt; -use sqlparser::ast::ObjectName; -use table::requests::{DeleteRequest as TableDeleteRequest, InsertRequest as TableInsertRequest}; +use store_api::storage::RegionId; +use table::requests::{ + CompactTableRequest, DeleteRequest as TableDeleteRequest, FlushTableRequest, + InsertRequest as TableInsertRequest, +}; use crate::delete::DeleterRef; -use crate::error::{InvalidSqlSnafu, Result}; use crate::insert::InserterRef; - -// TODO(LFC): Refactor consideration: move this function to some helper mod, -// could be done together or after `TableReference`'s refactoring, when issue #559 is resolved. -/// Converts maybe fully-qualified table name (`..`) to tuple. -pub fn table_idents_to_full_name( - obj_name: &ObjectName, - query_ctx: &QueryContextRef, -) -> Result<(String, String, String)> { - match &obj_name.0[..] { - [table] => Ok(( - query_ctx.current_catalog().to_string(), - query_ctx.current_schema().to_string(), - table.value.clone(), - )), - [schema, table] => Ok(( - query_ctx.current_catalog().to_string(), - schema.value.clone(), - table.value.clone(), - )), - [catalog, schema, table] => Ok(( - catalog.value.clone(), - schema.value.clone(), - table.value.clone(), - )), - _ => InvalidSqlSnafu { - err_msg: format!( - "expect table name to be ..
, .
or
, actual: {obj_name}", - ), - }.fail(), - } -} +use crate::request::RequesterRef; pub struct TableMutationOperator { inserter: InserterRef, deleter: DeleterRef, + requester: RequesterRef, } impl TableMutationOperator { - pub fn new(inserter: InserterRef, deleter: DeleterRef) -> Self { - Self { inserter, deleter } + pub fn new(inserter: InserterRef, deleter: DeleterRef, requester: RequesterRef) -> Self { + Self { + inserter, + deleter, + requester, + } } } @@ -93,4 +71,52 @@ impl TableMutationHandler for TableMutationOperator { .map_err(BoxedError::new) .context(query_error::TableMutationSnafu) } + + async fn flush( + &self, + request: FlushTableRequest, + ctx: QueryContextRef, + ) -> QueryResult { + self.requester + .handle_table_flush(request, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } + + async fn compact( + &self, + request: CompactTableRequest, + ctx: QueryContextRef, + ) -> QueryResult { + self.requester + .handle_table_compaction(request, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } + + async fn flush_region( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> QueryResult { + self.requester + .handle_region_flush(region_id, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } + + async fn compact_region( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> QueryResult { + self.requester + .handle_region_compaction(region_id, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } } diff --git a/src/session/Cargo.toml b/src/session/Cargo.toml index 4f044fdf7c72..0f393962c7ad 100644 --- a/src/session/Cargo.toml +++ b/src/session/Cargo.toml @@ -18,4 +18,5 @@ common-catalog.workspace = true common-telemetry.workspace = true common-time.workspace = true derive_builder.workspace = true +snafu.workspace = true sql.workspace = true diff --git a/src/session/src/lib.rs b/src/session/src/lib.rs index 0c17ec66e02b..b51511ce6dfd 100644 --- a/src/session/src/lib.rs +++ b/src/session/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod context; +pub mod table_name; use std::net::SocketAddr; use std::sync::Arc; diff --git a/src/session/src/table_name.rs b/src/session/src/table_name.rs new file mode 100644 index 000000000000..7d56362bedfb --- /dev/null +++ b/src/session/src/table_name.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sql::ast::ObjectName; +use sql::error::{InvalidSqlSnafu, Result}; +use sql::parser::ParserContext; + +use crate::QueryContextRef; + +/// Parse table name into `(catalog, schema, table)` with query context. +pub fn table_name_to_full_name( + name: &str, + query_ctx: &QueryContextRef, +) -> Result<(String, String, String)> { + let obj_name = ParserContext::parse_table_name(name, query_ctx.sql_dialect())?; + + table_idents_to_full_name(&obj_name, query_ctx) +} + +/// Converts maybe fully-qualified table name (`..
`) to tuple. +pub fn table_idents_to_full_name( + obj_name: &ObjectName, + query_ctx: &QueryContextRef, +) -> Result<(String, String, String)> { + match &obj_name.0[..] { + [table] => Ok(( + query_ctx.current_catalog().to_string(), + query_ctx.current_schema().to_string(), + table.value.clone(), + )), + [schema, table] => Ok(( + query_ctx.current_catalog().to_string(), + schema.value.clone(), + table.value.clone(), + )), + [catalog, schema, table] => Ok(( + catalog.value.clone(), + schema.value.clone(), + table.value.clone(), + )), + _ => InvalidSqlSnafu { + msg: format!( + "expect table name to be ..
, .
or
, actual: {obj_name}", + ), + }.fail(), + } +} diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index e66b1f3243f1..52aaf0213bd4 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -74,6 +74,22 @@ impl<'a> ParserContext<'a> { Ok(stmts) } + pub fn parse_table_name(sql: &'a str, dialect: &dyn Dialect) -> Result { + let mut parser = Parser::new(dialect) + .with_options(ParserOptions::new().with_trailing_commas(true)) + .try_with_sql(sql) + .context(SyntaxSnafu)?; + + let raw_table_name = parser.parse_object_name().context(error::UnexpectedSnafu { + sql, + expected: "a table name", + actual: parser.peek_token().to_string(), + })?; + let table_name = Self::canonicalize_object_name(raw_table_name); + + Ok(table_name) + } + pub fn parse_function(sql: &'a str, dialect: &dyn Dialect) -> Result { let mut parser = Parser::new(dialect) .with_options(ParserOptions::new().with_trailing_commas(true)) @@ -267,4 +283,39 @@ mod tests { ConcreteDataType::timestamp_millisecond_datatype(), ); } + + #[test] + pub fn test_parse_table_name() { + let table_name = "a.b.c"; + + let object_name = + ParserContext::parse_table_name(table_name, &GreptimeDbDialect {}).unwrap(); + + assert_eq!(object_name.0.len(), 3); + assert_eq!(object_name.to_string(), table_name); + + let table_name = "a.b"; + + let object_name = + ParserContext::parse_table_name(table_name, &GreptimeDbDialect {}).unwrap(); + + assert_eq!(object_name.0.len(), 2); + assert_eq!(object_name.to_string(), table_name); + + let table_name = "Test.\"public-test\""; + + let object_name = + ParserContext::parse_table_name(table_name, &GreptimeDbDialect {}).unwrap(); + + assert_eq!(object_name.0.len(), 2); + assert_eq!(object_name.to_string(), table_name.to_ascii_lowercase()); + + let table_name = "HelloWorld"; + + let object_name = + ParserContext::parse_table_name(table_name, &GreptimeDbDialect {}).unwrap(); + + assert_eq!(object_name.0.len(), 1); + assert_eq!(object_name.to_string(), table_name.to_ascii_lowercase()); + } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index d41b885f9465..ce22a29027c7 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -22,6 +22,7 @@ use api::v1::region::{ InsertRequests, OpenRequest, TruncateRequest, }; use api::v1::{self, Rows, SemanticType}; +pub use common_base::AffectedRows; use snafu::{ensure, OptionExt}; use strum::IntoStaticStr; @@ -33,8 +34,6 @@ use crate::metadata::{ use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; -pub type AffectedRows = usize; - #[derive(Debug, IntoStaticStr)] pub enum RegionRequest { Put(RegionPutRequest), diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 7b0e8a625f1f..bdd40a7282c8 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -286,20 +286,14 @@ pub struct CopyTableRequest { pub struct FlushTableRequest { pub catalog_name: String, pub schema_name: String, - pub table_name: Option, - pub region_number: Option, - /// Wait until the flush is done. - pub wait: Option, + pub table_name: String, } #[derive(Debug, Clone, Default)] pub struct CompactTableRequest { pub catalog_name: String, pub schema_name: String, - pub table_name: Option, - pub region_number: Option, - /// Wait until the compaction is done. - pub wait: Option, + pub table_name: String, } /// Truncate table request diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index bcac8d80383f..18a5e1224146 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -868,6 +868,8 @@ async fn find_region_distribution_by_sql(cluster: &GreptimeDbCluster) -> RegionD let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + info!("SQL result:\n {}", recordbatches.pretty_print().unwrap()); + let mut distribution = RegionDistribution::new(); for batch in recordbatches.take() { @@ -911,6 +913,8 @@ async fn trigger_migration_by_sql( let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + info!("SQL result:\n {}", recordbatches.pretty_print().unwrap()); + let Value::String(procedure_id) = recordbatches.take()[0].column(0).get(0) else { unreachable!(); }; @@ -932,6 +936,8 @@ async fn query_procedure_by_sql(instance: &Arc, pid: &str) -> String { let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + info!("SQL result:\n {}", recordbatches.pretty_print().unwrap()); + let Value::String(state) = recordbatches.take()[0].column(0).get(0) else { unreachable!(); }; diff --git a/tests/cases/distributed/function/admin/flush_compact_region.result b/tests/cases/distributed/function/admin/flush_compact_region.result new file mode 100644 index 000000000000..322b99b76c75 --- /dev/null +++ b/tests/cases/distributed/function/admin/flush_compact_region.result @@ -0,0 +1,66 @@ +--- test flush_region and compact_region --- +CREATE TABLE my_table ( + a INT PRIMARY KEY, + b STRING, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (a) (); + +Affected Rows: 0 + +INSERT INTO my_table VALUES + (1, 'a', 1), + (2, 'b', 2), + (11, 'c', 3), + (12, 'd', 4), + (21, 'e', 5), + (22, 'f', 5); + +Affected Rows: 6 + +SELECT * FROM my_table; + ++----+---+-------------------------+ +| a | b | ts | ++----+---+-------------------------+ +| 1 | a | 1970-01-01T00:00:00.001 | +| 2 | b | 1970-01-01T00:00:00.002 | +| 11 | c | 1970-01-01T00:00:00.003 | +| 12 | d | 1970-01-01T00:00:00.004 | +| 21 | e | 1970-01-01T00:00:00.005 | +| 22 | f | 1970-01-01T00:00:00.005 | ++----+---+-------------------------+ + +SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; + ++-------------------------------------------------------------------+ +| flush_region(information_schema.partitions.greptime_partition_id) | ++-------------------------------------------------------------------+ +| 0 | ++-------------------------------------------------------------------+ + +SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; + ++---------------------------------------------------------------------+ +| compact_region(information_schema.partitions.greptime_partition_id) | ++---------------------------------------------------------------------+ +| 0 | ++---------------------------------------------------------------------+ + +SELECT * FROM my_table; + ++----+---+-------------------------+ +| a | b | ts | ++----+---+-------------------------+ +| 1 | a | 1970-01-01T00:00:00.001 | +| 2 | b | 1970-01-01T00:00:00.002 | +| 11 | c | 1970-01-01T00:00:00.003 | +| 12 | d | 1970-01-01T00:00:00.004 | +| 21 | e | 1970-01-01T00:00:00.005 | +| 22 | f | 1970-01-01T00:00:00.005 | ++----+---+-------------------------+ + +DROP TABLE my_table; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/function/admin/flush_compact_region.sql b/tests/cases/distributed/function/admin/flush_compact_region.sql new file mode 100644 index 000000000000..f7146c6b20e6 --- /dev/null +++ b/tests/cases/distributed/function/admin/flush_compact_region.sql @@ -0,0 +1,26 @@ +--- test flush_region and compact_region --- + +CREATE TABLE my_table ( + a INT PRIMARY KEY, + b STRING, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (a) (); + +INSERT INTO my_table VALUES + (1, 'a', 1), + (2, 'b', 2), + (11, 'c', 3), + (12, 'd', 4), + (21, 'e', 5), + (22, 'f', 5); + +SELECT * FROM my_table; + +SELECT flush_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; + +SELECT compact_region(greptime_partition_id) from information_schema.partitions WHERE table_name = 'my_table'; + +SELECT * FROM my_table; + +DROP TABLE my_table; diff --git a/tests/cases/standalone/common/function/admin/flush_compact_table.result b/tests/cases/standalone/common/function/admin/flush_compact_table.result new file mode 100644 index 000000000000..01bad5f3b1cc --- /dev/null +++ b/tests/cases/standalone/common/function/admin/flush_compact_table.result @@ -0,0 +1,54 @@ +--- test flush_table and compact_table --- +CREATE TABLE test(ts timestamp time index); + +Affected Rows: 0 + +INSERT INTO test VALUES (1), (2), (3), (4), (5); + +Affected Rows: 5 + +SELECT * FROM test; + ++-------------------------+ +| ts | ++-------------------------+ +| 1970-01-01T00:00:00.001 | +| 1970-01-01T00:00:00.002 | +| 1970-01-01T00:00:00.003 | +| 1970-01-01T00:00:00.004 | +| 1970-01-01T00:00:00.005 | ++-------------------------+ + +SELECT FLUSH_TABLE('test'); + ++---------------------------+ +| flush_table(Utf8("test")) | ++---------------------------+ +| 0 | ++---------------------------+ + +SELECT COMPACT_TABLE('test'); + ++-----------------------------+ +| compact_table(Utf8("test")) | ++-----------------------------+ +| 0 | ++-----------------------------+ + +--- doesn't change anything --- +SELECT * FROM test; + ++-------------------------+ +| ts | ++-------------------------+ +| 1970-01-01T00:00:00.001 | +| 1970-01-01T00:00:00.002 | +| 1970-01-01T00:00:00.003 | +| 1970-01-01T00:00:00.004 | +| 1970-01-01T00:00:00.005 | ++-------------------------+ + +DROP TABLE test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/function/admin/flush_compact_table.sql b/tests/cases/standalone/common/function/admin/flush_compact_table.sql new file mode 100644 index 000000000000..42e1ee5f3e6d --- /dev/null +++ b/tests/cases/standalone/common/function/admin/flush_compact_table.sql @@ -0,0 +1,16 @@ +--- test flush_table and compact_table --- + +CREATE TABLE test(ts timestamp time index); + +INSERT INTO test VALUES (1), (2), (3), (4), (5); + +SELECT * FROM test; + +SELECT FLUSH_TABLE('test'); + +SELECT COMPACT_TABLE('test'); + +--- doesn't change anything --- +SELECT * FROM test; + +DROP TABLE test;