Skip to content

Commit

Permalink
parse sql struct wrapped in cast
Browse files Browse the repository at this point in the history
  • Loading branch information
gstvg committed Mar 25, 2024
1 parent 47f4b5a commit 1863a69
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 13 deletions.
41 changes: 35 additions & 6 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ mod substring;
mod unary_op;
mod value;

use std::sync::Arc;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::TimeUnit;
use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_err, Column, DFSchema,
Expand All @@ -36,6 +39,7 @@ use datafusion_common::{
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::expr::InList;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Literal;
use datafusion_expr::{
col, expr, lit, AggregateFunction, Between, BinaryExpr, BuiltinScalarFunction, Cast,
Expr, ExprSchemable, GetFieldAccess, GetIndexedField, Like, Operator, TryCast,
Expand Down Expand Up @@ -600,22 +604,47 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if !fields.is_empty() {
return not_impl_err!("Struct fields are not supported yet");
}
let args = values
let (fields, args) = values
.into_iter()
.map(|value| {
self.sql_expr_to_logical_expr(value, input_schema, planner_context)
.enumerate()
.map(|(i, value)| {

let (name, unnamed_expr) = match value {
SQLExpr::Named { expr, name } => {
(name.value, *expr)
}
other => {
(
format!("c{i}"),
other
)
}
};

let logical_expr = self.sql_expr_to_logical_expr(unnamed_expr, input_schema, planner_context)?;
let field = Field::new(name, logical_expr.get_type(input_schema)?, true);

Ok((Arc::new(field), logical_expr))
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>()?
.into_iter()
.unzip::<_, _, Vec<_>, _>();

let data_type = DataType::Struct(fields.into());

let struct_func = self
.context_provider
.get_function_meta("struct")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'struct' function")
})?;
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(

let struct_call = Expr::ScalarFunction(ScalarFunction::new_udf(
struct_func,
args,
)))
));

Ok(Expr::Cast(Cast::new(Box::new(struct_call), data_type)))
}

fn parse_array_agg(
Expand Down
14 changes: 7 additions & 7 deletions datafusion/sqllogictest/test_files/struct.slt
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,21 @@ select struct(1, 3.14, 'e');

# struct scalar function with columns #1
query ?
select struct(a, b, c) from values;
select struct(a, b as b_name, c) from values;
----
{c0: 1, c1: 1.1, c2: a}
{c0: 2, c1: 2.2, c2: b}
{c0: 3, c1: 3.3, c2: c}
{c0: 1, b_name: 1.1, c2: a}
{c0: 2, b_name: 2.2, c2: b}
{c0: 3, b_name: 3.3, c2: c}

# explain struct scalar function with columns #1
query TT
explain select struct(a, b, c) from values;
explain select struct(a, b as b_name, c) from values;
----
logical_plan
Projection: struct(values.a, values.b, values.c)
Projection: CAST(struct(values.a, values.b, values.c) AS Struct([Field { name: "c0", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b_name", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]))
--TableScan: values projection=[a, b, c]
physical_plan
ProjectionExec: expr=[struct(a@0, b@1, c@2) as struct(values.a,values.b,values.c)]
ProjectionExec: expr=[CAST(struct(a@0, b@1, c@2) AS Struct([Field { name: "c0", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b_name", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])) as struct(values.a,values.b,values.c)]
--MemoryExec: partitions=1, partition_sizes=[1]

statement ok
Expand Down

0 comments on commit 1863a69

Please sign in to comment.