diff --git a/dev b/dev index cd9048b13804..49fd459e6306 100755 --- a/dev +++ b/dev @@ -3,7 +3,7 @@ set -euo pipefail # Bump this counter to force rebuilding `dev` on all machines. -DEV_VERSION=10 +DEV_VERSION=11 THIS_DIR=$(cd "$(dirname "$0")" && pwd) BINARY_DIR=$THIS_DIR/bin/dev-versions diff --git a/pkg/cmd/dev/cache.go b/pkg/cmd/dev/cache.go index a66c0f94aee4..d585c0dc819c 100644 --- a/pkg/cmd/dev/cache.go +++ b/pkg/cmd/dev/cache.go @@ -68,12 +68,20 @@ func (d *dev) cache(cmd *cobra.Command, _ []string) error { if err != nil { log.Printf("%v\n", err) } - return d.setUpCache(ctx) + bazelRcLine, err := d.setUpCache(ctx) + if bazelRcLine != "" { + fmt.Printf("Please add `%s` to your ~/.bazelrc\n", bazelRcLine) + } + return err } if down { return d.tearDownCache(ctx) } - return d.setUpCache(ctx) + bazelRcLine, err := d.setUpCache(ctx) + if bazelRcLine != "" { + fmt.Printf("Please add `%s` to your ~/.bazelrc\n", bazelRcLine) + } + return err } func bazelRemoteCacheDir() (string, error) { @@ -118,25 +126,27 @@ func (d *dev) cacheIsUp(ctx context.Context) bool { return err == nil } -func (d *dev) setUpCache(ctx context.Context) error { +// setUpCache returns a non-nil error iff setting up the cache failed, and a +// string which is a line that should be added to ~/.bazelrc. +func (d *dev) setUpCache(ctx context.Context) (string, error) { if d.cacheIsUp(ctx) { - return nil + return d.getCacheBazelrcLine(ctx) } log.Printf("Configuring cache...\n") if _, err := d.exec.CommandContextSilent(ctx, "bazel", "build", bazelRemoteTarget); err != nil { - return err + return "", err } bazelBin, err := d.getBazelBin(ctx) if err != nil { - return err + return "", err } // write config file unless already exists cacheDir, err := bazelRemoteCacheDir() if err != nil { - return err + return "", err } configFile := filepath.Join(cacheDir, configFilename) _, err = os.Stat(configFile) @@ -144,7 +154,7 @@ func (d *dev) setUpCache(ctx context.Context) error { if os.IsNotExist(err) { err := d.os.MkdirAll(filepath.Join(cacheDir, "cache")) if err != nil { - return err + return "", err } err = d.os.WriteFile(configFile, fmt.Sprintf(`# File generated by dev. You can edit this file in-place. # See https://github.com/buchgr/bazel-remote for additional information. @@ -155,10 +165,10 @@ host: localhost port: 9867 `, filepath.Join(cacheDir, "cache"))) if err != nil { - return err + return "", err } } else { - return err + return "", err } } log.Printf("Using cache configuration file at %s\n", configFile) @@ -168,14 +178,14 @@ port: 9867 // is mostly copied from `bazci`. output, err := d.exec.CommandContextSilent(ctx, "bazel", "cquery", bazelRemoteTarget, "--output=label_kind") if err != nil { - return err + return "", err } configHash := strings.Fields(string(output))[3] configHash = strings.TrimPrefix(configHash, "(") configHash = strings.TrimSuffix(configHash, ")") output, err = d.exec.CommandContextSilent(ctx, "bazel", "config", configHash) if err != nil { - return err + return "", err } var binDirForBazelRemote string for _, line := range strings.Split(string(output), "\n") { @@ -187,44 +197,35 @@ port: 9867 } } if binDirForBazelRemote == "" { - return fmt.Errorf("could not find bazel-remote binary; this is a bug") + return "", fmt.Errorf("could not find bazel-remote binary; this is a bug") } bazelRemoteBinary := filepath.Join(binDirForBazelRemote, bazelutil.OutputOfBinaryRule(bazelRemoteTarget, false)) cmd := exec.Command(bazelRemoteBinary, "--config_file", configFile) stdout, err := os.Create(filepath.Join(cacheDir, "stdout.log")) if err != nil { - return err + return "", err } cmd.Stdout = stdout stderr, err := os.Create(filepath.Join(cacheDir, "stderr.log")) if err != nil { - return err + return "", err } cmd.Stderr = stderr err = cmd.Start() if err != nil { - return err + return "", err } pid := cmd.Process.Pid err = cmd.Process.Release() if err != nil { - return err + return "", err } - // We "should" be using a YAML parser for this, but who's going to stop me? - configFileContents, err := d.os.ReadFile(configFile) + err = d.os.WriteFile(filepath.Join(cacheDir, cachePidFilename), strconv.Itoa(pid)) if err != nil { - return err - } - for _, line := range strings.Split(configFileContents, "\n") { - if strings.HasPrefix(line, "port:") { - port := strings.TrimSpace(strings.Split(line, ":")[1]) - fmt.Printf("Add the string `--remote_cache=localhost:%s` to your ~/.bazelrc\n", port) - break - } + return "", err } - - return d.os.WriteFile(filepath.Join(cacheDir, cachePidFilename), strconv.Itoa(pid)) + return d.getCacheBazelrcLine(ctx) } func (d *dev) tearDownCache(ctx context.Context) error { @@ -257,3 +258,23 @@ func (d *dev) cleanCache(ctx context.Context) error { } return os.RemoveAll(dir) } + +func (d *dev) getCacheBazelrcLine(ctx context.Context) (string, error) { + cacheDir, err := bazelRemoteCacheDir() + if err != nil { + return "", err + } + configFile := filepath.Join(cacheDir, configFilename) + // We "should" be using a YAML parser for this, but who's going to stop me? + configFileContents, err := d.os.ReadFile(configFile) + if err != nil { + return "", err + } + for _, line := range strings.Split(configFileContents, "\n") { + if strings.HasPrefix(line, "port:") { + port := strings.TrimSpace(strings.Split(line, ":")[1]) + return fmt.Sprintf("build --remote_cache=http://127.0.0.1:%s", port), nil + } + } + return "", fmt.Errorf("could not determine what to add to ~/.bazelrc to enable cache") +} diff --git a/pkg/cmd/dev/doctor.go b/pkg/cmd/dev/doctor.go index 28ce5292378b..1e99d379f742 100644 --- a/pkg/cmd/dev/doctor.go +++ b/pkg/cmd/dev/doctor.go @@ -214,7 +214,7 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace) if !noCache { d.log.Println("doctor: setting up cache") - err = d.setUpCache(ctx) + bazelRcLine, err := d.setUpCache(ctx) if err != nil { return err } @@ -223,8 +223,9 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace) return err } bazelRcContents, err := d.os.ReadFile(filepath.Join(homeDir, ".bazelrc")) - if err != nil || !strings.Contains(bazelRcContents, "--remote_cache=") { - log.Printf("Did you remember to add the --remote_cache=... line to your ~/.bazelrc?") + if err != nil || !strings.Contains(bazelRcContents, bazelRcLine) { + log.Printf("Please add the string `%s` to your ~/.bazelrc:\n", bazelRcLine) + log.Printf(" echo \"%s\" >> ~/.bazelrc", bazelRcLine) success = false } } diff --git a/pkg/sql/catalog/dbdesc/database_desc_builder.go b/pkg/sql/catalog/dbdesc/database_desc_builder.go index eefcf8f9c6b9..86d0230d9bcc 100644 --- a/pkg/sql/catalog/dbdesc/database_desc_builder.go +++ b/pkg/sql/catalog/dbdesc/database_desc_builder.go @@ -96,6 +96,12 @@ func (ddb *databaseDescriptorBuilder) RunRestoreChanges( func maybeConvertIncompatibleDBPrivilegesToDefaultPrivileges( privileges *descpb.PrivilegeDescriptor, defaultPrivileges *descpb.DefaultPrivilegeDescriptor, ) (hasChanged bool) { + // If privileges are nil, there is nothing to convert. + // This case can happen during restore where privileges are not yet created. + if privileges == nil { + return false + } + var pgIncompatibleDBPrivileges = privilege.List{ privilege.SELECT, privilege.INSERT, privilege.UPDATE, privilege.DELETE, } diff --git a/pkg/sql/colexec/aggregators_test.go b/pkg/sql/colexec/aggregators_test.go index 56294607353f..1aba308e1d9e 100644 --- a/pkg/sql/colexec/aggregators_test.go +++ b/pkg/sql/colexec/aggregators_test.go @@ -1152,7 +1152,7 @@ func benchmarkAggregateFunction( break } } - if err = a.(colexecop.Closer).Close(); err != nil { + if err = a.(colexecop.Closer).Close(ctx); err != nil { b.Fatal(err) } source.Reset(ctx) diff --git a/pkg/sql/colexec/colexecagg/default_agg_tmpl.go b/pkg/sql/colexec/colexecagg/default_agg_tmpl.go index 769b63864581..5242a093e9f2 100644 --- a/pkg/sql/colexec/colexecagg/default_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/default_agg_tmpl.go @@ -201,9 +201,9 @@ func (a *default_AGGKINDAggAlloc) newAggFunc() AggregateFunc { return f } -func (a *default_AGGKINDAggAlloc) Close() error { +func (a *default_AGGKINDAggAlloc) Close(ctx context.Context) error { for _, fn := range a.returnedFns { - fn.fn.Close(fn.ctx) + fn.fn.Close(ctx) } a.returnedFns = nil return nil diff --git a/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go index 5a91a13be42d..7dce22f8b257 100644 --- a/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_default_agg.eg.go @@ -172,9 +172,9 @@ func (a *defaultHashAggAlloc) newAggFunc() AggregateFunc { return f } -func (a *defaultHashAggAlloc) Close() error { +func (a *defaultHashAggAlloc) Close(ctx context.Context) error { for _, fn := range a.returnedFns { - fn.fn.Close(fn.ctx) + fn.fn.Close(ctx) } a.returnedFns = nil return nil diff --git a/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go index e3dde9f454b8..92cf0fed9f56 100644 --- a/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go @@ -237,9 +237,9 @@ func (a *defaultOrderedAggAlloc) newAggFunc() AggregateFunc { return f } -func (a *defaultOrderedAggAlloc) Close() error { +func (a *defaultOrderedAggAlloc) Close(ctx context.Context) error { for _, fn := range a.returnedFns { - fn.fn.Close(fn.ctx) + fn.fn.Close(ctx) } a.returnedFns = nil return nil diff --git a/pkg/sql/colexec/colexecargs/op_creation.go b/pkg/sql/colexec/colexecargs/op_creation.go index 0abcda55253d..1949aa8232f4 100644 --- a/pkg/sql/colexec/colexecargs/op_creation.go +++ b/pkg/sql/colexec/colexecargs/op_creation.go @@ -106,7 +106,7 @@ var _ execinfra.Releasable = &NewColOperatorResult{} // TestCleanupNoError releases the resources associated with this result and // asserts that no error is returned. It should only be used in tests. func (r *NewColOperatorResult) TestCleanupNoError(t testing.TB) { - require.NoError(t, r.ToClose.Close()) + require.NoError(t, r.ToClose.Close(context.Background())) } var newColOperatorResultPool = sync.Pool{ diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner.go b/pkg/sql/colexec/colexecjoin/crossjoiner.go index 7029d9560378..6cad4d43bc88 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner.go @@ -99,7 +99,7 @@ func (c *crossJoiner) Next() coldata.Batch { } willEmit := c.willEmit() if willEmit == 0 { - if err := c.Close(); err != nil { + if err := c.Close(c.Ctx); err != nil { colexecerror.InternalError(err) } c.done = true @@ -462,8 +462,7 @@ func (b *crossJoinerBase) Reset(ctx context.Context) { b.builderState.numEmittedTotal = 0 } -func (b *crossJoinerBase) Close() error { - ctx := b.initHelper.EnsureCtx() +func (b *crossJoinerBase) Close(ctx context.Context) error { if b.rightTuples != nil { return b.rightTuples.Close(ctx) } diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner.go b/pkg/sql/colexec/colexecjoin/mergejoiner.go index ceaf285c54f6..4ab842e34698 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner.go @@ -889,20 +889,20 @@ func (o *mergeJoinBase) completeRightBufferedGroup() { o.finishRightBufferedGroup() } -func (o *mergeJoinBase) Close() error { +func (o *mergeJoinBase) Close(ctx context.Context) error { if !o.CloserHelper.Close() { return nil } var lastErr error for _, op := range []colexecop.Operator{o.left.source, o.right.source} { if c, ok := op.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { lastErr = err } } } if h := o.bufferedGroup.helper; h != nil { - if err := h.Close(); err != nil { + if err := h.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/colexectestutils/utils.go b/pkg/sql/colexec/colexectestutils/utils.go index b0db336ca2ec..f34d5ac624cb 100644 --- a/pkg/sql/colexec/colexectestutils/utils.go +++ b/pkg/sql/colexec/colexectestutils/utils.go @@ -440,7 +440,7 @@ func RunTestsWithOrderedCols( // setting, the closing happens at the end of the query execution. func closeIfCloser(t *testing.T, op colexecop.Operator) { if c, ok := op.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(context.Background()); err != nil { t.Fatal(err) } } diff --git a/pkg/sql/colexec/colexecwindow/buffered_window.go b/pkg/sql/colexec/colexecwindow/buffered_window.go index a2cfce874a43..2de7b230911c 100644 --- a/pkg/sql/colexec/colexecwindow/buffered_window.go +++ b/pkg/sql/colexec/colexecwindow/buffered_window.go @@ -113,7 +113,7 @@ const ( // buffer all tuples from each partition. type bufferedWindower interface { Init(ctx context.Context) - Close() + Close(context.Context) // seekNextPartition is called during the windowSeeking state on the current // batch. It gives windowers a chance to perform any necessary pre-processing, @@ -357,7 +357,7 @@ func (b *bufferedWindowOp) Next() coldata.Batch { colexecerror.InternalError( errors.AssertionFailedf("window operator in processing state without buffered rows")) case windowFinished: - if err = b.Close(); err != nil { + if err = b.Close(b.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -369,16 +369,16 @@ func (b *bufferedWindowOp) Next() coldata.Batch { } } -func (b *bufferedWindowOp) Close() error { +func (b *bufferedWindowOp) Close(ctx context.Context) error { if !b.CloserHelper.Close() || b.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } - if err := b.bufferQueue.Close(b.EnsureCtx()); err != nil { + if err := b.bufferQueue.Close(ctx); err != nil { return err } - b.windower.Close() + b.windower.Close(ctx) return nil } diff --git a/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go b/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go index 8ecc40a15137..20af8be13cab 100644 --- a/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go +++ b/pkg/sql/colexec/colexecwindow/count_rows_aggregator.go @@ -76,12 +76,12 @@ func (a *countRowsWindowAggregator) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (a *countRowsWindowAggregator) Close() { +func (a *countRowsWindowAggregator) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) } // processBatch implements the bufferedWindower interface. diff --git a/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go b/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go index 419ead7e8cdc..2fa11d0fba04 100644 --- a/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go @@ -201,9 +201,9 @@ func (b *_OP_NAMEBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (b *_OP_NAMEBase) Close() { +func (b *_OP_NAMEBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/first_value.eg.go b/pkg/sql/colexec/colexecwindow/first_value.eg.go index 7182dc35b648..80bd6de1172b 100644 --- a/pkg/sql/colexec/colexecwindow/first_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/first_value.eg.go @@ -567,9 +567,9 @@ func (b *firstValueBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (b *firstValueBase) Close() { +func (b *firstValueBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/lag.eg.go b/pkg/sql/colexec/colexecwindow/lag.eg.go index 49c474259aa3..97f3122abdff 100644 --- a/pkg/sql/colexec/colexecwindow/lag.eg.go +++ b/pkg/sql/colexec/colexecwindow/lag.eg.go @@ -1595,9 +1595,9 @@ func (b *lagBase) Init(ctx context.Context) { } } -func (b *lagBase) Close() { +func (b *lagBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/last_value.eg.go b/pkg/sql/colexec/colexecwindow/last_value.eg.go index 594c16d99059..424ca01cc9e2 100644 --- a/pkg/sql/colexec/colexecwindow/last_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/last_value.eg.go @@ -567,9 +567,9 @@ func (b *lastValueBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (b *lastValueBase) Close() { +func (b *lastValueBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/lead.eg.go b/pkg/sql/colexec/colexecwindow/lead.eg.go index bf3fbcff723a..f5f904a139df 100644 --- a/pkg/sql/colexec/colexecwindow/lead.eg.go +++ b/pkg/sql/colexec/colexecwindow/lead.eg.go @@ -1595,9 +1595,9 @@ func (b *leadBase) Init(ctx context.Context) { } } -func (b *leadBase) Close() { +func (b *leadBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go b/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go index 3a046f5fc60a..33ea4700a190 100644 --- a/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go @@ -174,11 +174,11 @@ func (b *_OP_NAMEBase) Init(ctx context.Context) { } } -func (b *_OP_NAMEBase) Close() { +func (b *_OP_NAMEBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } // {{/* diff --git a/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go b/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go index 59a9a155e4f1..76306130845a 100644 --- a/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go +++ b/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go @@ -341,10 +341,10 @@ func (a *minBoolAggregator) aggregateOverIntervals(intervals []windowInterval) { } } -func (a *minBoolAggregator) Close() { +func (a *minBoolAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minBoolAggregator{} } @@ -494,10 +494,10 @@ func (a *minBytesAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minBytesAggregator) Close() { +func (a *minBytesAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minBytesAggregator{} } @@ -649,10 +649,10 @@ func (a *minDecimalAggregator) aggregateOverIntervals(intervals []windowInterval } } -func (a *minDecimalAggregator) Close() { +func (a *minDecimalAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minDecimalAggregator{} } @@ -826,10 +826,10 @@ func (a *minInt16Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minInt16Aggregator) Close() { +func (a *minInt16Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minInt16Aggregator{} } @@ -1003,10 +1003,10 @@ func (a *minInt32Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minInt32Aggregator) Close() { +func (a *minInt32Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minInt32Aggregator{} } @@ -1180,10 +1180,10 @@ func (a *minInt64Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minInt64Aggregator) Close() { +func (a *minInt64Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minInt64Aggregator{} } @@ -1373,10 +1373,10 @@ func (a *minFloat64Aggregator) aggregateOverIntervals(intervals []windowInterval } } -func (a *minFloat64Aggregator) Close() { +func (a *minFloat64Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minFloat64Aggregator{} } @@ -1542,10 +1542,10 @@ func (a *minTimestampAggregator) aggregateOverIntervals(intervals []windowInterv } } -func (a *minTimestampAggregator) Close() { +func (a *minTimestampAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minTimestampAggregator{} } @@ -1697,10 +1697,10 @@ func (a *minIntervalAggregator) aggregateOverIntervals(intervals []windowInterva } } -func (a *minIntervalAggregator) Close() { +func (a *minIntervalAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minIntervalAggregator{} } @@ -1895,10 +1895,10 @@ func (a *minJSONAggregator) aggregateOverIntervals(intervals []windowInterval) { } } -func (a *minJSONAggregator) Close() { +func (a *minJSONAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minJSONAggregator{} } @@ -2054,10 +2054,10 @@ func (a *minDatumAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *minDatumAggregator) Close() { +func (a *minDatumAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = minDatumAggregator{} } @@ -2308,10 +2308,10 @@ func (a *maxBoolAggregator) aggregateOverIntervals(intervals []windowInterval) { } } -func (a *maxBoolAggregator) Close() { +func (a *maxBoolAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxBoolAggregator{} } @@ -2461,10 +2461,10 @@ func (a *maxBytesAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxBytesAggregator) Close() { +func (a *maxBytesAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxBytesAggregator{} } @@ -2616,10 +2616,10 @@ func (a *maxDecimalAggregator) aggregateOverIntervals(intervals []windowInterval } } -func (a *maxDecimalAggregator) Close() { +func (a *maxDecimalAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxDecimalAggregator{} } @@ -2793,10 +2793,10 @@ func (a *maxInt16Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxInt16Aggregator) Close() { +func (a *maxInt16Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxInt16Aggregator{} } @@ -2970,10 +2970,10 @@ func (a *maxInt32Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxInt32Aggregator) Close() { +func (a *maxInt32Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxInt32Aggregator{} } @@ -3147,10 +3147,10 @@ func (a *maxInt64Aggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxInt64Aggregator) Close() { +func (a *maxInt64Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxInt64Aggregator{} } @@ -3340,10 +3340,10 @@ func (a *maxFloat64Aggregator) aggregateOverIntervals(intervals []windowInterval } } -func (a *maxFloat64Aggregator) Close() { +func (a *maxFloat64Aggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxFloat64Aggregator{} } @@ -3509,10 +3509,10 @@ func (a *maxTimestampAggregator) aggregateOverIntervals(intervals []windowInterv } } -func (a *maxTimestampAggregator) Close() { +func (a *maxTimestampAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxTimestampAggregator{} } @@ -3664,10 +3664,10 @@ func (a *maxIntervalAggregator) aggregateOverIntervals(intervals []windowInterva } } -func (a *maxIntervalAggregator) Close() { +func (a *maxIntervalAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxIntervalAggregator{} } @@ -3862,10 +3862,10 @@ func (a *maxJSONAggregator) aggregateOverIntervals(intervals []windowInterval) { } } -func (a *maxJSONAggregator) Close() { +func (a *maxJSONAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxJSONAggregator{} } @@ -4021,10 +4021,10 @@ func (a *maxDatumAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *maxDatumAggregator) Close() { +func (a *maxDatumAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = maxDatumAggregator{} } diff --git a/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go b/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go index 36dd5e2442d9..73a8159dff6d 100644 --- a/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go @@ -296,10 +296,10 @@ func (a *_AGG_TYPEAggregator) aggregateOverIntervals(intervals []windowInterval) } } -func (a *_AGG_TYPEAggregator) Close() { +func (a *_AGG_TYPEAggregator) Close(ctx context.Context) { a.queue.close() a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) *a = _AGG_TYPEAggregator{} } diff --git a/pkg/sql/colexec/colexecwindow/nth_value.eg.go b/pkg/sql/colexec/colexecwindow/nth_value.eg.go index ed3fccf5d57f..48fa6ca801f1 100644 --- a/pkg/sql/colexec/colexecwindow/nth_value.eg.go +++ b/pkg/sql/colexec/colexecwindow/nth_value.eg.go @@ -767,9 +767,9 @@ func (b *nthValueBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (b *nthValueBase) Close() { +func (b *nthValueBase) Close(ctx context.Context) { if !b.CloserHelper.Close() { return } - b.buffer.Close(b.EnsureCtx()) + b.buffer.Close(ctx) } diff --git a/pkg/sql/colexec/colexecwindow/ntile.eg.go b/pkg/sql/colexec/colexecwindow/ntile.eg.go index deed9c367fb5..cb46dc0e9a43 100644 --- a/pkg/sql/colexec/colexecwindow/ntile.eg.go +++ b/pkg/sql/colexec/colexecwindow/ntile.eg.go @@ -276,4 +276,4 @@ func (b *nTileBase) startNewPartition() { func (b *nTileBase) Init(ctx context.Context) {} -func (b *nTileBase) Close() {} +func (b *nTileBase) Close(context.Context) {} diff --git a/pkg/sql/colexec/colexecwindow/ntile_tmpl.go b/pkg/sql/colexec/colexecwindow/ntile_tmpl.go index 6bd9078e2adb..5db90f71eac1 100644 --- a/pkg/sql/colexec/colexecwindow/ntile_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/ntile_tmpl.go @@ -256,4 +256,4 @@ func (b *nTileBase) startNewPartition() { func (b *nTileBase) Init(ctx context.Context) {} -func (b *nTileBase) Close() {} +func (b *nTileBase) Close(context.Context) {} diff --git a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go index 9d2e2098be49..8c5abaaca414 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go @@ -310,7 +310,7 @@ func (r *percentRankNoPartitionOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -323,14 +323,14 @@ func (r *percentRankNoPartitionOp) Next() coldata.Batch { } } -func (r *percentRankNoPartitionOp) Close() error { +func (r *percentRankNoPartitionOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } return lastErr @@ -589,7 +589,7 @@ func (r *percentRankWithPartitionOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -602,17 +602,17 @@ func (r *percentRankWithPartitionOp) Next() coldata.Batch { } } -func (r *percentRankWithPartitionOp) Close() error { +func (r *percentRankWithPartitionOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } - if err := r.partitionsState.Close(r.Ctx); err != nil { + if err := r.partitionsState.Close(ctx); err != nil { lastErr = err } return lastErr @@ -856,7 +856,7 @@ func (r *cumeDistNoPartitionOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -869,17 +869,17 @@ func (r *cumeDistNoPartitionOp) Next() coldata.Batch { } } -func (r *cumeDistNoPartitionOp) Close() error { +func (r *cumeDistNoPartitionOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } - if err := r.peerGroupsState.Close(r.Ctx); err != nil { + if err := r.peerGroupsState.Close(ctx); err != nil { lastErr = err } return lastErr @@ -1217,7 +1217,7 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -1230,20 +1230,20 @@ func (r *cumeDistWithPartitionOp) Next() coldata.Batch { } } -func (r *cumeDistWithPartitionOp) Close() error { +func (r *cumeDistWithPartitionOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } - if err := r.partitionsState.Close(r.Ctx); err != nil { + if err := r.partitionsState.Close(ctx); err != nil { lastErr = err } - if err := r.peerGroupsState.Close(r.Ctx); err != nil { + if err := r.peerGroupsState.Close(ctx); err != nil { lastErr = err } return lastErr diff --git a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go index 9f606141704e..ce4439b0b975 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go @@ -576,7 +576,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next() coldata.Batch { return r.output case relativeRankFinished: - if err := r.Close(); err != nil { + if err := r.Close(r.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -589,23 +589,23 @@ func (r *_RELATIVE_RANK_STRINGOp) Next() coldata.Batch { } } -func (r *_RELATIVE_RANK_STRINGOp) Close() error { +func (r *_RELATIVE_RANK_STRINGOp) Close(ctx context.Context) error { if !r.CloserHelper.Close() || r.Ctx == nil { // Either Close() has already been called or Init() was never called. In // both cases there is nothing to do. return nil } var lastErr error - if err := r.bufferedTuples.Close(r.Ctx); err != nil { + if err := r.bufferedTuples.Close(ctx); err != nil { lastErr = err } // {{if .HasPartition}} - if err := r.partitionsState.Close(r.Ctx); err != nil { + if err := r.partitionsState.Close(ctx); err != nil { lastErr = err } // {{end}} // {{if .IsCumeDist}} - if err := r.peerGroupsState.Close(r.Ctx); err != nil { + if err := r.peerGroupsState.Close(ctx); err != nil { lastErr = err } // {{end}} diff --git a/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go b/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go index 60effe81c329..b20cf3d4b0b2 100644 --- a/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go +++ b/pkg/sql/colexec/colexecwindow/window_aggregator.eg.go @@ -174,15 +174,15 @@ func (a *windowAggregatorBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (a *windowAggregatorBase) Close() { +func (a *windowAggregatorBase) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } - if err := a.closers.Close(); err != nil { + if err := a.closers.Close(ctx); err != nil { colexecerror.InternalError(err) } a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) } func (a *windowAggregator) startNewPartition() { @@ -190,8 +190,8 @@ func (a *windowAggregator) startNewPartition() { a.agg.Reset() } -func (a *windowAggregator) Close() { - a.windowAggregatorBase.Close() +func (a *windowAggregator) Close(ctx context.Context) { + a.windowAggregatorBase.Close(ctx) a.agg.Reset() *a = windowAggregator{} } @@ -236,8 +236,8 @@ func (a *slidingWindowAggregator) startNewPartition() { a.agg.Reset() } -func (a *slidingWindowAggregator) Close() { - a.windowAggregatorBase.Close() +func (a *slidingWindowAggregator) Close(ctx context.Context) { + a.windowAggregatorBase.Close(ctx) a.agg.Reset() *a = slidingWindowAggregator{} } diff --git a/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go b/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go index 4c5607fa94e5..6c71db6bfcd0 100644 --- a/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/window_aggregator_tmpl.go @@ -179,15 +179,15 @@ func (a *windowAggregatorBase) Init(ctx context.Context) { } // Close implements the bufferedWindower interface. -func (a *windowAggregatorBase) Close() { +func (a *windowAggregatorBase) Close(ctx context.Context) { if !a.CloserHelper.Close() { return } - if err := a.closers.Close(); err != nil { + if err := a.closers.Close(ctx); err != nil { colexecerror.InternalError(err) } a.framer.close() - a.buffer.Close(a.EnsureCtx()) + a.buffer.Close(ctx) } func (a *windowAggregator) startNewPartition() { @@ -195,8 +195,8 @@ func (a *windowAggregator) startNewPartition() { a.agg.Reset() } -func (a *windowAggregator) Close() { - a.windowAggregatorBase.Close() +func (a *windowAggregator) Close(ctx context.Context) { + a.windowAggregatorBase.Close(ctx) a.agg.Reset() *a = windowAggregator{} } @@ -220,8 +220,8 @@ func (a *slidingWindowAggregator) startNewPartition() { a.agg.Reset() } -func (a *slidingWindowAggregator) Close() { - a.windowAggregatorBase.Close() +func (a *slidingWindowAggregator) Close(ctx context.Context) { + a.windowAggregatorBase.Close(ctx) a.agg.Reset() *a = slidingWindowAggregator{} } diff --git a/pkg/sql/colexec/colexecwindow/window_functions_test.go b/pkg/sql/colexec/colexecwindow/window_functions_test.go index 68905ca5c9a7..cdf637647aa8 100644 --- a/pkg/sql/colexec/colexecwindow/window_functions_test.go +++ b/pkg/sql/colexec/colexecwindow/window_functions_test.go @@ -1083,7 +1083,7 @@ func TestWindowFunctions(t *testing.T) { // Close all closers manually (in production this is done on the // flow cleanup). for _, c := range toClose { - require.NoError(t, c.Close()) + require.NoError(t, c.Close(ctx)) } for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 1faa9a145158..f94f5c742226 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -133,7 +133,7 @@ func newColumnarizer( // Close will call InternalClose(). Note that we don't return // any trailing metadata here because the columnarizers // propagate it in DrainMeta. - if err := c.Close(); buildutil.CrdbTestBuild && err != nil { + if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil { // Close never returns an error. colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close")) } @@ -289,7 +289,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { } // Close is part of the colexecop.ClosableOperator interface. -func (c *Columnarizer) Close() error { +func (c *Columnarizer) Close(context.Context) error { if c.removedFromFlow { return nil } diff --git a/pkg/sql/colexec/columnarizer_test.go b/pkg/sql/colexec/columnarizer_test.go index f38a9400d818..091341f96389 100644 --- a/pkg/sql/colexec/columnarizer_test.go +++ b/pkg/sql/colexec/columnarizer_test.go @@ -109,7 +109,7 @@ func TestColumnarizerDrainsAndClosesInput(t *testing.T) { if tc.consumerClosed { // Closing the Columnarizer should call ConsumerClosed on the processor. - require.NoError(t, c.Close()) + require.NoError(t, c.Close(ctx)) require.Equal(t, execinfra.ConsumerClosed, rb.ConsumerStatus, "unexpected consumer status %d", rb.ConsumerStatus) } else { // Calling DrainMeta from the vectorized execution engine should propagate to diff --git a/pkg/sql/colexec/disk_spiller.go b/pkg/sql/colexec/disk_spiller.go index bb3388c25384..2e3ad36f3dd8 100644 --- a/pkg/sql/colexec/disk_spiller.go +++ b/pkg/sql/colexec/disk_spiller.go @@ -234,16 +234,16 @@ func (d *diskSpillerBase) Reset(ctx context.Context) { } // Close implements the Closer interface. -func (d *diskSpillerBase) Close() error { +func (d *diskSpillerBase) Close(ctx context.Context) error { if !d.CloserHelper.Close() { return nil } var retErr error if c, ok := d.inMemoryOp.(colexecop.Closer); ok { - retErr = c.Close() + retErr = c.Close(ctx) } if c, ok := d.diskBackedOp.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { retErr = err } } diff --git a/pkg/sql/colexec/external_sort.go b/pkg/sql/colexec/external_sort.go index 26c741e4a0b9..606b796f37e0 100644 --- a/pkg/sql/colexec/external_sort.go +++ b/pkg/sql/colexec/external_sort.go @@ -406,7 +406,7 @@ func (s *externalSorter) Next() coldata.Batch { for b := merger.Next(); ; b = merger.Next() { partitionDone := s.enqueue(b) if b.Length() == 0 || partitionDone { - if err := merger.Close(); err != nil { + if err := merger.Close(s.Ctx); err != nil { colexecerror.InternalError(err) } break @@ -469,7 +469,7 @@ func (s *externalSorter) Next() coldata.Batch { return b case externalSorterFinished: - if err := s.Close(); err != nil { + if err := s.Close(s.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -584,7 +584,7 @@ func (s *externalSorter) Reset(ctx context.Context) { r.Reset(ctx) } s.state = externalSorterNewPartition - if err := s.Close(); err != nil { + if err := s.Close(ctx); err != nil { colexecerror.InternalError(err) } // Reset the CloserHelper so that the sorter may be closed again. @@ -597,11 +597,10 @@ func (s *externalSorter) Reset(ctx context.Context) { s.emitted = 0 } -func (s *externalSorter) Close() error { +func (s *externalSorter) Close(ctx context.Context) error { if !s.CloserHelper.Close() { return nil } - ctx := s.EnsureCtx() log.VEvent(ctx, 1, "external sorter is closed") var lastErr error if s.partitioner != nil { @@ -609,7 +608,7 @@ func (s *externalSorter) Close() error { s.partitioner = nil } if c, ok := s.emitter.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index f98c24288922..38ad233f6208 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -298,7 +298,7 @@ func TestExternalSortMemoryAccounting(t *testing.T) { for b := sorter.Next(); b.Length() > 0; b = sorter.Next() { } for _, c := range closers { - require.NoError(t, c.Close()) + require.NoError(t, c.Close(ctx)) } require.True(t, spilled) diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 3b38e1f8e168..6ac8e6c1dc6c 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -469,16 +469,16 @@ func (op *hashAggregator) resetBucketsAndTrackingState(ctx context.Context) { op.curOutputBucketIdx = 0 } -func (op *hashAggregator) Close() error { +func (op *hashAggregator) Close(ctx context.Context) error { if !op.CloserHelper.Close() { return nil } op.accountingHelper.Release() var retErr error if op.inputTrackingState.tuples != nil { - retErr = op.inputTrackingState.tuples.Close(op.EnsureCtx()) + retErr = op.inputTrackingState.tuples.Close(ctx) } - if err := op.toClose.Close(); err != nil { + if err := op.toClose.Close(ctx); err != nil { retErr = err } return retErr diff --git a/pkg/sql/colexec/hash_based_partitioner.go b/pkg/sql/colexec/hash_based_partitioner.go index d3396cb30062..adfbca768d03 100644 --- a/pkg/sql/colexec/hash_based_partitioner.go +++ b/pkg/sql/colexec/hash_based_partitioner.go @@ -618,7 +618,7 @@ StateChanged: return b case hbpFinished: - if err := op.Close(); err != nil { + if err := op.Close(op.Ctx); err != nil { colexecerror.InternalError(err) } return coldata.ZeroBatch @@ -629,11 +629,10 @@ StateChanged: } } -func (op *hashBasedPartitioner) Close() error { +func (op *hashBasedPartitioner) Close(ctx context.Context) error { if !op.CloserHelper.Close() { return nil } - ctx := op.EnsureCtx() log.VEventf(ctx, 1, "%s is closed", op.name) var retErr error for i := range op.inputs { @@ -644,7 +643,7 @@ func (op *hashBasedPartitioner) Close() error { // The in-memory main operator might be a Closer (e.g. the in-memory hash // aggregator), and we need to close it if so. if c, ok := op.inMemMainOp.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { retErr = err } } @@ -652,7 +651,7 @@ func (op *hashBasedPartitioner) Close() error { // it will still be closed appropriately because we accumulate all closers // in NewColOperatorResult. if c, ok := op.diskBackedFallbackOp.(colexecop.Closer); ok { - if err := c.Close(); err != nil { + if err := c.Close(ctx); err != nil { retErr = err } } diff --git a/pkg/sql/colexec/invariants_checker.go b/pkg/sql/colexec/invariants_checker.go index 230e13c89d14..9a88fbadcb84 100644 --- a/pkg/sql/colexec/invariants_checker.go +++ b/pkg/sql/colexec/invariants_checker.go @@ -127,10 +127,10 @@ func (i *invariantsChecker) DrainMeta() []execinfrapb.ProducerMetadata { } // Close is part of the colexecop.ClosableOperator interface. -func (i *invariantsChecker) Close() error { +func (i *invariantsChecker) Close(ctx context.Context) error { c, ok := i.Input.(colexecop.Closer) if !ok { return nil } - return c.Close() + return c.Close(ctx) } diff --git a/pkg/sql/colexec/ordered_aggregator.go b/pkg/sql/colexec/ordered_aggregator.go index d5e59fc8b401..c99d9a986060 100644 --- a/pkg/sql/colexec/ordered_aggregator.go +++ b/pkg/sql/colexec/ordered_aggregator.go @@ -399,6 +399,6 @@ func (a *orderedAggregator) Reset(ctx context.Context) { } } -func (a *orderedAggregator) Close() error { - return a.toClose.Close() +func (a *orderedAggregator) Close(ctx context.Context) error { + return a.toClose.Close(ctx) } diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index 1b363e78f4d2..3a8f0f0ab65b 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -299,11 +299,15 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata { return bufferedMeta } -func (o *OrderedSynchronizer) Close() error { +func (o *OrderedSynchronizer) Close(context.Context) error { + // Note that we're using the context of the synchronizer rather than the + // argument of Close() because the synchronizer derives its own tracing + // span. + ctx := o.EnsureCtx() o.accountingHelper.Release() var lastErr error for _, input := range o.inputs { - if err := input.ToClose.Close(); err != nil { + if err := input.ToClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index 2b0c9b25120b..e1baf9bb4e3c 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -243,11 +243,15 @@ func (o *OrderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata { return bufferedMeta } -func (o *OrderedSynchronizer) Close() error { +func (o *OrderedSynchronizer) Close(context.Context) error { + // Note that we're using the context of the synchronizer rather than the + // argument of Close() because the synchronizer derives its own tracing + // span. + ctx := o.EnsureCtx() o.accountingHelper.Release() var lastErr error for _, input := range o.inputs { - if err := input.ToClose.Close(); err != nil { + if err := input.ToClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index 59c375b10754..1ca07dd46f0a 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -63,7 +63,8 @@ const ( type ParallelUnorderedSynchronizer struct { colexecop.InitHelper - inputs []colexecargs.OpWithMetaInfo + inputs []colexecargs.OpWithMetaInfo + inputCtxs []context.Context // cancelLocalInput stores context cancellation functions for each of the // inputs. The functions are populated only if LocalPlan is true. cancelLocalInput []context.CancelFunc @@ -139,6 +140,7 @@ func NewParallelUnorderedSynchronizer( } return &ParallelUnorderedSynchronizer{ inputs: inputs, + inputCtxs: make([]context.Context, len(inputs)), cancelLocalInput: make([]context.CancelFunc, len(inputs)), tracingSpans: make([]*tracing.Span, len(inputs)), readNextBatch: readNextBatch, @@ -165,8 +167,7 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) { return } for i, input := range s.inputs { - var inputCtx context.Context - inputCtx, s.tracingSpans[i] = execinfra.ProcessorSpan(s.Ctx, fmt.Sprintf("parallel unordered sync input %d", i)) + s.inputCtxs[i], s.tracingSpans[i] = execinfra.ProcessorSpan(s.Ctx, fmt.Sprintf("parallel unordered sync input %d", i)) if s.LocalPlan { // If there plan is local, there are no colrpc.Inboxes in this input // tree, and the synchronizer can cancel the current work eagerly @@ -177,9 +178,9 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) { // because canceling the context would break the gRPC stream and // make it impossible to fetch the remote metadata. Furthermore, it // will result in the remote flow cancellation. - inputCtx, s.cancelLocalInput[i] = context.WithCancel(inputCtx) + s.inputCtxs[i], s.cancelLocalInput[i] = context.WithCancel(s.inputCtxs[i]) } - input.Root.Init(inputCtx) + input.Root.Init(s.inputCtxs[i]) s.nextBatch[i] = func(inputOp colexecop.Operator, inputIdx int) func() { return func() { s.batches[inputIdx] = inputOp.Next() @@ -222,7 +223,7 @@ func (s *ParallelUnorderedSynchronizer) init() { } // We need to close all of the closers of this input before we // notify the wait groups. - input.ToClose.CloseAndLogOnErr(s.Ctx, "parallel unordered synchronizer input") + input.ToClose.CloseAndLogOnErr(s.inputCtxs[inputIdx], "parallel unordered synchronizer input") s.internalWaitGroup.Done() s.externalWaitGroup.Done() }() @@ -460,7 +461,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada } // Close is part of the colexecop.ClosableOperator interface. -func (s *ParallelUnorderedSynchronizer) Close() error { +func (s *ParallelUnorderedSynchronizer) Close(ctx context.Context) error { if state := s.getState(); state != parallelUnorderedSynchronizerStateUninitialized { // Input goroutines have been started and will take care of closing the // closers from the corresponding input trees, so we don't need to do @@ -483,7 +484,7 @@ func (s *ParallelUnorderedSynchronizer) Close() error { } var lastErr error for _, input := range s.inputs { - if err := input.ToClose.Close(); err != nil { + if err := input.ToClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go index 88dc93d21715..7a54de5b930a 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go @@ -248,7 +248,7 @@ func TestParallelUnorderedSyncClosesInputs(t *testing.T) { // closure occurred as expected. closed := false firstInput := &colexecop.CallbackOperator{ - CloseCb: func() error { + CloseCb: func(context.Context) error { closed = true return nil }, @@ -273,7 +273,7 @@ func TestParallelUnorderedSyncClosesInputs(t *testing.T) { // In the production setting, the user of the synchronizer is still expected // to close it, even if a panic is encountered in Init, so we do the same // thing here and verify that the first input is properly closed. - require.NoError(t, s.Close()) + require.NoError(t, s.Close(ctx)) require.True(t, closed) } diff --git a/pkg/sql/colexec/serial_unordered_synchronizer.go b/pkg/sql/colexec/serial_unordered_synchronizer.go index ad47f09f196c..02288277f4c6 100644 --- a/pkg/sql/colexec/serial_unordered_synchronizer.go +++ b/pkg/sql/colexec/serial_unordered_synchronizer.go @@ -107,10 +107,14 @@ func (s *SerialUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetadata } // Close is part of the colexecop.ClosableOperator interface. -func (s *SerialUnorderedSynchronizer) Close() error { +func (s *SerialUnorderedSynchronizer) Close(context.Context) error { + // Note that we're using the context of the synchronizer rather than the + // argument of Close() because the synchronizer derives its own tracing + // span. + ctx := s.EnsureCtx() var lastErr error for _, input := range s.inputs { - if err := input.ToClose.Close(); err != nil { + if err := input.ToClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colexecop/operator.go b/pkg/sql/colexecop/operator.go index dcad468d7051..86e75d750093 100644 --- a/pkg/sql/colexecop/operator.go +++ b/pkg/sql/colexecop/operator.go @@ -145,11 +145,17 @@ type Closer interface { // is an Operator, the implementation of Close must be safe to execute even // if Operator.Init wasn't called. // + // Unless the Closer derives its own context with a separate tracing span, + // the argument context rather than the one from Init() must be used + // (wherever necessary) by the implementation. This is so since the span in + // the context from Init() might be already finished when Close() is called + // whereas the argument context will contain an unfinished span. + // // If this Closer is an execinfra.Releasable, the implementation must be // safe to execute even after Release() was called. // TODO(yuzefovich): refactor this because the Release()'d objects should // not be used anymore. - Close() error + Close(context.Context) error } // Closers is a slice of Closers. @@ -162,7 +168,7 @@ type Closers []Closer func (c Closers) CloseAndLogOnErr(ctx context.Context, prefix string) { if err := colexecerror.CatchVectorizedRuntimeError(func() { for _, closer := range c { - if err := closer.Close(); err != nil && log.V(1) { + if err := closer.Close(ctx); err != nil && log.V(1) { log.Infof(ctx, "%s: error closing Closer: %v", prefix, err) } } @@ -172,10 +178,10 @@ func (c Closers) CloseAndLogOnErr(ctx context.Context, prefix string) { } // Close closes all Closers and returns the last error (if any occurs). -func (c Closers) Close() error { +func (c Closers) Close(ctx context.Context) error { var lastErr error for _, closer := range c { - if err := closer.Close(); err != nil { + if err := closer.Close(ctx); err != nil { lastErr = err } } @@ -331,12 +337,12 @@ type OneInputCloserHelper struct { var _ Closer = &OneInputCloserHelper{} // Close implements the Closer interface. -func (c *OneInputCloserHelper) Close() error { +func (c *OneInputCloserHelper) Close(ctx context.Context) error { if !c.CloserHelper.Close() { return nil } if closer, ok := c.Input.(Closer); ok { - return closer.Close() + return closer.Close(ctx) } return nil } diff --git a/pkg/sql/colexecop/testutils.go b/pkg/sql/colexecop/testutils.go index 8dc2823d28f0..886f3b8df646 100644 --- a/pkg/sql/colexecop/testutils.go +++ b/pkg/sql/colexecop/testutils.go @@ -141,7 +141,7 @@ type CallbackOperator struct { ZeroInputNode InitCb func(context.Context) NextCb func() coldata.Batch - CloseCb func() error + CloseCb func(ctx context.Context) error } var _ ClosableOperator = &CallbackOperator{} @@ -163,11 +163,11 @@ func (o *CallbackOperator) Next() coldata.Batch { } // Close is part of the ClosableOperator interface. -func (o *CallbackOperator) Close() error { +func (o *CallbackOperator) Close(ctx context.Context) error { if o.CloseCb == nil { return nil } - return o.CloseCb() + return o.CloseCb(ctx) } // TestingSemaphore is a semaphore.Semaphore that never blocks and is always diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 557eb97b616f..b9f8cf54e6ca 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -281,8 +281,12 @@ func (s *ColBatchScan) Release() { } // Close implements the colexecop.Closer interface. -func (s *ColBatchScan) Close() error { - s.cf.Close(s.EnsureCtx()) +func (s *ColBatchScan) Close(context.Context) error { + // Note that we're using the context of the ColBatchScan rather than the + // argument of Close() because the ColBatchScan derives its own tracing + // span. + ctx := s.EnsureCtx() + s.cf.Close(ctx) if s.tracingSpan != nil { s.tracingSpan.Finish() s.tracingSpan = nil diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 146b3bc9a568..a523bc0f8930 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -589,7 +589,7 @@ func (s *ColIndexJoin) Release() { } // Close implements the colexecop.Closer interface. -func (s *ColIndexJoin) Close() error { +func (s *ColIndexJoin) Close(context.Context) error { s.closeInternal() if s.tracingSpan != nil { s.tracingSpan.Finish() @@ -601,7 +601,11 @@ func (s *ColIndexJoin) Close() error { // closeInternal is a subset of Close() which doesn't finish the operator's // span. func (s *ColIndexJoin) closeInternal() { - s.cf.Close(s.EnsureCtx()) + // Note that we're using the context of the ColIndexJoin rather than the + // argument of Close() because the ColIndexJoin derives its own tracing + // span. + ctx := s.EnsureCtx() + s.cf.Close(ctx) if s.spanAssembler != nil { // spanAssembler can be nil if Release() has already been called. s.spanAssembler.Close() diff --git a/pkg/sql/colflow/flow_coordinator.go b/pkg/sql/colflow/flow_coordinator.go index 37dde04bc17e..894ad4589886 100644 --- a/pkg/sql/colflow/flow_coordinator.go +++ b/pkg/sql/colflow/flow_coordinator.go @@ -266,7 +266,7 @@ func (f *BatchFlowCoordinator) Run(ctx context.Context) { // Make sure that we close the coordinator and notify the batch receiver in // all cases. defer func() { - if err := f.close(); err != nil && status != execinfra.ConsumerClosed { + if err := f.close(ctx); err != nil && status != execinfra.ConsumerClosed { f.pushError(err) } f.output.ProducerDone() @@ -332,11 +332,11 @@ func (f *BatchFlowCoordinator) Run(ctx context.Context) { // close cancels the flow and closes all colexecop.Closers the coordinator is // responsible for. -func (f *BatchFlowCoordinator) close() error { +func (f *BatchFlowCoordinator) close(ctx context.Context) error { f.cancelFlow() var lastErr error for _, toClose := range f.input.ToClose { - if err := toClose.Close(); err != nil { + if err := toClose.Close(ctx); err != nil { lastErr = err } } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index f67d0bc82507..98eef7bc7844 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -1037,14 +1037,14 @@ func (s *vectorizedFlowCreator) setupOutput( // callbackCloser is a utility struct that implements the Closer interface by // calling the provided callback. type callbackCloser struct { - closeCb func() error + closeCb func(context.Context) error } var _ colexecop.Closer = &callbackCloser{} // Close implements the Closer interface. -func (c *callbackCloser) Close() error { - return c.closeCb() +func (c *callbackCloser) Close(ctx context.Context) error { + return c.closeCb(ctx) } func (s *vectorizedFlowCreator) setupFlow( @@ -1131,12 +1131,12 @@ func (s *vectorizedFlowCreator) setupFlow( for i := range toCloseCopy { func(idx int) { closed := false - result.ToClose[idx] = &callbackCloser{closeCb: func() error { + result.ToClose[idx] = &callbackCloser{closeCb: func(ctx context.Context) error { if !closed { closed = true atomic.AddInt32(&s.numClosed, 1) } - return toCloseCopy[idx].Close() + return toCloseCopy[idx].Close(ctx) }} }(i) } diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index a01f66ec9075..0536f038f1a4 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -59,13 +59,13 @@ var ( ) type callbackCloser struct { - closeCb func() error + closeCb func(context.Context) error } var _ colexecop.Closer = callbackCloser{} -func (c callbackCloser) Close() error { - return c.closeCb() +func (c callbackCloser) Close(ctx context.Context) error { + return c.closeCb(ctx) } // TestVectorizedFlowShutdown tests that closing the FlowCoordinator correctly @@ -257,7 +257,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { colexecargs.OpWithMetaInfo{ Root: outboxInput, MetadataSources: outboxMetadataSources, - ToClose: []colexecop.Closer{callbackCloser{closeCb: func() error { + ToClose: []colexecop.Closer{callbackCloser{closeCb: func(context.Context) error { idToClosed.Lock() idToClosed.mapping[id] = true idToClosed.Unlock() @@ -358,7 +358,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { inputInfo := colexecargs.OpWithMetaInfo{ Root: input, MetadataSources: colexecop.MetadataSources{inputMetadataSource}, - ToClose: colexecop.Closers{callbackCloser{closeCb: func() error { + ToClose: colexecop.Closers{callbackCloser{closeCb: func(context.Context) error { closeCalled = true return nil }}}, diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 5c2802b8634c..6b3e539e1ace 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -884,6 +884,8 @@ func (s *Server) newConnExecutor( ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} + ex.extraTxnState.atomicAutoRetryCounter = new(int32) + ex.initPlanner(ctx, &ex.planner) return ex @@ -1185,10 +1187,10 @@ type connExecutor struct { // transaction and it is cleared after the transaction is committed. schemaChangeJobRecords map[descpb.ID]*jobs.Record - // autoRetryCounter keeps track of the which iteration of a transaction + // atomicAutoRetryCounter keeps track of the which iteration of a transaction // auto-retry we're currently in. It's 0 whenever the transaction state is not // stateOpen. - autoRetryCounter int + atomicAutoRetryCounter *int32 // autoRetryReason records the error causing an auto-retryable error event if // the current transaction is being automatically retried. This is used in @@ -2687,7 +2689,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( advInfo := ex.state.consumeAdvanceInfo() if advInfo.code == rewind { - ex.extraTxnState.autoRetryCounter++ + atomic.AddInt32(ex.extraTxnState.atomicAutoRetryCounter, 1) } // If we had an error from DDL statement execution due to the presence of @@ -2716,7 +2718,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } } case txnStart: - ex.extraTxnState.autoRetryCounter = 0 + atomic.StoreInt32(ex.extraTxnState.atomicAutoRetryCounter, 0) ex.extraTxnState.autoRetryReason = nil ex.recordTransactionStart(advInfo.txnEvent.txnID) // Bump the txn counter for logging. @@ -2886,7 +2888,7 @@ func (ex *connExecutor) serialize() serverpb.Session { Start: ex.state.mu.txnStart, NumStatementsExecuted: int32(ex.state.mu.stmtCount), NumRetries: int32(txn.Epoch()), - NumAutoRetries: int32(ex.extraTxnState.autoRetryCounter), + NumAutoRetries: atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter), TxnDescription: txn.String(), Implicit: ex.implicitTxn(), AllocBytes: ex.state.mon.AllocBytes(), diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index fb36d8deadf8..925f09df5ed1 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -16,6 +16,7 @@ import ( "fmt" "runtime/pprof" "strings" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -1006,7 +1007,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( planner.maybeLogStatement( ctx, ex.executorType, - ex.extraTxnState.autoRetryCounter, + int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), ex.extraTxnState.txnCounter, res.RowsAffected(), res.Err(), @@ -1082,7 +1083,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( planner.curPlan.flags.Set(planFlagNotDistributed) } - ex.sessionTracing.TraceRetryInformation(ctx, ex.extraTxnState.autoRetryCounter, ex.extraTxnState.autoRetryReason) + ex.sessionTracing.TraceRetryInformation(ctx, int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), ex.extraTxnState.autoRetryReason) if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.extraTxnState.autoRetryReason != nil { ex.server.cfg.TestingKnobs.OnTxnRetry(ex.extraTxnState.autoRetryReason, planner.EvalContext()) } @@ -1115,7 +1116,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( // plan has not been closed earlier. ex.recordStatementSummary( ctx, planner, - ex.extraTxnState.autoRetryCounter, res.RowsAffected(), res.Err(), stats, + int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), res.RowsAffected(), res.Err(), stats, ) if ex.server.cfg.TestingKnobs.AfterExecute != nil { ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err()) @@ -2035,7 +2036,7 @@ func (ex *connExecutor) recordTransactionFinish( TransactionTimeSec: txnTime.Seconds(), Committed: ev.eventType == txnCommit, ImplicitTxn: implicit, - RetryCount: int64(ex.extraTxnState.autoRetryCounter), + RetryCount: int64(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs, ServiceLatency: txnServiceLat, RetryLatency: txnRetryLat, diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 9e2b1bdf0095..89ff72028c29 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -123,7 +123,7 @@ func (r *insertRun) initRowContainer(params runParams, columns colinfo.ResultCol // processSourceRow processes one row from the source for insertion and, if // result rows are needed, saves it in the result row container. func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) error { - if err := enforceLocalColumnConstraints(rowVals, r.insertCols, false /* isUpdate */); err != nil { + if err := enforceLocalColumnConstraints(rowVals, r.insertCols); err != nil { return err } diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index d5ab46f502e7..50a2d15f4e8f 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -148,18 +148,9 @@ func GenerateInsertRow( // Verify the column constraints. // - // We would really like to use enforceLocalColumnConstraints() here, - // but this is not possible because of some brain damage in the - // Insert() constructor, which causes insertCols to contain - // duplicate columns descriptors: computed columns are listed twice, - // one will receive a NULL value and one will receive a comptued - // value during execution. It "works out in the end" because the - // latter (non-NULL) value overwrites the earlier, but - // enforceLocalColumnConstraints() does not know how to reason about - // this. - // - // In the end it does not matter much, this code is going away in - // favor of the (simpler, correct) code in the CBO. + // During mutations (INSERT, UPDATE, UPSERT), this is checked by + // sql.enforceLocalColumnConstraints. These checks are required for IMPORT + // statements. // Check to see if NULL is being inserted into any non-nullable column. for _, col := range tableDesc.WritableColumns() { diff --git a/pkg/sql/tablewriter_upsert_opt.go b/pkg/sql/tablewriter_upsert_opt.go index c8a5048cd94d..e48eb9fcf0e2 100644 --- a/pkg/sql/tablewriter_upsert_opt.go +++ b/pkg/sql/tablewriter_upsert_opt.go @@ -268,11 +268,7 @@ func (tu *optTableUpserter) updateConflictingRow( // via GenerateInsertRow(). // - for the fetched part, we assume that the data in the table is // correct already. - if err := enforceLocalColumnConstraints( - updateValues, - tu.updateCols, - true, /* isUpdate */ - ); err != nil { + if err := enforceLocalColumnConstraints(updateValues, tu.updateCols); err != nil { return err } diff --git a/pkg/sql/update.go b/pkg/sql/update.go index 44a728e48c09..a7f4bb3b0e68 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -278,11 +278,7 @@ func (u *updateNode) processSourceRow(params runParams, sourceVals tree.Datums) // Verify the schema constraints. For consistency with INSERT/UPSERT // and compatibility with PostgreSQL, we must do this before // processing the CHECK constraints. - if err := enforceLocalColumnConstraints( - u.run.updateValues, - u.run.tu.ru.UpdateCols, - true, /* isUpdate */ - ); err != nil { + if err := enforceLocalColumnConstraints(u.run.updateValues, u.run.tu.ru.UpdateCols); err != nil { return err } @@ -416,20 +412,11 @@ func (ss scalarSlot) checkColumnTypes(row []tree.TypedExpr) error { // enforceLocalColumnConstraints asserts the column constraints that do not // require data validation from other sources than the row data itself. This // currently only includes checking for null values in non-nullable columns. -func enforceLocalColumnConstraints(row tree.Datums, cols []catalog.Column, isUpdate bool) error { +func enforceLocalColumnConstraints(row tree.Datums, cols []catalog.Column) error { for i, col := range cols { if !col.IsNullable() && row[i] == tree.DNull { return sqlerrors.NewNonNullViolationError(col.GetName()) } - if isUpdate { - // TODO(mgartner): Remove this once assignment casts are supported - // for UPSERTs and UPDATEs. - outVal, err := tree.AdjustValueToType(col.GetType(), row[i]) - if err != nil { - return err - } - row[i] = outVal - } } return nil } diff --git a/pkg/sql/upsert.go b/pkg/sql/upsert.go index 5d57ff37bb73..24d76fd8491c 100644 --- a/pkg/sql/upsert.go +++ b/pkg/sql/upsert.go @@ -137,11 +137,7 @@ func (n *upsertNode) BatchedNext(params runParams) (bool, error) { // processSourceRow processes one row from the source for upsertion. // The table writer is in charge of accumulating the result rows. func (n *upsertNode) processSourceRow(params runParams, rowVals tree.Datums) error { - if err := enforceLocalColumnConstraints( - rowVals, - n.run.insertCols, - true, /* isUpdate */ - ); err != nil { + if err := enforceLocalColumnConstraints(rowVals, n.run.insertCols); err != nil { return err }