From d8a697aab19b726df9d065ab0e54c4a627cc2bbf Mon Sep 17 00:00:00 2001 From: Miral Gadani Date: Thu, 17 Nov 2022 19:27:37 +0000 Subject: [PATCH 1/5] roachtest: Retry failed SCP attempts in the same way as SSH commands. Any error returned is assumed to be retryable (in contrast to SSH where by default we look for an exit code of 255) Release note: none Epic: CRDB-21386 --- pkg/roachprod/install/cluster_synced.go | 68 ++++++++++++++------ pkg/roachprod/install/cluster_synced_test.go | 2 +- 2 files changed, 48 insertions(+), 22 deletions(-) diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index 930cfade3628..7dd34b8ff453 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -124,7 +124,7 @@ var defaultRunRetryOpt = retry.Options{ func runWithMaybeRetry( l *logger.Logger, retryOpts retry.Options, - shouldRetryFn func(details *RunResultDetails) bool, + shouldRetryFn func(*RunResultDetails) bool, f func() (*RunResultDetails, error), ) (*RunResultDetails, error) { var err error @@ -135,11 +135,11 @@ func runWithMaybeRetry( for r := retry.Start(retryOpts); r.Next(); { res, err = f() res.Attempt = r.CurrentAttempt() + 1 - // nil err indicates a potentially retryable res.Err + // nil err (denoting a roachprod error) indicates a potentially retryable res.Err if err == nil && res.Err != nil { cmdErr = errors.CombineErrors(cmdErr, res.Err) if shouldRetryFn != nil && shouldRetryFn(res) { - l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, defaultRunRetryOpt.MaxRetries+1) + l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1) continue } } @@ -154,8 +154,26 @@ func runWithMaybeRetry( return res, err } -func defaultShouldRetry(res *RunResultDetails) bool { - return errors.Is(res.Err, rperrors.ErrSSH255) +// runWithDefaultSSHRetry will retry an SSH command which returns an error with exit code 255 +func runWithDefaultSSHRetry( + l *logger.Logger, f func() (*RunResultDetails, error), +) (*RunResultDetails, error) { + return runWithMaybeRetry( + l, + defaultRunRetryOpt, + func(res *RunResultDetails) bool { return errors.Is(res.Err, rperrors.ErrSSH255) }, + f, + ) +} + +// scpWithDefaultRetry assumes that any error returned from an scp attempt is retryable +func scpWithDefaultRetry(l *logger.Logger, src, dest string) (*RunResultDetails, error) { + return runWithMaybeRetry( + l, + defaultRunRetryOpt, + func(*RunResultDetails) bool { return true }, + func() (*RunResultDetails, error) { return scp(src, dest) }, + ) } // Host returns the public IP of a node. @@ -1222,7 +1240,7 @@ tar cvf %[3]s certs exit.WithCode(exit.UnspecifiedError()) } - tarfile, cleanup, err := c.getFileFromFirstNode(certsTarName) + tarfile, cleanup, err := c.getFileFromFirstNode(l, certsTarName) if err != nil { return err } @@ -1255,7 +1273,7 @@ func (c *SyncedCluster) DistributeTenantCerts( return err } - tarfile, cleanup, err := hostCluster.getFileFromFirstNode(tenantCertsTarName) + tarfile, cleanup, err := hostCluster.getFileFromFirstNode(l, tenantCertsTarName) if err != nil { return err } @@ -1343,7 +1361,9 @@ func (c *SyncedCluster) cockroachBinSupportsTenantScope(ctx context.Context, nod // getFile retrieves the given file from the first node in the cluster. The // filename is assumed to be relative from the home directory of the node's // user. -func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error) { +func (c *SyncedCluster) getFileFromFirstNode( + l *logger.Logger, name string, +) (string, func(), error) { var tmpfileName string cleanup := func() {} if c.IsLocal() { @@ -1359,9 +1379,9 @@ func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error } srcFileName := fmt.Sprintf("%s@%s:%s", c.user(1), c.Host(1), name) - if err := c.scp(srcFileName, tmpfile.Name()); err != nil { + if res, _ := scpWithDefaultRetry(l, srcFileName, tmpfile.Name()); res.Err != nil { cleanup() - return "", nil, err + return "", nil, res.Err } tmpfileName = tmpfile.Name() } @@ -1693,10 +1713,10 @@ func (c *SyncedCluster) Put( return } - err = c.scp(from, to) - results <- result{i, err} + res, _ := scpWithDefaultRetry(l, from, to) + results <- result{i, res.Err} - if err != nil { + if res.Err != nil { // The copy failed. Re-add the original source. pushSource(srcIndex) } else { @@ -2048,8 +2068,8 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err return } - err := c.scp(fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) - if err == nil { + res, _ := scpWithDefaultRetry(l, fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) + if res.Err == nil { // Make sure all created files and directories are world readable. // The CRDB process intentionally sets a 0007 umask (resulting in // non-world-readable files). This creates annoyances during CI @@ -2069,10 +2089,10 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err } return nil } - err = filepath.Walk(dest, chmod) + res.Err = filepath.Walk(dest, chmod) } - results <- result{i, err} + results <- result{i, res.Err} }(i) } @@ -2243,7 +2263,10 @@ func (c *SyncedCluster) SSH(ctx context.Context, l *logger.Logger, sshArgs, args return syscall.Exec(sshPath, allArgs, os.Environ()) } -func (c *SyncedCluster) scp(src, dest string) error { +// scp return type conforms to what runWithMaybeRetry expects. A nil error +// is always returned here since the only error that can happen is an scp error +// which we do want to be able to retry. +func scp(src, dest string) (*RunResultDetails, error) { args := []string{ "scp", "-r", "-C", "-o", "StrictHostKeyChecking=no", @@ -2253,9 +2276,12 @@ func (c *SyncedCluster) scp(src, dest string) error { cmd := exec.Command(args[0], args[1:]...) out, err := cmd.CombinedOutput() if err != nil { - return errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out) + err = errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out) } - return nil + + res := newRunResultDetails(-1, err) + res.CombinedOut = out + return res, nil } // ParallelResult captures the result of a user-defined function @@ -2322,7 +2348,7 @@ func (c *SyncedCluster) ParallelE( startNext := func() { go func(i int) { defer wg.Done() - res, err := runWithMaybeRetry(l, defaultRunRetryOpt, defaultShouldRetry, func() (*RunResultDetails, error) { return fn(i) }) + res, err := runWithDefaultSSHRetry(l, func() (*RunResultDetails, error) { return fn(i) }) results <- ParallelResult{i, res.CombinedOut, err} }(index) index++ diff --git a/pkg/roachprod/install/cluster_synced_test.go b/pkg/roachprod/install/cluster_synced_test.go index c0490e159ef0..a5e58706a69b 100644 --- a/pkg/roachprod/install/cluster_synced_test.go +++ b/pkg/roachprod/install/cluster_synced_test.go @@ -97,7 +97,7 @@ func TestRunWithMaybeRetry(t *testing.T) { attempt := 0 cases := []struct { f func() (*RunResultDetails, error) - shouldRetryFn func(res *RunResultDetails) bool + shouldRetryFn func(*RunResultDetails) bool expectedAttempts int shouldError bool }{ From ba5f9110c2690f07c1ef217edf39bcfaba467b29 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Fri, 18 Nov 2022 10:49:41 -0500 Subject: [PATCH 2/5] parser: delete RESTORE TENANT ... FROM REPLICATION Release note: None --- docs/generated/sql/bnf/stmt_block.bnf | 6 ----- pkg/sql/opaque.go | 1 - pkg/sql/parser/sql.y | 8 ------- pkg/sql/parser/testdata/backup_restore | 32 ------------------------- pkg/sql/sem/tree/BUILD.bazel | 1 - pkg/sql/sem/tree/stmt.go | 13 ---------- pkg/sql/sem/tree/stream_ingestion.go | 33 -------------------------- 7 files changed, 94 deletions(-) delete mode 100644 pkg/sql/sem/tree/stream_ingestion.go diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index d902bd9374b5..554d55a530c6 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -237,7 +237,6 @@ restore_stmt ::= | 'RESTORE' backup_targets 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options | 'RESTORE' 'SYSTEM' 'USERS' 'FROM' string_or_placeholder 'IN' list_of_string_or_placeholder_opt_list opt_as_of_clause opt_with_restore_options - | 'RESTORE' backup_targets 'FROM' 'REPLICATION' 'STREAM' 'FROM' string_or_placeholder_opt_list opt_as_tenant_clause resume_stmt ::= resume_jobs_stmt @@ -728,11 +727,6 @@ backup_targets ::= | 'TENANT' 'identifier' | 'DATABASE' name_list -opt_as_tenant_clause ::= - 'AS' 'TENANT' iconst64 - | 'AS' 'TENANT' 'identifier' - | - resume_jobs_stmt ::= 'RESUME' 'JOB' a_expr | 'RESUME' 'JOBS' select_stmt diff --git a/pkg/sql/opaque.go b/pkg/sql/opaque.go index 6ed538ac7c39..6b936f7df72d 100644 --- a/pkg/sql/opaque.go +++ b/pkg/sql/opaque.go @@ -391,7 +391,6 @@ func init() { &tree.CreateChangefeed{}, &tree.Import{}, &tree.ScheduledBackup{}, - &tree.StreamIngestion{}, &tree.CreateTenantFromReplication{}, } { typ := optbuilder.OpaqueReadOnly diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index a1ca1212ac27..507b76550dc7 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -3556,14 +3556,6 @@ restore_stmt: Options: *($9.restoreOptions()), } } -| RESTORE backup_targets FROM REPLICATION STREAM FROM string_or_placeholder_opt_list opt_as_tenant_clause - { - $$.val = &tree.StreamIngestion{ - Targets: $2.backupTargetList(), - From: $7.stringOrPlaceholderOptList(), - AsTenant: $8.asTenantClause(), - } - } | RESTORE error // SHOW HELP: RESTORE string_or_placeholder_opt_list: diff --git a/pkg/sql/parser/testdata/backup_restore b/pkg/sql/parser/testdata/backup_restore index 021261b7c51d..f346766b81ab 100644 --- a/pkg/sql/parser/testdata/backup_restore +++ b/pkg/sql/parser/testdata/backup_restore @@ -713,38 +713,6 @@ RESTORE TENANT 36 FROM (($1), ($2)) WITH tenant = ('5') -- fully parenthesized RESTORE TENANT _ FROM ($1, $1) WITH tenant = '_' -- literals removed RESTORE TENANT 36 FROM ($1, $2) WITH tenant = '5' -- identifiers removed -parse -RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS TENANT 321 ----- -RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS TENANT 321 -RESTORE TENANT 123 FROM REPLICATION STREAM FROM ('bar') AS TENANT 321 -- fully parenthesized -RESTORE TENANT _ FROM REPLICATION STREAM FROM '_' AS TENANT _ -- literals removed -RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' AS TENANT 321 -- identifiers removed - -parse -RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 ----- -RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 -RESTORE TENANT 123 FROM REPLICATION STREAM FROM ($1) -- fully parenthesized -RESTORE TENANT _ FROM REPLICATION STREAM FROM $1 -- literals removed -RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 -- identifiers removed - -parse -RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' ----- -RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' -RESTORE TENANT 123 FROM REPLICATION STREAM FROM ('bar') -- fully parenthesized -RESTORE TENANT _ FROM REPLICATION STREAM FROM '_' -- literals removed -RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar' -- identifiers removed - -parse -RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 ----- -RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 -RESTORE TENANT 123 FROM REPLICATION STREAM FROM ($1) -- fully parenthesized -RESTORE TENANT _ FROM REPLICATION STREAM FROM $1 -- literals removed -RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1 -- identifiers removed - parse BACKUP TABLE foo TO 'bar' WITH revision_history, detached ---- diff --git a/pkg/sql/sem/tree/BUILD.bazel b/pkg/sql/sem/tree/BUILD.bazel index a86b564cfb87..f848d6a70f15 100644 --- a/pkg/sql/sem/tree/BUILD.bazel +++ b/pkg/sql/sem/tree/BUILD.bazel @@ -88,7 +88,6 @@ go_library( "show.go", "split.go", "stmt.go", - "stream_ingestion.go", "survival_goal.go", "table_name.go", "table_pattern.go", diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 99b38bbde15d..efc69a5597f7 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -180,7 +180,6 @@ var _ CCLOnlyStatement = &AlterChangefeed{} var _ CCLOnlyStatement = &Import{} var _ CCLOnlyStatement = &Export{} var _ CCLOnlyStatement = &ScheduledBackup{} -var _ CCLOnlyStatement = &StreamIngestion{} var _ CCLOnlyStatement = &CreateTenantFromReplication{} // StatementReturnType implements the Statement interface. @@ -1888,17 +1887,6 @@ func (*Split) StatementType() StatementType { return TypeDML } // StatementTag returns a short string identifying the type of statement. func (*Split) StatementTag() string { return "SPLIT" } -// StatementReturnType implements the Statement interface. -func (*StreamIngestion) StatementReturnType() StatementReturnType { return Rows } - -// StatementType implements the Statement interface. -func (*StreamIngestion) StatementType() StatementType { return TypeDML } - -// StatementTag returns a short string identifying the type of statement. -func (*StreamIngestion) StatementTag() string { return "RESTORE FROM REPLICATION STREAM" } - -func (*StreamIngestion) cclOnlyStatement() {} - // StatementReturnType implements the Statement interface. func (*Unsplit) StatementReturnType() StatementReturnType { return Rows } @@ -2218,7 +2206,6 @@ func (n *ShowFingerprints) String() string { return AsString( func (n *ShowDefaultPrivileges) String() string { return AsString(n) } func (n *ShowCompletions) String() string { return AsString(n) } func (n *Split) String() string { return AsString(n) } -func (n *StreamIngestion) String() string { return AsString(n) } func (n *Unsplit) String() string { return AsString(n) } func (n *Truncate) String() string { return AsString(n) } func (n *UnionClause) String() string { return AsString(n) } diff --git a/pkg/sql/sem/tree/stream_ingestion.go b/pkg/sql/sem/tree/stream_ingestion.go deleted file mode 100644 index 475ac7c58baa..000000000000 --- a/pkg/sql/sem/tree/stream_ingestion.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package tree - -// StreamIngestion represents a RESTORE FROM REPLICATION STREAM statement. -type StreamIngestion struct { - Targets BackupTargetList - From StringOrPlaceholderOptList - AsTenant TenantID -} - -var _ Statement = &StreamIngestion{} - -// Format implements the NodeFormatter interface. -func (node *StreamIngestion) Format(ctx *FmtCtx) { - ctx.WriteString("RESTORE ") - ctx.FormatNode(&node.Targets) - ctx.WriteString(" ") - ctx.WriteString("FROM REPLICATION STREAM FROM ") - ctx.FormatNode(&node.From) - if node.AsTenant.Specified { - ctx.WriteString(" AS TENANT ") - ctx.FormatNode(&node.AsTenant) - } -} From 51508e10d7963e099a99ca608babd4c3e8658eb3 Mon Sep 17 00:00:00 2001 From: maryliag Date: Mon, 21 Nov 2022 11:39:16 -0500 Subject: [PATCH 3/5] test: update time on test The test `pkg/sql/scheduledlogging/TestCaptureIndexUsageStats` was flaky on CI, even though it was passing (including on stress) locally. This commit increases the time buffer used to account for non-determinism in the logging timings. Fixes #92006 Release note: None --- pkg/sql/scheduledlogging/captured_index_usage_stats_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go b/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go index 950864abbd92..7c452c45f759 100644 --- a/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go +++ b/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go @@ -79,7 +79,7 @@ func TestCaptureIndexUsageStats(t *testing.T) { stubLoggingDelay := 0 * time.Second // timeBuffer is a short time buffer to account for non-determinism in the logging timings. - const timeBuffer = 2 * time.Second + const timeBuffer = 3 * time.Second settings := cluster.MakeTestingClusterSettings() // Configure capture index usage statistics to be disabled. This is to test From 07397db9abf0a9d107d45b31e3abb0b0088ec8a4 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Mon, 21 Nov 2022 11:41:26 -0500 Subject: [PATCH 4/5] parser: normalize docs link in TestParseDatadriven Release note: None --- pkg/sql/parser/parse_test.go | 4 ++++ pkg/sql/parser/testdata/create_function | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index bf4186553cc2..6ed2678b7d4e 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -15,6 +15,7 @@ import ( "fmt" "go/constant" "reflect" + "regexp" "strings" "testing" @@ -32,6 +33,8 @@ import ( "github.com/stretchr/testify/assert" ) +var issueLinkRE = regexp.MustCompile("https://go.crdb.dev/issue-v/([0-9]+)/.*") + // TestParseDataDriven verifies that we can parse the supplied SQL and regenerate the SQL // string from the syntax tree. func TestParseDatadriven(t *testing.T) { @@ -98,6 +101,7 @@ func TestParseDatadriven(t *testing.T) { if pgerr.Hint != "" { msg += "\nHINT: " + pgerr.Hint } + msg = issueLinkRE.ReplaceAllString(msg, "https://go.crdb.dev/issue-v/$1/") return msg } d.Fatalf(t, "unsupported command: %s", d.Cmd) diff --git a/pkg/sql/parser/testdata/create_function b/pkg/sql/parser/testdata/create_function index 2fb200538102..298181be6d62 100644 --- a/pkg/sql/parser/testdata/create_function +++ b/pkg/sql/parser/testdata/create_function @@ -263,7 +263,7 @@ DETAIL: source SQL: CREATE OR REPLACE FUNCTION f(VARIADIC a int = 7) RETURNS INT AS 'SELECT 1' LANGUAGE SQL ^ HINT: You have attempted to use a feature that is not yet implemented. -See: https://go.crdb.dev/issue-v/88947/v23.1 +See: https://go.crdb.dev/issue-v/88947/ error CREATE OR REPLACE FUNCTION f(a int = 7) RETURNS INT TRANSFORM AS 'SELECT 1' LANGUAGE SQL From 33299c747da6181dedcb895c68935d99846e1365 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 21 Nov 2022 11:37:31 -0500 Subject: [PATCH 5/5] sqlstats: avoid mutex deadlock in Container.SaveToLog This commit fixes the locking in `ssmemstorage.Container.SaveToLog` to avoid a deadlock. Since 06f6874, we've been unlocking the incorrect mutex in the method. Luckily, it doesn't look like this code is called by default, because `sql.metrics.statement_details.dump_to_logs` defaults to false. If a user was to change that to true, they would have a bad time. This code could use a test. I don't plan to add one here because I don't know the code well and only stumbled upon this during an unrelated support escalation, but I encourage others to consider extending the testing. Release note (bug fix): When configured to true, the `sql.metrics.statement_details.dump_to_logs` cluster setting no longer causes a mutex deadlock. Epic: None --- pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index f6e26f28f1e8..71adc39f744e 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -615,7 +615,7 @@ func (s *Container) SaveToLog(ctx context.Context, appName string) { for key, stats := range s.mu.stmts { stats.mu.Lock() json, err := json.Marshal(stats.mu.data) - s.mu.Unlock() + stats.mu.Unlock() if err != nil { log.Errorf(ctx, "error while marshaling stats for %q // %q: %v", appName, key.String(), err) continue