diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index b9c83caf240e..a37948e6e1d3 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -100,7 +100,6 @@ func wrapRowSources( // able to pool the underlying slices. inputs[i].StatsCollectors = nil inputs[i].MetadataSources = nil - inputs[i].ToClose = nil toWrapInputs = append(toWrapInputs, toWrapInput) *releasables = append(*releasables, toWrapInput) } @@ -493,10 +492,8 @@ func takeOverMetaInfo(target *colexecargs.OpWithMetaInfo, inputs []colexecargs.O for i := range inputs { target.StatsCollectors = append(target.StatsCollectors, inputs[i].StatsCollectors...) target.MetadataSources = append(target.MetadataSources, inputs[i].MetadataSources...) - target.ToClose = append(target.ToClose, inputs[i].ToClose...) inputs[i].MetadataSources = nil inputs[i].StatsCollectors = nil - inputs[i].ToClose = nil } } @@ -599,10 +596,10 @@ func MaybeRemoveRootColumnarizer(r colexecargs.OpWithMetaInfo) execinfra.RowSour return nil } // We have the columnarizer as the root, and it must be included into the - // MetadataSources and ToClose slices, so if we don't see any other objects, - // then the responsibility over other meta components has been claimed by - // the children of the columnarizer. - if len(r.StatsCollectors) != 0 || len(r.MetadataSources) != 1 || len(r.ToClose) != 1 { + // MetadataSources slice, so if we don't see any other objects, then the + // responsibility over other meta components has been claimed by the + // children of the columnarizer. + if len(r.StatsCollectors) != 0 || len(r.MetadataSources) != 1 { return nil } c.MarkAsRemovedFromFlow() diff --git a/pkg/sql/colexec/colexecargs/op_creation.go b/pkg/sql/colexec/colexecargs/op_creation.go index 2d50573db5f4..94c0e0244ae3 100644 --- a/pkg/sql/colexec/colexecargs/op_creation.go +++ b/pkg/sql/colexec/colexecargs/op_creation.go @@ -36,6 +36,13 @@ var TestNewColOperator func(ctx context.Context, flowCtx *execinfra.FlowCtx, arg // OpWithMetaInfo stores a colexecop.Operator together with miscellaneous meta // information about the tree rooted in that operator. +// +// Note that at some point colexecop.Closers were also included into this +// struct, but for ease of tracking we pulled them out to the flow-level. +// Closers are different from the objects tracked here since we have a +// convenient place to close them from the main goroutine whereas the stats +// collection as well as metadata draining must happen in the goroutine that +// "owns" these objects. // TODO(yuzefovich): figure out the story about pooling these objects. type OpWithMetaInfo struct { Root colexecop.Operator @@ -47,9 +54,6 @@ type OpWithMetaInfo struct { // tree rooted in Root for which the responsibility of draining hasn't been // claimed yet. MetadataSources colexecop.MetadataSources - // ToClose are all colexecop.Closers that are present in the tree rooted in - // Root for which the responsibility of closing hasn't been claimed yet. - ToClose colexecop.Closers } // NewColOperatorArgs is a helper struct that encompasses all of the input @@ -106,6 +110,7 @@ type NewColOperatorResult struct { // contract right now of whether or not a particular operator has to make a // copy of the type schema if it needs to use it later. ColumnTypes []*types.T + ToClose colexecop.Closers Releasables []execreleasable.Releasable } @@ -155,8 +160,8 @@ func (r *NewColOperatorResult) Release() { OpWithMetaInfo: OpWithMetaInfo{ StatsCollectors: r.StatsCollectors[:0], MetadataSources: r.MetadataSources[:0], - ToClose: r.ToClose[:0], }, + ToClose: r.ToClose[:0], Releasables: r.Releasables[:0], } newColOperatorResultPool.Put(r) diff --git a/pkg/sql/colexec/external_hash_joiner_test.go b/pkg/sql/colexec/external_hash_joiner_test.go index 88e434b9bcf3..603f07c0ed9d 100644 --- a/pkg/sql/colexec/external_hash_joiner_test.go +++ b/pkg/sql/colexec/external_hash_joiner_test.go @@ -81,11 +81,6 @@ func TestExternalHashJoiner(t *testing.T) { sem := colexecop.NewTestingSemaphore(colexecop.ExternalHJMinPartitions) semsToCheck = append(semsToCheck, sem) spec := createSpecForHashJoiner(tc) - // TODO(asubiotto): Pass in the testing.T of the caller to this - // function and do substring matching on the test name to - // conditionally explicitly call Close() on the hash joiner - // (through result.ToClose) in cases where it is known the sorter - // will not be drained. hjOp, closers, err := createDiskBackedHashJoiner( ctx, flowCtx, spec, sources, func() {}, queueCfg, numForcedRepartitions, delegateFDAcquisitions, sem, @@ -93,8 +88,6 @@ func TestExternalHashJoiner(t *testing.T) { ) // Expect three closers. These are the external hash joiner, and // one external sorter for each input. - // TODO(asubiotto): Explicitly Close when testing.T is passed into - // this constructor and we do a substring match. require.Equal(t, 3, len(closers)) return hjOp, err }) diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index fc608c77f218..b78421e3427f 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -94,19 +94,12 @@ func TestExternalSort(t *testing.T) { if tc.k == 0 || tc.k >= uint64(len(tc.tuples)) { semsToCheck = append(semsToCheck, sem) } - // TODO(asubiotto): Pass in the testing.T of the caller to this - // function and do substring matching on the test name to - // conditionally explicitly call Close() on the sorter (through - // result.ToClose) in cases where it is know the sorter will not - // be drained. sorter, closers, err := createDiskBackedSorter( ctx, flowCtx, input, tc.typs, tc.ordCols, tc.matchLen, tc.k, func() {}, numForcedRepartitions, false /* delegateFDAcquisition */, queueCfg, sem, &monitorRegistry, ) // Check that the sort was added as a Closer. - // TODO(asubiotto): Explicitly Close when testing.T is passed into - // this constructor and we do a substring match. require.Equal(t, 1, len(closers)) return sorter, err }) diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 449da21e879c..62b92c416024 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -60,9 +60,6 @@ type Materializer struct { // outputRow stores the returned results of next() to be passed through an // adapter. outputRow rowenc.EncDatumRow - - // closers is a slice of Closers that should be Closed on termination. - closers colexecop.Closers } // drainHelper is a utility struct that wraps MetadataSources in a RowSource @@ -207,7 +204,6 @@ func newMaterializerInternal( typs: typs, converter: colconv.NewAllVecToDatumConverter(len(typs)), row: make(rowenc.EncDatumRow, len(typs)), - closers: input.ToClose, } m.drainHelper.allocator = allocator m.drainHelper.statsCollectors = input.StatsCollectors @@ -221,8 +217,9 @@ func newMaterializerInternal( nil, /* output */ execinfra.ProcStateOpts{ // We append drainHelper to inputs to drain below in order to reuse - // the same underlying slice from the pooled materializer. - TrailingMetaCallback: m.trailingMetaCallback, + // the same underlying slice from the pooled materializer. The + // drainHelper is responsible for draining the metadata from the + // input tree. }, ) m.AddInputToDrain(&m.drainHelper) @@ -324,29 +321,6 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata return nil, m.DrainHelper() } -func (m *Materializer) trailingMetaCallback() []execinfrapb.ProducerMetadata { - // Note that we delegate draining all of the metadata sources to drainHelper - // which is added as an input to drain. - m.close() - return nil -} - -func (m *Materializer) close() { - if m.Closed { - return - } - // Make sure to call InternalClose() only after closing the closers - this - // allows the closers to utilize the unfinished tracing span (if tracing is - // enabled). - m.closers.CloseAndLogOnErr(m.Ctx(), "materializer") - m.InternalClose() -} - -// ConsumerClosed is part of the execinfra.RowSource interface. -func (m *Materializer) ConsumerClosed() { - m.close() -} - // Release implements the execinfra.Releasable interface. func (m *Materializer) Release() { m.ProcessorBaseNoHelper.Reset() diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index d71a8ea32639..98f582188fd2 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -315,22 +315,12 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata { } func (o *OrderedSynchronizer) Close(context.Context) error { - // Note that we're using the context of the synchronizer rather than the - // argument of Close() because the synchronizer derives its own tracing - // span. - ctx := o.EnsureCtx() o.accountingHelper.Release() - var lastErr error - for _, input := range o.inputs { - if err := input.ToClose.Close(ctx); err != nil { - lastErr = err - } - } if o.span != nil { o.span.Finish() } *o = OrderedSynchronizer{} - return lastErr + return nil } func (o *OrderedSynchronizer) compareRow(batchIdx1 int, batchIdx2 int) int { diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index 96c308334659..cf149d15889b 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -263,22 +263,12 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata { } func (o *OrderedSynchronizer) Close(context.Context) error { - // Note that we're using the context of the synchronizer rather than the - // argument of Close() because the synchronizer derives its own tracing - // span. - ctx := o.EnsureCtx() o.accountingHelper.Release() - var lastErr error - for _, input := range o.inputs { - if err := input.ToClose.Close(ctx); err != nil { - lastErr = err - } - } if o.span != nil { o.span.Finish() } *o = OrderedSynchronizer{} - return lastErr + return nil } func (o *OrderedSynchronizer) compareRow(batchIdx1 int, batchIdx2 int) int { diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index e94071894849..dbade33393de 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -226,9 +226,6 @@ func (s *ParallelUnorderedSynchronizer) init() { if int(atomic.AddUint32(&s.numFinishedInputs, 1)) == len(s.inputs) { close(s.batchCh) } - // We need to close all of the closers of this input before we - // notify the wait groups. - input.ToClose.CloseAndLogOnErr(s.inputCtxs[inputIdx], "parallel unordered synchronizer input") s.internalWaitGroup.Done() s.externalWaitGroup.Done() }() @@ -485,33 +482,19 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada // Close is part of the colexecop.ClosableOperator interface. func (s *ParallelUnorderedSynchronizer) Close(ctx context.Context) error { if state := s.getState(); state != parallelUnorderedSynchronizerStateUninitialized { - // Input goroutines have been started and will take care of closing the - // closers from the corresponding input trees, so we don't need to do - // anything. + // Input goroutines have been started and will take care of finishing + // the tracing spans. return nil } // If the synchronizer is in "uninitialized" state, it means that the // goroutines for each input haven't been started, so they won't be able to - // close the Closers from the corresponding trees. In such a scenario the - // synchronizer must close all of them from all input trees. Note that it is - // ok to close some input trees even if they haven't been initialized. - // - // Note that at this point we know that the input goroutines won't be - // spawned up (our consumer won't call Next/DrainMeta after calling Close), - // so it is safe to close all closers from this goroutine. - var lastErr error - for _, input := range s.inputs { - if err := input.ToClose.Close(ctx); err != nil { - lastErr = err - } - } - // Finish the spans after closing the Closers since Close() implementations - // might log some stuff. + // finish their tracing spans. In such a scenario the synchronizer must do + // that on its own. for i, span := range s.tracingSpans { if span != nil { span.Finish() s.tracingSpans[i] = nil } } - return lastErr + return nil } diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go index fdc9d98a7621..452739b57f55 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go @@ -248,50 +248,6 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) { require.Equal(t, len(inputs), int(atomic.LoadUint32(&s.numFinishedInputs))) } -// TestParallelUnorderedSyncClosesInputs verifies that the parallel unordered -// synchronizer closes the input trees if it encounters a panic during the -// initialization. -func TestParallelUnorderedSyncClosesInputs(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - const injectedPanicMsg = "injected panic" - inputs := make([]colexecargs.OpWithMetaInfo, 2) - - // Create the first input that is responsible for tracking whether the - // closure occurred as expected. - closed := false - firstInput := &colexecop.CallbackOperator{ - CloseCb: func(context.Context) error { - closed = true - return nil - }, - } - inputs[0].Root = firstInput - inputs[0].ToClose = append(inputs[0].ToClose, firstInput) - - // Create the second input that injects a panic into Init. - inputs[1].Root = &colexecop.CallbackOperator{ - InitCb: func(context.Context) { - colexecerror.InternalError(errors.New(injectedPanicMsg)) - }, - } - - // Create and initialize (but don't run) the synchronizer. - var wg sync.WaitGroup - s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg) - err := colexecerror.CatchVectorizedRuntimeError(func() { s.Init(ctx) }) - require.NotNil(t, err) - require.True(t, strings.Contains(err.Error(), injectedPanicMsg)) - - // In the production setting, the user of the synchronizer is still expected - // to close it, even if a panic is encountered in Init, so we do the same - // thing here and verify that the first input is properly closed. - require.NoError(t, s.Close(ctx)) - require.True(t, closed) -} - func BenchmarkParallelUnorderedSynchronizer(b *testing.B) { const numInputs = 6 diff --git a/pkg/sql/colexec/serial_unordered_synchronizer.go b/pkg/sql/colexec/serial_unordered_synchronizer.go index 322cdd966a52..18b0dd2cb213 100644 --- a/pkg/sql/colexec/serial_unordered_synchronizer.go +++ b/pkg/sql/colexec/serial_unordered_synchronizer.go @@ -109,19 +109,9 @@ func (s *SerialUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata // Close is part of the colexecop.ClosableOperator interface. func (s *SerialUnorderedSynchronizer) Close(context.Context) error { - // Note that we're using the context of the synchronizer rather than the - // argument of Close() because the synchronizer derives its own tracing - // span. - ctx := s.EnsureCtx() - var lastErr error - for _, input := range s.inputs { - if err := input.ToClose.Close(ctx); err != nil { - lastErr = err - } - } if s.span != nil { s.span.Finish() s.span = nil } - return lastErr + return nil } diff --git a/pkg/sql/colexecop/BUILD.bazel b/pkg/sql/colexecop/BUILD.bazel index df37bb1dcc3c..cbdb45f95633 100644 --- a/pkg/sql/colexecop/BUILD.bazel +++ b/pkg/sql/colexecop/BUILD.bazel @@ -20,7 +20,6 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/execstats", "//pkg/sql/types", - "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/colexecop/operator.go b/pkg/sql/colexecop/operator.go index 14333fcde815..6071f7cc3d51 100644 --- a/pkg/sql/colexecop/operator.go +++ b/pkg/sql/colexecop/operator.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -163,22 +162,6 @@ type Closer interface { // Closers is a slice of Closers. type Closers []Closer -// CloseAndLogOnErr closes all Closers and logs the error if the log verbosity -// is 1 or higher. The given prefix is prepended to the log message. -// Note: this method should *only* be used when returning an error doesn't make -// sense. -func (c Closers) CloseAndLogOnErr(ctx context.Context, prefix string) { - if err := colexecerror.CatchVectorizedRuntimeError(func() { - for _, closer := range c { - if err := closer.Close(ctx); err != nil && log.V(1) { - log.Infof(ctx, "%s: error closing Closer: %v", prefix, err) - } - } - }); err != nil && log.V(1) { - log.Infof(ctx, "%s: runtime error closing the closers: %v", prefix, err) - } -} - // Close closes all Closers and returns the last error (if any occurs). func (c Closers) Close(ctx context.Context) error { var lastErr error diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 9d57ae2dcc60..2e1be95d3a98 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -113,7 +113,7 @@ func NewOutbox( return o, nil } -func (o *Outbox) close(ctx context.Context) { +func (o *Outbox) close() { o.scratch.buf = nil o.scratch.msg = nil // Unset the input (which is a deselector operator) so that its output batch @@ -122,7 +122,6 @@ func (o *Outbox) close(ctx context.Context) { // the deselector). o.Input = nil o.unlimitedAllocator.ReleaseAll() - o.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "outbox") } // Run starts an outbox by connecting to the provided node and pushing @@ -217,7 +216,7 @@ func (o *Outbox) Run( return nil }(); err != nil { // error during stream set up. - o.close(ctx) + o.close() return } @@ -436,6 +435,6 @@ func (o *Outbox) runWithStream( } } - o.close(ctx) + o.close() <-waitCh } diff --git a/pkg/sql/colflow/explain_vec.go b/pkg/sql/colflow/explain_vec.go index 48a7c2e75a28..61db06e86548 100644 --- a/pkg/sql/colflow/explain_vec.go +++ b/pkg/sql/colflow/explain_vec.go @@ -12,7 +12,6 @@ package colflow import ( "context" - "math" "reflect" "sort" @@ -26,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/util/admission" - "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/treeprinter" "github.com/cockroachdb/errors" ) @@ -34,10 +32,10 @@ import ( // convertToVecTree converts the flow to a tree of vectorized operators // returning a list of the leap operators or an error if the flow vectorization // is not supported. Note that it does so by setting up the full flow without -// running the components asynchronously, so it is pretty expensive. -// It also returns a non-nil cleanup function that releases all -// execinfra.Releasable objects which can *only* be performed once opChains are -// no longer needed. +// running the components asynchronously, so it is pretty expensive. It also +// returns a non-nil cleanup function that closes all the closers as well as +// releases all execreleasable.Releasable objects which can *only* be performed +// once opChains are no longer needed. func convertToVecTree( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -61,24 +59,12 @@ func convertToVecTree( flowCtx.Cfg.VecFDSemaphore, flowCtx.NewTypeResolver(flowCtx.Txn), admission.WorkInfo{}, ) - // We create an unlimited memory account because we're interested whether the - // flow is supported via the vectorized engine in general (without paying - // attention to the memory since it is node-dependent in the distributed - // case). - memoryMonitor := mon.NewMonitor( - "convert-to-vec-tree", - mon.MemoryResource, - nil, /* curCount */ - nil, /* maxHist */ - -1, /* increment */ - math.MaxInt64, /* noteworthy */ - flowCtx.Cfg.Settings, - ) - memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) - defer memoryMonitor.Stop(ctx) - defer creator.cleanup(ctx) opChains, _, err = creator.setupFlow(ctx, flowCtx, flow.Processors, localProcessors, fuseOpt) - return opChains, creator.Release, err + cleanup = func() { + creator.cleanup(ctx) + creator.Release() + } + return opChains, cleanup, err } // fakeBatchReceiver exists for the sole purpose of convertToVecTree method. In @@ -106,11 +92,6 @@ type flowWithNode struct { // It also supports printing of already constructed operator chains which takes // priority if non-nil (flows are ignored). All operators in opChains are // assumed to be planned on the gateway. -// -// As the second return parameter it returns a non-nil cleanup function which -// can be called only **after** closing the planNode tree containing the -// explainVecNode (if ExplainVec is used by another caller, then it can be -// called at any time). func ExplainVec( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -120,21 +101,11 @@ func ExplainVec( gatewaySQLInstanceID base.SQLInstanceID, verbose bool, distributed bool, -) (_ []string, cleanup func(), _ error) { +) ([]string, error) { tp := treeprinter.NewWithStyle(treeprinter.CompactStyle) root := tp.Child("│") - var ( - cleanups []func() - err error - conversionErr error - ) - defer func() { - cleanup = func() { - for _, c := range cleanups { - c() - } - } - }() + var err error + var conversionErr error // It is possible that when iterating over execopnode.OpNodes we will hit a // panic (an input that doesn't implement OpNode interface), so we're // catching such errors. @@ -150,8 +121,11 @@ func ExplainVec( // last. sort.Slice(sortedFlows, func(i, j int) bool { return sortedFlows[i].sqlInstanceID < sortedFlows[j].sqlInstanceID }) for _, flow := range sortedFlows { + var cleanup func() opChains, cleanup, err = convertToVecTree(ctx, flowCtx, flow.flow, localProcessors, !distributed) - cleanups = append(cleanups, cleanup) + // We need to delay the cleanup until after the tree has been + // formatted. + defer cleanup() if err != nil { conversionErr = err return @@ -160,12 +134,12 @@ func ExplainVec( } } }); err != nil { - return nil, nil, err + return nil, err } if conversionErr != nil { - return nil, nil, conversionErr + return nil, conversionErr } - return tp.FormattedRows(), nil, nil + return tp.FormattedRows(), nil } func formatChains( diff --git a/pkg/sql/colflow/flow_coordinator.go b/pkg/sql/colflow/flow_coordinator.go index 3dce8db4b33c..752d1fb95f36 100644 --- a/pkg/sql/colflow/flow_coordinator.go +++ b/pkg/sql/colflow/flow_coordinator.go @@ -278,13 +278,8 @@ func (f *BatchFlowCoordinator) Run(ctx context.Context) { // Make sure that we close the coordinator and notify the batch receiver in // all cases. defer func() { - if err := f.close(ctx); err != nil && status != execinfra.ConsumerClosed { - f.pushError(err) - } + f.cancelFlow() f.output.ProducerDone() - // Note that f.close is only safe to call before finishing the tracing - // span because some components might still use the span when they are - // being closed. span.Finish() }() @@ -342,19 +337,6 @@ func (f *BatchFlowCoordinator) Run(ctx context.Context) { } } -// close cancels the flow and closes all colexecop.Closers the coordinator is -// responsible for. -func (f *BatchFlowCoordinator) close(ctx context.Context) error { - f.cancelFlow() - var lastErr error - for _, toClose := range f.input.ToClose { - if err := toClose.Close(ctx); err != nil { - lastErr = err - } - } - return lastErr -} - // Release implements the execinfra.Releasable interface. func (f *BatchFlowCoordinator) Release() { *f = BatchFlowCoordinator{} diff --git a/pkg/sql/colflow/routers.go b/pkg/sql/colflow/routers.go index e3d6932454ab..86252f75a5e3 100644 --- a/pkg/sql/colflow/routers.go +++ b/pkg/sql/colflow/routers.go @@ -646,8 +646,6 @@ func (r *HashRouter) Run(ctx context.Context) { // in DrainMeta. r.waitForMetadata <- bufferedMeta close(r.waitForMetadata) - - r.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "hash router") } // processNextBatch reads the next batch from its input, hashes it and adds diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 3f113e38666e..458491fb13ea 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -197,16 +197,6 @@ type vectorizedFlow struct { path string } - testingInfo struct { - // numClosers is the number of components in the flow that implement - // Close. This is used for testing assertions. - numClosers int32 - // numClosed is a pointer to an int32 that is updated atomically when a - // component's Close method is called. This is used for testing - // assertions. - numClosed *int32 - } - testingKnobs struct { // onSetupFlow is a testing knob that is called before calling // creator.setupFlow with the given creator. @@ -288,8 +278,6 @@ func (f *vectorizedFlow) Setup( return ctx, nil, err } f.batchFlowCoordinator = batchFlowCoordinator - f.testingInfo.numClosers = f.creator.numClosers - f.testingInfo.numClosed = &f.creator.numClosed f.SetStartedGoroutines(f.creator.operatorConcurrency) log.VEventf(ctx, 2, "vectorized flow setup succeeded") if !f.IsLocal() { @@ -381,21 +369,10 @@ func (f *vectorizedFlow) MemUsage() int64 { // Cleanup is part of the flowinfra.Flow interface. func (f *vectorizedFlow) Cleanup(ctx context.Context) { - // This cleans up all the memory and disk monitoring of the vectorized flow. + // This cleans up all the memory and disk monitoring of the vectorized flow + // as well as closes all the closers. f.creator.cleanup(ctx) - if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { - // Check that all closers have been closed. Note that we don't check - // this in case the flow was never started in the first place (it is ok - // to not check this since closers haven't allocated any resources in - // such a case). We also don't check when the panic injection is - // enabled since then Close() might be legitimately not called (if a - // panic is injected in Init() of the wrapped operator). - if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers { - colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed)) - } - } - f.tempStorage.Lock() created := f.tempStorage.path != "" f.tempStorage.Unlock() @@ -627,6 +604,11 @@ type vectorizedFlowCreator struct { opChains execopnode.OpChains // operatorConcurrency is set if any operators are executed in parallel. operatorConcurrency bool + // closers will be closed during the flow cleanup. It is safe to do so in + // the main flow goroutine since all other goroutines that might have used + // these objects must have exited by the time Cleanup() is called - + // Flow.Wait() ensures that. + closers colexecop.Closers // releasables contains all components that should be released back to their // pools during the flow cleanup. releasables []execreleasable.Releasable @@ -634,12 +616,6 @@ type vectorizedFlowCreator struct { monitorRegistry colexecargs.MonitorRegistry diskQueueCfg colcontainer.DiskQueueCfg fdSemaphore semaphore.Semaphore - - // numClosers and numClosed are used to assert during testing that the - // expected number of components are closed. The assertion only happens if - // the panic injection is not enabled. - numClosers int32 - numClosed int32 } var _ execreleasable.Releasable = &vectorizedFlowCreator{} @@ -696,6 +672,15 @@ func newVectorizedFlowCreator( } func (s *vectorizedFlowCreator) cleanup(ctx context.Context) { + if err := colexecerror.CatchVectorizedRuntimeError(func() { + for _, closer := range s.closers { + if err := closer.Close(ctx); err != nil && log.V(1) { + log.Infof(ctx, "error closing Closer: %v", err) + } + } + }); err != nil && log.V(1) { + log.Infof(ctx, "runtime error closing the closers: %v", err) + } s.monitorRegistry.Close(ctx) } @@ -717,6 +702,9 @@ func (s *vectorizedFlowCreator) Release() { for i := range s.opChains { s.opChains[i] = nil } + for i := range s.closers { + s.closers[i] = nil + } for i := range s.releasables { s.releasables[i] = nil } @@ -732,6 +720,7 @@ func (s *vectorizedFlowCreator) Release() { // prime it for reuse. procIdxQueue: s.procIdxQueue[:0], opChains: s.opChains[:0], + closers: s.closers[:0], releasables: s.releasables[:0], monitorRegistry: s.monitorRegistry, } @@ -823,6 +812,7 @@ func (s *vectorizedFlowCreator) setupRouter( foundLocalOutput := false for i, op := range outputs { + s.closers = append(s.closers, op) if buildutil.CrdbTestBuild { op = colexec.NewInvariantsChecker(op) } @@ -837,7 +827,6 @@ func (s *vectorizedFlowCreator) setupRouter( ctx, flowCtx, colexecargs.OpWithMetaInfo{ Root: op, MetadataSources: colexecop.MetadataSources{op}, - ToClose: colexecop.Closers{op}, }, outputTyps, stream, factory, nil, /* getStats */ ); err != nil { return err @@ -847,8 +836,6 @@ func (s *vectorizedFlowCreator) setupRouter( opWithMetaInfo := colexecargs.OpWithMetaInfo{ Root: op, MetadataSources: colexecop.MetadataSources{op}, - // input.ToClose will be closed by the hash router. - ToClose: colexecop.Closers{op}, } if s.recordingStats { mons := []*mon.BytesMonitor{hashRouterMemMonitor, diskMon} @@ -967,15 +954,15 @@ func (s *vectorizedFlowCreator) setupInput( opWithMetaInfo = colexecargs.OpWithMetaInfo{ Root: os, MetadataSources: colexecop.MetadataSources{os}, - ToClose: colexecop.Closers{os}, } + s.closers = append(s.closers, os) } else if input.Type == execinfrapb.InputSyncSpec_SERIAL_UNORDERED || opt == flowinfra.FuseAggressively { sync := colexec.NewSerialUnorderedSynchronizer(inputStreamOps) opWithMetaInfo = colexecargs.OpWithMetaInfo{ Root: sync, MetadataSources: colexecop.MetadataSources{sync}, - ToClose: colexecop.Closers{sync}, } + s.closers = append(s.closers, sync) } else { // Note that if we have opt == flowinfra.FuseAggressively, then we // must use the serial unordered sync above in order to remove any @@ -986,8 +973,8 @@ func (s *vectorizedFlowCreator) setupInput( opWithMetaInfo = colexecargs.OpWithMetaInfo{ Root: sync, MetadataSources: colexecop.MetadataSources{sync}, - ToClose: colexecop.Closers{sync}, } + s.closers = append(s.closers, sync) s.operatorConcurrency = true // Don't use the unordered synchronizer's inputs for stats collection // given that they run concurrently. The stall time will be collected @@ -1079,14 +1066,8 @@ func (s *vectorizedFlowCreator) setupOutput( s.releasables = append(s.releasables, s.batchFlowCoordinator) } else { // We need to use the row receiving output. - if input != nil { - // We successfully removed the columnarizer. - if buildutil.CrdbTestBuild { - // That columnarizer was added as a closer, so we need to - // decrement the number of expected closers. - s.numClosers-- - } - } else { + if input == nil { + // We couldn't remove the columnarizer. input = colexec.NewMaterializerNoEvalCtxCopy( colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory), flowCtx, @@ -1118,19 +1099,6 @@ func (s *vectorizedFlowCreator) setupOutput( return nil } -// callbackCloser is a utility struct that implements the Closer interface by -// calling the provided callback. -type callbackCloser struct { - closeCb func(context.Context) error -} - -var _ colexecop.Closer = &callbackCloser{} - -// Close implements the Closer interface. -func (c *callbackCloser) Close(ctx context.Context) error { - return c.closeCb(ctx) -} - func (s *vectorizedFlowCreator) setupFlow( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -1210,23 +1178,9 @@ func (s *vectorizedFlowCreator) setupFlow( err = errors.Wrapf(err, "unable to vectorize execution plan") return } + s.closers = append(s.closers, result.ToClose...) if flowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { result.Root = newPanicInjector(result.Root) - } else if buildutil.CrdbTestBuild { - toCloseCopy := append(colexecop.Closers{}, result.ToClose...) - for i := range toCloseCopy { - func(idx int) { - closed := false - result.ToClose[idx] = &callbackCloser{closeCb: func(ctx context.Context) error { - if !closed { - closed = true - atomic.AddInt32(&s.numClosed, 1) - } - return toCloseCopy[idx].Close(ctx) - }} - }(i) - } - s.numClosers += int32(len(result.ToClose)) } if s.recordingStats { diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 96e9a9e87252..5a38a4b4f63b 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -41,7 +41,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -58,16 +57,6 @@ var ( testScenarios = []testScenario{consumerDone, consumerClosed, useBatchReceiver} ) -type callbackCloser struct { - closeCb func(context.Context) error -} - -var _ colexecop.Closer = callbackCloser{} - -func (c callbackCloser) Close(ctx context.Context) error { - return c.closeCb(ctx) -} - // TestVectorizedFlowShutdown tests that closing the FlowCoordinator correctly // closes all the infrastructure corresponding to the flow ending in that // FlowCoordinator. Namely: @@ -245,12 +234,6 @@ func TestVectorizedFlowShutdown(t *testing.T) { inputMetadataSource := colexecop.MetadataSource(synchronizer) flowID := execinfrapb.FlowID{UUID: uuid.MakeV4()} - // idToClosed keeps track of whether Close was called for a given id. - idToClosed := struct { - syncutil.Mutex - mapping map[int]bool - }{} - idToClosed.mapping = make(map[int]bool) runOutboxInbox := func( outboxCtx context.Context, flowCtxCancel context.CancelFunc, @@ -260,20 +243,11 @@ func TestVectorizedFlowShutdown(t *testing.T) { id int, outboxMetadataSources []colexecop.MetadataSource, ) { - idToClosed.Lock() - idToClosed.mapping[id] = false - idToClosed.Unlock() outbox, err := colrpc.NewOutbox( colmem.NewAllocator(outboxCtx, outboxMemAcc, testColumnFactory), colexecargs.OpWithMetaInfo{ Root: outboxInput, MetadataSources: outboxMetadataSources, - ToClose: []colexecop.Closer{callbackCloser{closeCb: func(context.Context) error { - idToClosed.Lock() - idToClosed.mapping[id] = true - idToClosed.Unlock() - return nil - }}}, }, typs, nil, /* getStats */ @@ -365,14 +339,9 @@ func TestVectorizedFlowShutdown(t *testing.T) { input = synchronizer } - closeCalled := false inputInfo := colexecargs.OpWithMetaInfo{ Root: input, MetadataSources: colexecop.MetadataSources{inputMetadataSource}, - ToClose: colexecop.Closers{callbackCloser{closeCb: func(context.Context) error { - closeCalled = true - return nil - }}}, } // runFlowCoordinator creates a pair of a materializer and a @@ -468,11 +437,6 @@ func TestVectorizedFlowShutdown(t *testing.T) { } } wg.Wait() - // Ensure all the outboxes called Close. - for id, closed := range idToClosed.mapping { - require.True(t, closed, "outbox with ID %d did not call Close on closers", id) - } - require.True(t, closeCalled) }) } } diff --git a/pkg/sql/distsql/columnar_utils_test.go b/pkg/sql/distsql/columnar_utils_test.go index e978807f5f5b..4f373ab36036 100644 --- a/pkg/sql/distsql/columnar_utils_test.go +++ b/pkg/sql/distsql/columnar_utils_test.go @@ -161,6 +161,7 @@ func verifyColOperator(t *testing.T, args verifyColOperatorArgs) error { if err != nil { return err } + defer result.TestCleanupNoError(t) outColOp := colexec.NewMaterializer( nil, /* allocator */ diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index e7bbde9e3cb3..73a25cecae48 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -843,12 +843,11 @@ func (p *PlanningCtx) getDefaultSaveFlowsFunc( flowCtx, cleanup := newFlowCtxForExplainPurposes(ctx, p, planner) defer cleanup() getExplain := func(verbose bool) []string { - explain, cleanup, err := colflow.ExplainVec( + explain, err := colflow.ExplainVec( ctx, flowCtx, flows, p.infra.LocalProcessors, opChains, planner.extendedEvalCtx.DistSQLPlanner.gatewaySQLInstanceID, verbose, planner.curPlan.flags.IsDistributed(), ) - cleanup() if err != nil { // In some edge cases (like when subqueries are present or // when certain component doesn't implement execopnode.OpNode diff --git a/pkg/sql/explain_vec.go b/pkg/sql/explain_vec.go index ff576cf17d7d..7528200e5115 100644 --- a/pkg/sql/explain_vec.go +++ b/pkg/sql/explain_vec.go @@ -35,8 +35,6 @@ type explainVecNode struct { lines []string // The current row returned by the node. values tree.Datums - // cleanup will be called after closing the input tree. - cleanup func() } } @@ -77,7 +75,7 @@ func (n *explainVecNode) startExec(params runParams) error { } verbose := n.options.Flags[tree.ExplainFlagVerbose] willDistribute := physPlan.Distribution.WillDistribute() - n.run.lines, n.run.cleanup, err = colflow.ExplainVec( + n.run.lines, err = colflow.ExplainVec( params.ctx, flowCtx, flows, physPlan.LocalProcessors, nil, /* opChains */ distSQLPlanner.gatewaySQLInstanceID, verbose, willDistribute, ) @@ -152,7 +150,4 @@ func (n *explainVecNode) Next(runParams) (bool, error) { func (n *explainVecNode) Values() tree.Datums { return n.run.values } func (n *explainVecNode) Close(ctx context.Context) { n.plan.close(ctx) - if n.run.cleanup != nil { - n.run.cleanup() - } }