From d969a06d3b871ef050e5736287517e5b84343ea3 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 25 Mar 2022 02:32:07 +0000 Subject: [PATCH] execinfrapb: break the dependency on sem/builtins This commit moves several utility functions from `execinfrapb` into `execinfra` package in order to break the dependency of the former on `sem/builtins` (which eventually depends on the `c-deps`). Release note: None --- pkg/sql/colexec/colexecagg/BUILD.bazel | 1 + pkg/sql/colexec/colexecagg/aggregate_funcs.go | 7 +- .../colexec/colexecagg/aggregators_util.go | 3 +- .../colexec/colexecagg/default_agg_tmpl.go | 6 +- .../colexec/colexecagg/hash_default_agg.eg.go | 6 +- .../colexecagg/ordered_default_agg.eg.go | 6 +- pkg/sql/distsql/columnar_operators_test.go | 5 +- pkg/sql/distsql_physical_planner.go | 6 +- pkg/sql/distsql_plan_window.go | 3 +- pkg/sql/execinfra/BUILD.bazel | 2 + pkg/sql/execinfra/aggregatorbase.go | 161 ++++++++++++++++++ pkg/sql/execinfrapb/BUILD.bazel | 1 - pkg/sql/execinfrapb/processors.go | 142 --------------- pkg/sql/physicalplan/aggregator_funcs_test.go | 6 +- pkg/sql/rowexec/aggregator.go | 2 +- pkg/sql/rowexec/windower.go | 2 +- 16 files changed, 192 insertions(+), 167 deletions(-) create mode 100644 pkg/sql/execinfra/aggregatorbase.go diff --git a/pkg/sql/colexec/colexecagg/BUILD.bazel b/pkg/sql/colexec/colexecagg/BUILD.bazel index 2e2ce136e97d..bfa781c5da3c 100644 --- a/pkg/sql/colexec/colexecagg/BUILD.bazel +++ b/pkg/sql/colexec/colexecagg/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/sql/colexecerror", "//pkg/sql/colexecop", "//pkg/sql/colmem", + "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/sem/tree", "//pkg/sql/types", diff --git a/pkg/sql/colexec/colexecagg/aggregate_funcs.go b/pkg/sql/colexec/colexecagg/aggregate_funcs.go index 7943eae67564..c717de3399cb 100644 --- a/pkg/sql/colexec/colexecagg/aggregate_funcs.go +++ b/pkg/sql/colexec/colexecagg/aggregate_funcs.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -446,16 +447,16 @@ func ProcessAggregations( aggregations []execinfrapb.AggregatorSpec_Aggregation, inputTypes []*types.T, ) ( - constructors []execinfrapb.AggregateConstructor, + constructors []execinfra.AggregateConstructor, constArguments []tree.Datums, outputTypes []*types.T, err error, ) { - constructors = make([]execinfrapb.AggregateConstructor, len(aggregations)) + constructors = make([]execinfra.AggregateConstructor, len(aggregations)) constArguments = make([]tree.Datums, len(aggregations)) outputTypes = make([]*types.T, len(aggregations)) for i, aggFn := range aggregations { - constructors[i], constArguments[i], outputTypes[i], err = execinfrapb.GetAggregateConstructor( + constructors[i], constArguments[i], outputTypes[i], err = execinfra.GetAggregateConstructor( evalCtx, semaCtx, &aggFn, inputTypes, ) if err != nil { diff --git a/pkg/sql/colexec/colexecagg/aggregators_util.go b/pkg/sql/colexec/colexecagg/aggregators_util.go index c59b7cab74c9..a26e2a7115cd 100644 --- a/pkg/sql/colexec/colexecagg/aggregators_util.go +++ b/pkg/sql/colexec/colexecagg/aggregators_util.go @@ -13,6 +13,7 @@ package colexecagg import ( "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -30,7 +31,7 @@ type NewAggregatorArgs struct { InputTypes []*types.T Spec *execinfrapb.AggregatorSpec EvalCtx *tree.EvalContext - Constructors []execinfrapb.AggregateConstructor + Constructors []execinfra.AggregateConstructor ConstArguments []tree.Datums OutputTypes []*types.T } diff --git a/pkg/sql/colexec/colexecagg/default_agg_tmpl.go b/pkg/sql/colexec/colexecagg/default_agg_tmpl.go index 5242a093e9f2..7248b6ea56a6 100644 --- a/pkg/sql/colexec/colexecagg/default_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/default_agg_tmpl.go @@ -30,7 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" ) @@ -117,7 +117,7 @@ func (a *default_AGGKINDAgg) Reset() { func newDefault_AGGKINDAggAlloc( allocator *colmem.Allocator, - constructor execinfrapb.AggregateConstructor, + constructor execinfra.AggregateConstructor, evalCtx *tree.EvalContext, inputArgsConverter *colconv.VecToDatumConverter, numArguments int, @@ -147,7 +147,7 @@ type default_AGGKINDAggAlloc struct { aggAllocBase aggFuncs []default_AGGKINDAgg - constructor execinfrapb.AggregateConstructor + constructor execinfra.AggregateConstructor evalCtx *tree.EvalContext // inputArgsConverter is a converter from coldata.Vecs to tree.Datums that // is shared among all aggregate functions and is managed by the aggregator diff --git a/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go index 7dce22f8b257..469ae0e1a4c8 100644 --- a/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go @@ -18,7 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" ) @@ -88,7 +88,7 @@ func (a *defaultHashAgg) Reset() { func newDefaultHashAggAlloc( allocator *colmem.Allocator, - constructor execinfrapb.AggregateConstructor, + constructor execinfra.AggregateConstructor, evalCtx *tree.EvalContext, inputArgsConverter *colconv.VecToDatumConverter, numArguments int, @@ -118,7 +118,7 @@ type defaultHashAggAlloc struct { aggAllocBase aggFuncs []defaultHashAgg - constructor execinfrapb.AggregateConstructor + constructor execinfra.AggregateConstructor evalCtx *tree.EvalContext // inputArgsConverter is a converter from coldata.Vecs to tree.Datums that // is shared among all aggregate functions and is managed by the aggregator diff --git a/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go index 92cf0fed9f56..eee77538db76 100644 --- a/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go @@ -18,7 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" ) @@ -153,7 +153,7 @@ func (a *defaultOrderedAgg) Reset() { func newDefaultOrderedAggAlloc( allocator *colmem.Allocator, - constructor execinfrapb.AggregateConstructor, + constructor execinfra.AggregateConstructor, evalCtx *tree.EvalContext, inputArgsConverter *colconv.VecToDatumConverter, numArguments int, @@ -183,7 +183,7 @@ type defaultOrderedAggAlloc struct { aggAllocBase aggFuncs []defaultOrderedAgg - constructor execinfrapb.AggregateConstructor + constructor execinfra.AggregateConstructor evalCtx *tree.EvalContext // inputArgsConverter is a converter from coldata.Vecs to tree.Datums that // is shared among all aggregate functions and is managed by the aggregator diff --git a/pkg/sql/distsql/columnar_operators_test.go b/pkg/sql/distsql/columnar_operators_test.go index 3204b4a49384..c25ee39872af 100644 --- a/pkg/sql/distsql/columnar_operators_test.go +++ b/pkg/sql/distsql/columnar_operators_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecwindow" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -273,7 +274,7 @@ func TestAggregatorAgainstProcessor(t *testing.T) { for _, typ := range aggFnInputTypes { hasJSONColumn = hasJSONColumn || typ.Family() == types.JsonFamily } - if _, outputType, err := execinfrapb.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil { + if _, outputType, err := execinfra.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil { outputTypes[i] = outputType break } @@ -1200,7 +1201,7 @@ func TestWindowFunctionsAgainstProcessor(t *testing.T) { } windowerSpec.WindowFns[0].Frame = generateWindowFrame(t, rng, &ordering, inputTypes) - _, outputType, err := execinfrapb.GetWindowFunctionInfo(fun, argTypes...) + _, outputType, err := execinfra.GetWindowFunctionInfo(fun, argTypes...) require.NoError(t, err) pspec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}}, diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index e8096f5fd59d..677e23309918 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1929,7 +1929,7 @@ func (dsp *DistSQLPlanner) planAggregators( for j, c := range e.ColIdx { argTypes[j] = inputTypes[c] } - _, outputType, err := execinfrapb.GetAggregateInfo(localFunc, argTypes...) + _, outputType, err := execinfra.GetAggregateInfo(localFunc, argTypes...) if err != nil { return err } @@ -1981,7 +1981,7 @@ func (dsp *DistSQLPlanner) planAggregators( // the current aggregation e. argTypes[i] = intermediateTypes[argIdxs[i]] } - _, outputType, err := execinfrapb.GetAggregateInfo(finalInfo.Fn, argTypes...) + _, outputType, err := execinfra.GetAggregateInfo(finalInfo.Fn, argTypes...) if err != nil { return err } @@ -2136,7 +2136,7 @@ func (dsp *DistSQLPlanner) planAggregators( } copy(argTypes[len(agg.ColIdx):], info.argumentsColumnTypes[i]) var err error - _, returnTyp, err := execinfrapb.GetAggregateInfo(agg.Func, argTypes...) + _, returnTyp, err := execinfra.GetAggregateInfo(agg.Func, argTypes...) if err != nil { return err } diff --git a/pkg/sql/distsql_plan_window.go b/pkg/sql/distsql_plan_window.go index d625471a583d..5aa771b897c8 100644 --- a/pkg/sql/distsql_plan_window.go +++ b/pkg/sql/distsql_plan_window.go @@ -11,6 +11,7 @@ package sql import ( + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -109,7 +110,7 @@ func (s *windowPlanState) createWindowFnSpec( for i, argIdx := range funcInProgress.argsIdxs { argTypes[i] = s.plan.GetResultTypes()[argIdx] } - _, outputType, err := execinfrapb.GetWindowFunctionInfo(funcSpec, argTypes...) + _, outputType, err := execinfra.GetWindowFunctionInfo(funcSpec, argTypes...) if err != nil { return execinfrapb.WindowerSpec_WindowFn{}, outputType, err } diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel index 51d8a526cb32..b81635763f63 100644 --- a/pkg/sql/execinfra/BUILD.bazel +++ b/pkg/sql/execinfra/BUILD.bazel @@ -4,6 +4,7 @@ load("//build:STRINGER.bzl", "stringer") go_library( name = "execinfra", srcs = [ + "aggregatorbase.go", "base.go", "flow_context.go", "metadata_test_receiver.go", @@ -48,6 +49,7 @@ go_library( "//pkg/sql/row", "//pkg/sql/rowenc", "//pkg/sql/rowenc/valueside", + "//pkg/sql/sem/builtins", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlliveness", diff --git a/pkg/sql/execinfra/aggregatorbase.go b/pkg/sql/execinfra/aggregatorbase.go new file mode 100644 index 000000000000..e032a791c175 --- /dev/null +++ b/pkg/sql/execinfra/aggregatorbase.go @@ -0,0 +1,161 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package execinfra + +import ( + "strings" + + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/errors" +) + +// AggregateConstructor is a function that creates an aggregate function. +type AggregateConstructor func(*tree.EvalContext, tree.Datums) tree.AggregateFunc + +// GetAggregateInfo returns the aggregate constructor and the return type for +// the given aggregate function when applied on the given type. +func GetAggregateInfo( + fn execinfrapb.AggregatorSpec_Func, inputTypes ...*types.T, +) (aggregateConstructor AggregateConstructor, returnType *types.T, err error) { + if fn == execinfrapb.AnyNotNull { + // The ANY_NOT_NULL builtin does not have a fixed return type; + // handle it separately. + if len(inputTypes) != 1 { + return nil, nil, errors.Errorf("any_not_null aggregate needs 1 input") + } + return builtins.NewAnyNotNullAggregate, inputTypes[0], nil + } + + props, builtins := builtins.GetBuiltinProperties(strings.ToLower(fn.String())) + for _, b := range builtins { + typs := b.Types.Types() + if len(typs) != len(inputTypes) { + continue + } + match := true + for i, t := range typs { + if !inputTypes[i].Equivalent(t) { + if props.NullableArgs && inputTypes[i].IsAmbiguous() { + continue + } + match = false + break + } + } + if match { + // Found! + constructAgg := func(evalCtx *tree.EvalContext, arguments tree.Datums) tree.AggregateFunc { + return b.AggregateFunc(inputTypes, evalCtx, arguments) + } + colTyp := b.InferReturnTypeFromInputArgTypes(inputTypes) + return constructAgg, colTyp, nil + } + } + return nil, nil, errors.Errorf( + "no builtin aggregate for %s on %+v", fn, inputTypes, + ) +} + +// GetAggregateConstructor processes the specification of a single aggregate +// function. +// +// evalCtx will not be mutated. +func GetAggregateConstructor( + evalCtx *tree.EvalContext, + semaCtx *tree.SemaContext, + aggInfo *execinfrapb.AggregatorSpec_Aggregation, + inputTypes []*types.T, +) (constructor AggregateConstructor, arguments tree.Datums, outputType *types.T, err error) { + argTypes := make([]*types.T, len(aggInfo.ColIdx)+len(aggInfo.Arguments)) + for j, c := range aggInfo.ColIdx { + if c >= uint32(len(inputTypes)) { + err = errors.Errorf("ColIdx out of range (%d)", aggInfo.ColIdx) + return + } + argTypes[j] = inputTypes[c] + } + arguments = make(tree.Datums, len(aggInfo.Arguments)) + var d tree.Datum + for j, argument := range aggInfo.Arguments { + h := execinfrapb.ExprHelper{} + // Pass nil types and row - there are no variables in these expressions. + if err = h.Init(argument, nil /* types */, semaCtx, evalCtx); err != nil { + err = errors.Wrapf(err, "%s", argument) + return + } + d, err = h.Eval(nil /* row */) + if err != nil { + err = errors.Wrapf(err, "%s", argument) + return + } + argTypes[len(aggInfo.ColIdx)+j] = d.ResolvedType() + arguments[j] = d + } + constructor, outputType, err = GetAggregateInfo(aggInfo.Func, argTypes...) + return +} + +// GetWindowFunctionInfo returns windowFunc constructor and the return type +// when given fn is applied to given inputTypes. +func GetWindowFunctionInfo( + fn execinfrapb.WindowerSpec_Func, inputTypes ...*types.T, +) (windowConstructor func(*tree.EvalContext) tree.WindowFunc, returnType *types.T, err error) { + if fn.AggregateFunc != nil && *fn.AggregateFunc == execinfrapb.AnyNotNull { + // The ANY_NOT_NULL builtin does not have a fixed return type; + // handle it separately. + if len(inputTypes) != 1 { + return nil, nil, errors.Errorf("any_not_null aggregate needs 1 input") + } + return builtins.NewAggregateWindowFunc(builtins.NewAnyNotNullAggregate), inputTypes[0], nil + } + + var funcStr string + if fn.AggregateFunc != nil { + funcStr = fn.AggregateFunc.String() + } else if fn.WindowFunc != nil { + funcStr = fn.WindowFunc.String() + } else { + return nil, nil, errors.Errorf( + "function is neither an aggregate nor a window function", + ) + } + props, builtins := builtins.GetBuiltinProperties(strings.ToLower(funcStr)) + for _, b := range builtins { + typs := b.Types.Types() + if len(typs) != len(inputTypes) { + continue + } + match := true + for i, t := range typs { + if !inputTypes[i].Equivalent(t) { + if props.NullableArgs && inputTypes[i].IsAmbiguous() { + continue + } + match = false + break + } + } + if match { + // Found! + constructAgg := func(evalCtx *tree.EvalContext) tree.WindowFunc { + return b.WindowFunc(inputTypes, evalCtx) + } + colTyp := b.InferReturnTypeFromInputArgTypes(inputTypes) + return constructAgg, colTyp, nil + } + } + return nil, nil, errors.Errorf( + "no builtin aggregate/window function for %s on %v", funcStr, inputTypes, + ) +} diff --git a/pkg/sql/execinfrapb/BUILD.bazel b/pkg/sql/execinfrapb/BUILD.bazel index 8eb5433c4b23..157f28c94fa7 100644 --- a/pkg/sql/execinfrapb/BUILD.bazel +++ b/pkg/sql/execinfrapb/BUILD.bazel @@ -32,7 +32,6 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/rowenc", - "//pkg/sql/sem/builtins", "//pkg/sql/sem/transform", "//pkg/sql/sem/tree", "//pkg/sql/sem/tree/treewindow", diff --git a/pkg/sql/execinfrapb/processors.go b/pkg/sql/execinfrapb/processors.go index f46ffa34a98b..3fbfa4399a51 100644 --- a/pkg/sql/execinfrapb/processors.go +++ b/pkg/sql/execinfrapb/processors.go @@ -17,10 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" - "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treewindow" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/duration" "github.com/cockroachdb/errors" ) @@ -36,92 +34,6 @@ func GetAggregateFuncIdx(funcName string) (int32, error) { return funcIdx, nil } -// AggregateConstructor is a function that creates an aggregate function. -type AggregateConstructor func(*tree.EvalContext, tree.Datums) tree.AggregateFunc - -// GetAggregateInfo returns the aggregate constructor and the return type for -// the given aggregate function when applied on the given type. -func GetAggregateInfo( - fn AggregatorSpec_Func, inputTypes ...*types.T, -) (aggregateConstructor AggregateConstructor, returnType *types.T, err error) { - if fn == AnyNotNull { - // The ANY_NOT_NULL builtin does not have a fixed return type; - // handle it separately. - if len(inputTypes) != 1 { - return nil, nil, errors.Errorf("any_not_null aggregate needs 1 input") - } - return builtins.NewAnyNotNullAggregate, inputTypes[0], nil - } - - props, builtins := builtins.GetBuiltinProperties(strings.ToLower(fn.String())) - for _, b := range builtins { - typs := b.Types.Types() - if len(typs) != len(inputTypes) { - continue - } - match := true - for i, t := range typs { - if !inputTypes[i].Equivalent(t) { - if props.NullableArgs && inputTypes[i].IsAmbiguous() { - continue - } - match = false - break - } - } - if match { - // Found! - constructAgg := func(evalCtx *tree.EvalContext, arguments tree.Datums) tree.AggregateFunc { - return b.AggregateFunc(inputTypes, evalCtx, arguments) - } - colTyp := b.InferReturnTypeFromInputArgTypes(inputTypes) - return constructAgg, colTyp, nil - } - } - return nil, nil, errors.Errorf( - "no builtin aggregate for %s on %+v", fn, inputTypes, - ) -} - -// GetAggregateConstructor processes the specification of a single aggregate -// function. -// -// evalCtx will not be mutated. -func GetAggregateConstructor( - evalCtx *tree.EvalContext, - semaCtx *tree.SemaContext, - aggInfo *AggregatorSpec_Aggregation, - inputTypes []*types.T, -) (constructor AggregateConstructor, arguments tree.Datums, outputType *types.T, err error) { - argTypes := make([]*types.T, len(aggInfo.ColIdx)+len(aggInfo.Arguments)) - for j, c := range aggInfo.ColIdx { - if c >= uint32(len(inputTypes)) { - err = errors.Errorf("ColIdx out of range (%d)", aggInfo.ColIdx) - return - } - argTypes[j] = inputTypes[c] - } - arguments = make(tree.Datums, len(aggInfo.Arguments)) - var d tree.Datum - for j, argument := range aggInfo.Arguments { - h := ExprHelper{} - // Pass nil types and row - there are no variables in these expressions. - if err = h.Init(argument, nil /* types */, semaCtx, evalCtx); err != nil { - err = errors.Wrapf(err, "%s", argument) - return - } - d, err = h.Eval(nil /* row */) - if err != nil { - err = errors.Wrapf(err, "%s", argument) - return - } - argTypes[len(aggInfo.ColIdx)+j] = d.ResolvedType() - arguments[j] = d - } - constructor, outputType, err = GetAggregateInfo(aggInfo.Func, argTypes...) - return -} - // Equals returns true if two aggregation specifiers are identical (and thus // will always yield the same result). func (a AggregatorSpec_Aggregation) Equals(b AggregatorSpec_Aggregation) bool { @@ -182,60 +94,6 @@ func GetWindowFuncIdx(funcName string) (int32, error) { return funcIdx, nil } -// GetWindowFunctionInfo returns windowFunc constructor and the return type -// when given fn is applied to given inputTypes. -func GetWindowFunctionInfo( - fn WindowerSpec_Func, inputTypes ...*types.T, -) (windowConstructor func(*tree.EvalContext) tree.WindowFunc, returnType *types.T, err error) { - if fn.AggregateFunc != nil && *fn.AggregateFunc == AnyNotNull { - // The ANY_NOT_NULL builtin does not have a fixed return type; - // handle it separately. - if len(inputTypes) != 1 { - return nil, nil, errors.Errorf("any_not_null aggregate needs 1 input") - } - return builtins.NewAggregateWindowFunc(builtins.NewAnyNotNullAggregate), inputTypes[0], nil - } - - var funcStr string - if fn.AggregateFunc != nil { - funcStr = fn.AggregateFunc.String() - } else if fn.WindowFunc != nil { - funcStr = fn.WindowFunc.String() - } else { - return nil, nil, errors.Errorf( - "function is neither an aggregate nor a window function", - ) - } - props, builtins := builtins.GetBuiltinProperties(strings.ToLower(funcStr)) - for _, b := range builtins { - typs := b.Types.Types() - if len(typs) != len(inputTypes) { - continue - } - match := true - for i, t := range typs { - if !inputTypes[i].Equivalent(t) { - if props.NullableArgs && inputTypes[i].IsAmbiguous() { - continue - } - match = false - break - } - } - if match { - // Found! - constructAgg := func(evalCtx *tree.EvalContext) tree.WindowFunc { - return b.WindowFunc(inputTypes, evalCtx) - } - colTyp := b.InferReturnTypeFromInputArgTypes(inputTypes) - return constructAgg, colTyp, nil - } - } - return nil, nil, errors.Errorf( - "no builtin aggregate/window function for %s on %v", funcStr, inputTypes, - ) -} - func (spec *WindowerSpec_Frame_Mode) initFromAST(w treewindow.WindowFrameMode) error { switch w { case treewindow.RANGE: diff --git a/pkg/sql/physicalplan/aggregator_funcs_test.go b/pkg/sql/physicalplan/aggregator_funcs_test.go index 77edd6d3a6dc..fd8ebc691a46 100644 --- a/pkg/sql/physicalplan/aggregator_funcs_test.go +++ b/pkg/sql/physicalplan/aggregator_funcs_test.go @@ -188,7 +188,7 @@ func checkDistAggregationInfo( intermediaryTypes := make([]*types.T, numIntermediary) for i, fn := range info.LocalStage { var err error - _, returnTyp, err := execinfrapb.GetAggregateInfo(fn, colTypes...) + _, returnTyp, err := execinfra.GetAggregateInfo(fn, colTypes...) if err != nil { t.Fatal(err) } @@ -207,7 +207,7 @@ func checkDistAggregationInfo( inputTypes[i] = intermediaryTypes[localIdx] } var err error - _, finalOutputTypes[i], err = execinfrapb.GetAggregateInfo(finalInfo.Fn, inputTypes...) + _, finalOutputTypes[i], err = execinfra.GetAggregateInfo(finalInfo.Fn, inputTypes...) if err != nil { t.Fatal(err) } @@ -509,7 +509,7 @@ func TestSingleArgumentDistAggregateFunctions(t *testing.T) { continue } // See if this column works with this function. - _, _, err := execinfrapb.GetAggregateInfo(fn, col.GetType()) + _, _, err := execinfra.GetAggregateInfo(fn, col.GetType()) if err != nil { continue } diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index 3531e05b1abe..c0ed8fc3aeca 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -130,7 +130,7 @@ func (ag *aggregatorBase) init( ) } } - constructor, arguments, outputType, err := execinfrapb.GetAggregateConstructor( + constructor, arguments, outputType, err := execinfra.GetAggregateConstructor( flowCtx.EvalCtx, semaCtx, &aggInfo, ag.inputTypes, ) if err != nil { diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index ff62832c984c..cbd0e63631b7 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -122,7 +122,7 @@ func newWindower( for i, argIdx := range windowFn.ArgsIdxs { argTypes[i] = w.inputTypes[argIdx] } - windowConstructor, outputType, err := execinfrapb.GetWindowFunctionInfo(windowFn.Func, argTypes...) + windowConstructor, outputType, err := execinfra.GetWindowFunctionInfo(windowFn.Func, argTypes...) if err != nil { return nil, err }