Skip to content

Commit

Permalink
feat(planner): support settings set sql_dialect
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed Aug 18, 2022
1 parent d6647bd commit 11c6c0a
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ build_exceptions! {
// Variable error codes.
UnknownVariable(2801),
OnlySupportAsciiChars(2802),
WrongValueForVariable(2803),

// Tenant quota error codes.
IllegalTenantQuotaFormat(2901),
Expand Down
1 change: 1 addition & 0 deletions src/common/settings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ doctest = false
test = false

[dependencies]
common-ast = { path = "../../query/ast" }
common-config = { path = "../config" }
common-datavalues = { path = "../datavalues" }
common-exception = { path = "../exception" }
Expand Down
59 changes: 59 additions & 0 deletions src/common/settings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::fmt::Formatter;
use std::str;
use std::sync::Arc;

use common_ast::Dialect;
use common_config::Config;
use common_datavalues::prelude::*;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -57,6 +58,7 @@ pub struct SettingValue {
user_setting: UserSetting,
level: ScopeLevel,
desc: &'static str,
possible_values: Option<Vec<&'static str>>,
}

#[derive(Clone)]
Expand All @@ -83,20 +85,23 @@ impl Settings {
user_setting: UserSetting::create("max_block_size", DataValue::UInt64(10000)),
level: ScopeLevel::Session,
desc: "Maximum block size for reading",
possible_values: None,
},
// max_threads
SettingValue {
default_value: DataValue::UInt64(16),
user_setting: UserSetting::create("max_threads", DataValue::UInt64(16)),
level: ScopeLevel::Session,
desc: "The maximum number of threads to execute the request. By default, it is determined automatically.",
possible_values: None,
},
// flight_client_timeout
SettingValue {
default_value: DataValue::UInt64(60),
user_setting: UserSetting::create("flight_client_timeout", DataValue::UInt64(60)),
level: ScopeLevel::Session,
desc: "Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds",
possible_values: None,
},
// storage_read_buffer_size
SettingValue {
Expand All @@ -107,6 +112,7 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "The size of buffer in bytes for buffered reader of dal. By default, it is 1MB.",
possible_values: None,
},
// enable_new_processor_framework
SettingValue {
Expand All @@ -117,13 +123,15 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "Enable new processor framework if value != 0, default value: 1",
possible_values: None,
},
// enable_planner_v2
SettingValue {
default_value: DataValue::UInt64(1),
user_setting: UserSetting::create("enable_planner_v2", DataValue::UInt64(1)),
level: ScopeLevel::Session,
desc: "Enable planner v2 by setting this variable to 1, default value: 1",
possible_values: None,
},
SettingValue {
default_value: DataValue::String("\n".as_bytes().to_vec()),
Expand All @@ -133,6 +141,7 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "Format record_delimiter, default value: \"\\n\"",
possible_values: None,
},
SettingValue {
default_value: DataValue::String(",".as_bytes().to_vec()),
Expand All @@ -142,18 +151,21 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "Format field delimiter, default value: ,",
possible_values: None,
},
SettingValue {
default_value: DataValue::UInt64(1),
user_setting: UserSetting::create("empty_as_default", DataValue::UInt64(1)),
level: ScopeLevel::Session,
desc: "Format empty_as_default, default value: 1",
possible_values: None,
},
SettingValue {
default_value: DataValue::UInt64(0),
user_setting: UserSetting::create("skip_header", DataValue::UInt64(0)),
level: ScopeLevel::Session,
desc: "Whether to skip the input header, default value: 0",
possible_values: None,
},
SettingValue {
default_value: DataValue::String("None".as_bytes().to_vec()),
Expand All @@ -163,6 +175,7 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "Format compression, default value: None",
possible_values: None,
},
SettingValue {
default_value: DataValue::String("UTC".as_bytes().to_vec()),
Expand All @@ -172,6 +185,7 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "Timezone, default value: UTC,",
possible_values: None,
},
SettingValue {
default_value: DataValue::UInt64(10000),
Expand All @@ -181,18 +195,21 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "The threshold of keys to open two-level aggregation, default value: 10000",
possible_values: None,
},
SettingValue {
default_value: DataValue::UInt64(0),
user_setting: UserSetting::create("enable_async_insert", DataValue::UInt64(0)),
level: ScopeLevel::Session,
desc: "Whether the client open async insert mode, default value: 0",
possible_values: None,
},
SettingValue {
default_value: DataValue::UInt64(1),
user_setting: UserSetting::create("wait_for_async_insert", DataValue::UInt64(1)),
level: ScopeLevel::Session,
desc: "Whether the client wait for the reply of async insert, default value: 1",
possible_values: None,
},
SettingValue {
default_value: DataValue::UInt64(100),
Expand All @@ -202,6 +219,7 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "The timeout in seconds for waiting for processing of async insert, default value: 100",
possible_values: None,
},
SettingValue {
default_value: DataValue::UInt64(0),
Expand All @@ -211,6 +229,7 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "Case sensitivity of unquoted identifiers, default value: 0 (aka case-insensitive)",
possible_values: None,
},
SettingValue {
default_value: DataValue::UInt64(1),
Expand All @@ -220,6 +239,17 @@ impl Settings {
),
level: ScopeLevel::Session,
desc: "Case sensitivity of quoted identifiers, default value: 1 (aka case-sensitive)",
possible_values: None,
},
SettingValue {
default_value: DataValue::String("PostgreSQL".as_bytes().to_vec()),
user_setting: UserSetting::create(
"sql_dialect",
DataValue::String("PostgreSQL".as_bytes().to_vec()),
),
level: ScopeLevel::Session,
desc: "SQL dialect, support \"PostgreSQL\" and \"MySQL\", default value: \"PostgreSQL\"",
possible_values: Some(vec!["PostgreSQL", "MySQL"]),
},
];

