From 661296f1224f34148d0112c081f1647f4e293275 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 15 Nov 2022 19:42:16 -0800 Subject: [PATCH] colflow: track closers by the vectorized flow This commit simplifies the way we track all of the `colexecop.Closer`s we need to close. Previously, we would track them using `OpWithMetaInfo` and then many operators would be responsible for closing the components in their input tree. This commit makes it so that we close them during the flow cleanup. This is ok because we know that all concurrent goroutines will have exited by the time `Cleanup` is called since we do so only after `Flow.Wait` returns (which blocks). The decision to put closers into `OpWithMetaInfo` was made in #62221 with justification "why not" that I provided. Now I have the answer to why we should not do it. Unlike the stats collection and metadata draining (which - at least currently - must happen in the "owner" goroutines), for closers we have a convenient place to close them at the flow level. The contract is such that the closure must occur eventually, but it doesn't matter much in which goroutine it's done (as long as there is no race) and whether it is a bit delayed. (The only downside I see is that the tracing spans are finished with a delay in comparison to when the relevant operations are actually done, but this has already been pretty bad, and this commit makes things only slightly worse. This "delayed" finish shows up as "over-extended" span when viewing traces via jaeger.) As a result of this refactor, the assertion that all closers are closed seems redundant - we'd effectively be asserting only that a single method is called, thus the assertion is removed. This commit also allowed to remove some of the complexity around `ExplainVec` implementation (we no longer need to tie the cleanup to closing of the corresponding planNode). Release note: None --- pkg/sql/colexec/colbuilder/execplan.go | 11 +- pkg/sql/colexec/colexecargs/op_creation.go | 13 ++- pkg/sql/colexec/external_hash_joiner_test.go | 7 -- pkg/sql/colexec/external_sort_test.go | 7 -- pkg/sql/colexec/materializer.go | 32 +----- pkg/sql/colexec/ordered_synchronizer.eg.go | 12 +-- pkg/sql/colexec/ordered_synchronizer_tmpl.go | 12 +-- .../parallel_unordered_synchronizer.go | 27 +---- .../parallel_unordered_synchronizer_test.go | 44 -------- .../colexec/serial_unordered_synchronizer.go | 12 +-- pkg/sql/colexecop/BUILD.bazel | 1 - pkg/sql/colexecop/operator.go | 17 --- pkg/sql/colflow/colrpc/outbox.go | 7 +- pkg/sql/colflow/explain_vec.go | 64 ++++------- pkg/sql/colflow/flow_coordinator.go | 20 +--- pkg/sql/colflow/routers.go | 2 - pkg/sql/colflow/vectorized_flow.go | 100 +++++------------- .../colflow/vectorized_flow_shutdown_test.go | 36 ------- pkg/sql/distsql/columnar_utils_test.go | 1 + pkg/sql/distsql_physical_planner.go | 3 +- pkg/sql/explain_vec.go | 7 +- 21 files changed, 77 insertions(+), 358 deletions(-) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index b9c83caf240e..a37948e6e1d3 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -100,7 +100,6 @@ func wrapRowSources( // able to pool the underlying slices. inputs[i].StatsCollectors = nil inputs[i].MetadataSources = nil - inputs[i].ToClose = nil toWrapInputs = append(toWrapInputs, toWrapInput) *releasables = append(*releasables, toWrapInput) } @@ -493,10 +492,8 @@ func takeOverMetaInfo(target *colexecargs.OpWithMetaInfo, inputs []colexecargs.O for i := range inputs { target.StatsCollectors = append(target.StatsCollectors, inputs[i].StatsCollectors...) target.MetadataSources = append(target.MetadataSources, inputs[i].MetadataSources...) - target.ToClose = append(target.ToClose, inputs[i].ToClose...) inputs[i].MetadataSources = nil inputs[i].StatsCollectors = nil - inputs[i].ToClose = nil } } @@ -599,10 +596,10 @@ func MaybeRemoveRootColumnarizer(r colexecargs.OpWithMetaInfo) execinfra.RowSour return nil } // We have the columnarizer as the root, and it must be included into the - // MetadataSources and ToClose slices, so if we don't see any other objects, - // then the responsibility over other meta components has been claimed by - // the children of the columnarizer. - if len(r.StatsCollectors) != 0 || len(r.MetadataSources) != 1 || len(r.ToClose) != 1 { + // MetadataSources slice, so if we don't see any other objects, then the + // responsibility over other meta components has been claimed by the + // children of the columnarizer. + if len(r.StatsCollectors) != 0 || len(r.MetadataSources) != 1 { return nil } c.MarkAsRemovedFromFlow() diff --git a/pkg/sql/colexec/colexecargs/op_creation.go b/pkg/sql/colexec/colexecargs/op_creation.go index 2d50573db5f4..94c0e0244ae3 100644 --- a/pkg/sql/colexec/colexecargs/op_creation.go +++ b/pkg/sql/colexec/colexecargs/op_creation.go @@ -36,6 +36,13 @@ var TestNewColOperator func(ctx context.Context, flowCtx *execinfra.FlowCtx, arg // OpWithMetaInfo stores a colexecop.Operator together with miscellaneous meta // information about the tree rooted in that operator. +// +// Note that at some point colexecop.Closers were also included into this +// struct, but for ease of tracking we pulled them out to the flow-level. +// Closers are different from the objects tracked here since we have a +// convenient place to close them from the main goroutine whereas the stats +// collection as well as metadata draining must happen in the goroutine that +// "owns" these objects. // TODO(yuzefovich): figure out the story about pooling these objects. type OpWithMetaInfo struct { Root colexecop.Operator @@ -47,9 +54,6 @@ type OpWithMetaInfo struct { // tree rooted in Root for which the responsibility of draining hasn't been // claimed yet. MetadataSources colexecop.MetadataSources - // ToClose are all colexecop.Closers that are present in the tree rooted in - // Root for which the responsibility of closing hasn't been claimed yet. - ToClose colexecop.Closers } // NewColOperatorArgs is a helper struct that encompasses all of the input @@ -106,6 +110,7 @@ type NewColOperatorResult struct { // contract right now of whether or not a particular operator has to make a // copy of the type schema if it needs to use it later. ColumnTypes []*types.T + ToClose colexecop.Closers Releasables []execreleasable.Releasable } @@ -155,8 +160,8 @@ func (r *NewColOperatorResult) Release() { OpWithMetaInfo: OpWithMetaInfo{ StatsCollectors: r.StatsCollectors[:0], MetadataSources: r.MetadataSources[:0], - ToClose: r.ToClose[:0], }, + ToClose: r.ToClose[:0], Releasables: r.Releasables[:0], } newColOperatorResultPool.Put(r) diff --git a/pkg/sql/colexec/external_hash_joiner_test.go b/pkg/sql/colexec/external_hash_joiner_test.go index 88e434b9bcf3..603f07c0ed9d 100644 --- a/pkg/sql/colexec/external_hash_joiner_test.go +++ b/pkg/sql/colexec/external_hash_joiner_test.go @@ -81,11 +81,6 @@ func TestExternalHashJoiner(t *testing.T) { sem := colexecop.NewTestingSemaphore(colexecop.ExternalHJMinPartitions) semsToCheck = append(semsToCheck, sem) spec := createSpecForHashJoiner(tc) - // TODO(asubiotto): Pass in the testing.T of the caller to this - // function and do substring matching on the test name to - // conditionally explicitly call Close() on the hash joiner - // (through result.ToClose) in cases where it is known the sorter - // will not be drained. hjOp, closers, err := createDiskBackedHashJoiner( ctx, flowCtx, spec, sources, func() {}, queueCfg, numForcedRepartitions, delegateFDAcquisitions, sem, @@ -93,8 +88,6 @@ func TestExternalHashJoiner(t *testing.T) { ) // Expect three closers. These are the external hash joiner, and // one external sorter for each input. - // TODO(asubiotto): Explicitly Close when testing.T is passed into - // this constructor and we do a substring match. require.Equal(t, 3, len(closers)) return hjOp, err }) diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index fc608c77f218..b78421e3427f 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -94,19 +94,12 @@ func TestExternalSort(t *testing.T) { if tc.k == 0 || tc.k >= uint64(len(tc.tuples)) { semsToCheck = append(semsToCheck, sem) } - // TODO(asubiotto): Pass in the testing.T of the caller to this - // function and do substring matching on the test name to - // conditionally explicitly call Close() on the sorter (through - // result.ToClose) in cases where it is know the sorter will not - // be drained. sorter, closers, err := createDiskBackedSorter( ctx, flowCtx, input, tc.typs, tc.ordCols, tc.matchLen, tc.k, func() {}, numForcedRepartitions, false /* delegateFDAcquisition */, queueCfg, sem, &monitorRegistry, ) // Check that the sort was added as a Closer. - // TODO(asubiotto): Explicitly Close when testing.T is passed into - // this constructor and we do a substring match. require.Equal(t, 1, len(closers)) return sorter, err }) diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 449da21e879c..62b92c416024 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -60,9 +60,6 @@ type Materializer struct { // outputRow stores the returned results of next() to be passed through an // adapter. outputRow rowenc.EncDatumRow - - // closers is a slice of Closers that should be Closed on termination. - closers colexecop.Closers } // drainHelper is a utility struct that wraps MetadataSources in a RowSource @@ -207,7 +204,6 @@ func newMaterializerInternal( typs: typs, converter: colconv.NewAllVecToDatumConverter(len(typs)), row: make(rowenc.EncDatumRow, len(typs)), - closers: input.ToClose, } m.drainHelper.allocator = allocator m.drainHelper.statsCollectors = input.StatsCollectors @@ -221,8 +217,9 @@ func newMaterializerInternal( nil, /* output */ execinfra.ProcStateOpts{ // We append drainHelper to inputs to drain below in order to reuse - // the same underlying slice from the pooled materializer. - TrailingMetaCallback: m.trailingMetaCallback, + // the same underlying slice from the pooled materializer. The + // drainHelper is responsible for draining the metadata from the + // input tree. }, ) m.AddInputToDrain(&m.drainHelper) @@ -324,29 +321,6 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata return nil, m.DrainHelper() } -func (m *Materializer) trailingMetaCallback() []execinfrapb.ProducerMetadata { - // Note that we delegate draining all of the metadata sources to drainHelper - // which is added as an input to drain. - m.close() - return nil -} - -func (m *Materializer) close() { - if m.Closed { - return - } - // Make sure to call InternalClose() only after closing the closers - this - // allows the closers to utilize the unfinished tracing span (if tracing is - // enabled). - m.closers.CloseAndLogOnErr(m.Ctx(), "materializer") - m.InternalClose() -} - -// ConsumerClosed is part of the execinfra.RowSource interface. -func (m *Materializer) ConsumerClosed() { - m.close() -} - // Release implements the execinfra.Releasable interface. func (m *Materializer) Release() { m.ProcessorBaseNoHelper.Reset() diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index d71a8ea32639..98f582188fd2 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -315,22 +315,12 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata { } func (o *OrderedSynchronizer) Close(context.Context) error { - // Note that we're using the context of the synchronizer rather than the - // argument of Close() because the synchronizer derives its own tracing - // span. - ctx := o.EnsureCtx() o.accountingHelper.Release() - var lastErr error - for _, input := range o.inputs { - if err := input.ToClose.Close(ctx); err != nil { - lastErr = err - } - } if o.span != nil { o.span.Finish() } *o = OrderedSynchronizer{} - return lastErr + return nil } func (o *OrderedSynchronizer) compareRow(batchIdx1 int, batchIdx2 int) int { diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index 96c308334659..cf149d15889b 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -263,22 +263,12 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata { } func (o *OrderedSynchronizer) Close(context.Context) error { - // Note that we're using the context of the synchronizer rather than the - // argument of Close() because the synchronizer derives its own tracing - // span. - ctx := o.EnsureCtx() o.accountingHelper.Release() - var lastErr error - for _, input := range o.inputs { - if err := input.ToClose.Close(ctx); err != nil { - lastErr = err - } - } if o.span != nil { o.span.Finish() } *o = OrderedSynchronizer{} - return lastErr + return nil } func (o *OrderedSynchronizer) compareRow(batchIdx1 int, batchIdx2 int) int { diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index e94071894849..dbade33393de 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -226,9 +226,6 @@ func (s *ParallelUnorderedSynchronizer) init() { if int(atomic.AddUint32(&s.numFinishedInputs, 1)) == len(s.inputs) { close(s.batchCh) } - // We need to close all of the closers of this input before we - // notify the wait groups. - input.ToClose.CloseAndLogOnErr(s.inputCtxs[inputIdx], "parallel unordered synchronizer input") s.internalWaitGroup.Done() s.externalWaitGroup.Done() }() @@ -485,33 +482,19 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada // Close is part of the colexecop.ClosableOperator interface. func (s *ParallelUnorderedSynchronizer) Close(ctx context.Context) error { if state := s.getState(); state != parallelUnorderedSynchronizerStateUninitialized { - // Input goroutines have been started and will take care of closing the - // closers from the corresponding input trees, so we don't need to do - // anything. + // Input goroutines have been started and will take care of finishing + // the tracing spans. return nil } // If the synchronizer is in "uninitialized" state, it means that the // goroutines for each input haven't been started, so they won't be able to - // close the Closers from the corresponding trees. In such a scenario the - // synchronizer must close all of them from all input trees. Note that it is - // ok to close some input trees even if they haven't been initialized. - // - // Note that at this point we know that the input goroutines won't be - // spawned up (our consumer won't call Next/DrainMeta after calling Close), - // so it is safe to close all closers from this goroutine. - var lastErr error - for _, input := range s.inputs { - if err := input.ToClose.Close(ctx); err != nil { - lastErr = err - } - } - // Finish the spans after closing the Closers since Close() implementations - // might log some stuff. + // finish their tracing spans. In such a scenario the synchronizer must do + // that on its own. for i, span := range s.tracingSpans { if span != nil { span.Finish() s.tracingSpans[i] = nil } } - return lastErr + return nil } diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go index fdc9d98a7621..452739b57f55 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go @@ -248,50 +248,6 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) { require.Equal(t, len(inputs), int(atomic.LoadUint32(&s.numFinishedInputs))) } -// TestParallelUnorderedSyncClosesInputs verifies that the parallel unordered -// synchronizer closes the input trees if it encounters a panic during the -// initialization. -func TestParallelUnorderedSyncClosesInputs(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - const injectedPanicMsg = "injected panic" - inputs := make([]colexecargs.OpWithMetaInfo, 2) - - // Create the first input that is responsible for tracking whether the - // closure occurred as expected. - closed := false - firstInput := &colexecop.CallbackOperator{ - CloseCb: func(context.Context) error { - closed = true - return nil - }, - } - inputs[0].Root = firstInput - inputs[0].ToClose = append(inputs[0].ToClose, firstInput) - - // Create the second input that injects a panic into Init. - inputs[1].Root = &colexecop.CallbackOperator{ - InitCb: func(context.Context) { - colexecerror.InternalError(errors.New(injectedPanicMsg)) - }, - } - - // Create and initialize (but don't run) the synchronizer. - var wg sync.WaitGroup - s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg) - err := colexecerror.CatchVectorizedRuntimeError(func() { s.Init(ctx) }) - require.NotNil(t, err) - require.True(t, strings.Contains(err.Error(), injectedPanicMsg)) - - // In the production setting, the user of the synchronizer is still expected - // to close it, even if a panic is encountered in Init, so we do the same - // thing here and verify that the first input is properly closed. - require.NoError(t, s.Close(ctx)) - require.True(t, closed) -} - func BenchmarkParallelUnorderedSynchronizer(b *testing.B) { const numInputs = 6 diff --git a/pkg/sql/colexec/serial_unordered_synchronizer.go b/pkg/sql/colexec/serial_unordered_synchronizer.go index 322cdd966a52..18b0dd2cb213 100644 --- a/pkg/sql/colexec/serial_unordered_synchronizer.go +++ b/pkg/sql/colexec/serial_unordered_synchronizer.go @@ -109,19 +109,9 @@ func (s *SerialUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata // Close is part of the colexecop.ClosableOperator interface. func (s *SerialUnorderedSynchronizer) Close(context.Context) error { - // Note that we're using the context of the synchronizer rather than the - // argument of Close() because the synchronizer derives its own tracing - // span. - ctx := s.EnsureCtx() - var lastErr error - for _, input := range s.inputs { - if err := input.ToClose.Close(ctx); err != nil { - lastErr = err - } - } if s.span != nil { s.span.Finish() s.span = nil } - return lastErr + return nil } diff --git a/pkg/sql/colexecop/BUILD.bazel b/pkg/sql/colexecop/BUILD.bazel index df37bb1dcc3c..cbdb45f95633 100644 --- a/pkg/sql/colexecop/BUILD.bazel +++ b/pkg/sql/colexecop/BUILD.bazel @@ -20,7 +20,6 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/execstats", "//pkg/sql/types", - "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/colexecop/operator.go b/pkg/sql/colexecop/operator.go index 14333fcde815..6071f7cc3d51 100644 --- a/pkg/sql/colexecop/operator.go +++ b/pkg/sql/colexecop/operator.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -163,22 +162,6 @@ type Closer interface { // Closers is a slice of Closers. type Closers []Closer -// CloseAndLogOnErr closes all Closers and logs the error if the log verbosity -// is 1 or higher. The given prefix is prepended to the log message. -// Note: this method should *only* be used when returning an error doesn't make -// sense. -func (c Closers) CloseAndLogOnErr(ctx context.Context, prefix string) { - if err := colexecerror.CatchVectorizedRuntimeError(func() { - for _, closer := range c { - if err := closer.Close(ctx); err != nil && log.V(1) { - log.Infof(ctx, "%s: error closing Closer: %v", prefix, err) - } - } - }); err != nil && log.V(1) { - log.Infof(ctx, "%s: runtime error closing the closers: %v", prefix, err) - } -} - // Close closes all Closers and returns the last error (if any occurs). func (c Closers) Close(ctx context.Context) error { var lastErr error diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 9d57ae2dcc60..2e1be95d3a98 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -113,7 +113,7 @@ func NewOutbox( return o, nil } -func (o *Outbox) close(ctx context.Context) { +func (o *Outbox) close() { o.scratch.buf = nil o.scratch.msg = nil // Unset the input (which is a deselector operator) so that its output batch @@ -122,7 +122,6 @@ func (o *Outbox) close(ctx context.Context) { // the deselector). o.Input = nil o.unlimitedAllocator.ReleaseAll() - o.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "outbox") } // Run starts an outbox by connecting to the provided node and pushing @@ -217,7 +216,7 @@ func (o *Outbox) Run( return nil }(); err != nil { // error during stream set up. - o.close(ctx) + o.close() return } @@ -436,6 +435,6 @@ func (o *Outbox) runWithStream( } } - o.close(ctx) + o.close() <-waitCh } diff --git a/pkg/sql/colflow/explain_vec.go b/pkg/sql/colflow/explain_vec.go index 48a7c2e75a28..61db06e86548 100644 --- a/pkg/sql/colflow/explain_vec.go +++ b/pkg/sql/colflow/explain_vec.go @@ -12,7 +12,6 @@ package colflow import ( "context" - "math" "reflect" "sort" @@ -26,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/util/admission" - "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/treeprinter" "github.com/cockroachdb/errors" ) @@ -34,10 +32,10 @@ import ( // convertToVecTree converts the flow to a tree of vectorized operators // returning a list of the leap operators or an error if the flow vectorization // is not supported. Note that it does so by setting up the full flow without -// running the components asynchronously, so it is pretty expensive. -// It also returns a non-nil cleanup function that releases all -// execinfra.Releasable objects which can *only* be performed once opChains are -// no longer needed. +// running the components asynchronously, so it is pretty expensive. It also +// returns a non-nil cleanup function that closes all the closers as well as +// releases all execreleasable.Releasable objects which can *only* be performed +// once opChains are no longer needed. func convertToVecTree( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -61,24 +59,12 @@ func convertToVecTree( flowCtx.Cfg.VecFDSemaphore, flowCtx.NewTypeResolver(flowCtx.Txn), admission.WorkInfo{}, ) - // We create an unlimited memory account because we're interested whether the - // flow is supported via the vectorized engine in general (without paying - // attention to the memory since it is node-dependent in the distributed - // case). - memoryMonitor := mon.NewMonitor( - "convert-to-vec-tree", - mon.MemoryResource, - nil, /* curCount */ - nil, /* maxHist */ - -1, /* increment */ - math.MaxInt64, /* noteworthy */ - flowCtx.Cfg.Settings, - ) - memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) - defer memoryMonitor.Stop(ctx) - defer creator.cleanup(ctx) opChains, _, err = creator.setupFlow(ctx, flowCtx, flow.Processors, localProcessors, fuseOpt) - return opChains, creator.Release, err + cleanup = func() { + creator.cleanup(ctx) + creator.Release() + } + return opChains, cleanup, err } // fakeBatchReceiver exists for the sole purpose of convertToVecTree method. In @@ -106,11 +92,6 @@ type flowWithNode struct { // It also supports printing of already constructed operator chains which takes // priority if non-nil (flows are ignored). All operators in opChains are // assumed to be planned on the gateway. -// -// As the second return parameter it returns a non-nil cleanup function which -// can be called only **after** closing the planNode tree containing the -// explainVecNode (if ExplainVec is used by another caller, then it can be -// called at any time). func ExplainVec( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -120,21 +101,11 @@ func ExplainVec( gatewaySQLInstanceID base.SQLInstanceID, verbose bool, distributed bool, -) (_ []string, cleanup func(), _ error) { +) ([]string, error) { tp := treeprinter.NewWithStyle(treeprinter.CompactStyle) root := tp.Child("│") - var ( - cleanups []func() - err error - conversionErr error - ) - defer func() { - cleanup = func() { - for _, c := range cleanups { - c() - } - } - }() + var err error + var conversionErr error // It is possible that when iterating over execopnode.OpNodes we will hit a // panic (an input that doesn't implement OpNode interface), so we're // catching such errors. @@ -150,8 +121,11 @@ func ExplainVec( // last. sort.Slice(sortedFlows, func(i, j int) bool { return sortedFlows[i].sqlInstanceID < sortedFlows[j].sqlInstanceID }) for _, flow := range sortedFlows { + var cleanup func() opChains, cleanup, err = convertToVecTree(ctx, flowCtx, flow.flow, localProcessors, !distributed) - cleanups = append(cleanups, cleanup) + // We need to delay the cleanup until after the tree has been + // formatted. + defer cleanup() if err != nil { conversionErr = err return @@ -160,12 +134,12 @@ func ExplainVec( } } }); err != nil { - return nil, nil, err + return nil, err } if conversionErr != nil { - return nil, nil, conversionErr + return nil, conversionErr } - return tp.FormattedRows(), nil, nil + return tp.FormattedRows(), nil } func formatChains( diff --git a/pkg/sql/colflow/flow_coordinator.go b/pkg/sql/colflow/flow_coordinator.go index 3dce8db4b33c..752d1fb95f36 100644 --- a/pkg/sql/colflow/flow_coordinator.go +++ b/pkg/sql/colflow/flow_coordinator.go @@ -278,13 +278,8 @@ func (f *BatchFlowCoordinator) Run(ctx context.Context) { // Make sure that we close the coordinator and notify the batch receiver in // all cases. defer func() { - if err := f.close(ctx); err != nil && status != execinfra.ConsumerClosed { - f.pushError(err) - } + f.cancelFlow() f.output.ProducerDone() - // Note that f.close is only safe to call before finishing the tracing - // span because some components might still use the span when they are - // being closed. span.Finish() }() @@ -342,19 +337,6 @@ func (f *BatchFlowCoordinator) Run(ctx context.Context) { } } -// close cancels the flow and closes all colexecop.Closers the coordinator is -// responsible for. -func (f *BatchFlowCoordinator) close(ctx context.Context) error { - f.cancelFlow() - var lastErr error - for _, toClose := range f.input.ToClose { - if err := toClose.Close(ctx); err != nil { - lastErr = err - } - } - return lastErr -} - // Release implements the execinfra.Releasable interface. func (f *BatchFlowCoordinator) Release() { *f = BatchFlowCoordinator{} diff --git a/pkg/sql/colflow/routers.go b/pkg/sql/colflow/routers.go index e3d6932454ab..86252f75a5e3 100644 --- a/pkg/sql/colflow/routers.go +++ b/pkg/sql/colflow/routers.go @@ -646,8 +646,6 @@ func (r *HashRouter) Run(ctx context.Context) { // in DrainMeta. r.waitForMetadata <- bufferedMeta close(r.waitForMetadata) - - r.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "hash router") } // processNextBatch reads the next batch from its input, hashes it and adds diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 3f113e38666e..458491fb13ea 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -197,16 +197,6 @@ type vectorizedFlow struct { path string } - testingInfo struct { - // numClosers is the number of components in the flow that implement - // Close. This is used for testing assertions. - numClosers int32 - // numClosed is a pointer to an int32 that is updated atomically when a - // component's Close method is called. This is used for testing - // assertions. - numClosed *int32 - } - testingKnobs struct { // onSetupFlow is a testing knob that is called before calling // creator.setupFlow with the given creator. @@ -288,8 +278,6 @@ func (f *vectorizedFlow) Setup( return ctx, nil, err } f.batchFlowCoordinator = batchFlowCoordinator - f.testingInfo.numClosers = f.creator.numClosers - f.testingInfo.numClosed = &f.creator.numClosed f.SetStartedGoroutines(f.creator.operatorConcurrency) log.VEventf(ctx, 2, "vectorized flow setup succeeded") if !f.IsLocal() { @@ -381,21 +369,10 @@ func (f *vectorizedFlow) MemUsage() int64 { // Cleanup is part of the flowinfra.Flow interface. func (f *vectorizedFlow) Cleanup(ctx context.Context) { - // This cleans up all the memory and disk monitoring of the vectorized flow. + // This cleans up all the memory and disk monitoring of the vectorized flow + // as well as closes all the closers. f.creator.cleanup(ctx) - if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { - // Check that all closers have been closed. Note that we don't check - // this in case the flow was never started in the first place (it is ok - // to not check this since closers haven't allocated any resources in - // such a case). We also don't check when the panic injection is - // enabled since then Close() might be legitimately not called (if a - // panic is injected in Init() of the wrapped operator). - if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers { - colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed)) - } - } - f.tempStorage.Lock() created := f.tempStorage.path != "" f.tempStorage.Unlock() @@ -627,6 +604,11 @@ type vectorizedFlowCreator struct { opChains execopnode.OpChains // operatorConcurrency is set if any operators are executed in parallel. operatorConcurrency bool + // closers will be closed during the flow cleanup. It is safe to do so in + // the main flow goroutine since all other goroutines that might have used + // these objects must have exited by the time Cleanup() is called - + // Flow.Wait() ensures that. + closers colexecop.Closers // releasables contains all components that should be released back to their // pools during the flow cleanup. releasables []execreleasable.Releasable @@ -634,12 +616,6 @@ type vectorizedFlowCreator struct { monitorRegistry colexecargs.MonitorRegistry diskQueueCfg colcontainer.DiskQueueCfg fdSemaphore semaphore.Semaphore - - // numClosers and numClosed are used to assert during testing that the - // expected number of components are closed. The assertion only happens if - // the panic injection is not enabled. - numClosers int32 - numClosed int32 } var _ execreleasable.Releasable = &vectorizedFlowCreator{} @@ -696,6 +672,15 @@ func newVectorizedFlowCreator( } func (s *vectorizedFlowCreator) cleanup(ctx context.Context) { + if err := colexecerror.CatchVectorizedRuntimeError(func() { + for _, closer := range s.closers { + if err := closer.Close(ctx); err != nil && log.V(1) { + log.Infof(ctx, "error closing Closer: %v", err) + } + } + }); err != nil && log.V(1) { + log.Infof(ctx, "runtime error closing the closers: %v", err) + } s.monitorRegistry.Close(ctx) } @@ -717,6 +702,9 @@ func (s *vectorizedFlowCreator) Release() { for i := range s.opChains { s.opChains[i] = nil } + for i := range s.closers { + s.closers[i] = nil + } for i := range s.releasables { s.releasables[i] = nil } @@ -732,6 +720,7 @@ func (s *vectorizedFlowCreator) Release() { // prime it for reuse. procIdxQueue: s.procIdxQueue[:0], opChains: s.opChains[:0], + closers: s.closers[:0], releasables: s.releasables[:0], monitorRegistry: s.monitorRegistry, } @@ -823,6 +812,7 @@ func (s *vectorizedFlowCreator) setupRouter( foundLocalOutput := false for i, op := range outputs { + s.closers = append(s.closers, op) if buildutil.CrdbTestBuild { op = colexec.NewInvariantsChecker(op) } @@ -837,7 +827,6 @@ func (s *vectorizedFlowCreator) setupRouter( ctx, flowCtx, colexecargs.OpWithMetaInfo{ Root: op, MetadataSources: colexecop.MetadataSources{op}, - ToClose: colexecop.Closers{op}, }, outputTyps, stream, factory, nil, /* getStats */ ); err != nil { return err @@ -847,8 +836,6 @@ func (s *vectorizedFlowCreator) setupRouter( opWithMetaInfo := colexecargs.OpWithMetaInfo{ Root: op, MetadataSources: colexecop.MetadataSources{op}, - // input.ToClose will be closed by the hash router. - ToClose: colexecop.Closers{op}, } if s.recordingStats { mons := []*mon.BytesMonitor{hashRouterMemMonitor, diskMon} @@ -967,15 +954,15 @@ func (s *vectorizedFlowCreator) setupInput( opWithMetaInfo = colexecargs.OpWithMetaInfo{ Root: os, MetadataSources: colexecop.MetadataSources{os}, - ToClose: colexecop.Closers{os}, } + s.closers = append(s.closers, os) } else if input.Type == execinfrapb.InputSyncSpec_SERIAL_UNORDERED || opt == flowinfra.FuseAggressively { sync := colexec.NewSerialUnorderedSynchronizer(inputStreamOps) opWithMetaInfo = colexecargs.OpWithMetaInfo{ Root: sync, MetadataSources: colexecop.MetadataSources{sync}, - ToClose: colexecop.Closers{sync}, } + s.closers = append(s.closers, sync) } else { // Note that if we have opt == flowinfra.FuseAggressively, then we // must use the serial unordered sync above in order to remove any @@ -986,8 +973,8 @@ func (s *vectorizedFlowCreator) setupInput( opWithMetaInfo = colexecargs.OpWithMetaInfo{ Root: sync, MetadataSources: colexecop.MetadataSources{sync}, - ToClose: colexecop.Closers{sync}, } + s.closers = append(s.closers, sync) s.operatorConcurrency = true // Don't use the unordered synchronizer's inputs for stats collection // given that they run concurrently. The stall time will be collected @@ -1079,14 +1066,8 @@ func (s *vectorizedFlowCreator) setupOutput( s.releasables = append(s.releasables, s.batchFlowCoordinator) } else { // We need to use the row receiving output. - if input != nil { - // We successfully removed the columnarizer. - if buildutil.CrdbTestBuild { - // That columnarizer was added as a closer, so we need to - // decrement the number of expected closers. - s.numClosers-- - } - } else { + if input == nil { + // We couldn't remove the columnarizer. input = colexec.NewMaterializerNoEvalCtxCopy( colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory), flowCtx, @@ -1118,19 +1099,6 @@ func (s *vectorizedFlowCreator) setupOutput( return nil } -// callbackCloser is a utility struct that implements the Closer interface by -// calling the provided callback. -type callbackCloser struct { - closeCb func(context.Context) error -} - -var _ colexecop.Closer = &callbackCloser{} - -// Close implements the Closer interface. -func (c *callbackCloser) Close(ctx context.Context) error { - return c.closeCb(ctx) -} - func (s *vectorizedFlowCreator) setupFlow( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -1210,23 +1178,9 @@ func (s *vectorizedFlowCreator) setupFlow( err = errors.Wrapf(err, "unable to vectorize execution plan") return } + s.closers = append(s.closers, result.ToClose...) if flowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { result.Root = newPanicInjector(result.Root) - } else if buildutil.CrdbTestBuild { - toCloseCopy := append(colexecop.Closers{}, result.ToClose...) - for i := range toCloseCopy { - func(idx int) { - closed := false - result.ToClose[idx] = &callbackCloser{closeCb: func(ctx context.Context) error { - if !closed { - closed = true - atomic.AddInt32(&s.numClosed, 1) - } - return toCloseCopy[idx].Close(ctx) - }} - }(i) - } - s.numClosers += int32(len(result.ToClose)) } if s.recordingStats { diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 96e9a9e87252..5a38a4b4f63b 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -41,7 +41,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -58,16 +57,6 @@ var ( testScenarios = []testScenario{consumerDone, consumerClosed, useBatchReceiver} ) -type callbackCloser struct { - closeCb func(context.Context) error -} - -var _ colexecop.Closer = callbackCloser{} - -func (c callbackCloser) Close(ctx context.Context) error { - return c.closeCb(ctx) -} - // TestVectorizedFlowShutdown tests that closing the FlowCoordinator correctly // closes all the infrastructure corresponding to the flow ending in that // FlowCoordinator. Namely: @@ -245,12 +234,6 @@ func TestVectorizedFlowShutdown(t *testing.T) { inputMetadataSource := colexecop.MetadataSource(synchronizer) flowID := execinfrapb.FlowID{UUID: uuid.MakeV4()} - // idToClosed keeps track of whether Close was called for a given id. - idToClosed := struct { - syncutil.Mutex - mapping map[int]bool - }{} - idToClosed.mapping = make(map[int]bool) runOutboxInbox := func( outboxCtx context.Context, flowCtxCancel context.CancelFunc, @@ -260,20 +243,11 @@ func TestVectorizedFlowShutdown(t *testing.T) { id int, outboxMetadataSources []colexecop.MetadataSource, ) { - idToClosed.Lock() - idToClosed.mapping[id] = false - idToClosed.Unlock() outbox, err := colrpc.NewOutbox( colmem.NewAllocator(outboxCtx, outboxMemAcc, testColumnFactory), colexecargs.OpWithMetaInfo{ Root: outboxInput, MetadataSources: outboxMetadataSources, - ToClose: []colexecop.Closer{callbackCloser{closeCb: func(context.Context) error { - idToClosed.Lock() - idToClosed.mapping[id] = true - idToClosed.Unlock() - return nil - }}}, }, typs, nil, /* getStats */ @@ -365,14 +339,9 @@ func TestVectorizedFlowShutdown(t *testing.T) { input = synchronizer } - closeCalled := false inputInfo := colexecargs.OpWithMetaInfo{ Root: input, MetadataSources: colexecop.MetadataSources{inputMetadataSource}, - ToClose: colexecop.Closers{callbackCloser{closeCb: func(context.Context) error { - closeCalled = true - return nil - }}}, } // runFlowCoordinator creates a pair of a materializer and a @@ -468,11 +437,6 @@ func TestVectorizedFlowShutdown(t *testing.T) { } } wg.Wait() - // Ensure all the outboxes called Close. - for id, closed := range idToClosed.mapping { - require.True(t, closed, "outbox with ID %d did not call Close on closers", id) - } - require.True(t, closeCalled) }) } } diff --git a/pkg/sql/distsql/columnar_utils_test.go b/pkg/sql/distsql/columnar_utils_test.go index e978807f5f5b..4f373ab36036 100644 --- a/pkg/sql/distsql/columnar_utils_test.go +++ b/pkg/sql/distsql/columnar_utils_test.go @@ -161,6 +161,7 @@ func verifyColOperator(t *testing.T, args verifyColOperatorArgs) error { if err != nil { return err } + defer result.TestCleanupNoError(t) outColOp := colexec.NewMaterializer( nil, /* allocator */ diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index e7bbde9e3cb3..73a25cecae48 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -843,12 +843,11 @@ func (p *PlanningCtx) getDefaultSaveFlowsFunc( flowCtx, cleanup := newFlowCtxForExplainPurposes(ctx, p, planner) defer cleanup() getExplain := func(verbose bool) []string { - explain, cleanup, err := colflow.ExplainVec( + explain, err := colflow.ExplainVec( ctx, flowCtx, flows, p.infra.LocalProcessors, opChains, planner.extendedEvalCtx.DistSQLPlanner.gatewaySQLInstanceID, verbose, planner.curPlan.flags.IsDistributed(), ) - cleanup() if err != nil { // In some edge cases (like when subqueries are present or // when certain component doesn't implement execopnode.OpNode diff --git a/pkg/sql/explain_vec.go b/pkg/sql/explain_vec.go index ff576cf17d7d..7528200e5115 100644 --- a/pkg/sql/explain_vec.go +++ b/pkg/sql/explain_vec.go @@ -35,8 +35,6 @@ type explainVecNode struct { lines []string // The current row returned by the node. values tree.Datums - // cleanup will be called after closing the input tree. - cleanup func() } } @@ -77,7 +75,7 @@ func (n *explainVecNode) startExec(params runParams) error { } verbose := n.options.Flags[tree.ExplainFlagVerbose] willDistribute := physPlan.Distribution.WillDistribute() - n.run.lines, n.run.cleanup, err = colflow.ExplainVec( + n.run.lines, err = colflow.ExplainVec( params.ctx, flowCtx, flows, physPlan.LocalProcessors, nil, /* opChains */ distSQLPlanner.gatewaySQLInstanceID, verbose, willDistribute, ) @@ -152,7 +150,4 @@ func (n *explainVecNode) Next(runParams) (bool, error) { func (n *explainVecNode) Values() tree.Datums { return n.run.values } func (n *explainVecNode) Close(ctx context.Context) { n.plan.close(ctx) - if n.run.cleanup != nil { - n.run.cleanup() - } }