Skip to content

Commit

Permalink
colbuilder: don't create a copy of the eval context in most cases
Browse files Browse the repository at this point in the history
Previously, on every `NewColOperator` call we would create a copy of the
eval context. This was copied over from the row engine where making
a copy was needed since the row-by-row processors use `ProcOutputHelper`
which might modify the eval context when evaluating the expresssions. In
the vectorized engine we almost never modify the eval context, so there
is no reason to make a copy.

Release note: None
  • Loading branch information
yuzefovich committed Nov 18, 2021
1 parent 7a07241 commit 3dad24e
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 48 deletions.
47 changes: 23 additions & 24 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,17 +659,17 @@ func NewColOperator(
r := result.NewColOperatorResult
spec := args.Spec
inputs := args.Inputs
evalCtx := flowCtx.NewEvalCtx()
factory := args.Factory
if factory == nil {
factory = coldataext.NewExtendedColumnFactory(evalCtx)
// This code path is only used in tests.
factory = coldataext.NewExtendedColumnFactory(flowCtx.EvalCtx)
}
streamingMemAccount := args.StreamingMemAccount
streamingAllocator := colmem.NewAllocator(ctx, streamingMemAccount, factory)
useStreamingMemAccountForBuffering := args.TestingKnobs.UseStreamingMemAccountForBuffering
if args.ExprHelper == nil {
args.ExprHelper = colexecargs.NewExprHelper()
args.ExprHelper.SemaCtx = flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
args.ExprHelper.SemaCtx = flowCtx.TypeResolverFactory.NewSemaContext(flowCtx.Txn)
}
if args.MonitorRegistry == nil {
args.MonitorRegistry = &colexecargs.MonitorRegistry{}
Expand Down Expand Up @@ -734,7 +734,7 @@ func NewColOperator(
estimatedRowCount := spec.EstimatedRowCount
scanOp, err := colfetcher.NewColBatchScan(
ctx, colmem.NewAllocator(ctx, cFetcherMemAcc, factory), kvFetcherMemAcc,
flowCtx, evalCtx, args.ExprHelper, core.TableReader, post, estimatedRowCount,
flowCtx, args.ExprHelper, core.TableReader, post, estimatedRowCount,
)
if err != nil {
return r, err
Expand Down Expand Up @@ -762,7 +762,7 @@ func NewColOperator(
copy(inputTypes, spec.Input[0].ColumnTypes)
indexJoinOp, err := colfetcher.NewColIndexJoin(
ctx, streamingAllocator, colmem.NewAllocator(ctx, cFetcherMemAcc, factory), kvFetcherMemAcc,
flowCtx, evalCtx, args.ExprHelper, inputs[0].Root, core.JoinReader, post, inputTypes,
flowCtx, args.ExprHelper, inputs[0].Root, core.JoinReader, post, inputTypes,
)
if err != nil {
return r, err
Expand All @@ -778,7 +778,7 @@ func NewColOperator(
copy(result.ColumnTypes, spec.Input[0].ColumnTypes)
result.Root = inputs[0].Root
if err := result.planAndMaybeWrapFilter(
ctx, flowCtx, evalCtx, args, spec.ProcessorID, core.Filterer.Filter, factory,
ctx, flowCtx, args, spec.ProcessorID, core.Filterer.Filter, factory,
); err != nil {
return r, err
}
Expand Down Expand Up @@ -818,11 +818,13 @@ func NewColOperator(
}
inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes))
copy(inputTypes, spec.Input[0].ColumnTypes)
// Make a copy of the evalCtx since we're modifying it below.
evalCtx := flowCtx.NewEvalCtx()
newAggArgs := &colexecagg.NewAggregatorArgs{
Input: inputs[0].Root,
InputTypes: inputTypes,
Spec: aggSpec,
EvalCtx: evalCtx,
EvalCtx: flowCtx.EvalCtx,
}
newAggArgs.Constructors, newAggArgs.ConstArguments, newAggArgs.OutputTypes, err = colexecagg.ProcessAggregations(
evalCtx, args.ExprHelper.SemaCtx, aggSpec.Aggregations, inputTypes,
Expand Down Expand Up @@ -1101,7 +1103,7 @@ func NewColOperator(

if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type == descpb.InnerJoin {
if err = result.planAndMaybeWrapFilter(
ctx, flowCtx, evalCtx, args, spec.ProcessorID, core.HashJoiner.OnExpr, factory,
ctx, flowCtx, args, spec.ProcessorID, core.HashJoiner.OnExpr, factory,
); err != nil {
return r, err
}
Expand Down Expand Up @@ -1143,7 +1145,7 @@ func NewColOperator(
args.DiskQueueCfg, args.FDSemaphore,
joinType, inputs[0].Root, inputs[1].Root, leftTypes, rightTypes,
core.MergeJoiner.LeftOrdering.Columns, core.MergeJoiner.RightOrdering.Columns,
diskAccount, evalCtx,
diskAccount, flowCtx.EvalCtx,
)

result.Root = mj
Expand All @@ -1152,7 +1154,7 @@ func NewColOperator(

if onExpr != nil {
if err = result.planAndMaybeWrapFilter(
ctx, flowCtx, evalCtx, args, spec.ProcessorID, *onExpr, factory,
ctx, flowCtx, args, spec.ProcessorID, *onExpr, factory,
); err != nil {
return r, err
}
Expand Down Expand Up @@ -1201,7 +1203,7 @@ func NewColOperator(
// We must cast to the expected argument type.
castIdx := len(typs)
input, err = colexecbase.GetCastOperator(
streamingAllocator, input, int(idx), castIdx, typs[idx], expectedType, evalCtx,
streamingAllocator, input, int(idx), castIdx, typs[idx], expectedType, flowCtx.EvalCtx,
)
if err != nil {
colexecerror.InternalError(errors.AssertionFailedf(
Expand Down Expand Up @@ -1261,7 +1263,7 @@ func NewColOperator(
outputIdx := int(wf.OutputColIdx + tempColOffset)

windowArgs := &colexecwindow.WindowArgs{
EvalCtx: evalCtx,
EvalCtx: flowCtx.EvalCtx,
MemoryLimit: execinfra.GetWorkMemLimit(flowCtx),
QueueCfg: args.DiskQueueCfg,
FdSemaphore: args.FDSemaphore,
Expand Down Expand Up @@ -1360,7 +1362,7 @@ func NewColOperator(
aggArgs := colexecagg.NewAggregatorArgs{
Allocator: windowArgs.MainAllocator,
InputTypes: argTypes,
EvalCtx: evalCtx,
EvalCtx: flowCtx.EvalCtx,
}
// The aggregate function will be presented with a ColVec slice
// containing only the argument columns.
Expand All @@ -1373,7 +1375,7 @@ func NewColOperator(
ColIdx: colIdx,
}}
aggArgs.Constructors, aggArgs.ConstArguments, aggArgs.OutputTypes, err =
colexecagg.ProcessAggregations(evalCtx, args.ExprHelper.SemaCtx, aggregations, argTypes)
colexecagg.ProcessAggregations(flowCtx.EvalCtx, args.ExprHelper.SemaCtx, aggregations, argTypes)
var toClose colexecop.Closers
var aggFnsAlloc *colexecagg.AggregateFuncsAlloc
if (aggType != execinfrapb.Min && aggType != execinfrapb.Max) ||
Expand Down Expand Up @@ -1435,7 +1437,7 @@ func NewColOperator(
Op: result.Root,
ColumnTypes: result.ColumnTypes,
}
err = ppr.planPostProcessSpec(ctx, flowCtx, evalCtx, args, post, factory, &r.Releasables)
err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables)
if err != nil {
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, err)
} else {
Expand Down Expand Up @@ -1500,7 +1502,7 @@ func NewColOperator(
if !actual.Identical(expected) {
castedIdx := len(typesWithCasts)
r.Root, err = colexecbase.GetCastOperator(
streamingAllocator, r.Root, i, castedIdx, actual, expected, evalCtx,
streamingAllocator, r.Root, i, castedIdx, actual, expected, flowCtx.EvalCtx,
)
if err != nil {
return r, errors.NewAssertionErrorWithWrappedErrf(err, "unexpectedly couldn't plan a cast although IsCastSupported returned true")
Expand Down Expand Up @@ -1532,14 +1534,13 @@ func NewColOperator(
func (r opResult) planAndMaybeWrapFilter(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
evalCtx *tree.EvalContext,
args *colexecargs.NewColOperatorArgs,
processorID int32,
filter execinfrapb.Expression,
factory coldata.ColumnFactory,
) error {
op, err := planFilterExpr(
ctx, flowCtx, evalCtx, r.Root, r.ColumnTypes, filter, args.StreamingMemAccount, factory, args.ExprHelper, &r.Releasables,
ctx, flowCtx, r.Root, r.ColumnTypes, filter, args.StreamingMemAccount, factory, args.ExprHelper, &r.Releasables,
)
if err != nil {
// Filter expression planning failed. Fall back to planning the filter
Expand Down Expand Up @@ -1599,7 +1600,6 @@ func (r opResult) wrapPostProcessSpec(
func (r *postProcessResult) planPostProcessSpec(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
evalCtx *tree.EvalContext,
args *colexecargs.NewColOperatorArgs,
post *execinfrapb.PostProcessSpec,
factory coldata.ColumnFactory,
Expand All @@ -1610,13 +1610,13 @@ func (r *postProcessResult) planPostProcessSpec(
} else if post.RenderExprs != nil {
var renderedCols []uint32
for _, renderExpr := range post.RenderExprs {
expr, err := args.ExprHelper.ProcessExpr(renderExpr, evalCtx, r.ColumnTypes)
expr, err := args.ExprHelper.ProcessExpr(renderExpr, flowCtx.EvalCtx, r.ColumnTypes)
if err != nil {
return err
}
var outputIdx int
r.Op, outputIdx, r.ColumnTypes, err = planProjectionOperators(
ctx, evalCtx, expr, r.ColumnTypes, r.Op, args.StreamingMemAccount, factory, releasables,
ctx, flowCtx.EvalCtx, expr, r.ColumnTypes, r.Op, args.StreamingMemAccount, factory, releasables,
)
if err != nil {
return errors.Wrapf(err, "unable to columnarize render expression %q", expr)
Expand Down Expand Up @@ -1696,7 +1696,6 @@ func (r opResult) finishScanPlanning(op colfetcher.ScanOperator, resultTypes []*
func planFilterExpr(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
evalCtx *tree.EvalContext,
input colexecop.Operator,
columnTypes []*types.T,
filter execinfrapb.Expression,
Expand All @@ -1705,7 +1704,7 @@ func planFilterExpr(
helper *colexecargs.ExprHelper,
releasables *[]execinfra.Releasable,
) (colexecop.Operator, error) {
expr, err := helper.ProcessExpr(filter, evalCtx, columnTypes)
expr, err := helper.ProcessExpr(filter, flowCtx.EvalCtx, columnTypes)
if err != nil {
return nil, err
}
Expand All @@ -1715,7 +1714,7 @@ func planFilterExpr(
return colexecutils.NewZeroOp(input), nil
}
op, _, filterColumnTypes, err := planSelectionOperators(
ctx, evalCtx, expr, columnTypes, input, acc, factory, releasables,
ctx, flowCtx.EvalCtx, expr, columnTypes, input, acc, factory, releasables,
)
if err != nil {
return nil, errors.Wrapf(err, "unable to columnarize filter expression %q", filter)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecagg/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ type aggAllocBase struct {

// ProcessAggregations processes all aggregate functions specified in
// aggregations.
//
// evalCtx will not be mutated.
func ProcessAggregations(
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecargs/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type ExprHelper struct {

// ProcessExpr processes the given expression and returns a well-typed
// expression. Note that SemaCtx must be already set on h.
//
// evalCtx will not be mutated.
func (h *ExprHelper) ProcessExpr(
expr execinfrapb.Expression, evalCtx *tree.EvalContext, typs []*types.T,
) (tree.TypedExpr, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecjoin/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ type mergeJoinInput struct {
// implements sort-merge join. It performs a merge on the left and right input
// sources, based on the equality columns, assuming both inputs are in sorted
// order.
//
// evalCtx will not be mutated.
func NewMergeJoinOp(
unlimitedAllocator *colmem.Allocator,
memoryLimit int64,
Expand Down
26 changes: 12 additions & 14 deletions pkg/sql/colfetcher/cfetcher_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
)
Expand Down Expand Up @@ -95,7 +94,6 @@ func (a *cFetcherTableArgs) populateTypes(cols []catalog.Column) {
func populateTableArgs(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
evalCtx *tree.EvalContext,
table catalog.TableDescriptor,
index catalog.Index,
invertedCol catalog.Column,
Expand Down Expand Up @@ -152,7 +150,7 @@ func populateTableArgs(
}
cols = cols[:colIdx]
if err := remapPostProcessSpec(
post, idxMap, helper, evalCtx, args.typs, flowCtx.PreserveFlowSpecs,
flowCtx, post, idxMap, helper, args.typs,
); err != nil {
return nil, nil, err
}
Expand All @@ -172,7 +170,7 @@ func populateTableArgs(
// make sure they are hydrated. In row execution engine it is done during
// the processor initialization, but neither ColBatchScan nor cFetcher are
// processors, so we need to do the hydration ourselves.
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn)
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.Txn)
return args, idxMap, resolver.HydrateTypeSlice(ctx, args.typs)
}

Expand All @@ -186,16 +184,14 @@ func populateTableArgs(
// If traceKV is true, then all columns are considered as needed, and
// neededColumns is ignored.
func keepOnlyNeededColumns(
evalCtx *tree.EvalContext,
flowCtx *execinfra.FlowCtx,
tableArgs *cFetcherTableArgs,
idxMap []int,
neededColumns []uint32,
post *execinfrapb.PostProcessSpec,
helper *colexecargs.ExprHelper,
traceKV bool,
preserveFlowSpecs bool,
) error {
if !traceKV && len(neededColumns) < len(tableArgs.cols) {
if !flowCtx.TraceKV && len(neededColumns) < len(tableArgs.cols) {
// If the tracing is not enabled and we don't need all of the available
// columns, we will prune all of the not needed columns away.

Expand Down Expand Up @@ -245,7 +241,7 @@ func keepOnlyNeededColumns(
neededColIdx++
}
if err := remapPostProcessSpec(
post, idxMap, helper, evalCtx, tableArgs.typs, preserveFlowSpecs,
flowCtx, post, idxMap, helper, tableArgs.typs,
); err != nil {
return err
}
Expand Down Expand Up @@ -291,16 +287,15 @@ func keepOnlyNeededColumns(
// words, every node must have gotten the unmodified version of the spec and is
// now free to modify it as it pleases.
func remapPostProcessSpec(
flowCtx *execinfra.FlowCtx,
post *execinfrapb.PostProcessSpec,
idxMap []int,
helper *colexecargs.ExprHelper,
evalCtx *tree.EvalContext,
typsBeforeRemapping []*types.T,
preserveFlowSpecs bool,
) error {
if post.Projection {
outputColumns := post.OutputColumns
if preserveFlowSpecs && post.OriginalOutputColumns == nil {
if flowCtx.PreserveFlowSpecs && post.OriginalOutputColumns == nil {
// This is the first time we're modifying this PostProcessSpec, but
// we've been asked to preserve the specs, so we have to set the
// original output columns. We are also careful to allocate a new
Expand All @@ -313,7 +308,7 @@ func remapPostProcessSpec(
}
} else if post.RenderExprs != nil {
renderExprs := post.RenderExprs
if preserveFlowSpecs && post.OriginalRenderExprs == nil {
if flowCtx.PreserveFlowSpecs && post.OriginalRenderExprs == nil {
// This is the first time we're modifying this PostProcessSpec, but
// we've been asked to preserve the specs, so we have to set the
// original render expressions. We are also careful to allocate a
Expand All @@ -325,7 +320,10 @@ func remapPostProcessSpec(
for i := range renderExprs {
// Make sure that the render expression is deserialized if we
// are on the remote node.
post.RenderExprs[i].LocalExpr, err = helper.ProcessExpr(renderExprs[i], evalCtx, typsBeforeRemapping)
//
// It is ok to use the evalCtx of the flowCtx since it won't be
// mutated (we are not evaluating the expressions).
post.RenderExprs[i].LocalExpr, err = helper.ProcessExpr(renderExprs[i], flowCtx.EvalCtx, typsBeforeRemapping)
if err != nil {
return err
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -186,7 +185,6 @@ func NewColBatchScan(
allocator *colmem.Allocator,
kvFetcherMemAcc *mon.BoundAccount,
flowCtx *execinfra.FlowCtx,
evalCtx *tree.EvalContext,
helper *colexecargs.ExprHelper,
spec *execinfrapb.TableReaderSpec,
post *execinfrapb.PostProcessSpec,
Expand All @@ -209,15 +207,15 @@ func NewColBatchScan(
table := spec.BuildTableDescriptor()
invertedColumn := tabledesc.FindInvertedColumn(table, spec.InvertedColumn)
tableArgs, idxMap, err := populateTableArgs(
ctx, flowCtx, evalCtx, table, table.ActiveIndexes()[spec.IndexIdx],
ctx, flowCtx, table, table.ActiveIndexes()[spec.IndexIdx],
invertedColumn, spec.Visibility, spec.HasSystemColumns, post, helper,
)
if err != nil {
return nil, err
}

if err = keepOnlyNeededColumns(
evalCtx, tableArgs, idxMap, spec.NeededColumns, post, helper, flowCtx.TraceKV, flowCtx.PreserveFlowSpecs,
flowCtx, tableArgs, idxMap, spec.NeededColumns, post, helper,
); err != nil {
return nil, err
}
Expand All @@ -239,7 +237,7 @@ func NewColBatchScan(
}

var bsHeader *roachpb.BoundedStalenessHeader
if aost := evalCtx.AsOfSystemTime; aost != nil && aost.BoundedStaleness {
if aost := flowCtx.EvalCtx.AsOfSystemTime; aost != nil && aost.BoundedStaleness {
ts := aost.Timestamp
// If the descriptor's modification time is after the bounded staleness min bound,
// we have to increase the min bound.
Expand All @@ -251,7 +249,7 @@ func NewColBatchScan(
bsHeader = &roachpb.BoundedStalenessHeader{
MinTimestampBound: ts,
MinTimestampBoundStrict: aost.NearestOnly,
MaxTimestampBound: evalCtx.AsOfSystemTime.MaxTimestampBound, // may be empty
MaxTimestampBound: flowCtx.EvalCtx.AsOfSystemTime.MaxTimestampBound, // may be empty
}
}

Expand Down
Loading

0 comments on commit 3dad24e

Please sign in to comment.