Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: wrap DrainMeta with panic-catcher and protect columnarizer #63108

Merged
merged 2 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func wrapRowSources(
c.MarkAsRemovedFromFlow()
toWrapInputs = append(toWrapInputs, c.Input())
} else {
var metadataSources execinfrapb.MetadataSources
var metadataSources colexecop.MetadataSources
if len(args.MetadataSources) > i {
// In some testing paths, MetadataSources might be left unset,
// so we check whether the slice has ith element. In the
Expand Down Expand Up @@ -616,7 +616,7 @@ func (r opResult) createAndWrapRowSource(
if args.TestingKnobs.PlanInvariantsCheckers {
r.Op = colexec.NewInvariantsChecker(r.Op)
}
r.MetadataSources = append(r.MetadataSources, r.Op.(execinfrapb.MetadataSource))
r.MetadataSources = append(r.MetadataSources, r.Op.(colexecop.MetadataSource))
r.ToClose = append(r.ToClose, c)
r.Releasables = append(r.Releasables, releasables...)
return nil
Expand Down Expand Up @@ -765,7 +765,7 @@ func NewColOperator(
result.Op = colexec.NewInvariantsChecker(result.Op)
}
result.KVReader = scanOp
result.MetadataSources = append(result.MetadataSources, result.Op.(execinfrapb.MetadataSource))
result.MetadataSources = append(result.MetadataSources, result.Op.(colexecop.MetadataSource))
result.Releasables = append(result.Releasables, scanOp)

// We want to check for cancellation once per input batch, and
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type NewColOperatorArgs struct {
// NewColOperator call creates a colexec.Materializer, then it will take
// over the responsibility of draining the sources; otherwise, they will be
// returned in NewColOperatorResult.
MetadataSources []execinfrapb.MetadataSources
MetadataSources []colexecop.MetadataSources
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Expand Down Expand Up @@ -97,7 +97,7 @@ type NewColOperatorResult struct {
// created during NewColOperator call, but it also might include all of the
// sources from NewColOperatorArgs.MetadataSources if no metadata draining
// component was created.
MetadataSources execinfrapb.MetadataSources
MetadataSources colexecop.MetadataSources
// ToClose is a slice of components that need to be Closed.
ToClose []colexecop.Closer
OpMonitors []*mon.BytesMonitor
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/colexec/colexectestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -1477,3 +1478,14 @@ func GenerateBatchSize() int {
}
return coldata.BatchSize()
}

// CallbackMetadataSource is a utility struct that implements the
// colexecop.MetadataSource interface by calling a provided callback.
type CallbackMetadataSource struct {
DrainMetaCb func(context.Context) []execinfrapb.ProducerMetadata
}

// DrainMeta is part of the colexecop.MetadataSource interface.
func (s CallbackMetadataSource) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
return s.DrainMetaCb(ctx)
}
14 changes: 10 additions & 4 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,22 @@ func (c *Columnarizer) Run(context.Context) {
}

var (
_ colexecop.Operator = &Columnarizer{}
_ execinfrapb.MetadataSource = &Columnarizer{}
_ colexecop.Closer = &Columnarizer{}
_ colexecop.DrainableOperator = &Columnarizer{}
_ colexecop.Closer = &Columnarizer{}
)

// DrainMeta is part of the MetadataSource interface.
// DrainMeta is part of the colexecop.MetadataSource interface.
func (c *Columnarizer) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
if c.removedFromFlow {
return nil
}
if c.initStatus == colexecop.OperatorNotInitialized {
// The columnarizer wasn't initialized, so the wrapped processors might
// not have been started leaving them in an unsafe to drain state, so
// we skip the draining. Mostly likely this happened because a panic was
// encountered in Init.
return c.accumulatedMeta
}
c.MoveToDraining(nil /* err */)
for {
meta := c.DrainHelper()
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/colexec/invariants_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@ type InvariantsChecker struct {
colexecop.NonExplainable

initStatus colexecop.OperatorInitStatus
metadataSource execinfrapb.MetadataSource
metadataSource colexecop.MetadataSource
}

var _ colexecop.Operator = &InvariantsChecker{}
var _ execinfrapb.MetadataSource
var _ colexecop.DrainableOperator = &InvariantsChecker{}

// NewInvariantsChecker creates a new InvariantsChecker.
func NewInvariantsChecker(input colexecop.Operator) *InvariantsChecker {
c := &InvariantsChecker{
OneInputNode: colexecop.OneInputNode{Input: input},
}
if ms, ok := input.(execinfrapb.MetadataSource); ok {
if ms, ok := input.(colexecop.MetadataSource); ok {
c.metadataSource = ms
}
return c
Expand Down Expand Up @@ -99,7 +98,7 @@ func (i *InvariantsChecker) Next(ctx context.Context) coldata.Batch {
return b
}

// DrainMeta implements the execinfrapb.MetadataSource interface.
// DrainMeta implements the colexecop.MetadataSource interface.
func (i *InvariantsChecker) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
if shortCircuit := i.assertInitWasCalled(); shortCircuit {
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type drainHelper struct {
ctx context.Context

getStats func() []*execinfrapb.ComponentStats
sources execinfrapb.MetadataSources
sources colexecop.MetadataSources

bufferedMeta []execinfrapb.ProducerMetadata
}
Expand All @@ -92,7 +92,7 @@ var drainHelperPool = sync.Pool{
}

func newDrainHelper(
getStats func() []*execinfrapb.ComponentStats, sources execinfrapb.MetadataSources,
getStats func() []*execinfrapb.ComponentStats, sources colexecop.MetadataSources,
) *drainHelper {
d := drainHelperPool.Get().(*drainHelper)
d.getStats = getStats
Expand Down Expand Up @@ -199,7 +199,7 @@ func NewMaterializer(
typs []*types.T,
output execinfra.RowReceiver,
getStats func() []*execinfrapb.ComponentStats,
metadataSources []execinfrapb.MetadataSource,
metadataSources []colexecop.MetadataSource,
toClose []colexecop.Closer,
cancelFlow func() context.CancelFunc,
) (*Materializer, error) {
Expand Down Expand Up @@ -274,7 +274,7 @@ func (m *Materializer) Start(ctx context.Context) {
m.MoveToDraining(err)
} else {
// Note that we intentionally only start the drain helper if
// initialization was successful - no starting the helper will tell it
// initialization was successful - not starting the helper will tell it
// to not drain the metadata sources (which have not been properly
// initialized).
m.drainHelper.Start(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) {
defer leaktest.AfterTest(t)()

testError := errors.New("test-induced error")
metadataSource := &execinfrapb.CallbackMetadataSource{DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
metadataSource := &colexectestutils.CallbackMetadataSource{DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
colexecerror.InternalError(testError)
// Unreachable
return nil
Expand All @@ -209,7 +209,7 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) {
nil, /* typ */
nil, /* output */
nil, /* getStats */
[]execinfrapb.MetadataSource{metadataSource},
[]colexecop.MetadataSource{metadataSource},
nil, /* toClose */
nil, /* cancelFlow */
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execinfra.O

// SynchronizerInput is a wrapper over a colexecop.Operator that a
// synchronizer goroutine will be calling Next on. An accompanying
// []execinfrapb.MetadataSource may also be specified, in which case
// colexecop.MetadataSources may also be specified, in which case
// DrainMeta will be called from the same goroutine.
type SynchronizerInput struct {
// Op is the input Operator.
Expand All @@ -123,7 +123,7 @@ type SynchronizerInput struct {
StatsCollectors []VectorizedStatsCollector
// MetadataSources are metadata sources in the input tree that should be
// drained in the same goroutine as Op.
MetadataSources execinfrapb.MetadataSources
MetadataSources colexecop.MetadataSources
// ToClose are Closers in the input tree that should be closed in the same
// goroutine as Op.
ToClose colexecop.Closers
Expand Down Expand Up @@ -358,7 +358,7 @@ func (s *ParallelUnorderedSynchronizer) notifyInputToReadNextBatch(inputIdx int)
}
}

// DrainMeta is part of the MetadataSource interface.
// DrainMeta is part of the colexecop.MetadataSource interface.
func (s *ParallelUnorderedSynchronizer) DrainMeta(
ctx context.Context,
) []execinfrapb.ProducerMetadata {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldatatestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
Expand Down Expand Up @@ -59,8 +60,8 @@ func TestParallelUnorderedSynchronizer(t *testing.T) {
source.ResetBatchesToReturn(numBatches)
inputs[i].Op = source
inputIdx := i
inputs[i].MetadataSources = []execinfrapb.MetadataSource{
execinfrapb.CallbackMetadataSource{DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
inputs[i].MetadataSources = []colexecop.MetadataSource{
colexectestutils.CallbackMetadataSource{DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
return []execinfrapb.ProducerMetadata{{Err: errors.Errorf("input %d test-induced metadata", inputIdx)}}
}},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/serial_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SerialUnorderedSynchronizer) Next(ctx context.Context) coldata.Batch {
}
}

// DrainMeta is part of the MetadataSource interface.
// DrainMeta is part of the colexecop.MetadataSource interface.
func (s *SerialUnorderedSynchronizer) DrainMeta(
ctx context.Context,
) []execinfrapb.ProducerMetadata {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colexec/serial_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldatatestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -44,8 +45,8 @@ func TestSerialUnorderedSynchronizer(t *testing.T) {
inputIdx := i
inputs[i] = SynchronizerInput{
Op: source,
MetadataSources: []execinfrapb.MetadataSource{
execinfrapb.CallbackMetadataSource{
MetadataSources: []colexecop.MetadataSource{
colexectestutils.CallbackMetadataSource{
DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
return []execinfrapb.ProducerMetadata{{Err: errors.Errorf("input %d test-induced metadata", inputIdx)}}
},
Expand Down
35 changes: 34 additions & 1 deletion pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Operator interface {
// DrainMeta may not be called concurrently.
type DrainableOperator interface {
Operator
execinfrapb.MetadataSource
MetadataSource
}

// KVReader is an operator that performs KV reads.
Expand Down Expand Up @@ -310,3 +310,36 @@ func (n *noopOperator) Reset(ctx context.Context) {
r.Reset(ctx)
}
}

// MetadataSource is an interface implemented by processors and columnar
// operators that can produce metadata.
type MetadataSource interface {
// DrainMeta returns all the metadata produced by the processor or operator.
// It will be called exactly once, usually, when the processor or operator
// has finished doing its computations. This is a signal that the output
// requires no more rows to be returned.
// Implementers can choose what to do on subsequent calls (if such occur).
// TODO(yuzefovich): modify the contract to require returning nil on all
// calls after the first one.
DrainMeta(context.Context) []execinfrapb.ProducerMetadata
}

// MetadataSources is a slice of MetadataSource.
type MetadataSources []MetadataSource

// DrainMeta calls DrainMeta on all MetadataSources and returns a single slice
// with all the accumulated metadata. Note that this method wraps the draining
// with the panic-catcher so that the callers don't have to.
func (s MetadataSources) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
var result []execinfrapb.ProducerMetadata
if err := colexecerror.CatchVectorizedRuntimeError(func() {
for _, src := range s {
result = append(result, src.DrainMeta(ctx)...)
}
}); err != nil {
meta := execinfrapb.GetProducerMeta()
meta.Err = err
result = append(result, *meta)
}
return result
}
2 changes: 1 addition & 1 deletion pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *ColBatchScan) Next(ctx context.Context) coldata.Batch {
return bat
}

// DrainMeta is part of the MetadataSource interface.
// DrainMeta is part of the colexecop.MetadataSource interface.
func (s *ColBatchScan) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
s.mu.Lock()
initialized := s.mu.init
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
"//pkg/col/colserde",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql/colexec/colexectestutils",
"//pkg/sql/colexec/colexecutils",
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldatatestutils"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
Expand Down Expand Up @@ -504,8 +505,8 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {
}
outbox, err := NewOutbox(
colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory),
input, typs, nil /* getStats */, []execinfrapb.MetadataSource{
execinfrapb.CallbackMetadataSource{
input, typs, nil /* getStats */, []colexecop.MetadataSource{
colexectestutils.CallbackMetadataSource{
DrainMetaCb: func(context.Context) []execinfrapb.ProducerMetadata {
return expectedMetadata
},
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Outbox struct {

// draining is an atomic that represents whether the Outbox is draining.
draining uint32
metadataSources []execinfrapb.MetadataSource
metadataSources colexecop.MetadataSources
// closers is a slice of Closers that need to be Closed on termination.
closers colexecop.Closers

Expand Down Expand Up @@ -81,7 +81,7 @@ func NewOutbox(
input colexecop.Operator,
typs []*types.T,
getStats func() []*execinfrapb.ComponentStats,
metadataSources []execinfrapb.MetadataSource,
metadataSources []colexecop.MetadataSource,
toClose []colexecop.Closer,
) (*Outbox, error) {
c, err := colserde.NewArrowBatchConverter(typs)
Expand Down Expand Up @@ -307,10 +307,8 @@ func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errT
},
})
}
for _, src := range o.metadataSources {
for _, meta := range src.DrainMeta(ctx) {
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, meta))
}
for _, meta := range o.metadataSources.DrainMeta(ctx) {
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, meta))
}
if len(msg.Data.Metadata) == 0 {
return nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colflow/colrpc/outbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
Expand Down Expand Up @@ -91,8 +92,8 @@ func TestOutboxDrainsMetadataSources(t *testing.T) {
// uint32 that is set atomically when the outbox drains a metadata source.
newOutboxWithMetaSources := func(allocator *colmem.Allocator) (*Outbox, *uint32, error) {
var sourceDrained uint32
outbox, err := NewOutbox(allocator, input, typs, nil /* getStats */, []execinfrapb.MetadataSource{
execinfrapb.CallbackMetadataSource{
outbox, err := NewOutbox(allocator, input, typs, nil /* getStats */, []colexecop.MetadataSource{
colexectestutils.CallbackMetadataSource{
DrainMetaCb: func(context.Context) []execinfrapb.ProducerMetadata {
atomic.StoreUint32(&sourceDrained, 1)
return nil
Expand Down
Loading