From dc5f8a8719413d91a858e8785fdc6afafb080643 Mon Sep 17 00:00:00 2001 From: richardjcai Date: Tue, 1 Feb 2022 16:39:48 -0500 Subject: [PATCH 1/5] sql: fix connExecutor extraTxnState autoRetryCounter race condition Previously, a race condition could happen when calling serialize() which read the ExtraTxnState.autoRetryCounter and the counter being incremented. One case this can happen is when autoRetryCounter is incremented while querying crdb_internal.node_sessions. Release note: None --- pkg/sql/conn_executor.go | 12 +++++++----- pkg/sql/conn_executor_exec.go | 9 +++++---- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 6a6d99ccddd8..2983ae144961 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -863,6 +863,8 @@ func (s *Server) newConnExecutor( ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} + ex.extraTxnState.atomicAutoRetryCounter = new(int32) + ex.initPlanner(ctx, &ex.planner) return ex @@ -1164,10 +1166,10 @@ type connExecutor struct { // transaction and it is cleared after the transaction is committed. schemaChangeJobRecords map[descpb.ID]*jobs.Record - // autoRetryCounter keeps track of the which iteration of a transaction + // atomicAutoRetryCounter keeps track of the which iteration of a transaction // auto-retry we're currently in. It's 0 whenever the transaction state is not // stateOpen. - autoRetryCounter int + atomicAutoRetryCounter *int32 // autoRetryReason records the error causing an auto-retryable error event if // the current transaction is being automatically retried. This is used in @@ -2660,7 +2662,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( advInfo := ex.state.consumeAdvanceInfo() if advInfo.code == rewind { - ex.extraTxnState.autoRetryCounter++ + atomic.AddInt32(ex.extraTxnState.atomicAutoRetryCounter, 1) } // If we had an error from DDL statement execution due to the presence of @@ -2689,7 +2691,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } } case txnStart: - ex.extraTxnState.autoRetryCounter = 0 + atomic.StoreInt32(ex.extraTxnState.atomicAutoRetryCounter, 0) ex.extraTxnState.autoRetryReason = nil ex.recordTransactionStart() // Bump the txn counter for logging. @@ -2859,7 +2861,7 @@ func (ex *connExecutor) serialize() serverpb.Session { Start: ex.state.mu.txnStart, NumStatementsExecuted: int32(ex.state.mu.stmtCount), NumRetries: int32(txn.Epoch()), - NumAutoRetries: int32(ex.extraTxnState.autoRetryCounter), + NumAutoRetries: atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter), TxnDescription: txn.String(), Implicit: ex.implicitTxn(), AllocBytes: ex.state.mon.AllocBytes(), diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 3a923c1f3d6d..fa04e8f034d5 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -16,6 +16,7 @@ import ( "fmt" "runtime/pprof" "strings" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -1004,7 +1005,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( planner.maybeLogStatement( ctx, ex.executorType, - ex.extraTxnState.autoRetryCounter, + int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), ex.extraTxnState.txnCounter, res.RowsAffected(), res.Err(), @@ -1080,7 +1081,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( planner.curPlan.flags.Set(planFlagNotDistributed) } - ex.sessionTracing.TraceRetryInformation(ctx, ex.extraTxnState.autoRetryCounter, ex.extraTxnState.autoRetryReason) + ex.sessionTracing.TraceRetryInformation(ctx, int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), ex.extraTxnState.autoRetryReason) if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.extraTxnState.autoRetryReason != nil { ex.server.cfg.TestingKnobs.OnTxnRetry(ex.extraTxnState.autoRetryReason, planner.EvalContext()) } @@ -1113,7 +1114,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( // plan has not been closed earlier. ex.recordStatementSummary( ctx, planner, - ex.extraTxnState.autoRetryCounter, res.RowsAffected(), res.Err(), stats, + int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), res.RowsAffected(), res.Err(), stats, ) if ex.server.cfg.TestingKnobs.AfterExecute != nil { ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err()) @@ -2014,7 +2015,7 @@ func (ex *connExecutor) recordTransaction( TransactionTimeSec: txnTime.Seconds(), Committed: ev == txnCommit, ImplicitTxn: implicit, - RetryCount: int64(ex.extraTxnState.autoRetryCounter), + RetryCount: int64(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs, ServiceLatency: txnServiceLat, RetryLatency: txnRetryLat, From cc24310113a46510ff5230e8f9074ad2538a0e30 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Wed, 2 Feb 2022 18:06:10 -0500 Subject: [PATCH 2/5] sql: remove type width enforcement during execution Assignment casts are now responsible for ensuring that a value written to a column has a type and width that match the column type. This commit removes the logic that performed this validation before assignment casts existed. Release note: None --- pkg/sql/insert.go | 2 +- pkg/sql/row/row_converter.go | 15 +++------------ pkg/sql/tablewriter_upsert_opt.go | 6 +----- pkg/sql/update.go | 17 ++--------------- pkg/sql/upsert.go | 6 +----- 5 files changed, 8 insertions(+), 38 deletions(-) diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 9e2b1bdf0095..89ff72028c29 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -123,7 +123,7 @@ func (r *insertRun) initRowContainer(params runParams, columns colinfo.ResultCol // processSourceRow processes one row from the source for insertion and, if // result rows are needed, saves it in the result row container. func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) error { - if err := enforceLocalColumnConstraints(rowVals, r.insertCols, false /* isUpdate */); err != nil { + if err := enforceLocalColumnConstraints(rowVals, r.insertCols); err != nil { return err } diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index d5ab46f502e7..50a2d15f4e8f 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -148,18 +148,9 @@ func GenerateInsertRow( // Verify the column constraints. // - // We would really like to use enforceLocalColumnConstraints() here, - // but this is not possible because of some brain damage in the - // Insert() constructor, which causes insertCols to contain - // duplicate columns descriptors: computed columns are listed twice, - // one will receive a NULL value and one will receive a comptued - // value during execution. It "works out in the end" because the - // latter (non-NULL) value overwrites the earlier, but - // enforceLocalColumnConstraints() does not know how to reason about - // this. - // - // In the end it does not matter much, this code is going away in - // favor of the (simpler, correct) code in the CBO. + // During mutations (INSERT, UPDATE, UPSERT), this is checked by + // sql.enforceLocalColumnConstraints. These checks are required for IMPORT + // statements. // Check to see if NULL is being inserted into any non-nullable column. for _, col := range tableDesc.WritableColumns() { diff --git a/pkg/sql/tablewriter_upsert_opt.go b/pkg/sql/tablewriter_upsert_opt.go index c8a5048cd94d..e48eb9fcf0e2 100644 --- a/pkg/sql/tablewriter_upsert_opt.go +++ b/pkg/sql/tablewriter_upsert_opt.go @@ -268,11 +268,7 @@ func (tu *optTableUpserter) updateConflictingRow( // via GenerateInsertRow(). // - for the fetched part, we assume that the data in the table is // correct already. - if err := enforceLocalColumnConstraints( - updateValues, - tu.updateCols, - true, /* isUpdate */ - ); err != nil { + if err := enforceLocalColumnConstraints(updateValues, tu.updateCols); err != nil { return err } diff --git a/pkg/sql/update.go b/pkg/sql/update.go index 44a728e48c09..a7f4bb3b0e68 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -278,11 +278,7 @@ func (u *updateNode) processSourceRow(params runParams, sourceVals tree.Datums) // Verify the schema constraints. For consistency with INSERT/UPSERT // and compatibility with PostgreSQL, we must do this before // processing the CHECK constraints. - if err := enforceLocalColumnConstraints( - u.run.updateValues, - u.run.tu.ru.UpdateCols, - true, /* isUpdate */ - ); err != nil { + if err := enforceLocalColumnConstraints(u.run.updateValues, u.run.tu.ru.UpdateCols); err != nil { return err } @@ -416,20 +412,11 @@ func (ss scalarSlot) checkColumnTypes(row []tree.TypedExpr) error { // enforceLocalColumnConstraints asserts the column constraints that do not // require data validation from other sources than the row data itself. This // currently only includes checking for null values in non-nullable columns. -func enforceLocalColumnConstraints(row tree.Datums, cols []catalog.Column, isUpdate bool) error { +func enforceLocalColumnConstraints(row tree.Datums, cols []catalog.Column) error { for i, col := range cols { if !col.IsNullable() && row[i] == tree.DNull { return sqlerrors.NewNonNullViolationError(col.GetName()) } - if isUpdate { - // TODO(mgartner): Remove this once assignment casts are supported - // for UPSERTs and UPDATEs. - outVal, err := tree.AdjustValueToType(col.GetType(), row[i]) - if err != nil { - return err - } - row[i] = outVal - } } return nil } diff --git a/pkg/sql/upsert.go b/pkg/sql/upsert.go index 5d57ff37bb73..24d76fd8491c 100644 --- a/pkg/sql/upsert.go +++ b/pkg/sql/upsert.go @@ -137,11 +137,7 @@ func (n *upsertNode) BatchedNext(params runParams) (bool, error) { // processSourceRow processes one row from the source for upsertion. // The table writer is in charge of accumulating the result rows. func (n *upsertNode) processSourceRow(params runParams, rowVals tree.Datums) error { - if err := enforceLocalColumnConstraints( - rowVals, - n.run.insertCols, - true, /* isUpdate */ - ); err != nil { + if err := enforceLocalColumnConstraints(rowVals, n.run.insertCols); err != nil { return err } From 36f35973ee9cabec87909017127b8fb8d127e3b7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 7 Feb 2022 09:08:30 -0800 Subject: [PATCH 3/5] colexec: change Close to take in a context Previously, all `Closer`s would use their own context (either captured in `Init` or derived from the one in `Init`) in the implementation of `Close` (for example, when they wanted to log something). However, due to the way the draining of the wrapped row-by-row processors and closing of `Closer`s is structured (the draining happens first), it was possible for the captured context to have a tracing span which was already `Finish`ed. This is so because the row-by-row processors derive separate tracing spans and finish them automatically during draining whereas the closure of `Closer`s happens later. This commit fixes this issue by passing a context as an argument to `Close` function, and most of the implementations now use that. Only components that derive their own tracing span are allowed to use their own context since they control when the span is finished. Release note: None --- pkg/sql/colexec/aggregators_test.go | 2 +- .../colexec/colexecagg/default_agg_tmpl.go | 4 +- .../colexec/colexecagg/hash_default_agg.eg.go | 4 +- .../colexecagg/ordered_default_agg.eg.go | 4 +- pkg/sql/colexec/colexecargs/op_creation.go | 2 +- pkg/sql/colexec/colexecjoin/crossjoiner.go | 5 +- pkg/sql/colexec/colexecjoin/mergejoiner.go | 6 +- pkg/sql/colexec/colexectestutils/utils.go | 2 +- .../colexec/colexecwindow/buffered_window.go | 10 +-- .../colexecwindow/count_rows_aggregator.go | 4 +- .../first_last_nth_value_tmpl.go | 4 +- .../colexec/colexecwindow/first_value.eg.go | 4 +- pkg/sql/colexec/colexecwindow/lag.eg.go | 4 +- .../colexec/colexecwindow/last_value.eg.go | 4 +- pkg/sql/colexec/colexecwindow/lead.eg.go | 4 +- .../colexec/colexecwindow/lead_lag_tmpl.go | 4 +- .../colexecwindow/min_max_removable_agg.eg.go | 88 +++++++++---------- .../min_max_removable_agg_tmpl.go | 4 +- pkg/sql/colexec/colexecwindow/nth_value.eg.go | 4 +- pkg/sql/colexec/colexecwindow/ntile.eg.go | 2 +- pkg/sql/colexec/colexecwindow/ntile_tmpl.go | 2 +- .../colexec/colexecwindow/relative_rank.eg.go | 32 +++---- .../colexecwindow/relative_rank_tmpl.go | 10 +-- .../colexecwindow/window_aggregator.eg.go | 14 +-- .../colexecwindow/window_aggregator_tmpl.go | 14 +-- .../colexecwindow/window_functions_test.go | 2 +- pkg/sql/colexec/columnarizer.go | 4 +- pkg/sql/colexec/columnarizer_test.go | 2 +- pkg/sql/colexec/disk_spiller.go | 6 +- pkg/sql/colexec/external_sort.go | 11 ++- pkg/sql/colexec/external_sort_test.go | 2 +- pkg/sql/colexec/hash_aggregator.go | 6 +- pkg/sql/colexec/hash_based_partitioner.go | 9 +- pkg/sql/colexec/invariants_checker.go | 4 +- pkg/sql/colexec/ordered_aggregator.go | 4 +- pkg/sql/colexec/ordered_synchronizer.eg.go | 8 +- pkg/sql/colexec/ordered_synchronizer_tmpl.go | 8 +- .../parallel_unordered_synchronizer.go | 17 ++-- .../parallel_unordered_synchronizer_test.go | 4 +- .../colexec/serial_unordered_synchronizer.go | 8 +- pkg/sql/colexecop/operator.go | 18 ++-- pkg/sql/colexecop/testutils.go | 6 +- pkg/sql/colfetcher/colbatch_scan.go | 8 +- pkg/sql/colfetcher/index_join.go | 8 +- pkg/sql/colflow/flow_coordinator.go | 6 +- pkg/sql/colflow/vectorized_flow.go | 10 +-- .../colflow/vectorized_flow_shutdown_test.go | 10 +-- 47 files changed, 211 insertions(+), 187 deletions(-) diff --git a/pkg/sql/colexec/aggregators_test.go b/pkg/sql/colexec/aggregators_test.go index 56294607353f..1aba308e1d9e 100644 --- a/pkg/sql/colexec/aggregators_test.go +++ b/pkg/sql/colexec/aggregators_test.go @@ -1152,7 +1152,7 @@ func benchmarkAggregateFunction( break } } - if err = a.(colexecop.Closer).Close(); err != nil { + if err = a.(colexecop.Closer).Close(ctx); err != nil { b.Fatal(err) } source.Reset(ctx) diff --git a/pkg/sql/colexec/colexecagg/default_agg_tmpl.go b/pkg/sql/colexec/colexecagg/default_agg_tmpl.go index 769b63864581..5242a093e9f2 100644 --- a/pkg/sql/colexec/colexecagg/default_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/default_agg_tmpl.go @@ -201,9 +201,9 @@ func (a *default_AGGKINDAggAlloc) newAggFunc() AggregateFunc { return f } -func (a *default_AGGKINDAggAlloc) Close() error { +func (a *default_AGGKINDAggAlloc) Close(ctx context.Context) error { for _, fn := range a.returnedFns { - fn.fn.Close(fn.ctx) + fn.fn.Close(ctx) } a.returnedFns = nil return nil diff --git a/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go index 5a91a13be42d..7dce22f8b257 100644 --- a/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go @@ -172,9 +172,9 @@ func (a *defaultHashAggAlloc) newAggFunc() AggregateFunc { return f } -func (a *defaultHashAggAlloc) Close() error { +func (a *defaultHashAggAlloc) Close(ctx context.Context) error { for _, fn := range a.returnedFns { - fn.fn.Close(fn.ctx) + fn.fn.Close(ctx) } a.returnedFns = nil return nil diff --git a/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go index e3dde9f454b8..92cf0fed9f56 100644 --- a/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go @@ -237,9 +237,9 @@ func (a *defaultOrderedAggAlloc) newAggFunc() AggregateFunc { return f } -func (a *defaultOrderedAggAlloc) Close() error { +func (a *defaultOrderedAggAlloc) Close(ctx context.Context) error { for _, fn := range a.returnedFns { - fn.fn.Close(fn.ctx) + fn.fn.Close(ctx) } a.returnedFns = nil return nil diff --git a/pkg/sql/colexec/colexecargs/op_creation.go b/pkg/sql/colexec/colexecargs/op_creation.go index 0abcda55253d..1949aa8232f4 100644 --- a/pkg/sql/colexec/colexecargs/op_creation.go +++ b/pkg/sql/colexec/colexecargs/op_creation.go @@ -106,7 +106,7 @@ var _ execinfra.Releasable = &NewColOperatorResult{} // TestCleanupNoError releases the resources associated with this result and // asserts that no error is returned. It should only be used in tests. func (r *NewColOperatorResult) TestCleanupNoError(t testing.TB) { - require.NoError(t, r.ToClose.Close()) + require.NoError(t, r.ToClose.Close(context.Background())) } var newColOperatorResultPool = sync.Pool{ diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner.go b/pkg/sql/colexec/colexecjoin/crossjoiner.go index 7029d9560378..6cad4d43bc88 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner.go @@ -99,7 +99,7 @@ func (c *crossJoiner) Next() coldata.Batch { } willEmit := c.willEmit() if willEmit == 0 { - if err := c.Close(); err != nil { + if err := c.Close(c.Ctx); err != nil { colexecerror.InternalError(err) } c.done = true @@ -462,8 +462,7 @@ func (b *crossJoinerBase) Reset(ctx context.Context) { b.builderState.numEmittedTotal = 0 } -func (b *crossJoinerBase) Close() error { - ctx := b.initHelper.EnsureCtx() +func (b *crossJoinerBase) Close(ctx context.Context) error { if b.rightTuples != nil { return b.rightTuples.Close(ctx) } diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner.go b/pkg/sql/colexec/colexecjoin/mergejoiner.go index ceaf285c54f6..4ab842e34698 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner.go @@ -889,20 +889,20 @@ func (o *mergeJoinBase) completeRightBufferedGroup() { o.finishRightBufferedGroup() } -func (o *mergeJoinBase) Close() error { +func (o *mergeJoinBase) Close(ctx context.Context) error { if !o.CloserHelper.Close() { return nil } var lastErr error for _, op := range []colexecop.Operator{o.left.source, o.right.source} { if c, ok := op.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { lastErr = err } } } if h := o.bufferedGroup.helper; h != nil { - if err := h.Close(); err != nil { + if err := h.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/colexectestutils/utils.go b/pkg/sql/colexec/colexectestutils/utils.go index b0db336ca2ec..f34d5ac624cb 100644 --- a/pkg/sql/colexec/colexectestutils/utils.go +++ b/pkg/sql/colexec/colexectestutils/utils.go @@ -440,7 +440,7 @@ func RunTestsWithOrderedCols( // setting, the closing happens at the end of the query execution. func closeIfCloser(t *testing.T, op colexecop.Operator) { if c, ok := op.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(context.Background()); err != nil { t.Fatal(err) } } diff --git a/pkg/sql/colexec/colexecwindow/buffered_window.go b/pkg/sql/colexec/colexecwindow/buffered_window.go index a2cfce874a43..2de7b230911c 100644 --- a/pkg/sql/colexec/colexecwindow/buffered_window.go +++ b/pkg/sql/colexec/colexecwindow/buffered_window.go @@ -113,7 +113,7 @@ const ( // buffer all tuples from each partition. type bufferedWindower interface { Init(ctx context.Context) - Close() + Close(context.Context) // seekNextPartition is called during the windowSeeking state on the current // batch. It gives windowers a chance to perform any necessary pre-processing, @@ -357,7 +357,7 @@ func (b *bufferedWindowOp) Next() coldata.Batch { colexecerror.InternalError( errors.AssertionFailedf("window operator in processing state without buffered rows")) case windowFinished: - if err = b.Close(); err != nil { + if err = b.Close(b.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -369,16 +369,16 @@ func (b *bufferedWindowOp) Next() coldata.Batch { } } -func (b *bufferedWindowOp) Close() error { +func (b *bufferedWindowOp) Close(ctx context.Context) error { if !b.CloserHelper.Close() || b.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } - if err := b.bufferQueue.Close(b.EnsureCtx()); err != nil { + if err := b.bufferQueue.Close(ctx); err != nil { return err } - b.windower.Close() + b.windower.Close(ctx) return nil } diff --git a/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go b/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go index 8ecc40a15137..20af8be13cab 100644 --- a/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go +++ b/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go @@ -76,12 +76,12 @@ func (a *countRowsWindowAggregator) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (a *countRowsWindowAggregator) Close() { +func (a *countRowsWindowAggregator) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) } // processBatch implements the bufferedWindower interface. diff --git a/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go b/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go index 419ead7e8cdc..2fa11d0fba04 100644 --- a/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go @@ -201,9 +201,9 @@ func (b *_OP_NAMEBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (b *_OP_NAMEBase) Close() { +func (b *_OP_NAMEBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/first_value.eg.go b/pkg/sql/colexec/colexecwindow/first_value.eg.go index 7182dc35b648..80bd6de1172b 100644 --- a/pkg/sql/colexec/colexecwindow/first_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/first_value.eg.go @@ -567,9 +567,9 @@ func (b *firstValueBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (b *firstValueBase) Close() { +func (b *firstValueBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/lag.eg.go b/pkg/sql/colexec/colexecwindow/lag.eg.go index 49c474259aa3..97f3122abdff 100644 --- a/pkg/sql/colexec/colexecwindow/lag.eg.go +++ b/pkg/sql/colexec/colexecwindow/lag.eg.go @@ -1595,9 +1595,9 @@ func (b *lagBase) Init(ctx context.Context) { } } -func (b *lagBase) Close() { +func (b *lagBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/last_value.eg.go b/pkg/sql/colexec/colexecwindow/last_value.eg.go index 594c16d99059..424ca01cc9e2 100644 --- a/pkg/sql/colexec/colexecwindow/last_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/last_value.eg.go @@ -567,9 +567,9 @@ func (b *lastValueBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (b *lastValueBase) Close() { +func (b *lastValueBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/lead.eg.go b/pkg/sql/colexec/colexecwindow/lead.eg.go index bf3fbcff723a..f5f904a139df 100644 --- a/pkg/sql/colexec/colexecwindow/lead.eg.go +++ b/pkg/sql/colexec/colexecwindow/lead.eg.go @@ -1595,9 +1595,9 @@ func (b *leadBase) Init(ctx context.Context) { } } -func (b *leadBase) Close() { +func (b *leadBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go b/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go index 3a046f5fc60a..33ea4700a190 100644 --- a/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go @@ -174,11 +174,11 @@ func (b *_OP_NAMEBase) Init(ctx context.Context) { } } -func (b *_OP_NAMEBase) Close() { +func (b *_OP_NAMEBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } // {{/* diff --git a/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go b/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go index 59a9a155e4f1..76306130845a 100644 --- a/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go +++ b/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go @@ -341,10 +341,10 @@ func (a *minBoolAggregator) aggregateOverIntervals(intervals []windowInterval) { } } -func (a *minBoolAggregator) Close() { +func (a *minBoolAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minBoolAggregator{} } @@ -494,10 +494,10 @@ func (a *minBytesAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minBytesAggregator) Close() { +func (a *minBytesAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minBytesAggregator{} } @@ -649,10 +649,10 @@ func (a *minDecimalAggregator) aggregateOverIntervals(intervals []windowInterval } } -func (a *minDecimalAggregator) Close() { +func (a *minDecimalAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minDecimalAggregator{} } @@ -826,10 +826,10 @@ func (a *minInt16Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minInt16Aggregator) Close() { +func (a *minInt16Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minInt16Aggregator{} } @@ -1003,10 +1003,10 @@ func (a *minInt32Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minInt32Aggregator) Close() { +func (a *minInt32Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minInt32Aggregator{} } @@ -1180,10 +1180,10 @@ func (a *minInt64Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minInt64Aggregator) Close() { +func (a *minInt64Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minInt64Aggregator{} } @@ -1373,10 +1373,10 @@ func (a *minFloat64Aggregator) aggregateOverIntervals(intervals []windowInterval } } -func (a *minFloat64Aggregator) Close() { +func (a *minFloat64Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minFloat64Aggregator{} } @@ -1542,10 +1542,10 @@ func (a *minTimestampAggregator) aggregateOverIntervals(intervals []windowInterv } } -func (a *minTimestampAggregator) Close() { +func (a *minTimestampAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minTimestampAggregator{} } @@ -1697,10 +1697,10 @@ func (a *minIntervalAggregator) aggregateOverIntervals(intervals []windowInterva } } -func (a *minIntervalAggregator) Close() { +func (a *minIntervalAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minIntervalAggregator{} } @@ -1895,10 +1895,10 @@ func (a *minJSONAggregator) aggregateOverIntervals(intervals []windowInterval) { } } -func (a *minJSONAggregator) Close() { +func (a *minJSONAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minJSONAggregator{} } @@ -2054,10 +2054,10 @@ func (a *minDatumAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minDatumAggregator) Close() { +func (a *minDatumAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minDatumAggregator{} } @@ -2308,10 +2308,10 @@ func (a *maxBoolAggregator) aggregateOverIntervals(intervals []windowInterval) { } } -func (a *maxBoolAggregator) Close() { +func (a *maxBoolAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxBoolAggregator{} } @@ -2461,10 +2461,10 @@ func (a *maxBytesAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxBytesAggregator) Close() { +func (a *maxBytesAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxBytesAggregator{} } @@ -2616,10 +2616,10 @@ func (a *maxDecimalAggregator) aggregateOverIntervals(intervals []windowInterval } } -func (a *maxDecimalAggregator) Close() { +func (a *maxDecimalAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxDecimalAggregator{} } @@ -2793,10 +2793,10 @@ func (a *maxInt16Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxInt16Aggregator) Close() { +func (a *maxInt16Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxInt16Aggregator{} } @@ -2970,10 +2970,10 @@ func (a *maxInt32Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxInt32Aggregator) Close() { +func (a *maxInt32Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxInt32Aggregator{} } @@ -3147,10 +3147,10 @@ func (a *maxInt64Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxInt64Aggregator) Close() { +func (a *maxInt64Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxInt64Aggregator{} } @@ -3340,10 +3340,10 @@ func (a *maxFloat64Aggregator) aggregateOverIntervals(intervals []windowInterval } } -func (a *maxFloat64Aggregator) Close() { +func (a *maxFloat64Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxFloat64Aggregator{} } @@ -3509,10 +3509,10 @@ func (a *maxTimestampAggregator) aggregateOverIntervals(intervals []windowInterv } } -func (a *maxTimestampAggregator) Close() { +func (a *maxTimestampAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxTimestampAggregator{} } @@ -3664,10 +3664,10 @@ func (a *maxIntervalAggregator) aggregateOverIntervals(intervals []windowInterva } } -func (a *maxIntervalAggregator) Close() { +func (a *maxIntervalAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxIntervalAggregator{} } @@ -3862,10 +3862,10 @@ func (a *maxJSONAggregator) aggregateOverIntervals(intervals []windowInterval) { } } -func (a *maxJSONAggregator) Close() { +func (a *maxJSONAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxJSONAggregator{} } @@ -4021,10 +4021,10 @@ func (a *maxDatumAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxDatumAggregator) Close() { +func (a *maxDatumAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxDatumAggregator{} } diff --git a/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go b/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go index 36dd5e2442d9..73a8159dff6d 100644 --- a/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go @@ -296,10 +296,10 @@ func (a *_AGG_TYPEAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *_AGG_TYPEAggregator) Close() { +func (a *_AGG_TYPEAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = _AGG_TYPEAggregator{} } diff --git a/pkg/sql/colexec/colexecwindow/nth_value.eg.go b/pkg/sql/colexec/colexecwindow/nth_value.eg.go index ed3fccf5d57f..48fa6ca801f1 100644 --- a/pkg/sql/colexec/colexecwindow/nth_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/nth_value.eg.go @@ -767,9 +767,9 @@ func (b *nthValueBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (b *nthValueBase) Close() { +func (b *nthValueBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/ntile.eg.go b/pkg/sql/colexec/colexecwindow/ntile.eg.go index deed9c367fb5..cb46dc0e9a43 100644 --- a/pkg/sql/colexec/colexecwindow/ntile.eg.go +++ b/pkg/sql/colexec/colexecwindow/ntile.eg.go @@ -276,4 +276,4 @@ func (b *nTileBase) startNewPartition() { func (b *nTileBase) Init(ctx context.Context) {} -func (b *nTileBase) Close() {} +func (b *nTileBase) Close(context.Context) {} diff --git a/pkg/sql/colexec/colexecwindow/ntile_tmpl.go b/pkg/sql/colexec/colexecwindow/ntile_tmpl.go index 6bd9078e2adb..5db90f71eac1 100644 --- a/pkg/sql/colexec/colexecwindow/ntile_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/ntile_tmpl.go @@ -256,4 +256,4 @@ func (b *nTileBase) startNewPartition() { func (b *nTileBase) Init(ctx context.Context) {} -func (b *nTileBase) Close() {} +func (b *nTileBase) Close(context.Context) {} diff --git a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go index 9d2e2098be49..8c5abaaca414 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go @@ -310,7 +310,7 @@ func (r *percentRankNoPartitionOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -323,14 +323,14 @@ func (r *percentRankNoPartitionOp) Next() coldata.Batch { } } -func (r *percentRankNoPartitionOp) Close() error { +func (r *percentRankNoPartitionOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } return lastErr @@ -589,7 +589,7 @@ func (r *percentRankWithPartitionOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -602,17 +602,17 @@ func (r *percentRankWithPartitionOp) Next() coldata.Batch { } } -func (r *percentRankWithPartitionOp) Close() error { +func (r *percentRankWithPartitionOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } - if err := r.partitionsState.Close(r.Ctx); err != nil { + if err := r.partitionsState.Close(ctx); err != nil { lastErr = err } return lastErr @@ -856,7 +856,7 @@ func (r *cumeDistNoPartitionOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -869,17 +869,17 @@ func (r *cumeDistNoPartitionOp) Next() coldata.Batch { } } -func (r *cumeDistNoPartitionOp) Close() error { +func (r *cumeDistNoPartitionOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } - if err := r.peerGroupsState.Close(r.Ctx); err != nil { + if err := r.peerGroupsState.Close(ctx); err != nil { lastErr = err } return lastErr @@ -1217,7 +1217,7 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -1230,20 +1230,20 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { } } -func (r *cumeDistWithPartitionOp) Close() error { +func (r *cumeDistWithPartitionOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } - if err := r.partitionsState.Close(r.Ctx); err != nil { + if err := r.partitionsState.Close(ctx); err != nil { lastErr = err } - if err := r.peerGroupsState.Close(r.Ctx); err != nil { + if err := r.peerGroupsState.Close(ctx); err != nil { lastErr = err } return lastErr diff --git a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go index 9f606141704e..ce4439b0b975 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go @@ -576,7 +576,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -589,23 +589,23 @@ func (r *_RELATIVE_RANK_STRINGOp) Next() coldata.Batch { } } -func (r *_RELATIVE_RANK_STRINGOp) Close() error { +func (r *_RELATIVE_RANK_STRINGOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } // {{if .HasPartition}} - if err := r.partitionsState.Close(r.Ctx); err != nil { + if err := r.partitionsState.Close(ctx); err != nil { lastErr = err } // {{end}} // {{if .IsCumeDist}} - if err := r.peerGroupsState.Close(r.Ctx); err != nil { + if err := r.peerGroupsState.Close(ctx); err != nil { lastErr = err } // {{end}} diff --git a/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go b/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go index 60effe81c329..b20cf3d4b0b2 100644 --- a/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go +++ b/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go @@ -174,15 +174,15 @@ func (a *windowAggregatorBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (a *windowAggregatorBase) Close() { +func (a *windowAggregatorBase) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } - if err := a.closers.Close(); err != nil { + if err := a.closers.Close(ctx); err != nil { colexecerror.InternalError(err) } a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) } func (a *windowAggregator) startNewPartition() { @@ -190,8 +190,8 @@ func (a *windowAggregator) startNewPartition() { a.agg.Reset() } -func (a *windowAggregator) Close() { - a.windowAggregatorBase.Close() +func (a *windowAggregator) Close(ctx context.Context) { + a.windowAggregatorBase.Close(ctx) a.agg.Reset() *a = windowAggregator{} } @@ -236,8 +236,8 @@ func (a *slidingWindowAggregator) startNewPartition() { a.agg.Reset() } -func (a *slidingWindowAggregator) Close() { - a.windowAggregatorBase.Close() +func (a *slidingWindowAggregator) Close(ctx context.Context) { + a.windowAggregatorBase.Close(ctx) a.agg.Reset() *a = slidingWindowAggregator{} } diff --git a/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go b/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go index 4c5607fa94e5..6c71db6bfcd0 100644 --- a/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go @@ -179,15 +179,15 @@ func (a *windowAggregatorBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (a *windowAggregatorBase) Close() { +func (a *windowAggregatorBase) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } - if err := a.closers.Close(); err != nil { + if err := a.closers.Close(ctx); err != nil { colexecerror.InternalError(err) } a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) } func (a *windowAggregator) startNewPartition() { @@ -195,8 +195,8 @@ func (a *windowAggregator) startNewPartition() { a.agg.Reset() } -func (a *windowAggregator) Close() { - a.windowAggregatorBase.Close() +func (a *windowAggregator) Close(ctx context.Context) { + a.windowAggregatorBase.Close(ctx) a.agg.Reset() *a = windowAggregator{} } @@ -220,8 +220,8 @@ func (a *slidingWindowAggregator) startNewPartition() { a.agg.Reset() } -func (a *slidingWindowAggregator) Close() { - a.windowAggregatorBase.Close() +func (a *slidingWindowAggregator) Close(ctx context.Context) { + a.windowAggregatorBase.Close(ctx) a.agg.Reset() *a = slidingWindowAggregator{} } diff --git a/pkg/sql/colexec/colexecwindow/window_functions_test.go b/pkg/sql/colexec/colexecwindow/window_functions_test.go index 68905ca5c9a7..cdf637647aa8 100644 --- a/pkg/sql/colexec/colexecwindow/window_functions_test.go +++ b/pkg/sql/colexec/colexecwindow/window_functions_test.go @@ -1083,7 +1083,7 @@ func TestWindowFunctions(t *testing.T) { // Close all closers manually (in production this is done on the // flow cleanup). for _, c := range toClose { - require.NoError(t, c.Close()) + require.NoError(t, c.Close(ctx)) } for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 1faa9a145158..f94f5c742226 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -133,7 +133,7 @@ func newColumnarizer( // Close will call InternalClose(). Note that we don't return // any trailing metadata here because the columnarizers // propagate it in DrainMeta. - if err := c.Close(); buildutil.CrdbTestBuild && err != nil { + if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil { // Close never returns an error. colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close")) } @@ -289,7 +289,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { } // Close is part of the colexecop.ClosableOperator interface. -func (c *Columnarizer) Close() error { +func (c *Columnarizer) Close(context.Context) error { if c.removedFromFlow { return nil } diff --git a/pkg/sql/colexec/columnarizer_test.go b/pkg/sql/colexec/columnarizer_test.go index f38a9400d818..091341f96389 100644 --- a/pkg/sql/colexec/columnarizer_test.go +++ b/pkg/sql/colexec/columnarizer_test.go @@ -109,7 +109,7 @@ func TestColumnarizerDrainsAndClosesInput(t *testing.T) { if tc.consumerClosed { // Closing the Columnarizer should call ConsumerClosed on the processor. - require.NoError(t, c.Close()) + require.NoError(t, c.Close(ctx)) require.Equal(t, execinfra.ConsumerClosed, rb.ConsumerStatus, "unexpected consumer status %d", rb.ConsumerStatus) } else { // Calling DrainMeta from the vectorized execution engine should propagate to diff --git a/pkg/sql/colexec/disk_spiller.go b/pkg/sql/colexec/disk_spiller.go index bb3388c25384..2e3ad36f3dd8 100644 --- a/pkg/sql/colexec/disk_spiller.go +++ b/pkg/sql/colexec/disk_spiller.go @@ -234,16 +234,16 @@ func (d *diskSpillerBase) Reset(ctx context.Context) { } // Close implements the Closer interface. -func (d *diskSpillerBase) Close() error { +func (d *diskSpillerBase) Close(ctx context.Context) error { if !d.CloserHelper.Close() { return nil } var retErr error if c, ok := d.inMemoryOp.(colexecop.Closer); ok { - retErr = c.Close() + retErr = c.Close(ctx) } if c, ok := d.diskBackedOp.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { retErr = err } } diff --git a/pkg/sql/colexec/external_sort.go b/pkg/sql/colexec/external_sort.go index 26c741e4a0b9..606b796f37e0 100644 --- a/pkg/sql/colexec/external_sort.go +++ b/pkg/sql/colexec/external_sort.go @@ -406,7 +406,7 @@ func (s *externalSorter) Next() coldata.Batch { for b := merger.Next(); ; b = merger.Next() { partitionDone := s.enqueue(b) if b.Length() == 0 || partitionDone { - if err := merger.Close(); err != nil { + if err := merger.Close(s.Ctx); err != nil { colexecerror.InternalError(err) } break @@ -469,7 +469,7 @@ func (s *externalSorter) Next() coldata.Batch { return b case externalSorterFinished: - if err := s.Close(); err != nil { + if err := s.Close(s.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -584,7 +584,7 @@ func (s *externalSorter) Reset(ctx context.Context) { r.Reset(ctx) } s.state = externalSorterNewPartition - if err := s.Close(); err != nil { + if err := s.Close(ctx); err != nil { colexecerror.InternalError(err) } // Reset the CloserHelper so that the sorter may be closed again. @@ -597,11 +597,10 @@ func (s *externalSorter) Reset(ctx context.Context) { s.emitted = 0 } -func (s *externalSorter) Close() error { +func (s *externalSorter) Close(ctx context.Context) error { if !s.CloserHelper.Close() { return nil } - ctx := s.EnsureCtx() log.VEvent(ctx, 1, "external sorter is closed") var lastErr error if s.partitioner != nil { @@ -609,7 +608,7 @@ func (s *externalSorter) Close() error { s.partitioner = nil } if c, ok := s.emitter.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index f98c24288922..38ad233f6208 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -298,7 +298,7 @@ func TestExternalSortMemoryAccounting(t *testing.T) { for b := sorter.Next(); b.Length() > 0; b = sorter.Next() { } for _, c := range closers { - require.NoError(t, c.Close()) + require.NoError(t, c.Close(ctx)) } require.True(t, spilled) diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 3b38e1f8e168..6ac8e6c1dc6c 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -469,16 +469,16 @@ func (op *hashAggregator) resetBucketsAndTrackingState(ctx context.Context) { op.curOutputBucketIdx = 0 } -func (op *hashAggregator) Close() error { +func (op *hashAggregator) Close(ctx context.Context) error { if !op.CloserHelper.Close() { return nil } op.accountingHelper.Release() var retErr error if op.inputTrackingState.tuples != nil { - retErr = op.inputTrackingState.tuples.Close(op.EnsureCtx()) + retErr = op.inputTrackingState.tuples.Close(ctx) } - if err := op.toClose.Close(); err != nil { + if err := op.toClose.Close(ctx); err != nil { retErr = err } return retErr diff --git a/pkg/sql/colexec/hash_based_partitioner.go b/pkg/sql/colexec/hash_based_partitioner.go index d3396cb30062..adfbca768d03 100644 --- a/pkg/sql/colexec/hash_based_partitioner.go +++ b/pkg/sql/colexec/hash_based_partitioner.go @@ -618,7 +618,7 @@ StateChanged: return b case hbpFinished: - if err := op.Close(); err != nil { + if err := op.Close(op.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -629,11 +629,10 @@ StateChanged: } } -func (op *hashBasedPartitioner) Close() error { +func (op *hashBasedPartitioner) Close(ctx context.Context) error { if !op.CloserHelper.Close() { return nil } - ctx := op.EnsureCtx() log.VEventf(ctx, 1, "%s is closed", op.name) var retErr error for i := range op.inputs { @@ -644,7 +643,7 @@ func (op *hashBasedPartitioner) Close() error { // The in-memory main operator might be a Closer (e.g. the in-memory hash // aggregator), and we need to close it if so. if c, ok := op.inMemMainOp.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { retErr = err } } @@ -652,7 +651,7 @@ func (op *hashBasedPartitioner) Close() error { // it will still be closed appropriately because we accumulate all closers // in NewColOperatorResult. if c, ok := op.diskBackedFallbackOp.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { retErr = err } } diff --git a/pkg/sql/colexec/invariants_checker.go b/pkg/sql/colexec/invariants_checker.go index 230e13c89d14..9a88fbadcb84 100644 --- a/pkg/sql/colexec/invariants_checker.go +++ b/pkg/sql/colexec/invariants_checker.go @@ -127,10 +127,10 @@ func (i *invariantsChecker) DrainMeta() []execinfrapb.ProducerMetadata { } // Close is part of the colexecop.ClosableOperator interface. -func (i *invariantsChecker) Close() error { +func (i *invariantsChecker) Close(ctx context.Context) error { c, ok := i.Input.(colexecop.Closer) if !ok { return nil } - return c.Close() + return c.Close(ctx) } diff --git a/pkg/sql/colexec/ordered_aggregator.go b/pkg/sql/colexec/ordered_aggregator.go index d5e59fc8b401..c99d9a986060 100644 --- a/pkg/sql/colexec/ordered_aggregator.go +++ b/pkg/sql/colexec/ordered_aggregator.go @@ -399,6 +399,6 @@ func (a *orderedAggregator) Reset(ctx context.Context) { } } -func (a *orderedAggregator) Close() error { - return a.toClose.Close() +func (a *orderedAggregator) Close(ctx context.Context) error { + return a.toClose.Close(ctx) } diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index 1b363e78f4d2..3a8f0f0ab65b 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -299,11 +299,15 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata { return bufferedMeta } -func (o *OrderedSynchronizer) Close() error { +func (o *OrderedSynchronizer) Close(context.Context) error { + // Note that we're using the context of the synchronizer rather than the + // argument of Close() because the synchronizer derives its own tracing + // span. + ctx := o.EnsureCtx() o.accountingHelper.Release() var lastErr error for _, input := range o.inputs { - if err := input.ToClose.Close(); err != nil { + if err := input.ToClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index 2b0c9b25120b..e1baf9bb4e3c 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -243,11 +243,15 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata { return bufferedMeta } -func (o *OrderedSynchronizer) Close() error { +func (o *OrderedSynchronizer) Close(context.Context) error { + // Note that we're using the context of the synchronizer rather than the + // argument of Close() because the synchronizer derives its own tracing + // span. + ctx := o.EnsureCtx() o.accountingHelper.Release() var lastErr error for _, input := range o.inputs { - if err := input.ToClose.Close(); err != nil { + if err := input.ToClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index 59c375b10754..1ca07dd46f0a 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -63,7 +63,8 @@ const ( type ParallelUnorderedSynchronizer struct { colexecop.InitHelper - inputs []colexecargs.OpWithMetaInfo + inputs []colexecargs.OpWithMetaInfo + inputCtxs []context.Context // cancelLocalInput stores context cancellation functions for each of the // inputs. The functions are populated only if LocalPlan is true. cancelLocalInput []context.CancelFunc @@ -139,6 +140,7 @@ func NewParallelUnorderedSynchronizer( } return &ParallelUnorderedSynchronizer{ inputs: inputs, + inputCtxs: make([]context.Context, len(inputs)), cancelLocalInput: make([]context.CancelFunc, len(inputs)), tracingSpans: make([]*tracing.Span, len(inputs)), readNextBatch: readNextBatch, @@ -165,8 +167,7 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) { return } for i, input := range s.inputs { - var inputCtx context.Context - inputCtx, s.tracingSpans[i] = execinfra.ProcessorSpan(s.Ctx, fmt.Sprintf("parallel unordered sync input %d", i)) + s.inputCtxs[i], s.tracingSpans[i] = execinfra.ProcessorSpan(s.Ctx, fmt.Sprintf("parallel unordered sync input %d", i)) if s.LocalPlan { // If there plan is local, there are no colrpc.Inboxes in this input // tree, and the synchronizer can cancel the current work eagerly @@ -177,9 +178,9 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) { // because canceling the context would break the gRPC stream and // make it impossible to fetch the remote metadata. Furthermore, it // will result in the remote flow cancellation. - inputCtx, s.cancelLocalInput[i] = context.WithCancel(inputCtx) + s.inputCtxs[i], s.cancelLocalInput[i] = context.WithCancel(s.inputCtxs[i]) } - input.Root.Init(inputCtx) + input.Root.Init(s.inputCtxs[i]) s.nextBatch[i] = func(inputOp colexecop.Operator, inputIdx int) func() { return func() { s.batches[inputIdx] = inputOp.Next() @@ -222,7 +223,7 @@ func (s *ParallelUnorderedSynchronizer) init() { } // We need to close all of the closers of this input before we // notify the wait groups. - input.ToClose.CloseAndLogOnErr(s.Ctx, "parallel unordered synchronizer input") + input.ToClose.CloseAndLogOnErr(s.inputCtxs[inputIdx], "parallel unordered synchronizer input") s.internalWaitGroup.Done() s.externalWaitGroup.Done() }() @@ -460,7 +461,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada } // Close is part of the colexecop.ClosableOperator interface. -func (s *ParallelUnorderedSynchronizer) Close() error { +func (s *ParallelUnorderedSynchronizer) Close(ctx context.Context) error { if state := s.getState(); state != parallelUnorderedSynchronizerStateUninitialized { // Input goroutines have been started and will take care of closing the // closers from the corresponding input trees, so we don't need to do @@ -483,7 +484,7 @@ func (s *ParallelUnorderedSynchronizer) Close() error { } var lastErr error for _, input := range s.inputs { - if err := input.ToClose.Close(); err != nil { + if err := input.ToClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go index 88dc93d21715..7a54de5b930a 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go @@ -248,7 +248,7 @@ func TestParallelUnorderedSyncClosesInputs(t *testing.T) { // closure occurred as expected. closed := false firstInput := &colexecop.CallbackOperator{ - CloseCb: func() error { + CloseCb: func(context.Context) error { closed = true return nil }, @@ -273,7 +273,7 @@ func TestParallelUnorderedSyncClosesInputs(t *testing.T) { // In the production setting, the user of the synchronizer is still expected // to close it, even if a panic is encountered in Init, so we do the same // thing here and verify that the first input is properly closed. - require.NoError(t, s.Close()) + require.NoError(t, s.Close(ctx)) require.True(t, closed) } diff --git a/pkg/sql/colexec/serial_unordered_synchronizer.go b/pkg/sql/colexec/serial_unordered_synchronizer.go index ad47f09f196c..02288277f4c6 100644 --- a/pkg/sql/colexec/serial_unordered_synchronizer.go +++ b/pkg/sql/colexec/serial_unordered_synchronizer.go @@ -107,10 +107,14 @@ func (s *SerialUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata } // Close is part of the colexecop.ClosableOperator interface. -func (s *SerialUnorderedSynchronizer) Close() error { +func (s *SerialUnorderedSynchronizer) Close(context.Context) error { + // Note that we're using the context of the synchronizer rather than the + // argument of Close() because the synchronizer derives its own tracing + // span. + ctx := s.EnsureCtx() var lastErr error for _, input := range s.inputs { - if err := input.ToClose.Close(); err != nil { + if err := input.ToClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexecop/operator.go b/pkg/sql/colexecop/operator.go index dcad468d7051..86e75d750093 100644 --- a/pkg/sql/colexecop/operator.go +++ b/pkg/sql/colexecop/operator.go @@ -145,11 +145,17 @@ type Closer interface { // is an Operator, the implementation of Close must be safe to execute even // if Operator.Init wasn't called. // + // Unless the Closer derives its own context with a separate tracing span, + // the argument context rather than the one from Init() must be used + // (wherever necessary) by the implementation. This is so since the span in + // the context from Init() might be already finished when Close() is called + // whereas the argument context will contain an unfinished span. + // // If this Closer is an execinfra.Releasable, the implementation must be // safe to execute even after Release() was called. // TODO(yuzefovich): refactor this because the Release()'d objects should // not be used anymore. - Close() error + Close(context.Context) error } // Closers is a slice of Closers. @@ -162,7 +168,7 @@ type Closers []Closer func (c Closers) CloseAndLogOnErr(ctx context.Context, prefix string) { if err := colexecerror.CatchVectorizedRuntimeError(func() { for _, closer := range c { - if err := closer.Close(); err != nil && log.V(1) { + if err := closer.Close(ctx); err != nil && log.V(1) { log.Infof(ctx, "%s: error closing Closer: %v", prefix, err) } } @@ -172,10 +178,10 @@ func (c Closers) CloseAndLogOnErr(ctx context.Context, prefix string) { } // Close closes all Closers and returns the last error (if any occurs). -func (c Closers) Close() error { +func (c Closers) Close(ctx context.Context) error { var lastErr error for _, closer := range c { - if err := closer.Close(); err != nil { + if err := closer.Close(ctx); err != nil { lastErr = err } } @@ -331,12 +337,12 @@ type OneInputCloserHelper struct { var _ Closer = &OneInputCloserHelper{} // Close implements the Closer interface. -func (c *OneInputCloserHelper) Close() error { +func (c *OneInputCloserHelper) Close(ctx context.Context) error { if !c.CloserHelper.Close() { return nil } if closer, ok := c.Input.(Closer); ok { - return closer.Close() + return closer.Close(ctx) } return nil } diff --git a/pkg/sql/colexecop/testutils.go b/pkg/sql/colexecop/testutils.go index 8dc2823d28f0..886f3b8df646 100644 --- a/pkg/sql/colexecop/testutils.go +++ b/pkg/sql/colexecop/testutils.go @@ -141,7 +141,7 @@ type CallbackOperator struct { ZeroInputNode InitCb func(context.Context) NextCb func() coldata.Batch - CloseCb func() error + CloseCb func(ctx context.Context) error } var _ ClosableOperator = &CallbackOperator{} @@ -163,11 +163,11 @@ func (o *CallbackOperator) Next() coldata.Batch { } // Close is part of the ClosableOperator interface. -func (o *CallbackOperator) Close() error { +func (o *CallbackOperator) Close(ctx context.Context) error { if o.CloseCb == nil { return nil } - return o.CloseCb() + return o.CloseCb(ctx) } // TestingSemaphore is a semaphore.Semaphore that never blocks and is always diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 557eb97b616f..b9f8cf54e6ca 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -281,8 +281,12 @@ func (s *ColBatchScan) Release() { } // Close implements the colexecop.Closer interface. -func (s *ColBatchScan) Close() error { - s.cf.Close(s.EnsureCtx()) +func (s *ColBatchScan) Close(context.Context) error { + // Note that we're using the context of the ColBatchScan rather than the + // argument of Close() because the ColBatchScan derives its own tracing + // span. + ctx := s.EnsureCtx() + s.cf.Close(ctx) if s.tracingSpan != nil { s.tracingSpan.Finish() s.tracingSpan = nil diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 146b3bc9a568..a523bc0f8930 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -589,7 +589,7 @@ func (s *ColIndexJoin) Release() { } // Close implements the colexecop.Closer interface. -func (s *ColIndexJoin) Close() error { +func (s *ColIndexJoin) Close(context.Context) error { s.closeInternal() if s.tracingSpan != nil { s.tracingSpan.Finish() @@ -601,7 +601,11 @@ func (s *ColIndexJoin) Close() error { // closeInternal is a subset of Close() which doesn't finish the operator's // span. func (s *ColIndexJoin) closeInternal() { - s.cf.Close(s.EnsureCtx()) + // Note that we're using the context of the ColIndexJoin rather than the + // argument of Close() because the ColIndexJoin derives its own tracing + // span. + ctx := s.EnsureCtx() + s.cf.Close(ctx) if s.spanAssembler != nil { // spanAssembler can be nil if Release() has already been called. s.spanAssembler.Close() diff --git a/pkg/sql/colflow/flow_coordinator.go b/pkg/sql/colflow/flow_coordinator.go index 37dde04bc17e..894ad4589886 100644 --- a/pkg/sql/colflow/flow_coordinator.go +++ b/pkg/sql/colflow/flow_coordinator.go @@ -266,7 +266,7 @@ func (f *BatchFlowCoordinator) Run(ctx context.Context) { // Make sure that we close the coordinator and notify the batch receiver in // all cases. defer func() { - if err := f.close(); err != nil && status != execinfra.ConsumerClosed { + if err := f.close(ctx); err != nil && status != execinfra.ConsumerClosed { f.pushError(err) } f.output.ProducerDone() @@ -332,11 +332,11 @@ func (f *BatchFlowCoordinator) Run(ctx context.Context) { // close cancels the flow and closes all colexecop.Closers the coordinator is // responsible for. -func (f *BatchFlowCoordinator) close() error { +func (f *BatchFlowCoordinator) close(ctx context.Context) error { f.cancelFlow() var lastErr error for _, toClose := range f.input.ToClose { - if err := toClose.Close(); err != nil { + if err := toClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index f67d0bc82507..98eef7bc7844 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -1037,14 +1037,14 @@ func (s *vectorizedFlowCreator) setupOutput( // callbackCloser is a utility struct that implements the Closer interface by // calling the provided callback. type callbackCloser struct { - closeCb func() error + closeCb func(context.Context) error } var _ colexecop.Closer = &callbackCloser{} // Close implements the Closer interface. -func (c *callbackCloser) Close() error { - return c.closeCb() +func (c *callbackCloser) Close(ctx context.Context) error { + return c.closeCb(ctx) } func (s *vectorizedFlowCreator) setupFlow( @@ -1131,12 +1131,12 @@ func (s *vectorizedFlowCreator) setupFlow( for i := range toCloseCopy { func(idx int) { closed := false - result.ToClose[idx] = &callbackCloser{closeCb: func() error { + result.ToClose[idx] = &callbackCloser{closeCb: func(ctx context.Context) error { if !closed { closed = true atomic.AddInt32(&s.numClosed, 1) } - return toCloseCopy[idx].Close() + return toCloseCopy[idx].Close(ctx) }} }(i) } diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index a01f66ec9075..0536f038f1a4 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -59,13 +59,13 @@ var ( ) type callbackCloser struct { - closeCb func() error + closeCb func(context.Context) error } var _ colexecop.Closer = callbackCloser{} -func (c callbackCloser) Close() error { - return c.closeCb() +func (c callbackCloser) Close(ctx context.Context) error { + return c.closeCb(ctx) } // TestVectorizedFlowShutdown tests that closing the FlowCoordinator correctly @@ -257,7 +257,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { colexecargs.OpWithMetaInfo{ Root: outboxInput, MetadataSources: outboxMetadataSources, - ToClose: []colexecop.Closer{callbackCloser{closeCb: func() error { + ToClose: []colexecop.Closer{callbackCloser{closeCb: func(context.Context) error { idToClosed.Lock() idToClosed.mapping[id] = true idToClosed.Unlock() @@ -358,7 +358,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { inputInfo := colexecargs.OpWithMetaInfo{ Root: input, MetadataSources: colexecop.MetadataSources{inputMetadataSource}, - ToClose: colexecop.Closers{callbackCloser{closeCb: func() error { + ToClose: colexecop.Closers{callbackCloser{closeCb: func(context.Context) error { closeCalled = true return nil }}}, From ff8e7cf997122bfac4c78bab9eb33009e90a2dda Mon Sep 17 00:00:00 2001 From: richardjcai Date: Mon, 7 Feb 2022 16:05:07 -0500 Subject: [PATCH 4/5] sql: fix nil pointer error in RunPostDeserializationChanges In some restore paths, the privilegeDescriptor would be nil in RunPostDeserializationChanges, avoid doing work on a nil pointer. Release note: None --- pkg/sql/catalog/dbdesc/database_desc_builder.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/sql/catalog/dbdesc/database_desc_builder.go b/pkg/sql/catalog/dbdesc/database_desc_builder.go index eefcf8f9c6b9..86d0230d9bcc 100644 --- a/pkg/sql/catalog/dbdesc/database_desc_builder.go +++ b/pkg/sql/catalog/dbdesc/database_desc_builder.go @@ -96,6 +96,12 @@ func (ddb *databaseDescriptorBuilder) RunRestoreChanges( func maybeConvertIncompatibleDBPrivilegesToDefaultPrivileges( privileges *descpb.PrivilegeDescriptor, defaultPrivileges *descpb.DefaultPrivilegeDescriptor, ) (hasChanged bool) { + // If privileges are nil, there is nothing to convert. + // This case can happen during restore where privileges are not yet created. + if privileges == nil { + return false + } + var pgIncompatibleDBPrivileges = privilege.List{ privilege.SELECT, privilege.INSERT, privilege.UPDATE, privilege.DELETE, } From 5211799967ae6b4bc5a047f1cf560781610f43f9 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Mon, 7 Feb 2022 15:37:10 -0600 Subject: [PATCH 5/5] dev: fix up logic computing line which should be added to `~/.bazelrc` Have `setUpCache()` return the specific line it wants to see in `~/.bazelrc`, and have `doctor` check for the presence of that specific line. Also explicitly specify `http://` and `127.0.0.1` instead of `localhost`. Closes #76170. Release note: None --- dev | 2 +- pkg/cmd/dev/cache.go | 79 +++++++++++++++++++++++++++---------------- pkg/cmd/dev/doctor.go | 7 ++-- 3 files changed, 55 insertions(+), 33 deletions(-) diff --git a/dev b/dev index cd9048b13804..49fd459e6306 100755 --- a/dev +++ b/dev @@ -3,7 +3,7 @@ set -euo pipefail # Bump this counter to force rebuilding `dev` on all machines. -DEV_VERSION=10 +DEV_VERSION=11 THIS_DIR=$(cd "$(dirname "$0")" && pwd) BINARY_DIR=$THIS_DIR/bin/dev-versions diff --git a/pkg/cmd/dev/cache.go b/pkg/cmd/dev/cache.go index a66c0f94aee4..d585c0dc819c 100644 --- a/pkg/cmd/dev/cache.go +++ b/pkg/cmd/dev/cache.go @@ -68,12 +68,20 @@ func (d *dev) cache(cmd *cobra.Command, _ []string) error { if err != nil { log.Printf("%v\n", err) } - return d.setUpCache(ctx) + bazelRcLine, err := d.setUpCache(ctx) + if bazelRcLine != "" { + fmt.Printf("Please add `%s` to your ~/.bazelrc\n", bazelRcLine) + } + return err } if down { return d.tearDownCache(ctx) } - return d.setUpCache(ctx) + bazelRcLine, err := d.setUpCache(ctx) + if bazelRcLine != "" { + fmt.Printf("Please add `%s` to your ~/.bazelrc\n", bazelRcLine) + } + return err } func bazelRemoteCacheDir() (string, error) { @@ -118,25 +126,27 @@ func (d *dev) cacheIsUp(ctx context.Context) bool { return err == nil } -func (d *dev) setUpCache(ctx context.Context) error { +// setUpCache returns a non-nil error iff setting up the cache failed, and a +// string which is a line that should be added to ~/.bazelrc. +func (d *dev) setUpCache(ctx context.Context) (string, error) { if d.cacheIsUp(ctx) { - return nil + return d.getCacheBazelrcLine(ctx) } log.Printf("Configuring cache...\n") if _, err := d.exec.CommandContextSilent(ctx, "bazel", "build", bazelRemoteTarget); err != nil { - return err + return "", err } bazelBin, err := d.getBazelBin(ctx) if err != nil { - return err + return "", err } // write config file unless already exists cacheDir, err := bazelRemoteCacheDir() if err != nil { - return err + return "", err } configFile := filepath.Join(cacheDir, configFilename) _, err = os.Stat(configFile) @@ -144,7 +154,7 @@ func (d *dev) setUpCache(ctx context.Context) error { if os.IsNotExist(err) { err := d.os.MkdirAll(filepath.Join(cacheDir, "cache")) if err != nil { - return err + return "", err } err = d.os.WriteFile(configFile, fmt.Sprintf(`# File generated by dev. You can edit this file in-place. # See https://github.com/buchgr/bazel-remote for additional information. @@ -155,10 +165,10 @@ host: localhost port: 9867 `, filepath.Join(cacheDir, "cache"))) if err != nil { - return err + return "", err } } else { - return err + return "", err } } log.Printf("Using cache configuration file at %s\n", configFile) @@ -168,14 +178,14 @@ port: 9867 // is mostly copied from `bazci`. output, err := d.exec.CommandContextSilent(ctx, "bazel", "cquery", bazelRemoteTarget, "--output=label_kind") if err != nil { - return err + return "", err } configHash := strings.Fields(string(output))[3] configHash = strings.TrimPrefix(configHash, "(") configHash = strings.TrimSuffix(configHash, ")") output, err = d.exec.CommandContextSilent(ctx, "bazel", "config", configHash) if err != nil { - return err + return "", err } var binDirForBazelRemote string for _, line := range strings.Split(string(output), "\n") { @@ -187,44 +197,35 @@ port: 9867 } } if binDirForBazelRemote == "" { - return fmt.Errorf("could not find bazel-remote binary; this is a bug") + return "", fmt.Errorf("could not find bazel-remote binary; this is a bug") } bazelRemoteBinary := filepath.Join(binDirForBazelRemote, bazelutil.OutputOfBinaryRule(bazelRemoteTarget, false)) cmd := exec.Command(bazelRemoteBinary, "--config_file", configFile) stdout, err := os.Create(filepath.Join(cacheDir, "stdout.log")) if err != nil { - return err + return "", err } cmd.Stdout = stdout stderr, err := os.Create(filepath.Join(cacheDir, "stderr.log")) if err != nil { - return err + return "", err } cmd.Stderr = stderr err = cmd.Start() if err != nil { - return err + return "", err } pid := cmd.Process.Pid err = cmd.Process.Release() if err != nil { - return err + return "", err } - // We "should" be using a YAML parser for this, but who's going to stop me? - configFileContents, err := d.os.ReadFile(configFile) + err = d.os.WriteFile(filepath.Join(cacheDir, cachePidFilename), strconv.Itoa(pid)) if err != nil { - return err - } - for _, line := range strings.Split(configFileContents, "\n") { - if strings.HasPrefix(line, "port:") { - port := strings.TrimSpace(strings.Split(line, ":")[1]) - fmt.Printf("Add the string `--remote_cache=localhost:%s` to your ~/.bazelrc\n", port) - break - } + return "", err } - - return d.os.WriteFile(filepath.Join(cacheDir, cachePidFilename), strconv.Itoa(pid)) + return d.getCacheBazelrcLine(ctx) } func (d *dev) tearDownCache(ctx context.Context) error { @@ -257,3 +258,23 @@ func (d *dev) cleanCache(ctx context.Context) error { } return os.RemoveAll(dir) } + +func (d *dev) getCacheBazelrcLine(ctx context.Context) (string, error) { + cacheDir, err := bazelRemoteCacheDir() + if err != nil { + return "", err + } + configFile := filepath.Join(cacheDir, configFilename) + // We "should" be using a YAML parser for this, but who's going to stop me? + configFileContents, err := d.os.ReadFile(configFile) + if err != nil { + return "", err + } + for _, line := range strings.Split(configFileContents, "\n") { + if strings.HasPrefix(line, "port:") { + port := strings.TrimSpace(strings.Split(line, ":")[1]) + return fmt.Sprintf("build --remote_cache=http://127.0.0.1:%s", port), nil + } + } + return "", fmt.Errorf("could not determine what to add to ~/.bazelrc to enable cache") +} diff --git a/pkg/cmd/dev/doctor.go b/pkg/cmd/dev/doctor.go index 28ce5292378b..1e99d379f742 100644 --- a/pkg/cmd/dev/doctor.go +++ b/pkg/cmd/dev/doctor.go @@ -214,7 +214,7 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace) if !noCache { d.log.Println("doctor: setting up cache") - err = d.setUpCache(ctx) + bazelRcLine, err := d.setUpCache(ctx) if err != nil { return err } @@ -223,8 +223,9 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace) return err } bazelRcContents, err := d.os.ReadFile(filepath.Join(homeDir, ".bazelrc")) - if err != nil || !strings.Contains(bazelRcContents, "--remote_cache=") { - log.Printf("Did you remember to add the --remote_cache=... line to your ~/.bazelrc?") + if err != nil || !strings.Contains(bazelRcContents, bazelRcLine) { + log.Printf("Please add the string `%s` to your ~/.bazelrc:\n", bazelRcLine) + log.Printf(" echo \"%s\" >> ~/.bazelrc", bazelRcLine) success = false } }