Expand Down Expand Up @@ -408,6 +438,19 @@ impl Settings {
self.try_set_u64(KEY, v, false)
}

pub fn get_sql_dialect(&self) -> Result<Dialect> {
let key = "sql_dialect";
self.check_and_get_setting_value(key)
.and_then(|v| v.user_setting.value.as_string())
.map(|v| {
if v == b"MySQL" {
Dialect::MySQL
} else {
Dialect::PostgreSQL
}
})
}

pub fn has_setting(&self, key: &str) -> bool {
let settings = self.settings.read();
settings.get(key).is_some()
Expand All @@ -421,6 +464,21 @@ impl Settings {
Ok(setting.clone())
}

fn check_possible_values(&self, setting: &SettingValue, val: String) -> Result<String> {
if let Some(possible_values) = &setting.possible_values {
for possible_value in possible_values {
if possible_value.to_lowercase() == val.to_lowercase() {
return Ok(possible_value.to_string());
}
}
return Err(ErrorCode::WrongValueForVariable(format!(
"Variable {:?} can't be set to the value of {:?}",
setting.user_setting.name, val
)));
}
Ok(val)
}

// Get u64 value, we don't get from the metasrv.
fn try_get_u64(&self, key: &str) -> Result<u64> {
let setting = self.check_and_get_setting_value(key)?;
Expand Down Expand Up @@ -539,6 +597,7 @@ impl Settings {

pub fn set_settings(&self, key: String, val: String, is_global: bool) -> Result<()> {
let setting = self.check_and_get_setting_value(&key)?;
let val = self.check_possible_values(&setting, val)?;

match setting.user_setting.value.max_data_type().data_type_id() {
TypeID::UInt64 => {
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/sql/planner/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use common_ast::parser::parse_comma_separated_exprs;
use common_ast::parser::token::Token;
use common_ast::parser::tokenize_sql;
use common_ast::Backtrace;
use common_ast::Dialect;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
use common_datavalues::DataSchemaRef;
Expand Down Expand Up @@ -345,11 +344,13 @@ impl<'a> ValueSourceV2<'a> {
let buf = reader.get_checkpoint_buffer();

let sql = std::str::from_utf8(buf).unwrap();
let settings = self.ctx.get_settings();
let sql_dialect = settings.get_sql_dialect()?;
let tokens = tokenize_sql(sql)?;
let backtrace = Backtrace::new();
let exprs = parse_comma_separated_exprs(
&tokens[1..tokens.len() as usize],
Dialect::PostgreSQL,
sql_dialect,
&backtrace,
)?;

Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/sql/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use common_ast::ast::TimeTravelPoint;
use common_ast::parser::parse_sql;
use common_ast::parser::tokenize_sql;
use common_ast::Backtrace;
use common_ast::Dialect;
use common_ast::DisplayError;
use common_catalog::catalog::CATALOG_DEFAULT;
use common_datavalues::prelude::*;
Expand Down Expand Up @@ -136,9 +135,11 @@ impl<'a> Binder {
.options()
.get(QUERY)
.ok_or_else(|| ErrorCode::LogicalError("Invalid VIEW object"))?;
let settings = self.ctx.get_settings();
let sql_dialect = settings.get_sql_dialect()?;
let tokens = tokenize_sql(query.as_str())?;
let backtrace = Backtrace::new();
let (stmt, _) = parse_sql(&tokens, Dialect::PostgreSQL, &backtrace)?;
let (stmt, _) = parse_sql(&tokens, sql_dialect, &backtrace)?;
if let Statement::Query(query) = &stmt {
self.bind_query(bind_context, query).await
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/sql/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;
use common_ast::parser::parse_sql;
use common_ast::parser::tokenize_sql;
use common_ast::Backtrace;
use common_ast::Dialect;
use common_exception::Result;
use parking_lot::RwLock;
pub use plans::ScalarExpr;
Expand Down Expand Up @@ -61,11 +60,12 @@ impl Planner {

pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, MetadataRef, Option<String>)> {
let settings = self.ctx.get_settings();
let sql_dialect = settings.get_sql_dialect()?;

// Step 1: parse SQL text into AST
let tokens = tokenize_sql(sql)?;
let backtrace = Backtrace::new();
let (stmt, format) = parse_sql(&tokens, Dialect::PostgreSQL, &backtrace)?;
let (stmt, format) = parse_sql(&tokens, sql_dialect, &backtrace)?;

// Step 2: bind AST with catalog, and generate a pure logical SExpr
let metadata = Arc::new(RwLock::new(Metadata::create()));
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/sql/planner/semantic/type_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use common_ast::parser::parse_expr;
use common_ast::parser::token::Token;
use common_ast::parser::tokenize_sql;
use common_ast::Backtrace;
use common_ast::Dialect;
use common_ast::DisplayError;
use common_datavalues::type_coercion::merge_types;
use common_datavalues::ArrayType;
Expand Down Expand Up @@ -1623,9 +1622,11 @@ impl<'a> TypeChecker<'a> {
arguments.len()
))));
}
let settings = self.ctx.get_settings();
let sql_dialect = settings.get_sql_dialect()?;
let backtrace = Backtrace::new();
let sql_tokens = tokenize_sql(udf.definition.as_str())?;
let expr = parse_expr(&sql_tokens, Dialect::PostgreSQL, &backtrace)?;
let expr = parse_expr(&sql_tokens, sql_dialect, &backtrace)?;
let mut args_map = HashMap::new();
arguments.iter().enumerate().for_each(|(idx, argument)| {
if let Some(parameter) = parameters.get(idx) {
Expand Down
Loading

0 comments on commit 11c6c0a

Please sign in to comment.