From 40addd2f951ee3791d85e2f4a489e2337663fab8 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 14 Jun 2024 16:54:16 -0700 Subject: [PATCH] sql: audit all processors to avoid redundant copies of eval context This commit audits all processors to ensure that we make copies of the eval context only when necessary. Up until recently every processor mutated the context by modifying the `context.Context` field, but now that field has been removed, so only a handful of processors mutate the context (namely, whenever a processor explicitly mutates the context or when it uses the rendering / filtering capabilities of the ExprHelper). This commit makes it explicit whenever a copy is needed and uses the flow's eval context when no mutation occurs. In order to clarify different usages it removes `ProcessorBaseNoHelper.EvalCtx` field and moves it into each processor if it needs it. Rendering capabilities are used implicitly by many processors via `ProcOutputHelper`, and if a processor itself doesn't need a mutable eval context, we can create a copy for the helper without exposing it to the processor itself. Release note: None --- .../changefeedccl/changefeed_processors.go | 18 ++++++--- pkg/sql/backfill/backfill.go | 4 ++ pkg/sql/colexec/columnarizer.go | 4 -- pkg/sql/colexec/materializer.go | 4 -- pkg/sql/colflow/flow_coordinator.go | 4 -- pkg/sql/execinfra/flow_context.go | 27 ++++--------- pkg/sql/execinfra/processorsbase.go | 26 +++++++----- pkg/sql/execinfrapb/expr.go | 2 +- pkg/sql/importer/exportcsv.go | 2 +- pkg/sql/importer/exportparquet.go | 2 +- pkg/sql/plan_node_to_row_source.go | 6 +-- pkg/sql/rowcontainer/hash_row_container.go | 2 + .../rowcontainer/numbered_row_container.go | 2 +- pkg/sql/rowcontainer/row_container.go | 6 ++- pkg/sql/rowexec/aggregator.go | 40 ++++++------------- pkg/sql/rowexec/filterer.go | 18 ++++++--- pkg/sql/rowexec/hashjoiner.go | 4 +- pkg/sql/rowexec/inverted_filterer.go | 14 +++++-- pkg/sql/rowexec/inverted_joiner.go | 14 ++++--- pkg/sql/rowexec/joinerbase.go | 18 ++++++--- pkg/sql/rowexec/joinreader.go | 17 +++++--- pkg/sql/rowexec/mergejoiner.go | 2 +- pkg/sql/rowexec/processors_test.go | 8 ++-- pkg/sql/rowexec/project_set.go | 18 +++++---- pkg/sql/rowexec/sample_aggregator.go | 10 +++-- pkg/sql/rowexec/sampler.go | 2 +- pkg/sql/rowexec/sorter.go | 2 +- pkg/sql/rowexec/windower.go | 20 +++++----- pkg/sql/rowexec/zigzagjoiner.go | 2 +- pkg/sql/rowflow/routers.go | 4 +- 30 files changed, 163 insertions(+), 139 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 73da97c4b9c3..9f58f2842b1c 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -929,9 +930,10 @@ const ( type changeFrontier struct { execinfra.ProcessorBase - spec execinfrapb.ChangeFrontierSpec - memAcc mon.BoundAccount - a tree.DatumAlloc + evalCtx *eval.Context + spec execinfrapb.ChangeFrontierSpec + memAcc mon.BoundAccount + a tree.DatumAlloc // input returns rows from one or more changeAggregator processors input execinfra.RowSource @@ -1135,6 +1137,9 @@ func newChangeFrontierProcessor( } cf := &changeFrontier{ + // We might modify the ChangefeedState field in the eval.Context, so we + // need to make a copy. + evalCtx: flowCtx.NewEvalCtx(), spec: spec, memAcc: memMonitor.MakeBoundAccount(), input: input, @@ -1146,12 +1151,13 @@ func newChangeFrontierProcessor( cf.knobs = *cfKnobs } - if err := cf.Init( + if err := cf.InitWithEvalCtx( ctx, cf, post, input.OutputTypes(), flowCtx, + cf.evalCtx, processorID, memMonitor, execinfra.ProcStateOpts{ @@ -1248,12 +1254,12 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.sink = &errorWrapperSink{wrapped: cf.sink} cf.highWaterAtStart = cf.spec.Feed.StatementTime - if cf.EvalCtx.ChangefeedState == nil { + if cf.evalCtx.ChangefeedState == nil { cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state")) return } - cf.localState = cf.EvalCtx.ChangefeedState.(*cachedState) + cf.localState = cf.evalCtx.ChangefeedState.(*cachedState) cf.js = newJobState(nil, cf.FlowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{}) if cf.spec.JobID != 0 { diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 2d5a020a4ec4..3c9528bc4a21 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -223,6 +223,8 @@ func (cb *ColumnBackfiller) InitForDistributedUse( mon *mon.BytesMonitor, ) error { cb.initCols(desc) + // We'll be modifying the eval.Context in RunColumnBackfillChunk, so we need + // to make a copy. evalCtx := flowCtx.NewEvalCtx() var defaultExprs, computedExprs []tree.TypedExpr // Install type metadata in the target descriptors, as well as resolve any @@ -654,6 +656,8 @@ func (ib *IndexBackfiller) InitForDistributedUse( return err } + // We'll be modifying the eval.Context in BuildIndexEntriesChunk, so we need + // to make a copy. evalCtx := flowCtx.NewEvalCtx() var predicates map[descpb.IndexID]tree.TypedExpr var colExprs map[descpb.ColumnID]tree.TypedExpr diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 8760ee562ca0..c5588e15b795 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -155,10 +155,6 @@ func newColumnarizer( c.ProcessorBaseNoHelper.Init( nil, /* self */ flowCtx, - // The columnarizer will update the eval context when closed, so we give - // it a copy of the eval context to preserve the "global" eval context - // from being mutated. - flowCtx.NewEvalCtx(), processorID, execinfra.ProcStateOpts{ // We append input to inputs to drain below in order to reuse the same diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index be7971c040bc..dd989e928ee0 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -186,10 +186,6 @@ func NewMaterializer( m.Init( m, flowCtx, - // The materializer will update the eval context when closed, so we give - // it a copy of the eval context to preserve the "global" eval context - // from being mutated. - flowCtx.NewEvalCtx(), processorID, execinfra.ProcStateOpts{ // We append drainHelper to inputs to drain below in order to reuse diff --git a/pkg/sql/colflow/flow_coordinator.go b/pkg/sql/colflow/flow_coordinator.go index 60f5519481b4..c3de92df3369 100644 --- a/pkg/sql/colflow/flow_coordinator.go +++ b/pkg/sql/colflow/flow_coordinator.go @@ -66,10 +66,6 @@ func NewFlowCoordinator( f.Init( f, flowCtx, - // The FlowCoordinator will update the eval context when closed, so we - // give it a copy of the eval context to preserve the "global" eval - // context from being mutated. - flowCtx.NewEvalCtx(), processorID, execinfra.ProcStateOpts{ // We append input to inputs to drain below in order to reuse diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index 8bd0a5c75e39..87f292d64b51 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -42,13 +42,10 @@ type FlowCtx struct { // ID is a unique identifier for a flow. ID execinfrapb.FlowID - // EvalCtx is used by all the processors in the flow to evaluate expressions. - // Processors that intend to evaluate expressions with this EvalCtx should - // get a copy with NewEvalCtx instead of storing a pointer to this one - // directly (since some processor mutate the EvalContext they use). - // - // TODO(andrei): Get rid of this field and pass a non-shared EvalContext to - // cores of the processors that need it. + // EvalCtx is used by all the processors in the flow to access immutable + // state of the execution context. Processors that intend to evaluate + // expressions with this EvalCtx should get a copy with NewEvalCtx instead + // of using this field. EvalCtx *eval.Context Mon *mon.BytesMonitor @@ -107,19 +104,11 @@ type FlowCtx struct { TenantCPUMonitor multitenantcpu.CPUUsageHelper } -// NewEvalCtx returns a modifiable copy of the FlowCtx's EvalContext. -// Processors should use this method any time they need to store a pointer to -// the EvalContext, since processors may mutate the EvalContext. Specifically, -// every processor that runs ProcOutputHelper.Init must pass in a modifiable -// EvalContext, since it stores that EvalContext in its exprHelpers and mutates -// them at runtime to ensure expressions are evaluated with the correct indexed -// var context. -// TODO(yuzefovich): once we remove eval.Context.deprecatedContext, re-evaluate -// this since many processors don't modify the eval context except for that -// field. +// NewEvalCtx returns a modifiable copy of the FlowCtx's eval.Context. +// Processors should use this method whenever they explicitly modify the +// eval.Context. func (flowCtx *FlowCtx) NewEvalCtx() *eval.Context { - evalCopy := flowCtx.EvalCtx.Copy() - return evalCopy + return flowCtx.EvalCtx.Copy() } // TestingKnobs returns the distsql testing knobs for this flow context. diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 179319ada9eb..0cf3332e2f9f 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -123,12 +123,16 @@ func (h *ProcOutputHelper) Reset() { // omitted if there is no filtering expression. // Note that the types slice may be stored directly; the caller should not // modify it. +// +// If the provided evalCtx is the same as flowCtx.EvalCtx, then a copy will be +// made internally when there are render expressions. func (h *ProcOutputHelper) Init( ctx context.Context, post *execinfrapb.PostProcessSpec, coreOutputTypes []*types.T, semaCtx *tree.SemaContext, evalCtx *eval.Context, + flowCtx *FlowCtx, ) error { if !post.Projection && len(post.OutputColumns) > 0 { return errors.Errorf("post-processing has projection unset but output columns set: %s", post) @@ -157,6 +161,12 @@ func (h *ProcOutputHelper) Init( h.OutputTypes[i] = coreOutputTypes[c] } } else if nRenders := len(post.RenderExprs); nRenders > 0 { + if evalCtx == flowCtx.EvalCtx { + // We haven't created a copy of the eval context, and we have some + // renders, then we'll need to create a copy ourselves since we're + // going to use the ExprHelper which might mutate the eval context. + evalCtx = flowCtx.NewEvalCtx() + } if cap(h.OutputTypes) >= nRenders { h.OutputTypes = h.OutputTypes[:nRenders] } else { @@ -346,9 +356,6 @@ type ProcessorBaseNoHelper struct { FlowCtx *FlowCtx - // EvalCtx is used for expression evaluation. It overrides the one in flowCtx. - EvalCtx *eval.Context - // Closed is set by InternalClose(). Once set, the processor's tracing span // has been closed. Closed bool @@ -745,6 +752,8 @@ type ProcStateOpts struct { // - coreOutputTypes are the type schema of the rows output by the processor // core (i.e. the "internal schema" of the processor, see // execinfrapb.ProcessorSpec for more details). +// +// NB: it is assumed that the caller will not modify the eval context. func (pb *ProcessorBase) Init( ctx context.Context, self RowSource, @@ -756,7 +765,7 @@ func (pb *ProcessorBase) Init( opts ProcStateOpts, ) error { return pb.InitWithEvalCtx( - ctx, self, post, coreOutputTypes, flowCtx, flowCtx.NewEvalCtx(), processorID, memMonitor, opts, + ctx, self, post, coreOutputTypes, flowCtx, flowCtx.EvalCtx, processorID, memMonitor, opts, ) } @@ -775,7 +784,7 @@ func (pb *ProcessorBase) InitWithEvalCtx( memMonitor *mon.BytesMonitor, opts ProcStateOpts, ) error { - pb.ProcessorBaseNoHelper.Init(self, flowCtx, evalCtx, processorID, opts) + pb.ProcessorBaseNoHelper.Init(self, flowCtx, processorID, opts) pb.MemMonitor = memMonitor // Hydrate all types used in the processor. @@ -783,18 +792,17 @@ func (pb *ProcessorBase) InitWithEvalCtx( if err := resolver.HydrateTypeSlice(ctx, coreOutputTypes); err != nil { return err } - pb.SemaCtx = tree.MakeSemaContext(&resolver) - return pb.OutputHelper.Init(ctx, post, coreOutputTypes, &pb.SemaCtx, pb.EvalCtx) + pb.SemaCtx = tree.MakeSemaContext(&resolver) + return pb.OutputHelper.Init(ctx, post, coreOutputTypes, &pb.SemaCtx, evalCtx, flowCtx) } // Init initializes the ProcessorBaseNoHelper. func (pb *ProcessorBaseNoHelper) Init( - self RowSource, flowCtx *FlowCtx, evalCtx *eval.Context, processorID int32, opts ProcStateOpts, + self RowSource, flowCtx *FlowCtx, processorID int32, opts ProcStateOpts, ) { pb.self = self pb.FlowCtx = flowCtx - pb.EvalCtx = evalCtx pb.ProcessorID = processorID pb.trailingMetaCallback = opts.TrailingMetaCallback if opts.InputsToDrain != nil { diff --git a/pkg/sql/execinfrapb/expr.go b/pkg/sql/execinfrapb/expr.go index 02dccc5bd2ec..b69b95b101e0 100644 --- a/pkg/sql/execinfrapb/expr.go +++ b/pkg/sql/execinfrapb/expr.go @@ -182,7 +182,7 @@ func (eh *MultiExprHelper) Reset() { eh.exprs = eh.exprs[:0] } -// exprHelper is a base implementation of an expressoin helper used by both +// exprHelper is a base implementation of an expression helper used by both // ExprHelper and MultiExprHelper. type exprHelper struct { evalCtx *eval.Context diff --git a/pkg/sql/importer/exportcsv.go b/pkg/sql/importer/exportcsv.go index efa78a6bb6ba..d71e1f450689 100644 --- a/pkg/sql/importer/exportcsv.go +++ b/pkg/sql/importer/exportcsv.go @@ -145,7 +145,7 @@ func newCSVWriterProcessor( input: input, } semaCtx := tree.MakeSemaContext(nil /* resolver */) - if err := c.out.Init(ctx, post, colinfo.ExportColumnTypes, &semaCtx, flowCtx.NewEvalCtx()); err != nil { + if err := c.out.Init(ctx, post, colinfo.ExportColumnTypes, &semaCtx, flowCtx.EvalCtx, flowCtx); err != nil { return nil, err } return c, nil diff --git a/pkg/sql/importer/exportparquet.go b/pkg/sql/importer/exportparquet.go index 73effa3fe248..fac67aad1528 100644 --- a/pkg/sql/importer/exportparquet.go +++ b/pkg/sql/importer/exportparquet.go @@ -68,7 +68,7 @@ func newParquetWriterProcessor( input: input, } semaCtx := tree.MakeSemaContext(nil /* resolver */) - if err := c.out.Init(ctx, post, colinfo.ExportColumnTypes, &semaCtx, flowCtx.NewEvalCtx()); err != nil { + if err := c.out.Init(ctx, post, colinfo.ExportColumnTypes, &semaCtx, flowCtx.EvalCtx, flowCtx); err != nil { return nil, err } return c, nil diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index b0d8afbe199f..1f9a6c57a8b8 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -119,11 +119,7 @@ func (p *planNodeToRowSource) Init( post, p.outputTypes, flowCtx, - // Note that we have already created a copy of the extendedEvalContext - // (which made a copy of the EvalContext) right before calling - // newPlanNodeToRowSource, so we can just use the eval context from the - // params. - p.params.EvalContext(), + flowCtx.EvalCtx, processorID, nil, /* memMonitor */ execinfra.ProcStateOpts{ diff --git a/pkg/sql/rowcontainer/hash_row_container.go b/pkg/sql/rowcontainer/hash_row_container.go index 2b156b71b38c..da76d08ffc59 100644 --- a/pkg/sql/rowcontainer/hash_row_container.go +++ b/pkg/sql/rowcontainer/hash_row_container.go @@ -838,6 +838,8 @@ type HashDiskBackedRowContainer struct { var _ HashRowContainer = &HashDiskBackedRowContainer{} // NewHashDiskBackedRowContainer makes a HashDiskBackedRowContainer. +// +// Note that eval context will **not** be mutated. func NewHashDiskBackedRowContainer( evalCtx *eval.Context, memoryMonitor *mon.BytesMonitor, diff --git a/pkg/sql/rowcontainer/numbered_row_container.go b/pkg/sql/rowcontainer/numbered_row_container.go index 3ddfb76b9d03..ffb26fd49263 100644 --- a/pkg/sql/rowcontainer/numbered_row_container.go +++ b/pkg/sql/rowcontainer/numbered_row_container.go @@ -54,7 +54,7 @@ type DiskBackedNumberedRowContainer struct { // Arguments: // - deDup is true if it should de-duplicate. // - types is the schema of rows that will be added to this container. -// - evalCtx defines the context. +// - evalCtx defines the context. It will **not** be mutated. // - engine is the underlying store that rows are stored on when the container // spills to disk. // - memoryMonitor is used to monitor this container's memory usage. diff --git a/pkg/sql/rowcontainer/row_container.go b/pkg/sql/rowcontainer/row_container.go index b197557e8d15..74420eb84fff 100644 --- a/pkg/sql/rowcontainer/row_container.go +++ b/pkg/sql/rowcontainer/row_container.go @@ -179,6 +179,8 @@ var _ IndexedRowContainer = &MemRowContainer{} // Init initializes the MemRowContainer. The MemRowContainer uses the planner's // monitor to track memory usage. +// +// Note that eval context will **not** be mutated. func (mc *MemRowContainer) Init( ordering colinfo.ColumnOrdering, types []*types.T, evalCtx *eval.Context, ) { @@ -187,6 +189,8 @@ func (mc *MemRowContainer) Init( // InitWithMon initializes the MemRowContainer with an explicit monitor. Only // use this if the default MemRowContainer.Init() function is insufficient. +// +// Note that eval context will **not** be mutated. func (mc *MemRowContainer) InitWithMon( ordering colinfo.ColumnOrdering, types []*types.T, evalCtx *eval.Context, mon *mon.BytesMonitor, ) { @@ -431,7 +435,7 @@ var _ DeDupingRowContainer = &DiskBackedRowContainer{} // - ordering is the output ordering; the order in which rows should be sorted. // - types is the schema of rows that will be added to this container. // - evalCtx defines the context in which to evaluate comparisons, only used -// when storing rows in memory. +// when storing rows in memory. It will **not** be mutated. // - engine is the store used for rows when spilling to disk. // - memoryMonitor is used to monitor the DiskBackedRowContainer's memory usage. // If this monitor denies an allocation, the DiskBackedRowContainer will diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index 7d2d9803e013..86e45e4534db 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -52,6 +52,8 @@ func (af aggregateFuncs) close(ctx context.Context) { type aggregatorBase struct { execinfra.ProcessorBase + evalCtx *eval.Context + // runningState represents the state of the aggregator. This is in addition to // ProcessorBase.State - the runningState is only relevant when // ProcessorBase.State == StateRunning. @@ -113,6 +115,8 @@ func (ag *aggregatorBase) init( ag.bucketsAcc = memMonitor.MakeBoundAccount() ag.arena = stringarena.Make(&ag.bucketsAcc) ag.aggFuncsAcc = memMonitor.MakeBoundAccount() + ag.evalCtx = flowCtx.NewEvalCtx() + ag.evalCtx.SingleDatumAggMemAccount = &ag.aggFuncsAcc // Loop over the select expressions and extract any aggregate functions -- // non-aggregation functions are replaced with parser.NewIdentAggregate, @@ -135,7 +139,7 @@ func (ag *aggregatorBase) init( } } constructor, arguments, outputType, err := execagg.GetAggregateConstructor( - ctx, flowCtx.EvalCtx, semaCtx, &aggInfo, ag.inputTypes, + ctx, ag.evalCtx, semaCtx, &aggInfo, ag.inputTypes, ) if err != nil { return err @@ -147,8 +151,8 @@ func (ag *aggregatorBase) init( ag.outputTypes[i] = outputType } - return ag.ProcessorBase.Init( - ctx, self, post, ag.outputTypes, flowCtx, processorID, memMonitor, + return ag.ProcessorBase.InitWithEvalCtx( + ctx, self, post, ag.outputTypes, flowCtx, ag.evalCtx, processorID, memMonitor, execinfra.ProcStateOpts{ InputsToDrain: []execinfra.RowSource{ag.input}, TrailingMetaCallback: trailingMetaCallback, @@ -283,8 +287,7 @@ func newHashAggregator( buckets: make(map[string]aggregateFuncs), bucketsLenGrowThreshold: hashAggregatorBucketsInitialLen, } - - if err := ag.init( + return ag, ag.init( ctx, ag, flowCtx, @@ -296,15 +299,7 @@ func newHashAggregator( ag.close() return nil }, - ); err != nil { - return nil, err - } - - // A new tree.EvalCtx was created during initializing aggregatorBase above - // and will be used only by this aggregator, so it is ok to update EvalCtx - // directly. - ag.EvalCtx.SingleDatumAggMemAccount = &ag.aggFuncsAcc - return ag, nil + ) } func newOrderedAggregator( @@ -316,8 +311,7 @@ func newOrderedAggregator( post *execinfrapb.PostProcessSpec, ) (*orderedAggregator, error) { ag := &orderedAggregator{} - - if err := ag.init( + return ag, ag.init( ctx, ag, flowCtx, @@ -329,15 +323,7 @@ func newOrderedAggregator( ag.close() return nil }, - ); err != nil { - return nil, err - } - - // A new tree.EvalCtx was created during initializing aggregatorBase above - // and will be used only by this aggregator, so it is ok to update EvalCtx - // directly. - ag.EvalCtx.SingleDatumAggMemAccount = &ag.aggFuncsAcc - return ag, nil + ) } // Start is part of the RowSource interface. @@ -405,7 +391,7 @@ func (ag *orderedAggregator) close() { // columns, and false otherwise. func (ag *aggregatorBase) matchLastOrdGroupCols(row rowenc.EncDatumRow) (bool, error) { for _, colIdx := range ag.orderedGroupCols { - res, err := ag.lastOrdGroupCols[colIdx].Compare(ag.Ctx(), ag.inputTypes[colIdx], &ag.datumAlloc, ag.EvalCtx, &row[colIdx]) + res, err := ag.lastOrdGroupCols[colIdx].Compare(ag.Ctx(), ag.inputTypes[colIdx], &ag.datumAlloc, ag.evalCtx, &row[colIdx]) if res != 0 || err != nil { return false, err } @@ -965,7 +951,7 @@ func (ag *aggregatorBase) createAggregateFuncs() (aggregateFuncs, error) { } bucket := make(aggregateFuncs, len(ag.funcs)) for i, f := range ag.funcs { - agg := f.create(ag.EvalCtx, f.arguments) + agg := f.create(ag.evalCtx, f.arguments) if err := ag.bucketsAcc.Grow(ag.Ctx(), agg.Size()); err != nil { return nil, err } diff --git a/pkg/sql/rowexec/filterer.go b/pkg/sql/rowexec/filterer.go index fba60f758a6d..5cfd7aa0f1a6 100644 --- a/pkg/sql/rowexec/filterer.go +++ b/pkg/sql/rowexec/filterer.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/errors" ) @@ -25,8 +26,9 @@ import ( // boolean expression. type filtererProcessor struct { execinfra.ProcessorBase - input execinfra.RowSource - filter execinfrapb.ExprHelper + evalCtx *eval.Context + input execinfra.RowSource + filter execinfrapb.ExprHelper } var _ execinfra.Processor = &filtererProcessor{} @@ -43,14 +45,20 @@ func newFiltererProcessor( input execinfra.RowSource, post *execinfrapb.PostProcessSpec, ) (*filtererProcessor, error) { - f := &filtererProcessor{input: input} + f := &filtererProcessor{ + // Make a copy of the eval context since we're going to pass it to the + // ExprHelper later (which might modify it). + evalCtx: flowCtx.NewEvalCtx(), + input: input, + } types := input.OutputTypes() - if err := f.Init( + if err := f.InitWithEvalCtx( ctx, f, post, types, flowCtx, + f.evalCtx, processorID, nil, /* memMonitor */ execinfra.ProcStateOpts{InputsToDrain: []execinfra.RowSource{f.input}}, @@ -58,7 +66,7 @@ func newFiltererProcessor( return nil, err } - if err := f.filter.Init(ctx, spec.Filter, types, &f.SemaCtx, f.EvalCtx); err != nil { + if err := f.filter.Init(ctx, spec.Filter, types, &f.SemaCtx, f.evalCtx); err != nil { return nil, err } diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index df7d4f0c3cab..492d378df0a5 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -127,7 +127,7 @@ func newHashJoiner( h.leftEqColTypes[i] = leftTypes[eqCol] } - if err := h.joinerBase.init( + if _, err := h.joinerBase.init( ctx, h, flowCtx, @@ -154,7 +154,7 @@ func newHashJoiner( h.MemMonitor = execinfra.NewLimitedMonitor(ctx, flowCtx.Mon, flowCtx, "hashjoiner-limited") h.diskMonitor = execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, "hashjoiner-disk") h.hashTable = rowcontainer.NewHashDiskBackedRowContainer( - h.EvalCtx, h.MemMonitor, h.diskMonitor, h.FlowCtx.Cfg.TempStorage, + h.FlowCtx.EvalCtx, h.MemMonitor, h.diskMonitor, h.FlowCtx.Cfg.TempStorage, ) // If the trace is recording, instrument the hashJoiner to collect stats. diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index 23662376303d..4b3c75242535 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -95,8 +95,14 @@ func newInvertedFilterer( ifr.outputRow[ifr.invertedColIdx].Datum = tree.DNull // Initialize ProcessorBase. - if err := ifr.ProcessorBase.Init( - ctx, ifr, post, outputColTypes, flowCtx, processorID, nil, /* memMonitor */ + evalCtx := flowCtx.EvalCtx + if spec.PreFiltererSpec != nil { + // Only make a copy of the eval context if we're going to pass it to the + // ExprHelper later. + evalCtx = flowCtx.NewEvalCtx() + } + if err := ifr.ProcessorBase.InitWithEvalCtx( + ctx, ifr, post, outputColTypes, flowCtx, evalCtx, processorID, nil, /* memMonitor */ execinfra.ProcStateOpts{ InputsToDrain: []execinfra.RowSource{ifr.input}, TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { @@ -114,7 +120,7 @@ func newInvertedFilterer( ifr.rc = rowcontainer.NewDiskBackedNumberedRowContainer( true, /* deDup */ rcColTypes, - ifr.EvalCtx, + ifr.FlowCtx.EvalCtx, ifr.FlowCtx.Cfg.TempStorage, ifr.MemMonitor, ifr.diskMonitor, @@ -129,7 +135,7 @@ func newInvertedFilterer( semaCtx := flowCtx.NewSemaContext(flowCtx.Txn) var exprHelper execinfrapb.ExprHelper colTypes := []*types.T{spec.PreFiltererSpec.Type} - if err := exprHelper.Init(ctx, spec.PreFiltererSpec.Expression, colTypes, semaCtx, ifr.EvalCtx); err != nil { + if err := exprHelper.Init(ctx, spec.PreFiltererSpec.Expression, colTypes, semaCtx, evalCtx); err != nil { return nil, err } preFilterer, preFiltererState, err := invertedidx.NewBoundPreFilterer( diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 737156b11f19..d7376ad5e709 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -234,8 +234,10 @@ func newInvertedJoiner( } } - if err := ij.ProcessorBase.Init( - ctx, ij, post, outputColTypes, flowCtx, processorID, nil, /* memMonitor */ + // Always make a copy of the eval context since it might be mutated later. + evalCtx := flowCtx.NewEvalCtx() + if err := ij.ProcessorBase.InitWithEvalCtx( + ctx, ij, post, outputColTypes, flowCtx, evalCtx, processorID, nil, /* memMonitor */ execinfra.ProcStateOpts{ InputsToDrain: []execinfra.RowSource{ij.input}, TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { @@ -263,7 +265,7 @@ func newInvertedJoiner( // execbuilder.Builder.buildInvertedJoin. onExprColTypes[len(ij.inputTypes)+ij.invertedFetchedColOrdinal] = spec.InvertedColumnOriginalType - if err := ij.onExprHelper.Init(ctx, spec.OnExpr, onExprColTypes, semaCtx, ij.EvalCtx); err != nil { + if err := ij.onExprHelper.Init(ctx, spec.OnExpr, onExprColTypes, semaCtx, evalCtx); err != nil { return nil, err } combinedRowLen := len(onExprColTypes) @@ -274,11 +276,11 @@ func newInvertedJoiner( if ij.datumsToInvertedExpr == nil { var invertedExprHelper execinfrapb.ExprHelper - if err := invertedExprHelper.Init(ctx, spec.InvertedExpr, onExprColTypes, semaCtx, ij.EvalCtx); err != nil { + if err := invertedExprHelper.Init(ctx, spec.InvertedExpr, onExprColTypes, semaCtx, evalCtx); err != nil { return nil, err } ij.datumsToInvertedExpr, err = invertedidx.NewDatumsToInvertedExpr( - ctx, ij.EvalCtx, onExprColTypes, invertedExprHelper.Expr(), ij.fetchSpec.GeoConfig, + ctx, evalCtx, onExprColTypes, invertedExprHelper.Expr(), ij.fetchSpec.GeoConfig, ) if err != nil { return nil, err @@ -324,7 +326,7 @@ func newInvertedJoiner( ij.indexRows = rowcontainer.NewDiskBackedNumberedRowContainer( true, /* deDup */ rightColTypes, - ij.EvalCtx, + ij.FlowCtx.EvalCtx, ij.FlowCtx.Cfg.TempStorage, ij.MemMonitor, ij.diskMonitor, diff --git a/pkg/sql/rowexec/joinerbase.go b/pkg/sql/rowexec/joinerbase.go index eded74080b9a..ed5aaacf2962 100644 --- a/pkg/sql/rowexec/joinerbase.go +++ b/pkg/sql/rowexec/joinerbase.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" ) @@ -48,12 +49,12 @@ func (jb *joinerBase) init( outputContinuationColumn bool, post *execinfrapb.PostProcessSpec, opts execinfra.ProcStateOpts, -) error { +) (*eval.Context, error) { jb.joinType = jType if jb.joinType.IsSetOpJoin() { if !onExpr.Empty() { - return errors.Errorf("expected empty onExpr, got %v", onExpr) + return nil, errors.Errorf("expected empty onExpr, got %v", onExpr) } } @@ -85,13 +86,18 @@ func (jb *joinerBase) init( outputTypes = jType.MakeOutputTypes(leftTypes, rightTypes) } - if err := jb.ProcessorBase.Init( - ctx, self, post, outputTypes, flowCtx, processorID, nil /* memMonitor */, opts, + evalCtx := flowCtx.EvalCtx + if !onExpr.Empty() { + // Only make a copy if we need to evaluate ON expression. + evalCtx = flowCtx.NewEvalCtx() + } + if err := jb.ProcessorBase.InitWithEvalCtx( + ctx, self, post, outputTypes, flowCtx, evalCtx, processorID, nil /* memMonitor */, opts, ); err != nil { - return err + return nil, err } semaCtx := flowCtx.NewSemaContext(flowCtx.Txn) - return jb.onCond.Init(ctx, onExpr, onCondTypes, semaCtx, jb.EvalCtx) + return evalCtx, jb.onCond.Init(ctx, onExpr, onCondTypes, semaCtx, evalCtx) } // joinSide is the utility type to distinguish between two sides of the join. diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index e8d171b473b9..068e4937c9b9 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -403,7 +403,7 @@ func newJoinReader( } rightTypes := spec.FetchSpec.FetchedColumnTypes() - if err := jr.joinerBase.init( + evalCtx, err := jr.joinerBase.init( ctx, jr, flowCtx, @@ -425,22 +425,29 @@ func newJoinReader( return trailingMeta }, }, - ); err != nil { + ) + if err != nil { return nil, err } if !spec.LookupExpr.Empty() { + if evalCtx == flowCtx.EvalCtx { + // We haven't created a copy of the eval context yet (because it is + // only done in init if we have a non-empty ON expression), but we + // actually need a copy. + evalCtx = flowCtx.NewEvalCtx() + } lookupExprTypes := make([]*types.T, 0, len(leftTypes)+len(rightTypes)) lookupExprTypes = append(lookupExprTypes, leftTypes...) lookupExprTypes = append(lookupExprTypes, rightTypes...) semaCtx := flowCtx.NewSemaContext(jr.txn) - if err := jr.lookupExpr.Init(ctx, spec.LookupExpr, lookupExprTypes, semaCtx, jr.EvalCtx); err != nil { + if err := jr.lookupExpr.Init(ctx, spec.LookupExpr, lookupExprTypes, semaCtx, evalCtx); err != nil { return nil, err } if !spec.RemoteLookupExpr.Empty() { if err := jr.remoteLookupExpr.Init( - ctx, spec.RemoteLookupExpr, lookupExprTypes, semaCtx, jr.EvalCtx, + ctx, spec.RemoteLookupExpr, lookupExprTypes, semaCtx, evalCtx, ); err != nil { return nil, err } @@ -725,7 +732,7 @@ func (jr *joinReader) initJoinReaderStrategy( drc := rowcontainer.NewDiskBackedNumberedRowContainer( false, /* deDup */ typs, - jr.EvalCtx, + jr.FlowCtx.EvalCtx, jr.FlowCtx.Cfg.TempStorage, jr.limitedMemMonitor, jr.diskMonitor, diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index bb8131d1a13b..fafb85d21fd1 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -73,7 +73,7 @@ func newMergeJoiner( m.ExecStatsForTrace = m.execStatsForTrace } - if err := m.joinerBase.init( + if _, err := m.joinerBase.init( ctx, m /* self */, flowCtx, processorID, leftSource.OutputTypes(), rightSource.OutputTypes(), spec.Type, spec.OnExpr, false /* outputContinuationColumn */, post, execinfra.ProcStateOpts{ diff --git a/pkg/sql/rowexec/processors_test.go b/pkg/sql/rowexec/processors_test.go index 1993627feb1a..5de642227581 100644 --- a/pkg/sql/rowexec/processors_test.go +++ b/pkg/sql/rowexec/processors_test.go @@ -184,6 +184,8 @@ func TestPostProcess(t *testing.T) { }, } + ctx := context.Background() + flowCtx := &execinfra.FlowCtx{} for tcIdx, tc := range testCases { t.Run(strconv.Itoa(tcIdx), func(t *testing.T) { inBuf := distsqlutils.NewRowBuffer(types.ThreeIntCols, input, distsqlutils.RowBufferArgs{}) @@ -192,14 +194,14 @@ func TestPostProcess(t *testing.T) { var out execinfra.ProcOutputHelper semaCtx := tree.MakeSemaContext(nil /* resolver */) evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) - defer evalCtx.Stop(context.Background()) - if err := out.Init(context.Background(), &tc.post, inBuf.OutputTypes(), &semaCtx, evalCtx); err != nil { + defer evalCtx.Stop(ctx) + if err := out.Init(ctx, &tc.post, inBuf.OutputTypes(), &semaCtx, evalCtx, flowCtx); err != nil { t.Fatal(err) } // Run the rows through the helper. for i := range input { - status, err := out.EmitRow(context.Background(), input[i], outBuf) + status, err := out.EmitRow(ctx, input[i], outBuf) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index 46275c3b46d9..22c459be20ff 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -30,8 +30,9 @@ import ( type projectSetProcessor struct { execinfra.ProcessorBase - input execinfra.RowSource - spec *execinfrapb.ProjectSetSpec + evalCtx *eval.Context + input execinfra.RowSource + spec *execinfrapb.ProjectSetSpec // eh contains the type-checked expression specified in the ROWS FROM // syntax. It can contain many kinds of expressions (anything that is @@ -82,6 +83,8 @@ func newProjectSetProcessor( ) (*projectSetProcessor, error) { outputTypes := append(input.OutputTypes(), spec.GeneratedColumns...) ps := &projectSetProcessor{ + // We'll be mutating the eval context, so we always need a copy. + evalCtx: flowCtx.NewEvalCtx(), input: input, spec: spec, funcs: make([]tree.TypedExpr, len(spec.Exprs)), @@ -89,12 +92,13 @@ func newProjectSetProcessor( gens: make([]eval.ValueGenerator, len(spec.Exprs)), done: make([]bool, len(spec.Exprs)), } - if err := ps.Init( + if err := ps.InitWithEvalCtx( ctx, ps, post, outputTypes, flowCtx, + ps.evalCtx, processorID, nil, /* memMonitor */ execinfra.ProcStateOpts{ @@ -110,7 +114,7 @@ func newProjectSetProcessor( // Initialize exprHelper. semaCtx := ps.FlowCtx.NewSemaContext(ps.FlowCtx.Txn) - err := ps.eh.Init(ctx, len(ps.spec.Exprs), ps.input.OutputTypes(), semaCtx, ps.EvalCtx) + err := ps.eh.Init(ctx, len(ps.spec.Exprs), ps.input.OutputTypes(), semaCtx, ps.evalCtx) if err != nil { return nil, err } @@ -159,7 +163,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // Set expression helper row so that we can use it as an // IndexedVarContainer. ps.eh.SetRow(row) - ps.EvalCtx.IVarContainer = ps.eh.IVarContainer() + ps.evalCtx.IVarContainer = ps.eh.IVarContainer() // Initialize a round of SRF generators or scalar values. for i, n := 0, ps.eh.ExprCount(); i < n; i++ { @@ -177,9 +181,9 @@ func (ps *projectSetProcessor) nextInputRow() ( var err error switch t := fn.(type) { case *tree.FuncExpr: - gen, err = eval.GetFuncGenerator(ps.Ctx(), ps.EvalCtx, t) + gen, err = eval.GetFuncGenerator(ps.Ctx(), ps.evalCtx, t) case *tree.RoutineExpr: - gen, err = eval.GetRoutineGenerator(ps.Ctx(), ps.EvalCtx, t) + gen, err = eval.GetRoutineGenerator(ps.Ctx(), ps.evalCtx, t) if err == nil && gen == nil && t.MultiColOutput && !t.Generator { // If the routine will return multiple output columns, we expect the // routine to return nulls for each column type instead of no rows, so diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index e3be72061b80..1e6e059b69b8 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -413,7 +413,7 @@ func (s *sampleAggregator) sampleRow( ctx context.Context, sr *stats.SampleReservoir, sampleRow rowenc.EncDatumRow, rank uint64, ) error { prevCapacity := sr.Cap() - if err := sr.SampleRow(ctx, s.EvalCtx, sampleRow, rank); err != nil { + if err := sr.SampleRow(ctx, s.FlowCtx.EvalCtx, sampleRow, rank); err != nil { if code := pgerror.GetPGCode(err); code != pgcode.OutOfMemory { return err } @@ -462,7 +462,9 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error { if err != nil { return err } - lowerBound, err = eval.Expr(ctx, s.EvalCtx, lbTypedExpr) + // Lower bounds are serialized datums, so evaluating the + // expression shouldn't modify the eval context. + lowerBound, err = eval.Expr(ctx, s.FlowCtx.EvalCtx, lbTypedExpr) if err != nil { return err } @@ -470,7 +472,7 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error { h, err := s.generateHistogram( ctx, - s.EvalCtx, + s.FlowCtx.EvalCtx, &s.sr, colIdx, typ, @@ -501,7 +503,7 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error { // inverted keys. h, err := s.generateHistogram( ctx, - s.EvalCtx, + s.FlowCtx.EvalCtx, invSr, 0, /* colIdx */ types.Bytes, diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 465c7f9815f3..f5c4480f10cc 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -461,7 +461,7 @@ func (s *samplerProcessor) sampleRow( // Use Int63 so we don't have headaches converting to DInt. rank := uint64(rng.Int63()) prevCapacity := sr.Cap() - if err := sr.SampleRow(ctx, s.EvalCtx, row, rank); err != nil { + if err := sr.SampleRow(ctx, s.FlowCtx.EvalCtx, row, rank); err != nil { if !sqlerrors.IsOutOfMemoryError(err) { return false, err } diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index bed0955185a3..f93df29d8c48 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -73,7 +73,7 @@ func (s *sorterBase) init( rc.Init( ordering, input.OutputTypes(), - s.EvalCtx, + s.FlowCtx.EvalCtx, flowCtx.Cfg.TempStorage, memMonitor, s.diskMonitor, diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index 6fb58ad1d2ec..1966cf704818 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -61,6 +61,8 @@ const memRequiredByWindower = 100 * 1024 type windower struct { execinfra.ProcessorBase + evalCtx *eval.Context + // runningState represents the state of the windower. This is in addition to // ProcessorBase.State - the runningState is only relevant when // ProcessorBase.State == StateRunning. @@ -107,9 +109,9 @@ func newWindower( post *execinfrapb.PostProcessSpec, ) (*windower, error) { w := &windower{ - input: input, + evalCtx: flowCtx.NewEvalCtx(), + input: input, } - evalCtx := flowCtx.NewEvalCtx() w.inputTypes = input.OutputTypes() // Limit the memory use by creating a child monitor with a hard limit. @@ -122,7 +124,7 @@ func newWindower( // them to reuse the same shared memory account with the windower. Notably, // we need to update the eval context before constructing the window // builtins. - evalCtx.SingleDatumAggMemAccount = &w.acc + w.evalCtx.SingleDatumAggMemAccount = &w.acc w.partitionBy = spec.PartitionBy windowFns := spec.WindowFns @@ -144,7 +146,7 @@ func newWindower( } w.outputTypes[windowFn.OutputColIdx] = outputType - w.builtins = append(w.builtins, windowConstructor(evalCtx)) + w.builtins = append(w.builtins, windowConstructor(w.evalCtx)) wf := &windowFunc{ ordering: windowFn.Ordering, argsIdxs: windowFn.ArgsIdxs, @@ -163,7 +165,7 @@ func newWindower( post, w.outputTypes, flowCtx, - evalCtx, + w.evalCtx, processorID, limitedMon, execinfra.ProcStateOpts{InputsToDrain: []execinfra.RowSource{w.input}, @@ -177,7 +179,7 @@ func newWindower( w.diskMonitor = execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, "windower-disk") w.allRowsPartitioned = rowcontainer.NewHashDiskBackedRowContainer( - evalCtx, w.MemMonitor, w.diskMonitor, flowCtx.Cfg.TempStorage, + w.evalCtx, w.MemMonitor, w.diskMonitor, flowCtx.Cfg.TempStorage, ) if err := w.allRowsPartitioned.Init( ctx, @@ -243,7 +245,7 @@ func (w *windower) close() { w.partition.Close(w.Ctx()) } for _, builtin := range w.builtins { - builtin.Close(w.Ctx(), w.EvalCtx) + builtin.Close(w.Ctx(), w.evalCtx) } w.acc.Close(w.Ctx()) w.MemMonitor.Stop(w.Ctx()) @@ -304,7 +306,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr return windowerStateUnknown, nil, w.DrainHelper() } - if err := w.computeWindowFunctions(w.Ctx(), w.EvalCtx); err != nil { + if err := w.computeWindowFunctions(w.Ctx(), w.evalCtx); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -615,7 +617,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *eval.Con w.partition = rowcontainer.NewDiskBackedIndexedRowContainer( ordering, w.inputTypes, - w.EvalCtx, + w.FlowCtx.EvalCtx, w.FlowCtx.Cfg.TempStorage, w.MemMonitor, w.diskMonitor, diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 1d59ed963693..8319b1dea0c0 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -305,7 +305,7 @@ func newZigzagJoiner( leftColumnTypes := spec.Sides[0].FetchSpec.FetchedColumnTypes() rightColumnTypes := spec.Sides[1].FetchSpec.FetchedColumnTypes() - err := z.joinerBase.init( + _, err := z.joinerBase.init( ctx, z, /* self */ flowCtx, diff --git a/pkg/sql/rowflow/routers.go b/pkg/sql/rowflow/routers.go index 50d4eac6f243..07c00dc6b17a 100644 --- a/pkg/sql/rowflow/routers.go +++ b/pkg/sql/rowflow/routers.go @@ -293,7 +293,9 @@ func (rb *routerBase) init( rb.outputs[i].mu.rowContainer.Init( nil, /* ordering */ types, - flowCtx.NewEvalCtx(), + // Eval context will not be mutated, so it's ok to use the shared + // one. + flowCtx.EvalCtx, flowCtx.Cfg.TempStorage, rb.outputs[i].memoryMonitor, rb.outputs[i].diskMonitor,