Skip to content

Commit

Permalink
sql: audit all processors to avoid redundant copies of eval context
Browse files Browse the repository at this point in the history
This commit audits all processors to ensure that we make copies of the
eval context only when necessary. Up until recently every processor
mutated the context by modifying the `context.Context` field, but now
that field has been removed, so only a handful of processors mutate the
context (namely, whenever a processor explicitly mutates the context or
when it uses the rendering / filtering capabilities of the ExprHelper).
This commit makes it explicit whenever a copy is needed and uses the
flow's eval context when no mutation occurs. In order to clarify
different usages it removes `ProcessorBaseNoHelper.EvalCtx` field and
moves it into each processor if it needs it. Rendering capabilities are
used implicitly by many processors via `ProcOutputHelper`, and if
a processor itself doesn't need a mutable eval context, we can create
a copy for the helper without exposing it to the processor itself.

Release note: None
  • Loading branch information
yuzefovich committed Jun 24, 2024
1 parent 996d175 commit 40addd2
Show file tree
Hide file tree
Showing 30 changed files with 163 additions and 139 deletions.
18 changes: 12 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
Expand Down Expand Up @@ -929,9 +930,10 @@ const (
type changeFrontier struct {
execinfra.ProcessorBase

spec execinfrapb.ChangeFrontierSpec
memAcc mon.BoundAccount
a tree.DatumAlloc
evalCtx *eval.Context
spec execinfrapb.ChangeFrontierSpec
memAcc mon.BoundAccount
a tree.DatumAlloc

// input returns rows from one or more changeAggregator processors
input execinfra.RowSource
Expand Down Expand Up @@ -1135,6 +1137,9 @@ func newChangeFrontierProcessor(
}

cf := &changeFrontier{
// We might modify the ChangefeedState field in the eval.Context, so we
// need to make a copy.
evalCtx: flowCtx.NewEvalCtx(),
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
input: input,
Expand All @@ -1146,12 +1151,13 @@ func newChangeFrontierProcessor(
cf.knobs = *cfKnobs
}

if err := cf.Init(
if err := cf.InitWithEvalCtx(
ctx,
cf,
post,
input.OutputTypes(),
flowCtx,
cf.evalCtx,
processorID,
memMonitor,
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -1248,12 +1254,12 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.sink = &errorWrapperSink{wrapped: cf.sink}

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.EvalCtx.ChangefeedState == nil {
if cf.evalCtx.ChangefeedState == nil {
cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state"))
return
}

cf.localState = cf.EvalCtx.ChangefeedState.(*cachedState)
cf.localState = cf.evalCtx.ChangefeedState.(*cachedState)
cf.js = newJobState(nil, cf.FlowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{})

if cf.spec.JobID != 0 {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ func (cb *ColumnBackfiller) InitForDistributedUse(
mon *mon.BytesMonitor,
) error {
cb.initCols(desc)
// We'll be modifying the eval.Context in RunColumnBackfillChunk, so we need
// to make a copy.
evalCtx := flowCtx.NewEvalCtx()
var defaultExprs, computedExprs []tree.TypedExpr
// Install type metadata in the target descriptors, as well as resolve any
Expand Down Expand Up @@ -654,6 +656,8 @@ func (ib *IndexBackfiller) InitForDistributedUse(
return err
}

// We'll be modifying the eval.Context in BuildIndexEntriesChunk, so we need
// to make a copy.
evalCtx := flowCtx.NewEvalCtx()
var predicates map[descpb.IndexID]tree.TypedExpr
var colExprs map[descpb.ColumnID]tree.TypedExpr
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ func newColumnarizer(
c.ProcessorBaseNoHelper.Init(
nil, /* self */
flowCtx,
// The columnarizer will update the eval context when closed, so we give
// it a copy of the eval context to preserve the "global" eval context
// from being mutated.
flowCtx.NewEvalCtx(),
processorID,
execinfra.ProcStateOpts{
// We append input to inputs to drain below in order to reuse the same
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,6 @@ func NewMaterializer(
m.Init(
m,
flowCtx,
// The materializer will update the eval context when closed, so we give
// it a copy of the eval context to preserve the "global" eval context
// from being mutated.
flowCtx.NewEvalCtx(),
processorID,
execinfra.ProcStateOpts{
// We append drainHelper to inputs to drain below in order to reuse
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/colflow/flow_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func NewFlowCoordinator(
f.Init(
f,
flowCtx,
// The FlowCoordinator will update the eval context when closed, so we
// give it a copy of the eval context to preserve the "global" eval
// context from being mutated.
flowCtx.NewEvalCtx(),
processorID,
execinfra.ProcStateOpts{
// We append input to inputs to drain below in order to reuse
Expand Down
27 changes: 8 additions & 19 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ type FlowCtx struct {
// ID is a unique identifier for a flow.
ID execinfrapb.FlowID

// EvalCtx is used by all the processors in the flow to evaluate expressions.
// Processors that intend to evaluate expressions with this EvalCtx should
// get a copy with NewEvalCtx instead of storing a pointer to this one
// directly (since some processor mutate the EvalContext they use).
//
// TODO(andrei): Get rid of this field and pass a non-shared EvalContext to
// cores of the processors that need it.
// EvalCtx is used by all the processors in the flow to access immutable
// state of the execution context. Processors that intend to evaluate
// expressions with this EvalCtx should get a copy with NewEvalCtx instead
// of using this field.
EvalCtx *eval.Context

Mon *mon.BytesMonitor
Expand Down Expand Up @@ -107,19 +104,11 @@ type FlowCtx struct {
TenantCPUMonitor multitenantcpu.CPUUsageHelper
}

// NewEvalCtx returns a modifiable copy of the FlowCtx's EvalContext.
// Processors should use this method any time they need to store a pointer to
// the EvalContext, since processors may mutate the EvalContext. Specifically,
// every processor that runs ProcOutputHelper.Init must pass in a modifiable
// EvalContext, since it stores that EvalContext in its exprHelpers and mutates
// them at runtime to ensure expressions are evaluated with the correct indexed
// var context.
// TODO(yuzefovich): once we remove eval.Context.deprecatedContext, re-evaluate
// this since many processors don't modify the eval context except for that
// field.
// NewEvalCtx returns a modifiable copy of the FlowCtx's eval.Context.
// Processors should use this method whenever they explicitly modify the
// eval.Context.
func (flowCtx *FlowCtx) NewEvalCtx() *eval.Context {
evalCopy := flowCtx.EvalCtx.Copy()
return evalCopy
return flowCtx.EvalCtx.Copy()
}

// TestingKnobs returns the distsql testing knobs for this flow context.
Expand Down
26 changes: 17 additions & 9 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,16 @@ func (h *ProcOutputHelper) Reset() {
// omitted if there is no filtering expression.
// Note that the types slice may be stored directly; the caller should not
// modify it.
//
// If the provided evalCtx is the same as flowCtx.EvalCtx, then a copy will be
// made internally when there are render expressions.
func (h *ProcOutputHelper) Init(
ctx context.Context,
post *execinfrapb.PostProcessSpec,
coreOutputTypes []*types.T,
semaCtx *tree.SemaContext,
evalCtx *eval.Context,
flowCtx *FlowCtx,
) error {
if !post.Projection && len(post.OutputColumns) > 0 {
return errors.Errorf("post-processing has projection unset but output columns set: %s", post)
Expand Down Expand Up @@ -157,6 +161,12 @@ func (h *ProcOutputHelper) Init(
h.OutputTypes[i] = coreOutputTypes[c]
}
} else if nRenders := len(post.RenderExprs); nRenders > 0 {
if evalCtx == flowCtx.EvalCtx {
// We haven't created a copy of the eval context, and we have some
// renders, then we'll need to create a copy ourselves since we're
// going to use the ExprHelper which might mutate the eval context.
evalCtx = flowCtx.NewEvalCtx()
}
if cap(h.OutputTypes) >= nRenders {
h.OutputTypes = h.OutputTypes[:nRenders]
} else {
Expand Down Expand Up @@ -346,9 +356,6 @@ type ProcessorBaseNoHelper struct {

FlowCtx *FlowCtx

// EvalCtx is used for expression evaluation. It overrides the one in flowCtx.
EvalCtx *eval.Context

// Closed is set by InternalClose(). Once set, the processor's tracing span
// has been closed.
Closed bool
Expand Down Expand Up @@ -745,6 +752,8 @@ type ProcStateOpts struct {
// - coreOutputTypes are the type schema of the rows output by the processor
// core (i.e. the "internal schema" of the processor, see
// execinfrapb.ProcessorSpec for more details).
//
// NB: it is assumed that the caller will not modify the eval context.
func (pb *ProcessorBase) Init(
ctx context.Context,
self RowSource,
Expand All @@ -756,7 +765,7 @@ func (pb *ProcessorBase) Init(
opts ProcStateOpts,
) error {
return pb.InitWithEvalCtx(
ctx, self, post, coreOutputTypes, flowCtx, flowCtx.NewEvalCtx(), processorID, memMonitor, opts,
ctx, self, post, coreOutputTypes, flowCtx, flowCtx.EvalCtx, processorID, memMonitor, opts,
)
}

Expand All @@ -775,26 +784,25 @@ func (pb *ProcessorBase) InitWithEvalCtx(
memMonitor *mon.BytesMonitor,
opts ProcStateOpts,
) error {
pb.ProcessorBaseNoHelper.Init(self, flowCtx, evalCtx, processorID, opts)
pb.ProcessorBaseNoHelper.Init(self, flowCtx, processorID, opts)
pb.MemMonitor = memMonitor

// Hydrate all types used in the processor.
resolver := flowCtx.NewTypeResolver(flowCtx.Txn)
if err := resolver.HydrateTypeSlice(ctx, coreOutputTypes); err != nil {
return err
}
pb.SemaCtx = tree.MakeSemaContext(&resolver)

return pb.OutputHelper.Init(ctx, post, coreOutputTypes, &pb.SemaCtx, pb.EvalCtx)
pb.SemaCtx = tree.MakeSemaContext(&resolver)
return pb.OutputHelper.Init(ctx, post, coreOutputTypes, &pb.SemaCtx, evalCtx, flowCtx)
}

// Init initializes the ProcessorBaseNoHelper.
func (pb *ProcessorBaseNoHelper) Init(
self RowSource, flowCtx *FlowCtx, evalCtx *eval.Context, processorID int32, opts ProcStateOpts,
self RowSource, flowCtx *FlowCtx, processorID int32, opts ProcStateOpts,
) {
pb.self = self
pb.FlowCtx = flowCtx
pb.EvalCtx = evalCtx
pb.ProcessorID = processorID
pb.trailingMetaCallback = opts.TrailingMetaCallback
if opts.InputsToDrain != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfrapb/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (eh *MultiExprHelper) Reset() {
eh.exprs = eh.exprs[:0]
}

// exprHelper is a base implementation of an expressoin helper used by both
// exprHelper is a base implementation of an expression helper used by both
// ExprHelper and MultiExprHelper.
type exprHelper struct {
evalCtx *eval.Context
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/importer/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func newCSVWriterProcessor(
input: input,
}
semaCtx := tree.MakeSemaContext(nil /* resolver */)
if err := c.out.Init(ctx, post, colinfo.ExportColumnTypes, &semaCtx, flowCtx.NewEvalCtx()); err != nil {
if err := c.out.Init(ctx, post, colinfo.ExportColumnTypes, &semaCtx, flowCtx.EvalCtx, flowCtx); err != nil {
return nil, err
}
return c, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/importer/exportparquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newParquetWriterProcessor(
input: input,
}
semaCtx := tree.MakeSemaContext(nil /* resolver */)
if err := c.out.Init(ctx, post, colinfo.ExportColumnTypes, &semaCtx, flowCtx.NewEvalCtx()); err != nil {
if err := c.out.Init(ctx, post, colinfo.ExportColumnTypes, &semaCtx, flowCtx.EvalCtx, flowCtx); err != nil {
return nil, err
}
return c, nil
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ func (p *planNodeToRowSource) Init(
post,
p.outputTypes,
flowCtx,
// Note that we have already created a copy of the extendedEvalContext
// (which made a copy of the EvalContext) right before calling
// newPlanNodeToRowSource, so we can just use the eval context from the
// params.
p.params.EvalContext(),
flowCtx.EvalCtx,
processorID,
nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/rowcontainer/hash_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,8 @@ type HashDiskBackedRowContainer struct {
var _ HashRowContainer = &HashDiskBackedRowContainer{}

// NewHashDiskBackedRowContainer makes a HashDiskBackedRowContainer.
//
// Note that eval context will **not** be mutated.
func NewHashDiskBackedRowContainer(
evalCtx *eval.Context,
memoryMonitor *mon.BytesMonitor,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowcontainer/numbered_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type DiskBackedNumberedRowContainer struct {
// Arguments:
// - deDup is true if it should de-duplicate.
// - types is the schema of rows that will be added to this container.
// - evalCtx defines the context.
// - evalCtx defines the context. It will **not** be mutated.
// - engine is the underlying store that rows are stored on when the container
// spills to disk.
// - memoryMonitor is used to monitor this container's memory usage.
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/rowcontainer/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ var _ IndexedRowContainer = &MemRowContainer{}

// Init initializes the MemRowContainer. The MemRowContainer uses the planner's
// monitor to track memory usage.
//
// Note that eval context will **not** be mutated.
func (mc *MemRowContainer) Init(
ordering colinfo.ColumnOrdering, types []*types.T, evalCtx *eval.Context,
) {
Expand All @@ -187,6 +189,8 @@ func (mc *MemRowContainer) Init(

// InitWithMon initializes the MemRowContainer with an explicit monitor. Only
// use this if the default MemRowContainer.Init() function is insufficient.
//
// Note that eval context will **not** be mutated.
func (mc *MemRowContainer) InitWithMon(
ordering colinfo.ColumnOrdering, types []*types.T, evalCtx *eval.Context, mon *mon.BytesMonitor,
) {
Expand Down Expand Up @@ -431,7 +435,7 @@ var _ DeDupingRowContainer = &DiskBackedRowContainer{}
// - ordering is the output ordering; the order in which rows should be sorted.
// - types is the schema of rows that will be added to this container.
// - evalCtx defines the context in which to evaluate comparisons, only used
// when storing rows in memory.
// when storing rows in memory. It will **not** be mutated.
// - engine is the store used for rows when spilling to disk.
// - memoryMonitor is used to monitor the DiskBackedRowContainer's memory usage.
// If this monitor denies an allocation, the DiskBackedRowContainer will
Expand Down
Loading

0 comments on commit 40addd2

Please sign in to comment.