Skip to content

Commit

Permalink
colflow: track closers by the vectorized flow
Browse files Browse the repository at this point in the history
This commit simplifies the way we track all of the `colexecop.Closer`s
we need to close. Previously, we would track them using `OpWithMetaInfo`
and then many operators would be responsible for closing the components
in their input tree. This commit makes it so that we close them during
the flow cleanup. This is ok because we know that all concurrent
goroutines will have exited by the time `Cleanup` is called since we do
so only after `Flow.Wait` returns (which blocks).

The decision to put closers into `OpWithMetaInfo` was made in #62221
with justification "why not" that I provided. Now I have the answer to
why we should not do it. Unlike the stats collection and metadata
draining (which - at least currently - must happen in the "owner"
goroutines), for closers we have a convenient place to close them at the
flow level. The contract is such that the closure must occur eventually,
but it doesn't matter much in which goroutine it's done (as long as there
is no race) and whether it is a bit delayed. (The only downside I see is
that the tracing spans are finished with a delay in comparison to when the
relevant operations are actually done, but this has already been pretty
bad, and this commit makes things only slightly worse. This "delayed"
finish shows up as "over-extended" span when viewing traces via jaeger.)

As a result of this refactor, the assertion that all closers are closed
seems redundant - we'd effectively be asserting only that a single
method is called, thus the assertion is removed. This commit also
allowed to remove some of the complexity around `ExplainVec`
implementation (we no longer need to tie the cleanup to closing of the
corresponding planNode).

Release note: None
  • Loading branch information
yuzefovich committed Nov 22, 2022
1 parent 7ffaece commit 661296f
Show file tree
Hide file tree
Showing 21 changed files with 77 additions and 358 deletions.
11 changes: 4 additions & 7 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func wrapRowSources(
// able to pool the underlying slices.
inputs[i].StatsCollectors = nil
inputs[i].MetadataSources = nil
inputs[i].ToClose = nil
toWrapInputs = append(toWrapInputs, toWrapInput)
*releasables = append(*releasables, toWrapInput)
}
Expand Down Expand Up @@ -493,10 +492,8 @@ func takeOverMetaInfo(target *colexecargs.OpWithMetaInfo, inputs []colexecargs.O
for i := range inputs {
target.StatsCollectors = append(target.StatsCollectors, inputs[i].StatsCollectors...)
target.MetadataSources = append(target.MetadataSources, inputs[i].MetadataSources...)
target.ToClose = append(target.ToClose, inputs[i].ToClose...)
inputs[i].MetadataSources = nil
inputs[i].StatsCollectors = nil
inputs[i].ToClose = nil
}
}

