Skip to content

Commit

Permalink
colexec: improve handling of the metadata
Browse files Browse the repository at this point in the history
This commit audits all of the places where we're operating with the
producer metadata and improves things a bit. This was prompted by the
fact that some types of the metadata (in particular, the
LeafTxnFinalState) can be of non-trivial footprint, so the sooner we
lose the reference to the metadata objects, the more stable CRDB will
be.

This commit adds the memory accounting for the LeafTxnFinalState
metadata objects in most (all?) places that buffer metadata (inbox,
columnarizer, materializer, parallel unordered synchronizer). The
reservations are released whenever the buffered component is drained,
however, the drainer - who takes over the metadata - might not perform
the accounting. The difficulty there is that the lifecycle of the
metadata objects are not super clear: it's possible that we're at the
end of the execution, and the whole plan is being drained - in such a
scenario, the metadata is pushed into the DistSQLReceiver and then
imported into the root txn and is discarded (modulo the increased root
txn footprint); it's also possible that the metadata from the drained
component gets buffered somewhere up the tree. But this commit adds the
accounting in most such places.

Release note: None
  • Loading branch information
yuzefovich committed Jul 29, 2022
1 parent 50fde61 commit 0866ddc
Show file tree
Hide file tree
Showing 21 changed files with 155 additions and 52 deletions.
17 changes: 13 additions & 4 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ func checkNumIn(inputs []colexecargs.OpWithMetaInfo, numIn int) error {
// execution flow and returns toWrap's output as an Operator.
// - materializerSafeToRelease indicates whether the materializers created in
// order to row-sourcify the inputs are safe to be released on the flow cleanup.
// - streamingMemAccFactory must be non-nil, and in the production setting must
// return separate accounts on each invocation.
func wrapRowSources(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
inputs []colexecargs.OpWithMetaInfo,
inputTypes [][]*types.T,
streamingMemAccount *mon.BoundAccount,
streamingMemAccFactory func() *mon.BoundAccount,
processorID int32,
newToWrap func([]execinfra.RowSource) (execinfra.RowSource, error),
materializerSafeToRelease bool,
Expand All @@ -88,6 +90,7 @@ func wrapRowSources(
toWrapInputs = append(toWrapInputs, c.Input())
} else {
toWrapInput := colexec.NewMaterializer(
colmem.NewAllocator(ctx, streamingMemAccFactory(), factory),
flowCtx,
processorID,
inputs[i],
Expand Down Expand Up @@ -119,11 +122,11 @@ func wrapRowSources(
var c *colexec.Columnarizer
if proc.MustBeStreaming() {
c = colexec.NewStreamingColumnarizer(
colmem.NewAllocator(ctx, streamingMemAccount, factory), flowCtx, processorID, toWrap,
colmem.NewAllocator(ctx, streamingMemAccFactory(), factory), flowCtx, processorID, toWrap,
)
} else {
c = colexec.NewBufferingColumnarizer(
colmem.NewAllocator(ctx, streamingMemAccount, factory), flowCtx, processorID, toWrap,
colmem.NewAllocator(ctx, streamingMemAccFactory(), factory), flowCtx, processorID, toWrap,
)
}
return c, releasables, nil
Expand Down Expand Up @@ -538,12 +541,18 @@ func (r opResult) createAndWrapRowSource(
// LocalPlanNode cores which is the case when we have non-empty
// LocalProcessors.
materializerSafeToRelease := len(args.LocalProcessors) == 0
streamingMemAccFactory := args.StreamingMemAccFactory
if streamingMemAccFactory == nil {
streamingMemAccFactory = func() *mon.BoundAccount {
return args.StreamingMemAccount
}
}
c, releasables, err := wrapRowSources(
ctx,
flowCtx,
inputs,
inputTypes,
args.StreamingMemAccount,
streamingMemAccFactory,
processorID,
func(inputs []execinfra.RowSource) (execinfra.RowSource, error) {
// We provide a slice with a single nil as 'outputs' parameter
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
defer r.TestCleanupNoError(t)

m := colexec.NewMaterializer(
nil, /* allocator */
flowCtx,
0, /* processorID */
r.OpWithMetaInfo,
Expand Down
23 changes: 12 additions & 11 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@ type OpWithMetaInfo struct {
// NewColOperatorArgs is a helper struct that encompasses all of the input
// arguments to NewColOperator call.
type NewColOperatorArgs struct {
Spec *execinfrapb.ProcessorSpec
Inputs []OpWithMetaInfo
StreamingMemAccount *mon.BoundAccount
ProcessorConstructor execinfra.ProcessorConstructor
LocalProcessors []execinfra.LocalProcessor
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Factory coldata.ColumnFactory
MonitorRegistry *MonitorRegistry
TestingKnobs struct {
Spec *execinfrapb.ProcessorSpec
Inputs []OpWithMetaInfo
StreamingMemAccount *mon.BoundAccount
StreamingMemAccFactory func() *mon.BoundAccount
ProcessorConstructor execinfra.ProcessorConstructor
LocalProcessors []execinfra.LocalProcessor
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Factory coldata.ColumnFactory
MonitorRegistry *MonitorRegistry
TestingKnobs struct {
// SpillingCallbackFn will be called when the spilling from an in-memory
// to disk-backed operator occurs. It should only be set in tests.
SpillingCallbackFn func()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecbase/simple_project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestSimpleProjectOpWithUnorderedSynchronizer(t *testing.T) {
for i := range parallelUnorderedSynchronizerInputs {
parallelUnorderedSynchronizerInputs[i].Root = inputs[i]
}
input = colexec.NewParallelUnorderedSynchronizer(parallelUnorderedSynchronizerInputs, &wg)
input = colexec.NewParallelUnorderedSynchronizer(testAllocator, parallelUnorderedSynchronizerInputs, &wg)
input = colexecbase.NewSimpleProjectOp(input, len(inputTypes), []uint32{0})
return colexecbase.NewConstOp(testAllocator, input, types.Int, constVal, 1)
})
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/col/coldata",
"//pkg/roachpb",
"//pkg/sql/colcontainer",
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
"//pkg/sql/colmem",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlerrors",
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/colexec/colexecutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package colexecutils

import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -318,3 +320,15 @@ func init() {
DefaultSelectionVector[i] = i
}
}

// AccountForMetadata registers the memory footprint of meta with the allocator.
func AccountForMetadata(allocator *colmem.Allocator, meta []execinfrapb.ProducerMetadata) {
for i := range meta {
// Perform the memory accounting for the LeafTxnFinalState metadata
// since it might be of non-trivial size.
if ltfs := meta[i].LeafTxnFinalState; ltfs != nil {
memUsage := roachpb.Spans(ltfs.RefreshSpans).MemUsage()
allocator.AdjustMemoryUsage(memUsage)
}
}
}
20 changes: 17 additions & 3 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
Expand Down Expand Up @@ -77,6 +78,7 @@ var _ colexecop.VectorizedStatsCollector = &Columnarizer{}

// NewBufferingColumnarizer returns a new Columnarizer that will be buffering up
// rows before emitting them as output batches.
// - allocator must use a memory account that is not shared with any other user.
func NewBufferingColumnarizer(
allocator *colmem.Allocator,
flowCtx *execinfra.FlowCtx,
Expand All @@ -88,6 +90,7 @@ func NewBufferingColumnarizer(

// NewStreamingColumnarizer returns a new Columnarizer that emits every input
// row as a separate batch.
// - allocator must use a memory account that is not shared with any other user.
func NewStreamingColumnarizer(
allocator *colmem.Allocator,
flowCtx *execinfra.FlowCtx,
Expand Down Expand Up @@ -233,6 +236,7 @@ func (c *Columnarizer) Next() coldata.Batch {
colexecerror.ExpectedError(meta.Err)
}
c.accumulatedMeta = append(c.accumulatedMeta, *meta)
colexecutils.AccountForMetadata(c.allocator, c.accumulatedMeta[len(c.accumulatedMeta)-1:])
continue
}
if row == nil {
Expand Down Expand Up @@ -266,22 +270,32 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata {
if c.removedFromFlow {
return nil
}
// We no longer need the batch.
c.batch = nil
bufferedMeta := c.accumulatedMeta
// Eagerly lose the reference to the metadata since it might be of
// non-trivial footprint.
c.accumulatedMeta = nil
// When this method returns, we no longer will have the reference to the
// metadata (nor to the batch), so we can release all memory from the
// allocator.
defer c.allocator.ReleaseAll()
if c.Ctx == nil {
// The columnarizer wasn't initialized, so the wrapped processors might
// not have been started leaving them in an unsafe to drain state, so
// we skip the draining. Mostly likely this happened because a panic was
// encountered in Init.
return c.accumulatedMeta
return bufferedMeta
}
c.MoveToDraining(nil /* err */)
for {
meta := c.DrainHelper()
if meta == nil {
break
}
c.accumulatedMeta = append(c.accumulatedMeta, *meta)
bufferedMeta = append(bufferedMeta, *meta)
}
return c.accumulatedMeta
return bufferedMeta
}

// Close is part of the colexecop.ClosableOperator interface.
Expand Down
53 changes: 38 additions & 15 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colconv"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
Expand Down Expand Up @@ -73,10 +75,14 @@ type drainHelper struct {
// are noops.
ctx context.Context

// allocator can be nil in tests.
allocator *colmem.Allocator

statsCollectors []colexecop.VectorizedStatsCollector
sources colexecop.MetadataSources

bufferedMeta []execinfrapb.ProducerMetadata
drained bool
meta []execinfrapb.ProducerMetadata
}

var _ execinfra.RowSource = &drainHelper{}
Expand Down Expand Up @@ -113,18 +119,25 @@ func (d *drainHelper) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata)
}
d.statsCollectors = nil
}
if d.bufferedMeta == nil {
d.bufferedMeta = d.sources.DrainMeta()
if d.bufferedMeta == nil {
// Still nil, avoid more calls to DrainMeta.
d.bufferedMeta = []execinfrapb.ProducerMetadata{}
if !d.drained {
d.meta = d.sources.DrainMeta()
d.drained = true
if d.allocator != nil {
colexecutils.AccountForMetadata(d.allocator, d.meta)
}
}
if len(d.bufferedMeta) == 0 {
if len(d.meta) == 0 {
// Eagerly lose the reference to the slice.
d.meta = nil
if d.allocator != nil {
// At this point, the caller took over all of the metadata, so we
// can release all of the allocations.
d.allocator.ReleaseAll()
}
return nil, nil
}
meta := d.bufferedMeta[0]
d.bufferedMeta = d.bufferedMeta[1:]
meta := d.meta[0]
d.meta = d.meta[1:]
return nil, &meta
}

Expand All @@ -143,18 +156,22 @@ var materializerPool = sync.Pool{
// NewMaterializer creates a new Materializer processor which processes the
// columnar data coming from input to return it as rows.
// Arguments:
// - allocator must use a memory account that is not shared with any other user,
// can be nil in tests.
// - typs is the output types schema. Typs are assumed to have been hydrated.
// - getStats (when tracing is enabled) returns all of the execution statistics
// of operators which the materializer is responsible for.
// NOTE: the constructor does *not* take in an execinfrapb.PostProcessSpec
// because we expect input to handle that for us.
func NewMaterializer(
flowCtx *execinfra.FlowCtx, processorID int32, input colexecargs.OpWithMetaInfo, typs []*types.T,
allocator *colmem.Allocator,
flowCtx *execinfra.FlowCtx,
processorID int32,
input colexecargs.OpWithMetaInfo,
typs []*types.T,
) *Materializer {
// When the materializer is created in the middle of the chain of operators,
// it will modify the eval context when it is done draining, so we have to
// give it a copy to preserve the "global" eval context from being mutated.
return newMaterializerInternal(flowCtx, flowCtx.NewEvalCtx(), processorID, input, typs)
return newMaterializerInternal(allocator, flowCtx, flowCtx.NewEvalCtx(), processorID, input, typs)
}

// NewMaterializerNoEvalCtxCopy is the same as NewMaterializer but doesn't make
Expand All @@ -166,12 +183,17 @@ func NewMaterializer(
// modifies the eval context) only when the whole flow is done, at which point
// the eval context won't be used anymore.
func NewMaterializerNoEvalCtxCopy(
flowCtx *execinfra.FlowCtx, processorID int32, input colexecargs.OpWithMetaInfo, typs []*types.T,
allocator *colmem.Allocator,
flowCtx *execinfra.FlowCtx,
processorID int32,
input colexecargs.OpWithMetaInfo,
typs []*types.T,
) *Materializer {
return newMaterializerInternal(flowCtx, flowCtx.EvalCtx, processorID, input, typs)
return newMaterializerInternal(allocator, flowCtx, flowCtx.EvalCtx, processorID, input, typs)
}

func newMaterializerInternal(
allocator *colmem.Allocator,
flowCtx *execinfra.FlowCtx,
evalCtx *eval.Context,
processorID int32,
Expand All @@ -192,6 +214,7 @@ func newMaterializerInternal(
// rowSourceToPlanNode wrappers.
closers: append(m.closers[:0], input.ToClose...),
}
m.drainHelper.allocator = allocator
m.drainHelper.statsCollectors = input.StatsCollectors
m.drainHelper.sources = input.MetadataSources

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestColumnarizeMaterialize(t *testing.T) {
c := NewBufferingColumnarizer(testAllocator, flowCtx, 0, input)

m := NewMaterializer(
nil, /* allocator */
flowCtx,
1, /* processorID */
colexecargs.OpWithMetaInfo{Root: c},
Expand Down Expand Up @@ -137,6 +138,7 @@ func BenchmarkMaterializer(b *testing.B) {
b.SetBytes(int64(nRows * nCols * int(memsize.Int64)))
for i := 0; i < b.N; i++ {
m := NewMaterializer(
nil, /* allocator */
flowCtx,
0, /* processorID */
colexecargs.OpWithMetaInfo{Root: input},
Expand Down Expand Up @@ -184,6 +186,7 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) {
}

m := NewMaterializer(
nil, /* allocator */
flowCtx,
0, /* processorID */
colexecargs.OpWithMetaInfo{
Expand Down Expand Up @@ -227,6 +230,7 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) {
b.SetBytes(int64(nRows * nCols * int(memsize.Int64)))
for i := 0; i < b.N; i++ {
m := NewMaterializer(
nil, /* allocator */
flowCtx,
1, /* processorID */
colexecargs.OpWithMetaInfo{Root: c},
Expand Down
Loading

0 comments on commit 0866ddc

Please sign in to comment.