Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
75827: sql: fix connExecutor extraTxnState autoRetryCounter race condition r=rafiss a=RichardJCai

Release note: None

Fixes #75752

75906: sql: remove type width enforcement during execution r=mgartner a=mgartner

Assignment casts are now responsible for ensuring that a value written
to a column has a type and width that match the column type. This commit
removes the logic that performed this validation before assignment casts
existed.

Release note: None

76176: colexec: change Close to take in a context r=yuzefovich a=yuzefovich

Previously, all `Closer`s would use their own context (either captured
in `Init` or derived from the one in `Init`) in the implementation of
`Close` (for example, when they wanted to log something). However, due
to the way the draining of the wrapped row-by-row processors and
closing of `Closer`s is structured (the draining happens first), it was
possible for the captured context to have a tracing span which was
already `Finish`ed. This is so because the row-by-row processors derive
separate tracing spans and finish them automatically during draining
whereas the closure of `Closer`s happens later.

This commit fixes this issue by passing a context as an argument to
`Close` function, and most of the implementations now use that. Only
components that derive their own tracing span are allowed to use their
own context since they control when the span is finished.

Fixes: #76096.

Release note: None

76198: sql: fix nil pointer error in RunPostDeserializationChanges r=RichardJCai a=RichardJCai

In some restore paths, the privilegeDescriptor would be nil in
RunPostDeserializationChanges, avoid doing work on a nil pointer.

Release note: None

Fixes #76063
Fixes #76062
Fixes #76042

76201: dev: fix up logic computing line which should be added to `~/.bazelrc` r=rail a=rickystewart

Have `setUpCache()` return the specific line it wants to see in
`~/.bazelrc`, and have `doctor` check for the presence of that specific
line. Also explicitly specify `http://` and `127.0.0.1` instead of
`localhost`.

Closes #76170.

Release note: None

Co-authored-by: richardjcai <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
5 people committed Feb 7, 2022
6 parents 3651e3c + dc5f8a8 + cc24310 + 36f3597 + ff8e7cf + 5211799 commit 9b46a59
Show file tree
Hide file tree
Showing 58 changed files with 292 additions and 267 deletions.
2 changes: 1 addition & 1 deletion dev
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -euo pipefail

# Bump this counter to force rebuilding `dev` on all machines.
DEV_VERSION=10
DEV_VERSION=11

THIS_DIR=$(cd "$(dirname "$0")" && pwd)
BINARY_DIR=$THIS_DIR/bin/dev-versions
Expand Down
79 changes: 50 additions & 29 deletions pkg/cmd/dev/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,20 @@ func (d *dev) cache(cmd *cobra.Command, _ []string) error {
if err != nil {
log.Printf("%v\n", err)
}
return d.setUpCache(ctx)
bazelRcLine, err := d.setUpCache(ctx)
if bazelRcLine != "" {
fmt.Printf("Please add `%s` to your ~/.bazelrc\n", bazelRcLine)
}
return err
}
if down {
return d.tearDownCache(ctx)
}
return d.setUpCache(ctx)
bazelRcLine, err := d.setUpCache(ctx)
if bazelRcLine != "" {
fmt.Printf("Please add `%s` to your ~/.bazelrc\n", bazelRcLine)
}
return err
}