Expand Down Expand Up @@ -599,10 +596,10 @@ func MaybeRemoveRootColumnarizer(r colexecargs.OpWithMetaInfo) execinfra.RowSour
return nil
}
// We have the columnarizer as the root, and it must be included into the
// MetadataSources and ToClose slices, so if we don't see any other objects,
// then the responsibility over other meta components has been claimed by
// the children of the columnarizer.
if len(r.StatsCollectors) != 0 || len(r.MetadataSources) != 1 || len(r.ToClose) != 1 {
// MetadataSources slice, so if we don't see any other objects, then the
// responsibility over other meta components has been claimed by the
// children of the columnarizer.
if len(r.StatsCollectors) != 0 || len(r.MetadataSources) != 1 {
return nil
}
c.MarkAsRemovedFromFlow()
Expand Down
13 changes: 9 additions & 4 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ var TestNewColOperator func(ctx context.Context, flowCtx *execinfra.FlowCtx, arg

// OpWithMetaInfo stores a colexecop.Operator together with miscellaneous meta
// information about the tree rooted in that operator.
//
// Note that at some point colexecop.Closers were also included into this
// struct, but for ease of tracking we pulled them out to the flow-level.
// Closers are different from the objects tracked here since we have a
// convenient place to close them from the main goroutine whereas the stats
// collection as well as metadata draining must happen in the goroutine that
// "owns" these objects.
// TODO(yuzefovich): figure out the story about pooling these objects.
type OpWithMetaInfo struct {
Root colexecop.Operator
Expand All @@ -47,9 +54,6 @@ type OpWithMetaInfo struct {
// tree rooted in Root for which the responsibility of draining hasn't been
// claimed yet.
MetadataSources colexecop.MetadataSources
// ToClose are all colexecop.Closers that are present in the tree rooted in
// Root for which the responsibility of closing hasn't been claimed yet.
ToClose colexecop.Closers
}

// NewColOperatorArgs is a helper struct that encompasses all of the input
Expand Down Expand Up @@ -106,6 +110,7 @@ type NewColOperatorResult struct {
// contract right now of whether or not a particular operator has to make a
// copy of the type schema if it needs to use it later.
ColumnTypes []*types.T
ToClose colexecop.Closers
Releasables []execreleasable.Releasable
}

Expand Down Expand Up @@ -155,8 +160,8 @@ func (r *NewColOperatorResult) Release() {
OpWithMetaInfo: OpWithMetaInfo{
StatsCollectors: r.StatsCollectors[:0],
MetadataSources: r.MetadataSources[:0],
ToClose: r.ToClose[:0],
},
ToClose: r.ToClose[:0],
Releasables: r.Releasables[:0],
}
newColOperatorResultPool.Put(r)
Expand Down
7 changes: 0 additions & 7 deletions pkg/sql/colexec/external_hash_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,13 @@ func TestExternalHashJoiner(t *testing.T) {
sem := colexecop.NewTestingSemaphore(colexecop.ExternalHJMinPartitions)
semsToCheck = append(semsToCheck, sem)
spec := createSpecForHashJoiner(tc)
// TODO(asubiotto): Pass in the testing.T of the caller to this
// function and do substring matching on the test name to
// conditionally explicitly call Close() on the hash joiner
// (through result.ToClose) in cases where it is known the sorter
// will not be drained.
hjOp, closers, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, sources, func() {}, queueCfg,
numForcedRepartitions, delegateFDAcquisitions, sem,
&monitorRegistry,
)
// Expect three closers. These are the external hash joiner, and
// one external sorter for each input.
// TODO(asubiotto): Explicitly Close when testing.T is passed into
// this constructor and we do a substring match.
require.Equal(t, 3, len(closers))
return hjOp, err
})
Expand Down
7 changes: 0 additions & 7 deletions pkg/sql/colexec/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,12 @@ func TestExternalSort(t *testing.T) {
if tc.k == 0 || tc.k >= uint64(len(tc.tuples)) {
semsToCheck = append(semsToCheck, sem)
}
// TODO(asubiotto): Pass in the testing.T of the caller to this
// function and do substring matching on the test name to
// conditionally explicitly call Close() on the sorter (through
// result.ToClose) in cases where it is know the sorter will not
// be drained.
sorter, closers, err := createDiskBackedSorter(
ctx, flowCtx, input, tc.typs, tc.ordCols, tc.matchLen, tc.k, func() {},
numForcedRepartitions, false /* delegateFDAcquisition */, queueCfg, sem,
&monitorRegistry,
)
// Check that the sort was added as a Closer.
// TODO(asubiotto): Explicitly Close when testing.T is passed into
// this constructor and we do a substring match.
require.Equal(t, 1, len(closers))
return sorter, err
})
Expand Down
32 changes: 3 additions & 29 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ type Materializer struct {
// outputRow stores the returned results of next() to be passed through an
// adapter.
outputRow rowenc.EncDatumRow

// closers is a slice of Closers that should be Closed on termination.
closers colexecop.Closers
}

