From 7fde76e33dcc26b0816fc8513c396becd431c1ad Mon Sep 17 00:00:00 2001 From: Jacob Ogle <123908271+JacobOgle@users.noreply.github.com> Date: Fri, 10 Nov 2023 16:06:32 -0500 Subject: [PATCH] Support remaining functions in protobuf serialization, add `expr_fn` for `StructFunction` (#8100) * working fix for #8098 * Added enum match for StringToArray * Added enum match for StructFun as well as mapping to a supporting scalar function * cargo fmt --------- Co-authored-by: Jacob Ogle --- datafusion/expr/src/expr_fn.rs | 7 +++++ .../proto/src/logical_plan/from_proto.rs | 29 ++++++++++++++----- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 5a60c2470c95..5b1050020755 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -871,6 +871,13 @@ scalar_expr!( scalar_expr!(ArrowTypeof, arrow_typeof, val, "data type"); +scalar_expr!( + Struct, + struct_fun, + val, + "returns a vector of fields from the struct" +); + /// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression. pub fn case(expr: Expr) -> CaseBuilder { CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None) diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index cdb0fe9bda7f..a3dcbc3fc80a 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -43,19 +43,20 @@ use datafusion_expr::{ array_has, array_has_all, array_has_any, array_length, array_ndims, array_position, array_positions, array_prepend, array_remove, array_remove_all, array_remove_n, array_repeat, array_replace, array_replace_all, array_replace_n, array_slice, - array_to_string, ascii, asin, asinh, atan, atan2, atanh, bit_length, btrim, - cardinality, cbrt, ceil, character_length, chr, coalesce, concat_expr, + array_to_string, arrow_typeof, ascii, asin, asinh, atan, atan2, atanh, bit_length, + btrim, cardinality, cbrt, ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, cot, current_date, current_time, date_bin, date_part, date_trunc, decode, degrees, digest, encode, exp, expr::{self, InList, Sort, WindowFunction}, - factorial, floor, from_unixtime, gcd, isnan, iszero, lcm, left, ln, log, log10, log2, + factorial, flatten, floor, from_unixtime, gcd, isnan, iszero, lcm, left, ln, log, + log10, log2, logical_plan::{PlanType, StringifiedPlan}, lower, lpad, ltrim, md5, nanvl, now, nullif, octet_length, pi, power, radians, random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, sinh, split_part, sqrt, - starts_with, strpos, substr, substring, tan, tanh, to_hex, to_timestamp_micros, - to_timestamp_millis, to_timestamp_nanos, to_timestamp_seconds, translate, trim, - trunc, upper, uuid, + starts_with, string_to_array, strpos, struct_fun, substr, substring, tan, tanh, + to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_nanos, + to_timestamp_seconds, translate, trim, trunc, upper, uuid, window_frame::regularize, AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, BuiltinScalarFunction, Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet, @@ -1645,9 +1646,21 @@ pub fn parse_expr( )), ScalarFunction::Isnan => Ok(isnan(parse_expr(&args[0], registry)?)), ScalarFunction::Iszero => Ok(iszero(parse_expr(&args[0], registry)?)), - _ => Err(proto_error( - "Protobuf deserialization error: Unsupported scalar function", + ScalarFunction::ArrowTypeof => { + Ok(arrow_typeof(parse_expr(&args[0], registry)?)) + } + ScalarFunction::ToTimestamp => { + Ok(to_timestamp_seconds(parse_expr(&args[0], registry)?)) + } + ScalarFunction::Flatten => Ok(flatten(parse_expr(&args[0], registry)?)), + ScalarFunction::StringToArray => Ok(string_to_array( + parse_expr(&args[0], registry)?, + parse_expr(&args[1], registry)?, + parse_expr(&args[2], registry)?, )), + ScalarFunction::StructFun => { + Ok(struct_fun(parse_expr(&args[0], registry)?)) + } } } ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { fun_name, args }) => {