Skip to content

Commit

Permalink
execinfrapb: break the dependency on sem/builtins
Browse files Browse the repository at this point in the history
This commit moves several utility functions from `execinfrapb` into
`execinfra` package in order to break the dependency of the former on
`sem/builtins` (which eventually depends on the `c-deps`).

Release note: None
  • Loading branch information
yuzefovich committed Mar 25, 2022
1 parent 366469f commit d969a06
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 167 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecagg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/colexec/colexecagg/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -446,16 +447,16 @@ func ProcessAggregations(
aggregations []execinfrapb.AggregatorSpec_Aggregation,
inputTypes []*types.T,
) (
constructors []execinfrapb.AggregateConstructor,
constructors []execinfra.AggregateConstructor,
constArguments []tree.Datums,
outputTypes []*types.T,
err error,
) {
constructors = make([]execinfrapb.AggregateConstructor, len(aggregations))
constructors = make([]execinfra.AggregateConstructor, len(aggregations))
constArguments = make([]tree.Datums, len(aggregations))
outputTypes = make([]*types.T, len(aggregations))
for i, aggFn := range aggregations {
constructors[i], constArguments[i], outputTypes[i], err = execinfrapb.GetAggregateConstructor(
constructors[i], constArguments[i], outputTypes[i], err = execinfra.GetAggregateConstructor(
evalCtx, semaCtx, &aggFn, inputTypes,
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colexecagg/aggregators_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package colexecagg
import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand All @@ -30,7 +31,7 @@ type NewAggregatorArgs struct {
InputTypes []*types.T
Spec *execinfrapb.AggregatorSpec
EvalCtx *tree.EvalContext
Constructors []execinfrapb.AggregateConstructor
Constructors []execinfra.AggregateConstructor
ConstArguments []tree.Datums
OutputTypes []*types.T
}
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecagg/default_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)
Expand Down Expand Up @@ -117,7 +117,7 @@ func (a *default_AGGKINDAgg) Reset() {

func newDefault_AGGKINDAggAlloc(
allocator *colmem.Allocator,
constructor execinfrapb.AggregateConstructor,
constructor execinfra.AggregateConstructor,
evalCtx *tree.EvalContext,
inputArgsConverter *colconv.VecToDatumConverter,
numArguments int,
Expand Down Expand Up @@ -147,7 +147,7 @@ type default_AGGKINDAggAlloc struct {
aggAllocBase
aggFuncs []default_AGGKINDAgg

constructor execinfrapb.AggregateConstructor
constructor execinfra.AggregateConstructor
evalCtx *tree.EvalContext
// inputArgsConverter is a converter from coldata.Vecs to tree.Datums that
// is shared among all aggregate functions and is managed by the aggregator
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecagg/hash_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/sql/distsql/columnar_operators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecwindow"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand Down Expand Up @@ -273,7 +274,7 @@ func TestAggregatorAgainstProcessor(t *testing.T) {
for _, typ := range aggFnInputTypes {
hasJSONColumn = hasJSONColumn || typ.Family() == types.JsonFamily
}
if _, outputType, err := execinfrapb.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil {
if _, outputType, err := execinfra.GetAggregateInfo(aggFn, aggFnInputTypes...); err == nil {
outputTypes[i] = outputType
break
}
Expand Down Expand Up @@ -1200,7 +1201,7 @@ func TestWindowFunctionsAgainstProcessor(t *testing.T) {
}
windowerSpec.WindowFns[0].Frame = generateWindowFrame(t, rng, &ordering, inputTypes)

_, outputType, err := execinfrapb.GetWindowFunctionInfo(fun, argTypes...)
_, outputType, err := execinfra.GetWindowFunctionInfo(fun, argTypes...)
require.NoError(t, err)
pspec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: inputTypes}},
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,7 @@ func (dsp *DistSQLPlanner) planAggregators(
for j, c := range e.ColIdx {
argTypes[j] = inputTypes[c]
}
_, outputType, err := execinfrapb.GetAggregateInfo(localFunc, argTypes...)
_, outputType, err := execinfra.GetAggregateInfo(localFunc, argTypes...)
if err != nil {
return err
}
Expand Down Expand Up @@ -1981,7 +1981,7 @@ func (dsp *DistSQLPlanner) planAggregators(
// the current aggregation e.
argTypes[i] = intermediateTypes[argIdxs[i]]
}
_, outputType, err := execinfrapb.GetAggregateInfo(finalInfo.Fn, argTypes...)
_, outputType, err := execinfra.GetAggregateInfo(finalInfo.Fn, argTypes...)
if err != nil {
return err
}
Expand Down Expand Up @@ -2136,7 +2136,7 @@ func (dsp *DistSQLPlanner) planAggregators(
}
copy(argTypes[len(agg.ColIdx):], info.argumentsColumnTypes[i])
var err error
_, returnTyp, err := execinfrapb.GetAggregateInfo(agg.Func, argTypes...)
_, returnTyp, err := execinfra.GetAggregateInfo(agg.Func, argTypes...)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_plan_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package sql

import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (s *windowPlanState) createWindowFnSpec(
for i, argIdx := range funcInProgress.argsIdxs {
argTypes[i] = s.plan.GetResultTypes()[argIdx]
}
_, outputType, err := execinfrapb.GetWindowFunctionInfo(funcSpec, argTypes...)
_, outputType, err := execinfra.GetWindowFunctionInfo(funcSpec, argTypes...)
if err != nil {
return execinfrapb.WindowerSpec_WindowFn{}, outputType, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("//build:STRINGER.bzl", "stringer")
go_library(
name = "execinfra",
srcs = [
"aggregatorbase.go",
"base.go",
"flow_context.go",
"metadata_test_receiver.go",
Expand Down Expand Up @@ -48,6 +49,7 @@ go_library(
"//pkg/sql/row",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness",
Expand Down
161 changes: 161 additions & 0 deletions pkg/sql/execinfra/aggregatorbase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package execinfra

import (
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

// AggregateConstructor is a function that creates an aggregate function.
type AggregateConstructor func(*tree.EvalContext, tree.Datums) tree.AggregateFunc

// GetAggregateInfo returns the aggregate constructor and the return type for
// the given aggregate function when applied on the given type.
func GetAggregateInfo(
fn execinfrapb.AggregatorSpec_Func, inputTypes ...*types.T,
) (aggregateConstructor AggregateConstructor, returnType *types.T, err error) {
if fn == execinfrapb.AnyNotNull {
// The ANY_NOT_NULL builtin does not have a fixed return type;
// handle it separately.
if len(inputTypes) != 1 {
return nil, nil, errors.Errorf("any_not_null aggregate needs 1 input")
}
return builtins.NewAnyNotNullAggregate, inputTypes[0], nil
}

props, builtins := builtins.GetBuiltinProperties(strings.ToLower(fn.String()))
for _, b := range builtins {
typs := b.Types.Types()
if len(typs) != len(inputTypes) {
continue
}
match := true
for i, t := range typs {
if !inputTypes[i].Equivalent(t) {
if props.NullableArgs && inputTypes[i].IsAmbiguous() {
continue
}
match = false
break
}
}
if match {
// Found!
constructAgg := func(evalCtx *tree.EvalContext, arguments tree.Datums) tree.AggregateFunc {
return b.AggregateFunc(inputTypes, evalCtx, arguments)
}
colTyp := b.InferReturnTypeFromInputArgTypes(inputTypes)
return constructAgg, colTyp, nil
}
}
return nil, nil, errors.Errorf(
"no builtin aggregate for %s on %+v", fn, inputTypes,
)
}

// GetAggregateConstructor processes the specification of a single aggregate
// function.
//
// evalCtx will not be mutated.
func GetAggregateConstructor(
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
aggInfo *execinfrapb.AggregatorSpec_Aggregation,
inputTypes []*types.T,
) (constructor AggregateConstructor, arguments tree.Datums, outputType *types.T, err error) {
argTypes := make([]*types.T, len(aggInfo.ColIdx)+len(aggInfo.Arguments))
for j, c := range aggInfo.ColIdx {
if c >= uint32(len(inputTypes)) {
err = errors.Errorf("ColIdx out of range (%d)", aggInfo.ColIdx)
return
}
argTypes[j] = inputTypes[c]
}
arguments = make(tree.Datums, len(aggInfo.Arguments))
var d tree.Datum
for j, argument := range aggInfo.Arguments {
h := execinfrapb.ExprHelper{}
// Pass nil types and row - there are no variables in these expressions.
if err = h.Init(argument, nil /* types */, semaCtx, evalCtx); err != nil {
err = errors.Wrapf(err, "%s", argument)
return
}
d, err = h.Eval(nil /* row */)
if err != nil {
err = errors.Wrapf(err, "%s", argument)
return
}
argTypes[len(aggInfo.ColIdx)+j] = d.ResolvedType()
arguments[j] = d
}
constructor, outputType, err = GetAggregateInfo(aggInfo.Func, argTypes...)
return
}

// GetWindowFunctionInfo returns windowFunc constructor and the return type
// when given fn is applied to given inputTypes.
func GetWindowFunctionInfo(
fn execinfrapb.WindowerSpec_Func, inputTypes ...*types.T,
) (windowConstructor func(*tree.EvalContext) tree.WindowFunc, returnType *types.T, err error) {
if fn.AggregateFunc != nil && *fn.AggregateFunc == execinfrapb.AnyNotNull {
// The ANY_NOT_NULL builtin does not have a fixed return type;
// handle it separately.
if len(inputTypes) != 1 {
return nil, nil, errors.Errorf("any_not_null aggregate needs 1 input")
}
return builtins.NewAggregateWindowFunc(builtins.NewAnyNotNullAggregate), inputTypes[0], nil
}

var funcStr string
if fn.AggregateFunc != nil {
funcStr = fn.AggregateFunc.String()
} else if fn.WindowFunc != nil {
funcStr = fn.WindowFunc.String()
} else {
return nil, nil, errors.Errorf(
"function is neither an aggregate nor a window function",
)
}
props, builtins := builtins.GetBuiltinProperties(strings.ToLower(funcStr))
for _, b := range builtins {
typs := b.Types.Types()
if len(typs) != len(inputTypes) {
continue
}
match := true
for i, t := range typs {
if !inputTypes[i].Equivalent(t) {
if props.NullableArgs && inputTypes[i].IsAmbiguous() {
continue
}
match = false
break
}
}
if match {
// Found!
constructAgg := func(evalCtx *tree.EvalContext) tree.WindowFunc {
return b.WindowFunc(inputTypes, evalCtx)
}
colTyp := b.InferReturnTypeFromInputArgTypes(inputTypes)
return constructAgg, colTyp, nil
}
}
return nil, nil, errors.Errorf(
"no builtin aggregate/window function for %s on %v", funcStr, inputTypes,
)
}
1 change: 0 additions & 1 deletion pkg/sql/execinfrapb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/rowenc",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/transform",
"//pkg/sql/sem/tree",
"//pkg/sql/sem/tree/treewindow",
Expand Down
Loading

0 comments on commit d969a06

Please sign in to comment.