func bazelRemoteCacheDir() (string, error) {
Expand Down Expand Up @@ -118,33 +126,35 @@ func (d *dev) cacheIsUp(ctx context.Context) bool {
return err == nil
}

func (d *dev) setUpCache(ctx context.Context) error {
// setUpCache returns a non-nil error iff setting up the cache failed, and a
// string which is a line that should be added to ~/.bazelrc.
func (d *dev) setUpCache(ctx context.Context) (string, error) {
if d.cacheIsUp(ctx) {
return nil
return d.getCacheBazelrcLine(ctx)
}

log.Printf("Configuring cache...\n")

if _, err := d.exec.CommandContextSilent(ctx, "bazel", "build", bazelRemoteTarget); err != nil {
return err
return "", err
}
bazelBin, err := d.getBazelBin(ctx)
if err != nil {
return err
return "", err
}

// write config file unless already exists
cacheDir, err := bazelRemoteCacheDir()
if err != nil {
return err
return "", err
}
configFile := filepath.Join(cacheDir, configFilename)
_, err = os.Stat(configFile)
if err != nil {
if os.IsNotExist(err) {
err := d.os.MkdirAll(filepath.Join(cacheDir, "cache"))
if err != nil {
return err
return "", err
}
err = d.os.WriteFile(configFile, fmt.Sprintf(`# File generated by dev. You can edit this file in-place.
# See https://github.com/buchgr/bazel-remote for additional information.
Expand All @@ -155,10 +165,10 @@ host: localhost
port: 9867
`, filepath.Join(cacheDir, "cache")))
if err != nil {
return err
return "", err
}
} else {
return err
return "", err
}
}
log.Printf("Using cache configuration file at %s\n", configFile)
Expand All @@ -168,14 +178,14 @@ port: 9867
// is mostly copied from `bazci`.
output, err := d.exec.CommandContextSilent(ctx, "bazel", "cquery", bazelRemoteTarget, "--output=label_kind")
if err != nil {
return err
return "", err
}
configHash := strings.Fields(string(output))[3]
configHash = strings.TrimPrefix(configHash, "(")
configHash = strings.TrimSuffix(configHash, ")")
output, err = d.exec.CommandContextSilent(ctx, "bazel", "config", configHash)
if err != nil {
return err
return "", err
}
var binDirForBazelRemote string
for _, line := range strings.Split(string(output), "\n") {
Expand All @@ -187,44 +197,35 @@ port: 9867
}
}
if binDirForBazelRemote == "" {
return fmt.Errorf("could not find bazel-remote binary; this is a bug")
return "", fmt.Errorf("could not find bazel-remote binary; this is a bug")
}
bazelRemoteBinary := filepath.Join(binDirForBazelRemote, bazelutil.OutputOfBinaryRule(bazelRemoteTarget, false))
cmd := exec.Command(bazelRemoteBinary, "--config_file", configFile)
stdout, err := os.Create(filepath.Join(cacheDir, "stdout.log"))
if err != nil {
return err
return "", err
}
cmd.Stdout = stdout
stderr, err := os.Create(filepath.Join(cacheDir, "stderr.log"))
if err != nil {
return err
return "", err
}
cmd.Stderr = stderr
err = cmd.Start()
if err != nil {
return err
return "", err
}
pid := cmd.Process.Pid
err = cmd.Process.Release()
if err != nil {
return err
return "", err
}

// We "should" be using a YAML parser for this, but who's going to stop me?
configFileContents, err := d.os.ReadFile(configFile)
err = d.os.WriteFile(filepath.Join(cacheDir, cachePidFilename), strconv.Itoa(pid))
if err != nil {
return err
}
for _, line := range strings.Split(configFileContents, "\n") {
if strings.HasPrefix(line, "port:") {
port := strings.TrimSpace(strings.Split(line, ":")[1])
fmt.Printf("Add the string `--remote_cache=localhost:%s` to your ~/.bazelrc\n", port)
break
}
return "", err
}

return d.os.WriteFile(filepath.Join(cacheDir, cachePidFilename), strconv.Itoa(pid))
return d.getCacheBazelrcLine(ctx)
}

func (d *dev) tearDownCache(ctx context.Context) error {
Expand Down Expand Up @@ -257,3 +258,23 @@ func (d *dev) cleanCache(ctx context.Context) error {
}
return os.RemoveAll(dir)
}

func (d *dev) getCacheBazelrcLine(ctx context.Context) (string, error) {
cacheDir, err := bazelRemoteCacheDir()
if err != nil {
return "", err
}
configFile := filepath.Join(cacheDir, configFilename)
// We "should" be using a YAML parser for this, but who's going to stop me?
configFileContents, err := d.os.ReadFile(configFile)
if err != nil {
return "", err
}
for _, line := range strings.Split(configFileContents, "\n") {
if strings.HasPrefix(line, "port:") {
port := strings.TrimSpace(strings.Split(line, ":")[1])
return fmt.Sprintf("build --remote_cache=http://127.0.0.1:%s", port), nil
}
}
return "", fmt.Errorf("could not determine what to add to ~/.bazelrc to enable cache")
}
7 changes: 4 additions & 3 deletions pkg/cmd/dev/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace)

