Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
92082: roachtest: Retry failed SCP attempts in the same way as SSH commands. r=smg260 a=smg260

Any error returned is assumed to be retryable (in contrast to SSH where by default we look for an exit code of 255)

Release note: none
Epic: CRDB-21386

92266: parser: delete RESTORE TENANT ... FROM REPLICATION r=stevendanna a=adityamaru

Release note: None

92270: parser: normalize docs link in TestParseDatadriven r=rafiss a=rafiss

fixes #92201

Release note: None

92271: test: update time on test r=maryliag a=maryliag

The test `pkg/sql/scheduledlogging/TestCaptureIndexUsageStats` was flaky on CI, even though it was passing (including on stress) locally.
This commit increases the time buffer used to account for non-determinism in the logging timings.

Fixes #92006

Release note: None

92272: sqlstats: avoid mutex deadlock in Container.SaveToLog r=maryliag a=nvanbenschoten

This commit fixes the locking in `ssmemstorage.Container.SaveToLog` to avoid a deadlock. Since 06f6874, we've been unlocking the incorrect mutex in the method. Luckily, it doesn't look like this code is called by default, because `sql.metrics.statement_details.dump_to_logs` defaults to false. If a user was to change that to true, they would have a bad time.

This code could use a test. I don't plan to add one here because I don't know the code well and only stumbled upon this during an unrelated support escalation, but I encourage others to consider extending the testing.

Release note (bug fix): When configured to true, the `sql.metrics.statement_details.dump_to_logs` cluster setting no longer causes a mutex deadlock.

Epic: None

Co-authored-by: Miral Gadani <[email protected]>
Co-authored-by: adityamaru <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: maryliag <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
6 people committed Nov 21, 2022
6 parents df51702 + d8a697a + ba5f911 + 07397db + 51508e1 + 33299c7 commit e181122
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 119 deletions.
6 changes: 0 additions & 6 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ restore_stmt ::=
| 'RESTORE' backup_targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' 'SYSTEM' 'USERS' 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' 'SYSTEM' 'USERS' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options
| 'RESTORE' backup_targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_tenant_clause

resume_stmt ::=
resume_jobs_stmt
Expand Down Expand Up @@ -728,11 +727,6 @@ backup_targets ::=
| 'TENANT' 'identifier'
| 'DATABASE' name_list

opt_as_tenant_clause ::=
'AS' 'TENANT' iconst64
| 'AS' 'TENANT' 'identifier'
|