// drainHelper is a utility struct that wraps MetadataSources in a RowSource
Expand Down Expand Up @@ -207,7 +204,6 @@ func newMaterializerInternal(
typs: typs,
converter: colconv.NewAllVecToDatumConverter(len(typs)),
row: make(rowenc.EncDatumRow, len(typs)),
closers: input.ToClose,
}
m.drainHelper.allocator = allocator
m.drainHelper.statsCollectors = input.StatsCollectors
Expand All @@ -221,8 +217,9 @@ func newMaterializerInternal(
nil, /* output */
execinfra.ProcStateOpts{
// We append drainHelper to inputs to drain below in order to reuse
// the same underlying slice from the pooled materializer.
TrailingMetaCallback: m.trailingMetaCallback,
// the same underlying slice from the pooled materializer. The
// drainHelper is responsible for draining the metadata from the
// input tree.
},
)
m.AddInputToDrain(&m.drainHelper)
Expand Down Expand Up @@ -324,29 +321,6 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata
return nil, m.DrainHelper()
}

func (m *Materializer) trailingMetaCallback() []execinfrapb.ProducerMetadata {
// Note that we delegate draining all of the metadata sources to drainHelper
// which is added as an input to drain.
m.close()
return nil
}

func (m *Materializer) close() {
if m.Closed {
return
}
// Make sure to call InternalClose() only after closing the closers - this
// allows the closers to utilize the unfinished tracing span (if tracing is
// enabled).
m.closers.CloseAndLogOnErr(m.Ctx(), "materializer")
m.InternalClose()
}

// ConsumerClosed is part of the execinfra.RowSource interface.
func (m *Materializer) ConsumerClosed() {
m.close()
}

// Release implements the execinfra.Releasable interface.
func (m *Materializer) Release() {
m.ProcessorBaseNoHelper.Reset()
Expand Down
12 changes: 1 addition & 11 deletions pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 1 addition & 11 deletions pkg/sql/colexec/ordered_synchronizer_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,12 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata {
}

func (o *OrderedSynchronizer) Close(context.Context) error {
// Note that we're using the context of the synchronizer rather than the
// argument of Close() because the synchronizer derives its own tracing
// span.
ctx := o.EnsureCtx()
o.accountingHelper.Release()
var lastErr error
for _, input := range o.inputs {
if err := input.ToClose.Close(ctx); err != nil {
lastErr = err
}
}
if o.span != nil {
o.span.Finish()
}
*o = OrderedSynchronizer{}
return lastErr
return nil
}

func (o *OrderedSynchronizer) compareRow(batchIdx1 int, batchIdx2 int) int {
Expand Down
27 changes: 5 additions & 22 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,6 @@ func (s *ParallelUnorderedSynchronizer) init() {
if int(atomic.AddUint32(&s.numFinishedInputs, 1)) == len(s.inputs) {
close(s.batchCh)
}
// We need to close all of the closers of this input before we
// notify the wait groups.
input.ToClose.CloseAndLogOnErr(s.inputCtxs[inputIdx], "parallel unordered synchronizer input")
s.internalWaitGroup.Done()
s.externalWaitGroup.Done()
}()
Expand Down Expand Up @@ -485,33 +482,19 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
// Close is part of the colexecop.ClosableOperator interface.
func (s *ParallelUnorderedSynchronizer) Close(ctx context.Context) error {
if state := s.getState(); state != parallelUnorderedSynchronizerStateUninitialized {
// Input goroutines have been started and will take care of closing the
// closers from the corresponding input trees, so we don't need to do
// anything.
// Input goroutines have been started and will take care of finishing
// the tracing spans.
return nil
}
// If the synchronizer is in "uninitialized" state, it means that the
// goroutines for each input haven't been started, so they won't be able to
// close the Closers from the corresponding trees. In such a scenario the
// synchronizer must close all of them from all input trees. Note that it is
// ok to close some input trees even if they haven't been initialized.
//
// Note that at this point we know that the input goroutines won't be
// spawned up (our consumer won't call Next/DrainMeta after calling Close),
// so it is safe to close all closers from this goroutine.
var lastErr error
for _, input := range s.inputs {
if err := input.ToClose.Close(ctx); err != nil {
lastErr = err
}
}
// Finish the spans after closing the Closers since Close() implementations
// might log some stuff.
// finish their tracing spans. In such a scenario the synchronizer must do
// that on its own.
for i, span := range s.tracingSpans {
if span != nil {
span.Finish()
s.tracingSpans[i] = nil
}
}
return lastErr
return nil
}
44 changes: 0 additions & 44 deletions pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,50 +248,6 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
require.Equal(t, len(inputs), int(atomic.LoadUint32(&s.numFinishedInputs)))
}

// TestParallelUnorderedSyncClosesInputs verifies that the parallel unordered
// synchronizer closes the input trees if it encounters a panic during the
// initialization.
func TestParallelUnorderedSyncClosesInputs(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const injectedPanicMsg = "injected panic"
inputs := make([]colexecargs.OpWithMetaInfo, 2)

// Create the first input that is responsible for tracking whether the
// closure occurred as expected.
closed := false
firstInput := &colexecop.CallbackOperator{
CloseCb: func(context.Context) error {
closed = true
return nil
},
}
inputs[0].Root = firstInput
inputs[0].ToClose = append(inputs[0].ToClose, firstInput)

// Create the second input that injects a panic into Init.
inputs[1].Root = &colexecop.CallbackOperator{
InitCb: func(context.Context) {
colexecerror.InternalError(errors.New(injectedPanicMsg))
},
}

// Create and initialize (but don't run) the synchronizer.
var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg)
err := colexecerror.CatchVectorizedRuntimeError(func() { s.Init(ctx) })
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), injectedPanicMsg))

