Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: execinfrapb: break the dependency on sem/builtins #79077

Merged
merged 1 commit into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecagg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/colexec/colexecagg/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -446,16 +447,16 @@ func ProcessAggregations(
aggregations []execinfrapb.AggregatorSpec_Aggregation,
inputTypes []*types.T,
) (
constructors []execinfrapb.AggregateConstructor,
constructors []execinfra.AggregateConstructor,
constArguments []tree.Datums,
outputTypes []*types.T,
err error,
) {
constructors = make([]execinfrapb.AggregateConstructor, len(aggregations))
constructors = make([]execinfra.AggregateConstructor, len(aggregations))
constArguments = make([]tree.Datums, len(aggregations))
outputTypes = make([]*types.T, len(aggregations))
for i, aggFn := range aggregations {
constructors[i], constArguments[i], outputTypes[i], err = execinfrapb.GetAggregateConstructor(
constructors[i], constArguments[i], outputTypes[i], err = execinfra.GetAggregateConstructor(
evalCtx, semaCtx, &aggFn, inputTypes,
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colexecagg/aggregators_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package colexecagg
import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand All @@ -30,7 +31,7 @@ type NewAggregatorArgs struct {
InputTypes []*types.T
Spec *execinfrapb.AggregatorSpec
EvalCtx *tree.EvalContext
Constructors []execinfrapb.AggregateConstructor
Constructors []execinfra.AggregateConstructor
ConstArguments []tree.Datums
OutputTypes []*types.T
}
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecagg/default_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)
Expand Down Expand Up @@ -117,7 +117,7 @@ func (a *default_AGGKINDAgg) Reset() {

func newDefault_AGGKINDAggAlloc(
allocator *colmem.Allocator,
constructor execinfrapb.AggregateConstructor,
constructor execinfra.AggregateConstructor,
evalCtx *tree.EvalContext,
inputArgsConverter *colconv.VecToDatumConverter,
numArguments int,
Expand Down Expand Up @@ -147,7 +147,7 @@ type default_AGGKINDAggAlloc struct {
aggAllocBase
aggFuncs []default_AGGKINDAgg

constructor execinfrapb.AggregateConstructor
constructor execinfra.AggregateConstructor
evalCtx *tree.EvalContext
// inputArgsConverter is a converter from coldata.Vecs to tree.Datums that
// is shared among all aggregate functions and is managed by the aggregator
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecagg/hash_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/sql/distsql/columnar_operators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecwindow"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand Down Expand Up @@ -273,7 +274,7 @@ func TestAggregatorAgainstProcessor(t *testing.T) {
for _, typ := range aggFnInputTypes {
hasJSONColumn = hasJSONColumn || typ.Family() == types.JsonFamily
}
if _, outputType, err := execinfrapb.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil {
if _, outputType, err := execinfra.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil {
outputTypes[i] = outputType
break
}
Expand Down Expand Up @@ -1200,7 +1201,7 @@ func TestWindowFunctionsAgainstProcessor(t *testing.T) {
}
windowerSpec.WindowFns[0].Frame = generateWindowFrame(t, rng, &ordering, inputTypes)

_, outputType, err := execinfrapb.GetWindowFunctionInfo(fun, argTypes...)
_, outputType, err := execinfra.GetWindowFunctionInfo(fun, argTypes...)
require.NoError(t, err)
pspec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}},
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,7 @@ func (dsp *DistSQLPlanner) planAggregators(
for j, c := range e.ColIdx {
argTypes[j] = inputTypes[c]
}
_, outputType, err := execinfrapb.GetAggregateInfo(localFunc, argTypes...)
_, outputType, err := execinfra.GetAggregateInfo(localFunc, argTypes...)
if err != nil {
return err
}
Expand Down Expand Up @@ -1981,7 +1981,7 @@ func (dsp *DistSQLPlanner) planAggregators(
// the current aggregation e.
argTypes[i] = intermediateTypes[argIdxs[i]]
}
_, outputType, err := execinfrapb.GetAggregateInfo(finalInfo.Fn, argTypes...)
_, outputType, err := execinfra.GetAggregateInfo(finalInfo.Fn, argTypes...)
if err != nil {
return err
}
Expand Down Expand Up @@ -2136,7 +2136,7 @@ func (dsp *DistSQLPlanner) planAggregators(
}
copy(argTypes[len(agg.ColIdx):], info.argumentsColumnTypes[i])
var err error
_, returnTyp, err := execinfrapb.GetAggregateInfo(agg.Func, argTypes...)
_, returnTyp, err := execinfra.GetAggregateInfo(agg.Func, argTypes...)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_plan_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package sql

import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (s *windowPlanState) createWindowFnSpec(
for i, argIdx := range funcInProgress.argsIdxs {
argTypes[i] = s.plan.GetResultTypes()[argIdx]
}
_, outputType, err := execinfrapb.GetWindowFunctionInfo(funcSpec, argTypes...)
_, outputType, err := execinfra.GetWindowFunctionInfo(funcSpec, argTypes...)
if err != nil {
return execinfrapb.WindowerSpec_WindowFn{}, outputType, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("//build:STRINGER.bzl", "stringer")
go_library(
name = "execinfra",
srcs = [
"aggregatorbase.go",
"base.go",
"flow_context.go",
"metadata_test_receiver.go",
Expand Down Expand Up @@ -48,6 +49,7 @@ go_library(
"//pkg/sql/row",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness",
Expand Down
161 changes: 161 additions & 0 deletions pkg/sql/execinfra/aggregatorbase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package execinfra

import (
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

// AggregateConstructor is a function that creates an aggregate function.
type AggregateConstructor func(*tree.EvalContext, tree.Datums) tree.AggregateFunc

// GetAggregateInfo returns the aggregate constructor and the return type for
// the given aggregate function when applied on the given type.
func GetAggregateInfo(
fn execinfrapb.AggregatorSpec_Func, inputTypes ...*types.T,
) (aggregateConstructor AggregateConstructor, returnType *types.T, err error) {
if fn == execinfrapb.AnyNotNull {
// The ANY_NOT_NULL builtin does not have a fixed return type;
// handle it separately.
if len(inputTypes) != 1 {
return nil, nil, errors.Errorf("any_not_null aggregate needs 1 input")
}
return builtins.NewAnyNotNullAggregate, inputTypes[0], nil
}

props, builtins := builtins.GetBuiltinProperties(strings.ToLower(fn.String()))
for _, b := range builtins {
typs := b.Types.Types()
if len(typs) != len(inputTypes) {
continue
}
match := true
for i, t := range typs {
if !inputTypes[i].Equivalent(t) {
if props.NullableArgs && inputTypes[i].IsAmbiguous() {
continue
}
match = false
break
}
}
if match {
// Found!
constructAgg := func(evalCtx *tree.EvalContext, arguments tree.Datums) tree.AggregateFunc {
return b.AggregateFunc(inputTypes, evalCtx, arguments)
}
colTyp := b.InferReturnTypeFromInputArgTypes(inputTypes)
return constructAgg, colTyp, nil
}
}
return nil, nil, errors.Errorf(
"no builtin aggregate for %s on %+v", fn, inputTypes,
)
}

// GetAggregateConstructor processes the specification of a single aggregate
// function.
//
// evalCtx will not be mutated.
func GetAggregateConstructor(
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
aggInfo *execinfrapb.AggregatorSpec_Aggregation,
inputTypes []*types.T,
) (constructor AggregateConstructor, arguments tree.Datums, outputType *types.T, err error) {
argTypes := make([]*types.T, len(aggInfo.ColIdx)+len(aggInfo.Arguments))
for j, c := range aggInfo.ColIdx {
if c >= uint32(len(inputTypes)) {
err = errors.Errorf("ColIdx out of range (%d)", aggInfo.ColIdx)
return
}
argTypes[j] = inputTypes[c]
}
arguments = make(tree.Datums, len(aggInfo.Arguments))
var d tree.Datum
for j, argument := range aggInfo.Arguments {
h := execinfrapb.ExprHelper{}
// Pass nil types and row - there are no variables in these expressions.
if err = h.Init(argument, nil /* types */, semaCtx, evalCtx); err != nil {
err = errors.Wrapf(err, "%s", argument)
return
}
d, err = h.Eval(nil /* row */)
if err != nil {
err = errors.Wrapf(err, "%s", argument)
return
}
argTypes[len(aggInfo.ColIdx)+j] = d.ResolvedType()
arguments[j] = d
}
constructor, outputType, err = GetAggregateInfo(aggInfo.Func, argTypes...)
return
}

// GetWindowFunctionInfo returns windowFunc constructor and the return type
// when given fn is applied to given inputTypes.
func GetWindowFunctionInfo(
fn execinfrapb.WindowerSpec_Func, inputTypes ...*types.T,
) (windowConstructor func(*tree.EvalContext) tree.WindowFunc, returnType *types.T, err error) {
if fn.AggregateFunc != nil && *fn.AggregateFunc == execinfrapb.AnyNotNull {
// The ANY_NOT_NULL builtin does not have a fixed return type;
// handle it separately.
if len(inputTypes) != 1 {
return nil, nil, errors.Errorf("any_not_null aggregate needs 1 input")
}
return builtins.NewAggregateWindowFunc(builtins.NewAnyNotNullAggregate), inputTypes[0], nil
}

var funcStr string
if fn.AggregateFunc != nil {
funcStr = fn.AggregateFunc.String()
} else if fn.WindowFunc != nil {
funcStr = fn.WindowFunc.String()
} else {
return nil, nil, errors.Errorf(
"function is neither an aggregate nor a window function",
)
}
props, builtins := builtins.GetBuiltinProperties(strings.ToLower(funcStr))
for _, b := range builtins {
typs := b.Types.Types()
if len(typs) != len(inputTypes) {
continue
}
match := true
for i, t := range typs {
if !inputTypes[i].Equivalent(t) {
if props.NullableArgs && inputTypes[i].IsAmbiguous() {
continue
}
match = false
break
}
}
if match {
// Found!
constructAgg := func(evalCtx *tree.EvalContext) tree.WindowFunc {
return b.WindowFunc(inputTypes, evalCtx)
}
colTyp := b.InferReturnTypeFromInputArgTypes(inputTypes)
return constructAgg, colTyp, nil
}
}
return nil, nil, errors.Errorf(
"no builtin aggregate/window function for %s on %v", funcStr, inputTypes,
)
}
1 change: 0 additions & 1 deletion pkg/sql/execinfrapb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/rowenc",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/transform",
"//pkg/sql/sem/tree",
"//pkg/sql/sem/tree/treewindow",
Expand Down
Loading