if !noCache {
d.log.Println("doctor: setting up cache")
err = d.setUpCache(ctx)
bazelRcLine, err := d.setUpCache(ctx)
if err != nil {
return err
}
Expand All @@ -223,8 +223,9 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace)
return err
}
bazelRcContents, err := d.os.ReadFile(filepath.Join(homeDir, ".bazelrc"))
if err != nil || !strings.Contains(bazelRcContents, "--remote_cache=") {
log.Printf("Did you remember to add the --remote_cache=... line to your ~/.bazelrc?")
if err != nil || !strings.Contains(bazelRcContents, bazelRcLine) {
log.Printf("Please add the string `%s` to your ~/.bazelrc:\n", bazelRcLine)
log.Printf(" echo \"%s\" >> ~/.bazelrc", bazelRcLine)
success = false
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/catalog/dbdesc/database_desc_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (ddb *databaseDescriptorBuilder) RunRestoreChanges(
func maybeConvertIncompatibleDBPrivilegesToDefaultPrivileges(
privileges *descpb.PrivilegeDescriptor, defaultPrivileges *descpb.DefaultPrivilegeDescriptor,
) (hasChanged bool) {
// If privileges are nil, there is nothing to convert.
// This case can happen during restore where privileges are not yet created.
if privileges == nil {
return false
}

var pgIncompatibleDBPrivileges = privilege.List{
privilege.SELECT, privilege.INSERT, privilege.UPDATE, privilege.DELETE,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ func benchmarkAggregateFunction(
break
}
}
if err = a.(colexecop.Closer).Close(); err != nil {
if err = a.(colexecop.Closer).Close(ctx); err != nil {
b.Fatal(err)
}
source.Reset(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecagg/default_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ func (a *default_AGGKINDAggAlloc) newAggFunc() AggregateFunc {
return f
}

func (a *default_AGGKINDAggAlloc) Close() error {
func (a *default_AGGKINDAggAlloc) Close(ctx context.Context) error {
for _, fn := range a.returnedFns {
fn.fn.Close(fn.ctx)
fn.fn.Close(ctx)
}
a.returnedFns = nil
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecagg/hash_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var _ execinfra.Releasable = &NewColOperatorResult{}
// TestCleanupNoError releases the resources associated with this result and
// asserts that no error is returned. It should only be used in tests.
func (r *NewColOperatorResult) TestCleanupNoError(t testing.TB) {
require.NoError(t, r.ToClose.Close())
require.NoError(t, r.ToClose.Close(context.Background()))
}

var newColOperatorResultPool = sync.Pool{
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/colexec/colexecjoin/crossjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *crossJoiner) Next() coldata.Batch {
}
willEmit := c.willEmit()
if willEmit == 0 {
if err := c.Close(); err != nil {
if err := c.Close(c.Ctx); err != nil {
colexecerror.InternalError(err)
}
c.done = true
Expand Down Expand Up @@ -462,8 +462,7 @@ func (b *crossJoinerBase) Reset(ctx context.Context) {
b.builderState.numEmittedTotal = 0
}

func (b *crossJoinerBase) Close() error {
ctx := b.initHelper.EnsureCtx()
func (b *crossJoinerBase) Close(ctx context.Context) error {
if b.rightTuples != nil {
return b.rightTuples.Close(ctx)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecjoin/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,20 +889,20 @@ func (o *mergeJoinBase) completeRightBufferedGroup() {
o.finishRightBufferedGroup()
}

func (o *mergeJoinBase) Close() error {
func (o *mergeJoinBase) Close(ctx context.Context) error {
if !o.CloserHelper.Close() {
return nil
}
var lastErr error
for _, op := range []colexecop.Operator{o.left.source, o.right.source} {
if c, ok := op.(colexecop.Closer); ok {
if err := c.Close(); err != nil {
if err := c.Close(ctx); err != nil {
lastErr = err
}
}
}
if h := o.bufferedGroup.helper; h != nil {
if err := h.Close(); err != nil {
if err := h.Close(ctx); err != nil {
lastErr = err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexectestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func RunTestsWithOrderedCols(
// setting, the closing happens at the end of the query execution.
func closeIfCloser(t *testing.T, op colexecop.Operator) {
if c, ok := op.(colexecop.Closer); ok {
if err := c.Close(); err != nil {
if err := c.Close(context.Background()); err != nil {
t.Fatal(err)
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/colexec/colexecwindow/buffered_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ const (
// buffer all tuples from each partition.
type bufferedWindower interface {
Init(ctx context.Context)
Close()
Close(context.Context)

// seekNextPartition is called during the windowSeeking state on the current
// batch. It gives windowers a chance to perform any necessary pre-processing,
Expand Down Expand Up @@ -357,7 +357,7 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
colexecerror.InternalError(
errors.AssertionFailedf("window operator in processing state without buffered rows"))
case windowFinished:
if err = b.Close(); err != nil {
if err = b.Close(b.Ctx); err != nil {
colexecerror.InternalError(err)
}
return coldata.ZeroBatch
Expand All @@ -369,16 +369,16 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
}
}

func (b *bufferedWindowOp) Close() error {
func (b *bufferedWindowOp) Close(ctx context.Context) error {
if !b.CloserHelper.Close() || b.Ctx == nil {
// Either Close() has already been called or Init() was never called. In
// both cases there is nothing to do.
return nil
}
if err := b.bufferQueue.Close(b.EnsureCtx()); err != nil {
if err := b.bufferQueue.Close(ctx); err != nil {
return err
}
b.windower.Close()
b.windower.Close(ctx)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/count_rows_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func (a *countRowsWindowAggregator) Init(ctx context.Context) {
}

// Close implements the bufferedWindower interface.
func (a *countRowsWindowAggregator) Close() {
func (a *countRowsWindowAggregator) Close(ctx context.Context) {
if !a.CloserHelper.Close() {
return
}
a.framer.close()
a.buffer.Close(a.EnsureCtx())
a.buffer.Close(ctx)
}

// processBatch implements the bufferedWindower interface.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ func (b *_OP_NAMEBase) Init(ctx context.Context) {
}

// Close implements the bufferedWindower interface.
func (b *_OP_NAMEBase) Close() {
func (b *_OP_NAMEBase) Close(ctx context.Context) {
if !b.CloserHelper.Close() {
return
}
b.buffer.Close(b.EnsureCtx())
b.buffer.Close(ctx)
}
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/first_value.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9b46a59

Please sign in to comment.