// In the production setting, the user of the synchronizer is still expected
// to close it, even if a panic is encountered in Init, so we do the same
// thing here and verify that the first input is properly closed.
require.NoError(t, s.Close(ctx))
require.True(t, closed)
}

func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
const numInputs = 6

Expand Down
12 changes: 1 addition & 11 deletions pkg/sql/colexec/serial_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,9 @@ func (s *SerialUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata

// Close is part of the colexecop.ClosableOperator interface.
func (s *SerialUnorderedSynchronizer) Close(context.Context) error {
// Note that we're using the context of the synchronizer rather than the
// argument of Close() because the synchronizer derives its own tracing
// span.
ctx := s.EnsureCtx()
var lastErr error
for _, input := range s.inputs {
if err := input.ToClose.Close(ctx); err != nil {
lastErr = err
}
}
if s.span != nil {
s.span.Finish()
s.span = nil
}
return lastErr
return nil
}
1 change: 0 additions & 1 deletion pkg/sql/colexecop/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/types",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
17 changes: 0 additions & 17 deletions pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -163,22 +162,6 @@ type Closer interface {
// Closers is a slice of Closers.
type Closers []Closer

// CloseAndLogOnErr closes all Closers and logs the error if the log verbosity
// is 1 or higher. The given prefix is prepended to the log message.
// Note: this method should *only* be used when returning an error doesn't make
// sense.
func (c Closers) CloseAndLogOnErr(ctx context.Context, prefix string) {
if err := colexecerror.CatchVectorizedRuntimeError(func() {
for _, closer := range c {
if err := closer.Close(ctx); err != nil && log.V(1) {
log.Infof(ctx, "%s: error closing Closer: %v", prefix, err)
}
}
}); err != nil && log.V(1) {
log.Infof(ctx, "%s: runtime error closing the closers: %v", prefix, err)
}
}

// Close closes all Closers and returns the last error (if any occurs).
func (c Closers) Close(ctx context.Context) error {
var lastErr error
Expand Down
Loading

0 comments on commit 661296f

Please sign in to comment.