Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: clean up meta components handling #62221

Merged
merged 5 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"sort_chunks.go",
"sort_utils.go",
"sorttopk.go",
"stats.go",
"tuple_proj_op.go",
"unordered_distinct.go",
"utils.go",
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/colexec/colbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ go_test(
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/colexec",
"//pkg/sql/colexec/colexecargs",
"//pkg/sql/colexecop",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/randgen",
Expand Down
178 changes: 93 additions & 85 deletions pkg/sql/colexec/colbuilder/execplan.go

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
Expand Down Expand Up @@ -114,7 +113,7 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
Post: execinfrapb.PostProcessSpec{RenderExprs: []execinfrapb.Expression{{Expr: "@1 - 1"}}},
ResultTypes: []*types.T{types.Int},
},
Inputs: []colexecop.Operator{r.Op},
Inputs: []colexecargs.OpWithMetaInfo{{Root: r.Root}},
StreamingMemAccount: &streamingMemAcc,
}
r, err = NewColOperator(ctx, flowCtx, args)
Expand All @@ -123,12 +122,9 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
m, err := colexec.NewMaterializer(
flowCtx,
0, /* processorID */
r.Op,
r.OpWithMetaInfo,
[]*types.T{types.Int},
nil, /* output */
nil, /* getStats */
nil, /* metadataSources */
nil, /* toClose */
nil, /* cancelFlow */
)
require.NoError(t, err)
Expand Down
75 changes: 42 additions & 33 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,37 @@ import (
var TestNewColOperator func(ctx context.Context, flowCtx *execinfra.FlowCtx, args *NewColOperatorArgs,
) (r *NewColOperatorResult, err error)

// OpWithMetaInfo stores a colexecop.Operator together with miscellaneous meta
// information about the tree rooted in that operator.
// TODO(yuzefovich): figure out the story about pooling these objects.
type OpWithMetaInfo struct {
Root colexecop.Operator
// StatsCollectors are all stats collectors that are present in the tree
// rooted in Root for which the responsibility of retrieving stats hasn't
// been claimed yet.
StatsCollectors []colexecop.VectorizedStatsCollector
// MetadataSources are all sources of the metadata that are present in the
// tree rooted in Root for which the responsibility of draining hasn't been
// claimed yet.
MetadataSources colexecop.MetadataSources
// ToClose are all colexecop.Closers that are present in the tree rooted in
// Root for which the responsibility of closing hasn't been claimed yet.
ToClose colexecop.Closers
}

// NewColOperatorArgs is a helper struct that encompasses all of the input
// arguments to NewColOperator call.
type NewColOperatorArgs struct {
Spec *execinfrapb.ProcessorSpec
Inputs []colexecop.Operator
Inputs []OpWithMetaInfo
StreamingMemAccount *mon.BoundAccount
ProcessorConstructor execinfra.ProcessorConstructor
LocalProcessors []execinfra.LocalProcessor
// MetadataSources are all sources of the metadata that are present in the
// trees rooted in Inputs. The slice has the same length as Inputs. If
// NewColOperator call creates a colexec.Materializer, then it will take
// over the responsibility of draining the sources; otherwise, they will be
// returned in NewColOperatorResult.
MetadataSources []colexecop.MetadataSources
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Factory coldata.ColumnFactory
TestingKnobs struct {
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Factory coldata.ColumnFactory
TestingKnobs struct {
// SpillingCallbackFn will be called when the spilling from an in-memory
// to disk-backed operator occurs. It should only be set in tests.
SpillingCallbackFn func()
Expand Down Expand Up @@ -90,21 +102,16 @@ type NewColOperatorArgs struct {
// NewColOperatorResult is a helper struct that encompasses all of the return
// values of NewColOperator call.
type NewColOperatorResult struct {
Op colexecop.Operator
KVReader colexecop.KVReader
ColumnTypes []*types.T
// MetadataSources are all sources of the metadata that are present in the
// tree rooted in Op that the caller must drain. These can be
// colfetcher.ColBatchScan or colexec.Columnarizer operators that were
// created during NewColOperator call, but it also might include all of the
// sources from NewColOperatorArgs.MetadataSources if no metadata draining
// component was created.
MetadataSources colexecop.MetadataSources
// ToClose is a slice of components that need to be Closed.
ToClose []colexecop.Closer
OpMonitors []*mon.BytesMonitor
OpAccounts []*mon.BoundAccount
Releasables []execinfra.Releasable
OpWithMetaInfo
KVReader colexecop.KVReader
// Columnarizer is the root colexec.Columnarizer, if needed, that is hidden
// behind the stats collector interface. We need to track it separately from
// all other stats collectors since it requires special handling.
Columnarizer colexecop.VectorizedStatsCollector
ColumnTypes []*types.T
OpMonitors []*mon.BytesMonitor
OpAccounts []*mon.BoundAccount
Releasables []execinfra.Releasable
}

var _ execinfra.Releasable = &NewColOperatorResult{}
Expand Down Expand Up @@ -141,12 +148,14 @@ func (r *NewColOperatorResult) Release() {
releasable.Release()
}
*r = NewColOperatorResult{
ColumnTypes: r.ColumnTypes[:0],
MetadataSources: r.MetadataSources[:0],
ToClose: r.ToClose[:0],
OpMonitors: r.OpMonitors[:0],
OpAccounts: r.OpAccounts[:0],
Releasables: r.Releasables[:0],
OpWithMetaInfo: OpWithMetaInfo{
MetadataSources: r.OpWithMetaInfo.MetadataSources[:0],
ToClose: r.OpWithMetaInfo.ToClose[:0],
},
ColumnTypes: r.ColumnTypes[:0],
OpMonitors: r.OpMonitors[:0],
OpAccounts: r.OpAccounts[:0],
Releasables: r.Releasables[:0],
}
newColOperatorResultPool.Put(r)
}
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecbase/ordinality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ func createTestOrdinalityOperator(
}
args := &colexecargs.NewColOperatorArgs{
Spec: spec,
Inputs: []colexecop.Operator{input},
Inputs: []colexecargs.OpWithMetaInfo{{Root: input}},
StreamingMemAccount: testMemAcc,
}
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
if err != nil {
return nil, err
}
return result.Op, nil
return result.Root, nil
}
5 changes: 3 additions & 2 deletions pkg/sql/colexec/colexecbase/simple_project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
Expand Down Expand Up @@ -116,9 +117,9 @@ func TestSimpleProjectOpWithUnorderedSynchronizer(t *testing.T) {
colexectestutils.RunTestsWithoutAllNullsInjection(t, testAllocator, inputTuples, [][]*types.T{inputTypes, inputTypes}, expected, colexectestutils.UnorderedVerifier,
func(inputs []colexecop.Operator) (colexecop.Operator, error) {
var input colexecop.Operator
parallelUnorderedSynchronizerInputs := make([]colexec.SynchronizerInput, len(inputs))
parallelUnorderedSynchronizerInputs := make([]colexecargs.OpWithMetaInfo, len(inputs))
for i := range parallelUnorderedSynchronizerInputs {
parallelUnorderedSynchronizerInputs[i].Op = inputs[i]
parallelUnorderedSynchronizerInputs[i].Root = inputs[i]
}
input = colexec.NewParallelUnorderedSynchronizer(parallelUnorderedSynchronizerInputs, &wg)
input = colexecbase.NewSimpleProjectOp(input, len(inputTypes), []uint32{0})
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexectestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "colexectestutils",
srcs = [
"args.go",
"proj_utils.go",
"utils.go",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexec
package colexectestutils

import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
)

// VectorizedStatsCollector is the common interface implemented by collectors.
type VectorizedStatsCollector interface {
colexecop.Operator
GetStats() *execinfrapb.ComponentStats
// MakeInputs is a utility function that populates a slice of
// colexecargs.OpWithMetaInfo objects based on sources.
func MakeInputs(sources []colexecop.Operator) []colexecargs.OpWithMetaInfo {
inputs := make([]colexecargs.OpWithMetaInfo, len(sources))
for i := range sources {
inputs[i].Root = sources[i]
}
return inputs
}
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexectestutils/proj_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func CreateTestProjectingOperator(
}
args := &colexecargs.NewColOperatorArgs{
Spec: spec,
Inputs: []colexecop.Operator{input},
Inputs: []colexecargs.OpWithMetaInfo{{Root: input}},
StreamingMemAccount: testMemAcc,
}
if canFallbackToRowexec {
Expand All @@ -108,5 +108,5 @@ func CreateTestProjectingOperator(
if err != nil {
return nil, err
}
return result.Op, nil
return result.Root, nil
}
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecwindow/window_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestWindowFunctions(t *testing.T) {
} {
log.Infof(ctx, "spillForced=%t/%s", spillForced, tc.windowerSpec.WindowFns[0].Func.String())
var semsToCheck []semaphore.Semaphore
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.UnorderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) {
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.UnorderedVerifier, func(sources []colexecop.Operator) (colexecop.Operator, error) {
tc.init()
ct := make([]*types.T, len(tc.tuples[0]))
for i := range ct {
Expand All @@ -297,7 +297,7 @@ func TestWindowFunctions(t *testing.T) {
sem := colexecop.NewTestingSemaphore(relativeRankNumRequiredFDs)
args := &colexecargs.NewColOperatorArgs{
Spec: spec,
Inputs: inputs,
Inputs: colexectestutils.MakeInputs(sources),
StreamingMemAccount: testMemAcc,
DiskQueueCfg: queueCfg,
FDSemaphore: sem,
Expand All @@ -307,7 +307,7 @@ func TestWindowFunctions(t *testing.T) {
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
accounts = append(accounts, result.OpAccounts...)
monitors = append(monitors, result.OpMonitors...)
return result.Op, err
return result.Root, err
})
for i, sem := range semsToCheck {
require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i)
Expand Down
33 changes: 33 additions & 0 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
// reading the input in chunks of size coldata.BatchSize() and converting each
// chunk into a coldata.Batch column by column.
type Columnarizer struct {
// TODO(yuzefovich): consider whether embedding ProcessorBase into the
// columnarizers makes sense.
execinfra.ProcessorBase
colexecop.NonExplainable

Expand All @@ -68,6 +70,7 @@ type Columnarizer struct {
}

var _ colexecop.Operator = &Columnarizer{}
var _ colexecop.VectorizedStatsCollector = &Columnarizer{}

// NewBufferingColumnarizer returns a new Columnarizer that will be buffering up
// rows before emitting them as output batches.
Expand Down Expand Up @@ -155,7 +158,37 @@ func (c *Columnarizer) Init() {
c.ctx = c.StartInternalNoSpan(c.ctx)
c.input.Start(c.ctx)
c.initStatus = colexecop.OperatorInitialized
if execStatsHijacker, ok := c.input.(execinfra.ExecStatsForTraceHijacker); ok {
// The columnarizer is now responsible for propagating the execution
// stats of the wrapped processor.
//
// Note that this columnarizer cannot be removed from the flow
// because it will have a vectorized stats collector planned on top,
// so the optimization of wrapRowSources() in execplan.go will never
// trigger. We check this assumption with an assertion below in the
// test setting.
//
// Still, just to be safe, we delay the hijacking until Init so that
// in case the assumption is wrong, we still get the stats from the
// wrapped processor.
c.ExecStatsForTrace = execStatsHijacker.HijackExecStatsForTrace()
}
}
}

// GetStats is part of the colexecop.VectorizedStatsCollector interface.
func (c *Columnarizer) GetStats() *execinfrapb.ComponentStats {
if c.removedFromFlow && util.CrdbTestBuild {
colexecerror.InternalError(errors.AssertionFailedf(
"unexpectedly the columnarizer was removed from the flow when stats are being collected",
))
}
if !c.removedFromFlow && c.ExecStatsForTrace != nil {
s := c.ExecStatsForTrace()
s.Component = c.FlowCtx.ProcessorComponentID(c.ProcessorID)
return s
}
return nil
}

// Next is part of the Operator interface.
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colexec/crossjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func TestCrossJoiner(t *testing.T) {
spec := createSpecForHashJoiner(tc)
args := &colexecargs.NewColOperatorArgs{
Spec: spec,
Inputs: sources,
Inputs: colexectestutils.MakeInputs(sources),
StreamingMemAccount: testMemAcc,
DiskQueueCfg: queueCfg,
FDSemaphore: colexecop.NewTestingSemaphore(externalHJMinPartitions),
Expand All @@ -384,7 +384,7 @@ func TestCrossJoiner(t *testing.T) {
}
accounts = append(accounts, result.OpAccounts...)
monitors = append(monitors, result.OpMonitors...)
return result.Op, nil
return result.Root, nil
})
}
}
Expand Down Expand Up @@ -444,7 +444,7 @@ func BenchmarkCrossJoiner(b *testing.B) {
args := &colexecargs.NewColOperatorArgs{
Spec: spec,
// Inputs will be set below.
Inputs: []colexecop.Operator{nil, nil},
Inputs: []colexecargs.OpWithMetaInfo{{}, {}},
StreamingMemAccount: testMemAcc,
DiskQueueCfg: queueCfg,
FDSemaphore: colexecop.NewTestingSemaphore(VecMaxOpenFDsLimit),
Expand All @@ -459,14 +459,14 @@ func BenchmarkCrossJoiner(b *testing.B) {
b.SetBytes(int64(8 * nOutputRows * (len(tc.leftOutCols) + len(tc.rightOutCols))))
b.ResetTimer()
for i := 0; i < b.N; i++ {
args.Inputs[0] = colexectestutils.NewChunkingBatchSource(testAllocator, sourceTypes, cols, nRows)
args.Inputs[1] = colexectestutils.NewChunkingBatchSource(testAllocator, sourceTypes, cols, nRows)
args.Inputs[0].Root = colexectestutils.NewChunkingBatchSource(testAllocator, sourceTypes, cols, nRows)
args.Inputs[1].Root = colexectestutils.NewChunkingBatchSource(testAllocator, sourceTypes, cols, nRows)
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
require.NoError(b, err)
accounts = append(accounts, result.OpAccounts...)
monitors = append(monitors, result.OpMonitors...)
require.NoError(b, err)
cj := result.Op
cj := result.Root
cj.Init()
for b := cj.Next(ctx); b.Length() > 0; b = cj.Next(ctx) {
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/external_distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func BenchmarkExternalDistinct(b *testing.B) {
func createExternalDistinct(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
input []colexecop.Operator,
sources []colexecop.Operator,
typs []*types.T,
distinctCols []uint32,
outputOrdering execinfrapb.Ordering,
Expand All @@ -375,13 +375,13 @@ func createExternalDistinct(
}
args := &colexecargs.NewColOperatorArgs{
Spec: spec,
Inputs: input,
Inputs: colexectestutils.MakeInputs(sources),
StreamingMemAccount: testMemAcc,
DiskQueueCfg: diskQueueCfg,
FDSemaphore: testingSemaphore,
}
args.TestingKnobs.SpillingCallbackFn = spillingCallbackFn
args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
return result.Op, result.OpAccounts, result.OpMonitors, result.ToClose, err
return result.Root, result.OpAccounts, result.OpMonitors, result.ToClose, err
}
Loading