resume_jobs_stmt ::=
'RESUME' 'JOB' a_expr
| 'RESUME' 'JOBS' select_stmt
Expand Down
68 changes: 47 additions & 21 deletions pkg/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ var defaultRunRetryOpt = retry.Options{
func runWithMaybeRetry(
l *logger.Logger,
retryOpts retry.Options,
shouldRetryFn func(details *RunResultDetails) bool,
shouldRetryFn func(*RunResultDetails) bool,
f func() (*RunResultDetails, error),
) (*RunResultDetails, error) {
var err error
Expand All @@ -135,11 +135,11 @@ func runWithMaybeRetry(
for r := retry.Start(retryOpts); r.Next(); {
res, err = f()
res.Attempt = r.CurrentAttempt() + 1
// nil err indicates a potentially retryable res.Err
// nil err (denoting a roachprod error) indicates a potentially retryable res.Err
if err == nil && res.Err != nil {
cmdErr = errors.CombineErrors(cmdErr, res.Err)
if shouldRetryFn != nil && shouldRetryFn(res) {
l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, defaultRunRetryOpt.MaxRetries+1)
l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1)
continue
}
}
Expand All @@ -154,8 +154,26 @@ func runWithMaybeRetry(
return res, err
}

func defaultShouldRetry(res *RunResultDetails) bool {
return errors.Is(res.Err, rperrors.ErrSSH255)
// runWithDefaultSSHRetry will retry an SSH command which returns an error with exit code 255
func runWithDefaultSSHRetry(
l *logger.Logger, f func() (*RunResultDetails, error),
) (*RunResultDetails, error) {
return runWithMaybeRetry(
l,
defaultRunRetryOpt,
func(res *RunResultDetails) bool { return errors.Is(res.Err, rperrors.ErrSSH255) },
f,
)
}

// scpWithDefaultRetry assumes that any error returned from an scp attempt is retryable
func scpWithDefaultRetry(l *logger.Logger, src, dest string) (*RunResultDetails, error) {
return runWithMaybeRetry(
l,
defaultRunRetryOpt,
func(*RunResultDetails) bool { return true },
func() (*RunResultDetails, error) { return scp(src, dest) },
)
}

// Host returns the public IP of a node.
Expand Down Expand Up @@ -1222,7 +1240,7 @@ tar cvf %[3]s certs
exit.WithCode(exit.UnspecifiedError())
}

tarfile, cleanup, err := c.getFileFromFirstNode(certsTarName)
tarfile, cleanup, err := c.getFileFromFirstNode(l, certsTarName)
if err != nil {
return err
}
Expand Down Expand Up @@ -1255,7 +1273,7 @@ func (c *SyncedCluster) DistributeTenantCerts(
return err
}

tarfile, cleanup, err := hostCluster.getFileFromFirstNode(tenantCertsTarName)
tarfile, cleanup, err := hostCluster.getFileFromFirstNode(l, tenantCertsTarName)
if err != nil {
return err
}
Expand Down Expand Up @@ -1343,7 +1361,9 @@ func (c *SyncedCluster) cockroachBinSupportsTenantScope(ctx context.Context, nod
// getFile retrieves the given file from the first node in the cluster. The
// filename is assumed to be relative from the home directory of the node's
// user.
func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error) {
func (c *SyncedCluster) getFileFromFirstNode(
l *logger.Logger, name string,
) (string, func(), error) {
var tmpfileName string
cleanup := func() {}
if c.IsLocal() {
Expand All @@ -1359,9 +1379,9 @@ func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error
}

srcFileName := fmt.Sprintf("%s@%s:%s", c.user(1), c.Host(1), name)
if err := c.scp(srcFileName, tmpfile.Name()); err != nil {
if res, _ := scpWithDefaultRetry(l, srcFileName, tmpfile.Name()); res.Err != nil {
cleanup()
return "", nil, err
return "", nil, res.Err
}
tmpfileName = tmpfile.Name()
}
Expand Down Expand Up @@ -1693,10 +1713,10 @@ func (c *SyncedCluster) Put(
return
}

err = c.scp(from, to)
results <- result{i, err}
res, _ := scpWithDefaultRetry(l, from, to)
results <- result{i, res.Err}

if err != nil {
if res.Err != nil {
// The copy failed. Re-add the original source.
pushSource(srcIndex)
} else {
Expand Down Expand Up @@ -2048,8 +2068,8 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err
return
}

err := c.scp(fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest)
if err == nil {
res, _ := scpWithDefaultRetry(l, fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest)
if res.Err == nil {
// Make sure all created files and directories are world readable.
// The CRDB process intentionally sets a 0007 umask (resulting in
// non-world-readable files). This creates annoyances during CI
Expand All @@ -2069,10 +2089,10 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err
}
return nil
}
err = filepath.Walk(dest, chmod)
res.Err = filepath.Walk(dest, chmod)
}

results <- result{i, err}
results <- result{i, res.Err}
}(i)
}

Expand Down Expand Up @@ -2243,7 +2263,10 @@ func (c *SyncedCluster) SSH(ctx context.Context, l *logger.Logger, sshArgs, args
return syscall.Exec(sshPath, allArgs, os.Environ())
}

func (c *SyncedCluster) scp(src, dest string) error {
// scp return type conforms to what runWithMaybeRetry expects. A nil error
// is always returned here since the only error that can happen is an scp error
// which we do want to be able to retry.
func scp(src, dest string) (*RunResultDetails, error) {
args := []string{
"scp", "-r", "-C",
"-o", "StrictHostKeyChecking=no",
Expand All @@ -2253,9 +2276,12 @@ func (c *SyncedCluster) scp(src, dest string) error {
cmd := exec.Command(args[0], args[1:]...)
out, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out)
err = errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out)
}
return nil

res := newRunResultDetails(-1, err)
res.CombinedOut = out
return res, nil
}

// ParallelResult captures the result of a user-defined function
Expand Down Expand Up @@ -2322,7 +2348,7 @@ func (c *SyncedCluster) ParallelE(
startNext := func() {
go func(i int) {
defer wg.Done()
res, err := runWithMaybeRetry(l, defaultRunRetryOpt, defaultShouldRetry, func() (*RunResultDetails, error) { return fn(i) })
res, err := runWithDefaultSSHRetry(l, func() (*RunResultDetails, error) { return fn(i) })
results <- ParallelResult{i, res.CombinedOut, err}
}(index)
index++
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachprod/install/cluster_synced_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestRunWithMaybeRetry(t *testing.T) {
attempt := 0
cases := []struct {
f func() (*RunResultDetails, error)
shouldRetryFn func(res *RunResultDetails) bool
shouldRetryFn func(*RunResultDetails) bool
expectedAttempts int
shouldError bool
}{
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/opaque.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ func init() {
&tree.CreateChangefeed{},
&tree.Import{},
&tree.ScheduledBackup{},
&tree.StreamIngestion{},
&tree.CreateTenantFromReplication{},
} {
typ := optbuilder.OpaqueReadOnly
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"go/constant"
"reflect"
"regexp"
"strings"
"testing"

Expand All @@ -32,6 +33,8 @@ import (
"github.com/stretchr/testify/assert"
)

var issueLinkRE = regexp.MustCompile("https://go.crdb.dev/issue-v/([0-9]+)/.*")

// TestParseDataDriven verifies that we can parse the supplied SQL and regenerate the SQL
// string from the syntax tree.
func TestParseDatadriven(t *testing.T) {
Expand Down Expand Up @@ -98,6 +101,7 @@ func TestParseDatadriven(t *testing.T) {
if pgerr.Hint != "" {
msg += "\nHINT: " + pgerr.Hint
}
msg = issueLinkRE.ReplaceAllString(msg, "https://go.crdb.dev/issue-v/$1/")
return msg
}
d.Fatalf(t, "unsupported command: %s", d.Cmd)
Expand Down
8 changes: 0 additions & 8 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -3556,14 +3556,6 @@ restore_stmt:
Options: *($9.restoreOptions()),
}
}
| RESTORE backup_targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list opt_as_tenant_clause
{
$$.val = &tree.StreamIngestion{
Targets: $2.backupTargetList(),
From: $7.stringOrPlaceholderOptList(),
AsTenant: $8.asTenantClause(),
}
}
| RESTORE error // SHOW HELP: RESTORE

string_or_placeholder_opt_list:
Expand Down
32 changes: 0 additions & 32 deletions pkg/sql/parser/testdata/backup_restore
Original file line number Diff line number Diff line change
Expand Up @@ -713,38 +713,6 @@ RESTORE TENANT 36 FROM (($1), ($2)) WITH tenant = ('5') -- fully parenthesized
RESTORE TENANT _ FROM ($1, $1) WITH tenant = '_' -- literals removed
RESTORE TENANT 36 FROM ($1, $2) WITH tenant = '5' -- identifiers removed

parse
RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS TENANT 321
----
RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS TENANT 321
RESTORE TENANT 123 FROM REPLICATION STREAM FROM ('bar') AS TENANT 321 -- fully parenthesized
RESTORE TENANT _ FROM REPLICATION STREAM FROM '_' AS TENANT _ -- literals removed
RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS TENANT 321 -- identifiers removed

parse
RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1
----
RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1
RESTORE TENANT 123 FROM REPLICATION STREAM FROM ($1) -- fully parenthesized
RESTORE TENANT _ FROM REPLICATION STREAM FROM $1 -- literals removed
RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 -- identifiers removed

parse
RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'
----
RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'
RESTORE TENANT 123 FROM REPLICATION STREAM FROM ('bar') -- fully parenthesized
RESTORE TENANT _ FROM REPLICATION STREAM FROM '_' -- literals removed
RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' -- identifiers removed

parse
RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1
----
RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1
RESTORE TENANT 123 FROM REPLICATION STREAM FROM ($1) -- fully parenthesized
RESTORE TENANT _ FROM REPLICATION STREAM FROM $1 -- literals removed
RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 -- identifiers removed

parse
BACKUP TABLE foo TO 'bar' WITH revision_history, detached
----
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/parser/testdata/create_function
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ DETAIL: source SQL:
CREATE OR REPLACE FUNCTION f(VARIADIC a int = 7) RETURNS INT AS 'SELECT 1' LANGUAGE SQL
^
HINT: You have attempted to use a feature that is not yet implemented.
See: https://go.crdb.dev/issue-v/88947/v23.1
See: https://go.crdb.dev/issue-v/88947/

error
CREATE OR REPLACE FUNCTION f(a int = 7) RETURNS INT TRANSFORM AS 'SELECT 1' LANGUAGE SQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestCaptureIndexUsageStats(t *testing.T) {
stubLoggingDelay := 0 * time.Second

// timeBuffer is a short time buffer to account for non-determinism in the logging timings.
const timeBuffer = 2 * time.Second
const timeBuffer = 3 * time.Second

settings := cluster.MakeTestingClusterSettings()
// Configure capture index usage statistics to be disabled. This is to test
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/sem/tree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ go_library(
"show.go",
"split.go",
"stmt.go",
"stream_ingestion.go",
"survival_goal.go",
"table_name.go",
"table_pattern.go",
Expand Down
13 changes: 0 additions & 13 deletions pkg/sql/sem/tree/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ var _ CCLOnlyStatement = &AlterChangefeed{}
var _ CCLOnlyStatement = &Import{}
var _ CCLOnlyStatement = &Export{}
var _ CCLOnlyStatement = &ScheduledBackup{}
var _ CCLOnlyStatement = &StreamIngestion{}
var _ CCLOnlyStatement = &CreateTenantFromReplication{}

// StatementReturnType implements the Statement interface.
Expand Down Expand Up @@ -1888,17 +1887,6 @@ func (*Split) StatementType() StatementType { return TypeDML }
// StatementTag returns a short string identifying the type of statement.
func (*Split) StatementTag() string { return "SPLIT" }

// StatementReturnType implements the Statement interface.
func (*StreamIngestion) StatementReturnType() StatementReturnType { return Rows }

// StatementType implements the Statement interface.
func (*StreamIngestion) StatementType() StatementType { return TypeDML }

// StatementTag returns a short string identifying the type of statement.
func (*StreamIngestion) StatementTag() string { return "RESTORE FROM REPLICATION STREAM" }

func (*StreamIngestion) cclOnlyStatement() {}

// StatementReturnType implements the Statement interface.
func (*Unsplit) StatementReturnType() StatementReturnType { return Rows }

Expand Down Expand Up @@ -2218,7 +2206,6 @@ func (n *ShowFingerprints) String() string { return AsString(
func (n *ShowDefaultPrivileges) String() string { return AsString(n) }
func (n *ShowCompletions) String() string { return AsString(n) }
func (n *Split) String() string { return AsString(n) }
func (n *StreamIngestion) String() string { return AsString(n) }
func (n *Unsplit) String() string { return AsString(n) }
func (n *Truncate) String() string { return AsString(n) }
func (n *UnionClause) String() string { return AsString(n) }
Expand Down
33 changes: 0 additions & 33 deletions pkg/sql/sem/tree/stream_ingestion.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (s *Container) SaveToLog(ctx context.Context, appName string) {
for key, stats := range s.mu.stmts {
stats.mu.Lock()
json, err := json.Marshal(stats.mu.data)
s.mu.Unlock()
stats.mu.Unlock()
if err != nil {
log.Errorf(ctx, "error while marshaling stats for %q // %q: %v", appName, key.String(), err)
continue
Expand Down

0 comments on commit e181122

Please sign in to comment.