diff --git a/Cargo.lock b/Cargo.lock index 0b4c1555cf..4328e358a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1141,6 +1141,28 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "chrono-tz" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa48fa079165080f11d7753fd0bc175b7d391f276b965fe4b55bfad67856e463" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9998fb9f7e9b2111641485bf8beb32f92945f97f92a3d061f744cfef335f751" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "clang-sys" version = "1.3.3" @@ -2948,6 +2970,18 @@ dependencies = [ "str_stack", ] +[[package]] +name = "influxdb_influxql_parser" +version = "0.1.0" +source = "git+https://github.com/Rachelint/influxdb_iox.git?branch=influxql-parser#77e24e992a90dd08a6d71366de53fac721e491fb" +dependencies = [ + "chrono", + "chrono-tz", + "nom 7.1.1", + "num-traits", + "once_cell", +] + [[package]] name = "instant" version = "0.1.12" @@ -4318,6 +4352,15 @@ dependencies = [ "thrift 0.13.0", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c705f256449c60da65e11ff6626e0c16a0a0b96aaa348de61376b249bc340f41" +dependencies = [ + "regex", +] + [[package]] name = "paste" version = "0.1.18" @@ -4383,6 +4426,44 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phf" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56ac890c5e3ca598bbdeaa99964edb5b0258a583a9eb6ef4e89fc85d9224770" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1181c94580fa345f50f19d738aaa39c0ed30a600d95cb2d3e23f94266f14fbf" +dependencies = [ + "phf_shared", + "rand 0.8.5", +] + +[[package]] +name = "phf_shared" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.0.11" @@ -5942,9 +6023,12 @@ dependencies = [ "datafusion-proto", "df_operator", "hashbrown 0.12.3", + "influxdb_influxql_parser", + "itertools", "log", "paste 1.0.8", "regex", + "regex-syntax", "snafu 0.6.10", "sqlparser", "table_engine", diff --git a/server/src/grpc/storage_service/prom_query.rs b/server/src/grpc/storage_service/prom_query.rs index 843e7a96d1..ab0fb7b441 100644 --- a/server/src/grpc/storage_service/prom_query.rs +++ b/server/src/grpc/storage_service/prom_query.rs @@ -35,9 +35,12 @@ use crate::grpc::storage_service::{ }; fn is_table_not_found_error(e: &FrontendError) -> bool { - matches!(&e, FrontendError::CreatePlan { source } - if matches!(source, sql::planner::Error::BuildPromPlanError { source } - if matches!(source, sql::promql::Error::TableNotFound { .. }))) + matches!(&e, + FrontendError::CreatePlan { + source, + .. } + if matches!(source, sql::planner::Error::BuildPromPlanError { source } + if matches!(source, sql::promql::Error::TableNotFound { .. }))) } pub async fn handle_query( diff --git a/sql/src/frontend.rs b/sql/src/frontend.rs index 900a50ca28..5229ea8225 100644 --- a/sql/src/frontend.rs +++ b/sql/src/frontend.rs @@ -7,6 +7,7 @@ use std::{sync::Arc, time::Instant}; use ceresdbproto::{prometheus::Expr as PromExpr, storage::WriteTableRequest}; use cluster::config::SchemaConfig; use common_types::request_id::RequestId; +use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table; @@ -37,6 +38,13 @@ pub enum Error { #[snafu(display("Expr is not found in prom request.\nBacktrace:\n{}", backtrace))] ExprNotFoundInPromRequest { backtrace: Backtrace }, + + // invalid sql is quite common, so we don't provide a backtrace now. + #[snafu(display("invalid influxql, influxql:{}, err:{}", influxql, parse_err))] + InvalidInfluxql { + influxql: String, + parse_err: influxdb_influxql_parser::common::ParseError, + }, } define_result!(Error); @@ -90,6 +98,21 @@ impl

Frontend

{ let expr = expr.context(ExprNotFoundInPromRequest)?; Expr::try_from(expr).context(InvalidPromRequest) } + + /// Parse the sql and returns the statements + pub fn parse_influxql( + &self, + _ctx: &mut Context, + influxql: &str, + ) -> Result> { + match influxdb_influxql_parser::parse_statements(influxql) { + Ok(stmts) => Ok(stmts), + Err(e) => Err(Error::InvalidInfluxql { + influxql: influxql.to_string(), + parse_err: e, + }), + } + } } impl Frontend

{ @@ -110,6 +133,16 @@ impl Frontend

{ planner.promql_expr_to_plan(expr).context(CreatePlan) } + pub fn influxql_stmt_to_plan( + &self, + ctx: &mut Context, + stmt: InfluxqlStatement, + ) -> Result { + let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism); + + planner.influxql_stmt_to_plan(stmt).context(CreatePlan) + } + pub fn write_req_to_plan( &self, ctx: &mut Context, diff --git a/sql/src/influxql/mod.rs b/sql/src/influxql/mod.rs new file mode 100644 index 0000000000..fbe9dc51cf --- /dev/null +++ b/sql/src/influxql/mod.rs @@ -0,0 +1,38 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Influxql processing + +pub mod planner; +pub(crate) mod stmt_rewriter; +pub(crate) mod util; +pub mod error { + use common_util::error::GenericError; + use snafu::{Backtrace, Snafu}; + + #[derive(Debug, Snafu)] + #[snafu(visibility = "pub")] + pub enum Error { + #[snafu(display( + "Unimplemented influxql statement, msg: {}.\nBacktrace:{}", + msg, + backtrace + ))] + Unimplemented { msg: String, backtrace: Backtrace }, + + #[snafu(display( + "Failed to rewrite influxql from statement with cause, msg:{}, source:{}", + msg, + source + ))] + RewriteWithCause { msg: String, source: GenericError }, + + #[snafu(display( + "Failed to rewrite influxql from statement no cause, msg:{}.\nBacktrace:{}", + msg, + backtrace + ))] + RewriteNoCause { msg: String, backtrace: Backtrace }, + } + + define_result!(Error); +} diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs new file mode 100644 index 0000000000..9913ed78c6 --- /dev/null +++ b/sql/src/influxql/planner.rs @@ -0,0 +1,56 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Influxql planner. + +use common_util::error::BoxError; +use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; +use snafu::ResultExt; +use table_engine::table::TableRef; + +use crate::{influxql::error::*, plan::Plan, provider::MetaProvider}; + +#[allow(dead_code)] +pub(crate) struct Planner<'a, P: MetaProvider> { + sql_planner: crate::planner::PlannerDelegate<'a, P>, +} + +impl<'a, P: MetaProvider> Planner<'a, P> { + pub fn new(sql_planner: crate::planner::PlannerDelegate<'a, P>) -> Self { + Self { sql_planner } + } + + pub fn statement_to_plan(&self, stmt: InfluxqlStatement) -> Result { + match stmt { + InfluxqlStatement::Select(_) => todo!(), + InfluxqlStatement::CreateDatabase(_) => todo!(), + InfluxqlStatement::ShowDatabases(_) => todo!(), + InfluxqlStatement::ShowRetentionPolicies(_) => todo!(), + InfluxqlStatement::ShowTagKeys(_) => todo!(), + InfluxqlStatement::ShowTagValues(_) => todo!(), + InfluxqlStatement::ShowFieldKeys(_) => todo!(), + InfluxqlStatement::ShowMeasurements(_) => todo!(), + InfluxqlStatement::Delete(_) => todo!(), + InfluxqlStatement::DropMeasurement(_) => todo!(), + InfluxqlStatement::Explain(_) => todo!(), + } + } +} + +pub trait MeasurementProvider { + fn measurement(&self, measurement_name: &str) -> Result>; +} + +pub(crate) struct MeasurementProviderImpl<'a, P: MetaProvider>( + crate::planner::PlannerDelegate<'a, P>, +); + +impl<'a, P: MetaProvider> MeasurementProvider for MeasurementProviderImpl<'a, P> { + fn measurement(&self, measurement_name: &str) -> Result> { + self.0 + .find_table(measurement_name) + .box_err() + .context(RewriteWithCause { + msg: format!("failed to find measurement, measurement:{measurement_name}"), + }) + } +} diff --git a/sql/src/influxql/stmt_rewriter.rs b/sql/src/influxql/stmt_rewriter.rs new file mode 100644 index 0000000000..9c8251f278 --- /dev/null +++ b/sql/src/influxql/stmt_rewriter.rs @@ -0,0 +1,459 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Influxql statement rewriter + +use std::{collections::BTreeSet, ops::ControlFlow}; + +use common_util::error::BoxError; +use influxdb_influxql_parser::{ + common::{MeasurementName, QualifiedMeasurementName, ZeroOrMore}, + expression::{walk, Expr, WildcardType}, + identifier::Identifier, + literal::Literal, + select::{ + Dimension, Field, FieldList, FromMeasurementClause, MeasurementSelection, SelectStatement, + }, +}; +use itertools::{Either, Itertools}; +use snafu::{ensure, OptionExt, ResultExt}; + +use super::{planner::MeasurementProvider, util}; +use crate::influxql::error::*; + +/// Rewriter for the influxql statement +/// +/// It will rewrite statement before converting it to sql statement. +// Partial copy from influxdb_iox. +pub(crate) struct StmtRewriter<'a> { + measurement_provider: &'a dyn MeasurementProvider, +} + +impl<'a> StmtRewriter<'a> { + #[allow(dead_code)] + pub fn new(measurement_provider: &'a dyn MeasurementProvider) -> Self { + Self { + measurement_provider, + } + } + + #[allow(dead_code)] + pub fn rewrite(&self, stmt: &mut SelectStatement) -> Result<()> { + self.rewrite_from(stmt)?; + self.rewrite_field_list(stmt) + } + + fn rewrite_from(&self, stmt: &mut SelectStatement) -> Result<()> { + let mut new_from = Vec::new(); + for ms in stmt.from.iter() { + match ms { + MeasurementSelection::Name(qmn) => match qmn { + QualifiedMeasurementName { + name: MeasurementName::Name(name), + .. + } => { + let _ = self.measurement_provider.measurement(name)?.context( + RewriteNoCause { + msg: format!("measurement not found, measurement:{name}"), + }, + )?; + new_from.push(ms.clone()); + } + QualifiedMeasurementName { + name: MeasurementName::Regex(_), + .. + } => { + // TODO: need to support get all tables first. + return Unimplemented { + msg: "rewrite from regex", + } + .fail(); + } + }, + MeasurementSelection::Subquery(_) => { + return Unimplemented { + msg: "rewrite from subquery", + } + .fail(); + } + } + } + + // TODO: support from multiple tables. + ensure!( + new_from.len() == 1, + Unimplemented { + msg: "rewrite from multiple measurements" + } + ); + + stmt.from = FromMeasurementClause::new(new_from); + + Ok(()) + } + + /// Rewrite the projection list and GROUP BY of the specified `SELECT`. + // TODO: should support from multiple measurements. + // TODO: support rewrite fields in subquery. + fn rewrite_field_list(&self, stmt: &mut SelectStatement) -> Result<()> { + ensure!( + stmt.from.len() == 1, + Unimplemented { + msg: "rewrite field list from multiple measurements" + } + ); + + match &stmt.from[0] { + MeasurementSelection::Name(qualified_name) => { + let QualifiedMeasurementName { name, .. } = qualified_name; + + match name { + MeasurementName::Name(name) => { + // Get schema, and split columns to tags and fields. + let (tags, fields) = self + .tags_and_fields_in_measurement(name.as_str()) + .box_err() + .context(RewriteWithCause { + msg: "rewrite field list fail to find measurement", + })?; + let mut group_by_tags = BTreeSet::new(); + maybe_rewrite_group_by(&tags, &mut group_by_tags, stmt)?; + maybe_rewrite_projection(&tags, &fields, &group_by_tags, stmt)?; + + Ok(()) + } + + MeasurementName::Regex(_) => RewriteNoCause { + msg: "rewrite field list should not encounter regex in from clause", + } + .fail(), + } + } + + MeasurementSelection::Subquery(_) => Unimplemented { + msg: "rewrite field list from subquery", + } + .fail(), + } + } + + fn tags_and_fields_in_measurement( + &self, + measurement_name: &str, + ) -> Result<(Vec, Vec)> { + let measurement = self + .measurement_provider + .measurement(measurement_name) + .box_err() + .context(RewriteWithCause { + msg: format!("failed to find measurement, measurement:{measurement_name}"), + })? + .context(RewriteNoCause { + msg: format!("measurement not found, measurement:{measurement_name}"), + })?; + + // Get schema and split to tags and fields. + let schema = measurement.schema(); + let tsid_idx_opt = schema.index_of_tsid(); + let timestamp_key_idx = schema.timestamp_index(); + let tags_and_fields: (Vec, Vec) = schema + .columns() + .iter() + .enumerate() + .filter_map(|(col_idx, col)| { + let is_tsid_col = match tsid_idx_opt { + Some(idx) => col_idx == idx, + None => false, + }; + let is_timestamp_key_col = col_idx == timestamp_key_idx; + + if !is_tsid_col && !is_timestamp_key_col { + Some(col) + } else { + None + } + }) + .partition_map(|col| { + if col.is_tag { + Either::Left(col.name.clone()) + } else { + Either::Right(col.name.clone()) + } + }); + Ok(tags_and_fields) + } +} + +fn maybe_rewrite_group_by( + tags: &[String], + group_by_tags: &mut BTreeSet, + stmt: &mut SelectStatement, +) -> Result<()> { + if let Some(group_by) = &stmt.group_by { + for dimension in group_by.iter() { + match dimension { + Dimension::Time { .. } => { + return Unimplemented { + msg: "group by time interval", + } + .fail(); + } + + Dimension::Tag(tag) => { + if !tags.contains(&tag.to_string()) { + return RewriteNoCause { + msg: format!("rewrite group by encounter tag not exist, tag:{tag}, exist tags:{tags:?}"), + } + .fail(); + } + let _ = group_by_tags.insert(tag.to_string()); + } + + Dimension::Regex(re) => { + let re = util::parse_regex(re).box_err().context(RewriteWithCause { + msg: format!("rewrite group by encounter invalid regex, regex:{re}"), + })?; + let match_tags = tags.iter().filter_map(|tag| { + if re.is_match(tag.as_str()) { + Some(tag.clone()) + } else { + None + } + }); + group_by_tags.extend(match_tags); + } + + Dimension::Wildcard => group_by_tags.extend(tags.iter().cloned()), + } + } + + stmt.group_by = Some(ZeroOrMore::new( + group_by_tags + .iter() + .map(|tag| Dimension::Tag(Identifier::new(tag.clone()))) + .collect::>(), + )); + } + + Ok(()) +} + +fn maybe_rewrite_projection( + tags: &[String], + fields: &[String], + groub_by_tags: &BTreeSet, + stmt: &mut SelectStatement, +) -> Result<()> { + let mut new_fields = Vec::new(); + + enum AddFieldType { + Tag, + Field, + Both, + } + + let add_fields = |filter: &dyn Fn(&String) -> bool, + add_field_type: AddFieldType, + new_fields: &mut Vec| { + if matches!(&add_field_type, AddFieldType::Tag | AddFieldType::Both) { + let tag_fields = tags.iter().filter_map(|tag| { + if !groub_by_tags.contains(tag.as_str()) && filter(tag) { + Some(Field { + expr: Expr::VarRef { + name: tag.clone().into(), + data_type: None, + }, + alias: None, + }) + } else { + None + } + }); + new_fields.extend(tag_fields); + } + + if matches!(&add_field_type, AddFieldType::Field | AddFieldType::Both) { + let normal_fields = fields.iter().filter_map(|field| { + if filter(field) { + Some(Field { + expr: Expr::VarRef { + name: field.clone().into(), + data_type: None, + }, + alias: None, + }) + } else { + None + } + }); + new_fields.extend(normal_fields); + } + }; + + for f in stmt.fields.iter() { + match &f.expr { + Expr::Wildcard(wct) => { + let filter = |_: &String| -> bool { true }; + + match wct { + Some(WildcardType::Tag) => { + add_fields(&filter, AddFieldType::Tag, &mut new_fields); + } + Some(WildcardType::Field) => { + add_fields(&filter, AddFieldType::Field, &mut new_fields); + } + None => { + add_fields(&filter, AddFieldType::Both, &mut new_fields); + } + } + } + + Expr::Literal(Literal::Regex(re)) => { + let re = util::parse_regex(re).box_err().context(RewriteWithCause { + msg: format!("rewrite projection encounter invalid regex, regex:{re}"), + })?; + + let filter = |v: &String| -> bool { re.is_match(v.as_str()) }; + + add_fields(&filter, AddFieldType::Both, &mut new_fields); + } + + Expr::Call { args, .. } => { + let mut args = args; + + // Search for the call with a wildcard by continuously descending until + // we no longer have a call. + while let Some(Expr::Call { + args: inner_args, .. + }) = args.first() + { + args = inner_args; + } + + match args.first() { + Some(Expr::Wildcard(Some(WildcardType::Tag))) => { + return RewriteNoCause { + msg: "rewrite projection found tags placed in a call", + } + .fail(); + } + Some(Expr::Wildcard(_)) | Some(Expr::Literal(Literal::Regex(_))) => { + return Unimplemented { + msg: "wildcard or regex in call", + } + .fail(); + } + _ => { + new_fields.push(f.clone()); + continue; + } + } + } + + Expr::Binary { .. } => { + let has_wildcard = walk::walk_expr(&f.expr, &mut |e| { + match e { + Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_)) => { + return ControlFlow::Break(()) + } + _ => {} + } + ControlFlow::Continue(()) + }) + .is_break(); + + if has_wildcard { + return RewriteNoCause { + msg: "rewrite projection encounter wildcard or regex in binary expression", + } + .fail(); + } + + new_fields.push(f.clone()); + } + + _ => new_fields.push(f.clone()), + } + } + + stmt.fields = FieldList::new(new_fields); + + Ok(()) +} + +#[cfg(test)] +mod test { + use datafusion::sql::TableReference; + use influxdb_influxql_parser::{ + parse_statements, select::SelectStatement, statement::Statement, + }; + + use super::StmtRewriter; + use crate::{ + influxql::planner::MeasurementProvider, provider::MetaProvider, tests::MockMetaProvider, + }; + + impl MeasurementProvider for MockMetaProvider { + fn measurement( + &self, + measurement_name: &str, + ) -> crate::influxql::error::Result> { + let table_ref = TableReference::Bare { + table: std::borrow::Cow::Borrowed(measurement_name), + }; + Ok(self.table(table_ref).unwrap()) + } + } + + #[test] + fn test_wildcard_and_regex_in_projection() { + let namespace = MockMetaProvider::default(); + + let mut stmt = parse_select("SELECT * FROM influxql_test"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!( + "SELECT col1, col2, col3 FROM influxql_test", + stmt.to_string() + ); + + let mut stmt = parse_select("SELECT *::tag FROM influxql_test"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!("SELECT col1, col2 FROM influxql_test", stmt.to_string()); + + let mut stmt = parse_select("SELECT *::field FROM influxql_test"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!("SELECT col3 FROM influxql_test", stmt.to_string()); + } + + #[test] + fn test_wildcard_and_regex_in_group_by() { + let namespace = MockMetaProvider::default(); + + let mut stmt = parse_select("SELECT * FROM influxql_test GROUP BY *"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!( + "SELECT col3 FROM influxql_test GROUP BY col1, col2", + stmt.to_string() + ); + + let mut stmt = parse_select("SELECT * FROM influxql_test GROUP BY col1"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!( + "SELECT col2, col3 FROM influxql_test GROUP BY col1", + stmt.to_string() + ); + } + + pub fn rewrite_statement(provider: &dyn MeasurementProvider, stmt: &mut SelectStatement) { + let rewriter = StmtRewriter::new(provider); + rewriter.rewrite(stmt).unwrap(); + } + + /// Returns the InfluxQL [`SelectStatement`] for the specified SQL, `s`. + pub fn parse_select(s: &str) -> SelectStatement { + let statements = parse_statements(s).unwrap(); + match statements.first() { + Some(Statement::Select(sel)) => *sel.clone(), + _ => panic!("expected SELECT statement"), + } + } +} diff --git a/sql/src/influxql/util.rs b/sql/src/influxql/util.rs new file mode 100644 index 0000000000..b143691332 --- /dev/null +++ b/sql/src/influxql/util.rs @@ -0,0 +1,113 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Some utils used process influxql + +use influxdb_influxql_parser::string::Regex; + +// Copy from influxdb_iox: +// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L147 +pub fn clean_non_meta_escapes(pattern: &str) -> String { + if pattern.is_empty() { + return pattern.to_string(); + } + + #[derive(Debug, Copy, Clone)] + enum SlashState { + No, + Single, + Double, + } + + let mut next_state = SlashState::No; + + let next_chars = pattern + .chars() + .map(Some) + .skip(1) + .chain(std::iter::once(None)); + + // emit char based on previous + let new_pattern: String = pattern + .chars() + .zip(next_chars) + .filter_map(|(c, next_char)| { + let cur_state = next_state; + next_state = match (c, cur_state) { + ('\\', SlashState::No) => SlashState::Single, + ('\\', SlashState::Single) => SlashState::Double, + ('\\', SlashState::Double) => SlashState::Single, + _ => SlashState::No, + }; + + // Decide to emit `c` or not + match (cur_state, c, next_char) { + (SlashState::No, '\\', Some(next_char)) + | (SlashState::Double, '\\', Some(next_char)) + if !is_valid_character_after_escape(next_char) => + { + None + } + _ => Some(c), + } + }) + .collect(); + + new_pattern +} + +// Copy from influxdb_iox: +// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L123 +fn is_valid_character_after_escape(c: char) -> bool { + // same list as https://docs.rs/regex-syntax/0.6.25/src/regex_syntax/ast/parse.rs.html#1445-1538 + match c { + '0'..='7' => true, + '8'..='9' => true, + 'x' | 'u' | 'U' => true, + 'p' | 'P' => true, + 'd' | 's' | 'w' | 'D' | 'S' | 'W' => true, + _ => regex_syntax::is_meta_character(c), + } +} + +// Copy from influxdb_iox: +// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/iox_query/src/plan/influxql/util.rs#L48 +pub fn parse_regex(re: &Regex) -> std::result::Result { + let pattern = clean_non_meta_escapes(re.as_str()); + regex::Regex::new(&pattern) +} + +mod test { + // Copy from influxdb_iox: + // https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L357 + #[test] + fn test_clean_non_meta_escapes() { + let cases = vec![ + ("", ""), + (r#"\"#, r#"\"#), + (r#"\\"#, r#"\\"#), + // : is not a special meta character + (r#"\:"#, r#":"#), + // . is a special meta character + (r#"\."#, r#"\."#), + (r#"foo\"#, r#"foo\"#), + (r#"foo\\"#, r#"foo\\"#), + (r#"foo\:"#, r#"foo:"#), + (r#"foo\xff"#, r#"foo\xff"#), + (r#"fo\\o"#, r#"fo\\o"#), + (r#"fo\:o"#, r#"fo:o"#), + (r#"fo\:o\x123"#, r#"fo:o\x123"#), + (r#"fo\:o\x123\:"#, r#"fo:o\x123:"#), + (r#"foo\\\:bar"#, r#"foo\\:bar"#), + (r#"foo\\\:bar\\\:"#, r#"foo\\:bar\\:"#), + ("foo", "foo"), + ]; + + for (pattern, expected) in cases { + let cleaned_pattern = crate::influxql::util::clean_non_meta_escapes(pattern); + assert_eq!( + cleaned_pattern, expected, + "Expected '{pattern}' to be cleaned to '{expected}', got '{cleaned_pattern}'" + ); + } + } +} diff --git a/sql/src/lib.rs b/sql/src/lib.rs index 1b811fab51..e6a8707fcb 100644 --- a/sql/src/lib.rs +++ b/sql/src/lib.rs @@ -10,6 +10,7 @@ extern crate common_util; pub mod ast; pub mod container; pub mod frontend; +pub mod influxql; pub mod parser; pub(crate) mod partition; pub mod plan; diff --git a/sql/src/planner.rs b/sql/src/planner.rs index 06ee750b03..faeacbff39 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -36,6 +36,7 @@ use datafusion::{ ResolvedTableReference, }, }; +use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; use log::{debug, trace}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use sqlparser::ast::{ @@ -59,7 +60,6 @@ use crate::{ promql::{ColumnNames, Expr as PromExpr}, provider::{ContextProviderAdapter, MetaProvider}, }; - // We do not carry backtrace in sql error because it is mainly used in server // handler and the error is usually caused by invalid/unsupported sql, which // should be easy to find out the reason. @@ -252,6 +252,10 @@ pub enum Error { #[snafu(display("Failed to build plan, msg:{}", msg))] InvalidWriteEntry { msg: String }, + #[snafu(display("Failed to build influxql plan, err:{}", source))] + BuildInfluxqlPlan { + source: crate::influxql::error::Error, + }, } define_result!(Error); @@ -324,6 +328,16 @@ impl<'a, P: MetaProvider> Planner<'a, P> { .context(BuildPromPlanError) } + pub fn influxql_stmt_to_plan(&self, statement: InfluxqlStatement) -> Result { + let adapter = ContextProviderAdapter::new(self.provider, self.read_parallelism); + let planner = PlannerDelegate::new(adapter); + + let influxql_planner = crate::influxql::planner::Planner::new(planner); + influxql_planner + .statement_to_plan(statement) + .context(BuildInfluxqlPlan) + } + pub fn write_req_to_plan( &self, schema_config: &SchemaConfig, @@ -534,16 +548,16 @@ fn try_get_data_type_from_value(value: &PbValue) -> Result { } /// A planner wraps the datafusion's logical planner, and delegate sql like /// select/explain to datafusion's planner. -struct PlannerDelegate<'a, P: MetaProvider> { +pub(crate) struct PlannerDelegate<'a, P: MetaProvider> { meta_provider: ContextProviderAdapter<'a, P>, } impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { - fn new(meta_provider: ContextProviderAdapter<'a, P>) -> Self { + pub(crate) fn new(meta_provider: ContextProviderAdapter<'a, P>) -> Self { Self { meta_provider } } - fn sql_statement_to_plan(self, sql_stmt: SqlStatement) -> Result { + pub(crate) fn sql_statement_to_plan(self, sql_stmt: SqlStatement) -> Result { match sql_stmt { // Query statement use datafusion planner SqlStatement::Explain { .. } | SqlStatement::Query(_) => { @@ -986,7 +1000,7 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { Ok(Plan::Show(ShowPlan::ShowDatabase)) } - fn find_table(&self, table_name: &str) -> Result> { + pub(crate) fn find_table(&self, table_name: &str) -> Result> { let table_ref = get_table_ref(table_name); self.meta_provider .table(table_ref) diff --git a/sql/src/tests.rs b/sql/src/tests.rs index ecaf19d67c..9304807492 100644 --- a/sql/src/tests.rs +++ b/sql/src/tests.rs @@ -3,7 +3,12 @@ use std::sync::Arc; use catalog::consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; -use common_types::tests::{build_default_value_schema, build_schema}; +use common_types::{ + column_schema, + datum::DatumKind, + schema::{Builder, Schema, TSID_COLUMN}, + tests::{build_default_value_schema, build_schema}, +}; use datafusion::catalog::TableReference; use df_operator::{scalar::ScalarUdf, udaf::AggregateUdf}; use table_engine::{ @@ -46,6 +51,12 @@ impl Default for MockMetaProvider { build_schema(), ANALYTIC_ENGINE_TYPE.to_string(), )), + Arc::new(MemoryTable::new( + "influxql_test".to_string(), + TableId::from(144), + build_influxql_test_schema(), + ANALYTIC_ENGINE_TYPE.to_string(), + )), ], } } @@ -79,3 +90,42 @@ impl MetaProvider for MockMetaProvider { todo!() } } + +fn build_influxql_test_schema() -> Schema { + Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("col1".to_string(), DatumKind::String) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("col2".to_string(), DatumKind::String) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("col3".to_string(), DatumKind::Int64) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .build() + .expect("should succeed to build schema") +}