Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change SQL dialect to PostgreSQL #214

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,6 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
expr_type: Some(ExprType::AggregateExpr(aggregate_expr)),
})
}
Expr::ScalarVariable(_) => unimplemented!(),
Expr::ScalarFunction { ref fun, ref args } => {
let fun: protobuf::ScalarFunction = fun.try_into()?;
let expr: Vec<protobuf::LogicalExprNode> = args
Expand Down
2 changes: 0 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
};
Expand Down Expand Up @@ -387,7 +386,6 @@ fn compile_expr(
let state = ExecutionContextState {
catalog_list,
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
};
Expand Down
47 changes: 0 additions & 47 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use crate::sql::{
parser::{DFParser, FileType},
planner::{ContextProvider, SqlToRel},
};
use crate::variable::{VarProvider, VarType};
use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
Expand Down Expand Up @@ -154,7 +153,6 @@ impl ExecutionContext {
state: Arc::new(Mutex::new(ExecutionContextState {
catalog_list,
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config,
})),
Expand Down Expand Up @@ -219,19 +217,6 @@ impl ExecutionContext {
query_planner.statement_to_plan(&statements[0])
}

/// Registers a variable provider within this context.
pub fn register_variable(
&mut self,
variable_type: VarType,
provider: Arc<dyn VarProvider + Send + Sync>,
) {
self.state
.lock()
.unwrap()
.var_provider
.insert(variable_type, provider);
}

/// Registers a scalar UDF within this context.
///
/// Note in SQL queries, function names are looked up using
Expand Down Expand Up @@ -744,8 +729,6 @@ pub struct ExecutionContextState {
pub catalog_list: Arc<dyn CatalogList>,
/// Scalar functions that are registered with the context
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Variable provider that are registered with the context
pub var_provider: HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>,
/// Aggregate functions registered in the context
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Context configuration
Expand Down Expand Up @@ -837,7 +820,6 @@ mod tests {
use crate::physical_plan::functions::make_scalar_function;
use crate::physical_plan::{collect, collect_partitioned};
use crate::test;
use crate::variable::VarType;
use crate::{
assert_batches_eq, assert_batches_sorted_eq,
logical_plan::{col, create_udf, sum},
Expand Down Expand Up @@ -916,35 +898,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn create_variable_expr() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these were added in apache/arrow#8135 . Perhaps @wqc200 has some comment about how / if this feature is used?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should remove support for scalar variables in the logical plan. This seems unrelated to changing the default SQL dialect. Users can use ScalarVariable without using SQL.

let tmp_dir = TempDir::new()?;
let partition_count = 4;
let mut ctx = create_ctx(&tmp_dir, partition_count)?;

let variable_provider = test::variable::SystemVar::new();
ctx.register_variable(VarType::System, Arc::new(variable_provider));
let variable_provider = test::variable::UserDefinedVar::new();
ctx.register_variable(VarType::UserDefined, Arc::new(variable_provider));

let provider = test::create_table_dual();
ctx.register_table("dual", provider)?;

let results =
plan_and_collect(&mut ctx, "SELECT @@version, @name FROM dual").await?;

let expected = vec![
"+----------------------+------------------------+",
"| @@version | @name |",
"+----------------------+------------------------+",
"| system-var-@@version | user-defined-var-@name |",
"+----------------------+------------------------+",
];
assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn register_deregister() -> Result<()> {
let tmp_dir = TempDir::new()?;
Expand Down
1 change: 0 additions & 1 deletion datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ pub mod physical_plan;
pub mod prelude;
pub mod scalar;
pub mod sql;
pub mod variable;

// re-export dependencies from arrow-rs to minimise version maintenance for crate users
pub use arrow;
Expand Down
8 changes: 0 additions & 8 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ pub enum Expr {
Alias(Box<Expr>, String),
/// A named reference to a field in a schema.
Column(String),
/// A named reference to a variable in a registry.
ScalarVariable(Vec<String>),
/// A constant value.
Literal(ScalarValue),
/// A binary expression such as "age > 21"
Expand Down Expand Up @@ -225,7 +223,6 @@ impl Expr {
.field_with_unqualified_name(name)?
.data_type()
.clone()),
Expr::ScalarVariable(_) => Ok(DataType::Utf8),
Expr::Literal(l) => Ok(l.get_datatype()),
Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
Expr::Cast { data_type, .. } => Ok(data_type.clone()),
Expand Down Expand Up @@ -293,7 +290,6 @@ impl Expr {
.field_with_unqualified_name(name)?
.is_nullable()),
Expr::Literal(value) => Ok(value.is_null()),
Expr::ScalarVariable(_) => Ok(true),
Expr::Case {
when_then_expr,
else_expr,
Expand Down Expand Up @@ -522,7 +518,6 @@ impl Expr {
let visitor = match self {
Expr::Alias(expr, _) => expr.accept(visitor),
Expr::Column(..) => Ok(visitor),
Expr::ScalarVariable(..) => Ok(visitor),
Expr::Literal(..) => Ok(visitor),
Expr::BinaryExpr { left, right, .. } => {
let visitor = left.accept(visitor)?;
Expand Down Expand Up @@ -633,7 +628,6 @@ impl Expr {
let expr = match self {
Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
Expr::Column(name) => Expr::Column(name),
Expr::ScalarVariable(names) => Expr::ScalarVariable(names),
Expr::Literal(value) => Expr::Literal(value),
Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
left: rewrite_boxed(left, rewriter)?,
Expand Down Expand Up @@ -1190,7 +1184,6 @@ impl fmt::Debug for Expr {
match self {
Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias),
Expr::Column(name) => write!(f, "#{}", name),
Expr::ScalarVariable(var_names) => write!(f, "{}", var_names.join(".")),
Expr::Literal(v) => write!(f, "{:?}", v),
Expr::Case {
expr,
Expand Down Expand Up @@ -1305,7 +1298,6 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
match e {
Expr::Alias(_, name) => Ok(name.clone()),
Expr::Column(name) => Ok(name.clone()),
Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{:?}", value)),
Expr::BinaryExpr { left, op, right } => {
let left = create_name(left, input_schema)?;
Expand Down
5 changes: 0 additions & 5 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> {
Expr::Column(name) => {
self.accum.insert(name.clone());
}
Expr::ScalarVariable(var_names) => {
self.accum.insert(var_names.join("."));
}
Expr::Alias(_, _) => {}
Expr::Literal(_) => {}
Expr::BinaryExpr { .. } => {}
Expand Down Expand Up @@ -271,7 +268,6 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
Expr::Column(_) => Ok(vec![]),
Expr::Alias(expr, ..) => Ok(vec![expr.as_ref().to_owned()]),
Expr::Literal(_) => Ok(vec![]),
Expr::ScalarVariable(_) => Ok(vec![]),
Expr::Not(expr) => Ok(vec![expr.as_ref().to_owned()]),
Expr::Negative(expr) => Ok(vec![expr.as_ref().to_owned()]),
Expr::Sort { expr, .. } => Ok(vec![expr.as_ref().to_owned()]),
Expand Down Expand Up @@ -375,7 +371,6 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
Expr::Negative(_) => Ok(Expr::Negative(Box::new(expressions[0].clone()))),
Expr::Column(_) => Ok(expr.clone()),
Expr::Literal(_) => Ok(expr.clone()),
Expr::ScalarVariable(_) => Ok(expr.clone()),
Expr::Sort {
asc, nulls_first, ..
} => Ok(Expr::Sort {
Expand Down
1 change: 0 additions & 1 deletion datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ impl RowGroupPredicateBuilder {
let execution_context_state = ExecutionContextState {
catalog_list: Arc::new(MemoryCatalogList::new()),
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
};
Expand Down
27 changes: 0 additions & 27 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use crate::physical_plan::{hash_utils, Partitioning};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::variable::VarType;
use arrow::compute::can_cast_types;

use arrow::compute::SortOptions;
Expand Down Expand Up @@ -444,31 +443,6 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(Column::new(name)))
}
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
Expr::ScalarVariable(variable_names) => {
if &variable_names[0][0..2] == "@@" {
match ctx_state.var_provider.get(&VarType::System) {
Some(provider) => {
let scalar_value =
provider.get_value(variable_names.clone())?;
Ok(Arc::new(Literal::new(scalar_value)))
}
_ => Err(DataFusionError::Plan(
"No system variable provider found".to_string(),
)),
}
} else {
match ctx_state.var_provider.get(&VarType::UserDefined) {
Some(provider) => {
let scalar_value =
provider.get_value(variable_names.clone())?;
Ok(Arc::new(Literal::new(scalar_value)))
}
_ => Err(DataFusionError::Plan(
"No user defined variable provider found".to_string(),
)),
}
}
}
Expr::BinaryExpr { left, op, right } => {
let lhs = self.create_physical_expr(left, input_schema, ctx_state)?;
let rhs = self.create_physical_expr(right, input_schema, ctx_state)?;
Expand Down Expand Up @@ -755,7 +729,6 @@ mod tests {
ExecutionContextState {
catalog_list: Arc::new(MemoryCatalogList::new()),
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/sql/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

use sqlparser::{
ast::{ColumnDef, ColumnOptionDef, Statement as SQLStatement, TableConstraint},
dialect::{keywords::Keyword, Dialect, GenericDialect},
dialect::{keywords::Keyword, Dialect, PostgreSqlDialect},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach might be to make the SQL dialect be configurable on ExecutionConfig so that users could choose what dialect they wanted to mimic: https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/execution/context.rs#L606

I think @andygrove has said in the past using DataFusion to mimic engines such as MySQL is a good usecase too

Copy link
Contributor Author

@returnString returnString Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting approach - we'd have to be careful about our sqlparser => expr conversions, but that could be quite useful as a feature 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more over lunch: if we enable a bring-your-own-dialect setup, we'd need a decent testing strategy to support this. In this example, @someval will be parsed as UnaryOp { op: PGAbs, expr: Ident("someval") } for Postgres, so we'd need to decide how we implement DF-specific parsing overrides, e.g. as used to support this var provider system, on a per-dialect basis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does sound fairly complicated.... I personally / project wise don't have a need for a MySQL specific mimic (postgres is good enough for us in IOx) but I think perhaps we should let others weigh in here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky for sure. I would be fine with having Postgres as the default and officially supported (well tested) dialect, while also allowing users to provide a dialect at their own risk (and have this be well documented) but even that might create an undue burden on maintainers. It would definitely be good to try and find out more about our user's requirements here.

Copy link
Contributor Author

@returnString returnString Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be totally clear, I'm definitely not married to the contents of this PR as they stand; if the consensus is that we do want to support multiple dialects, I'm happy to rework it towards that goal 🙂

If that's a route we want to explore in depth, we could perhaps build some sort of dialect config setup that allows for controlling parser overrides in DF, but that feels a bit too heavyweight if we don't have tonnes of use cases right now (and hence no-one to drive or own that work).

Here's my immediate idea:

  • revert the deletion of all the logical plan stuff here, and just retain the parser override deletion
  • make the dialect default to Postgres, but be configurable, with a doc warning mentioning the potential risks and edge cases
  • mention nullary functions as a replacement for scalar variables in SQL in the next set of release notes
  • random bonus thought: maybe expose a unary function to retrieve scalar variables by name from SQL?

Does that sound vaguely sensible?

Full disclosure: my goal here is to get a DataFusion-powered service queryable via BI tooling, and these tools often use the more esoteric features of the Postgres dialect to bootstrap their UIs (think table listings etc). I still have a fair bit of work to do on this, but I can at least sort of see a path to having it working now. I suspect this will be beneficial for lots of other use cases, but admittedly I don't have any evidence for that claim beyond intuition 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the dialect default to Postgres, but be configurable, with a doc warning mentioning the potential risks and edge cases

I think this makes sense

mention nullary functions as a replacement for scalar variables in SQL in the next set of release notes

👍

random bonus thought: maybe expose a unary function to retrieve scalar variables by name from SQL?

This also sounds good -- if you wrote it up as a ticket, I bet it is a good "first issue" type ticket for someone else in the community to do if they were looking for something to learn DataFusion code a bit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the continued discussion and progress on this. I think the outcome is good. Just for context, I have built database gateways in the past that mimic MySQL and Hive protocols and dialects (different projects) and although I don't work on any projects these days that need either, I am just trying to keep options open for future users of DataFusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I definitely wouldn't want to impose unnecessary constraints on people who did want to use other dialects :) If people are happy with the ideas in my last comment, I'll close this, get some new issues logged, and start tackling those.

parser::{Parser, ParserError},
tokenizer::{Token, Tokenizer},
};
Expand Down Expand Up @@ -78,7 +78,7 @@ pub struct DFParser<'a> {
impl<'a> DFParser<'a> {
/// Parse the specified tokens
pub fn new(sql: &str) -> Result<Self, ParserError> {
let dialect = &GenericDialect {};
let dialect = &PostgreSqlDialect {};
DFParser::new_with_dialect(sql, dialect)
}

Expand All @@ -97,7 +97,7 @@ impl<'a> DFParser<'a> {

/// Parse a SQL statement and produce a set of statements with dialect
pub fn parse_sql(sql: &str) -> Result<Vec<Statement>, ParserError> {
let dialect = &GenericDialect {};
let dialect = &PostgreSqlDialect {};
DFParser::parse_sql_with_dialect(sql, dialect)
}

Expand Down
21 changes: 5 additions & 16 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,28 +866,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fractional_seconds_precision,
),

SQLExpr::Identifier(ref id) => {
if &id.value[0..1] == "@" {
let var_names = vec![id.value.clone()];
Ok(Expr::ScalarVariable(var_names))
} else {
Ok(Expr::Column(id.value.to_string()))
}
}
SQLExpr::Identifier(ref id) => Ok(Expr::Column(id.value.to_string())),

SQLExpr::CompoundIdentifier(ids) => {
let mut var_names = vec![];
for id in ids {
var_names.push(id.value.clone());
}
if &var_names[0][0..1] == "@" {
Ok(Expr::ScalarVariable(var_names))
} else {
Err(DataFusionError::NotImplemented(format!(
"Unsupported compound identifier '{:?}'",
var_names,
)))
}
Err(DataFusionError::NotImplemented(format!(
"Unsupported compound identifier '{:?}'",
var_names,
)))
}

SQLExpr::Wildcard => Ok(Expr::Wildcard),
Expand Down
4 changes: 1 addition & 3 deletions datafusion/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,7 @@ where
asc: *asc,
nulls_first: *nulls_first,
}),
Expr::Column(_) | Expr::Literal(_) | Expr::ScalarVariable(_) => {
Ok(expr.clone())
}
Expr::Column(_) | Expr::Literal(_) => Ok(expr.clone()),
Expr::Wildcard => Ok(Expr::Wildcard),
},
}
Expand Down
1 change: 0 additions & 1 deletion datafusion/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ pub fn make_timestamps() -> RecordBatch {

pub mod exec;
pub mod user_defined;
pub mod variable;

/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
Expand Down
58 changes: 0 additions & 58 deletions datafusion/src/test/variable.rs

This file was deleted.

Loading