diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 57acf08188ae..74ad19543be1 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -93,6 +93,7 @@ go_library( "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", + "//pkg/util/retry", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 0514914288a3..79e998bbbfeb 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/build" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -38,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/cloudimpl" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -439,23 +441,53 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { } statsCache := p.ExecCfg().TableStatsCache - res, err := backup( - ctx, - p, - details.URI, - details.URIsByLocalityKV, - p.ExecCfg().DB, - p.ExecCfg().Settings, - defaultStore, - storageByLocalityKV, - b.job, - backupManifest, - p.ExecCfg().DistSQLSrv.ExternalStorage, - details.EncryptionOptions, - statsCache, - ) + // We retry on pretty generic failures -- any rpc error. If a worker node were + // to restart, it would produce this kind of error, but there may be other + // errors that are also rpc errors. Don't retry to aggressively. + retryOpts := retry.Options{ + MaxBackoff: 1 * time.Second, + MaxRetries: 5, + } + + // We want to retry a backup if there are transient failures (i.e. worker nodes + // dying), so if we receive a retryable error, re-plan and retry the backup. + var res RowCount + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + res, err = backup( + ctx, + p, + details.URI, + details.URIsByLocalityKV, + p.ExecCfg().DB, + p.ExecCfg().Settings, + defaultStore, + storageByLocalityKV, + b.job, + backupManifest, + p.ExecCfg().DistSQLSrv.ExternalStorage, + details.EncryptionOptions, + statsCache, + ) + if err == nil { + break + } + + if !utilccl.IsDistSQLRetryableError(err) { + return errors.Wrap(err, "failed to run backup") + } + + log.Warningf(ctx, `BACKUP job encountered retryable error: %+v`, err) + + // Reload the backup manifest to pick up any spans we may have completed on + // previous attempts. + var reloadBackupErr error + backupManifest, reloadBackupErr = b.readManifestOnResume(ctx, p.ExecCfg(), defaultStore, details) + if reloadBackupErr != nil { + log.Warning(ctx, "could not reload backup manifest when retrying, continuing with old progress") + } + } if err != nil { - return errors.Wrap(err, "failed to run backup") + return errors.Wrap(err, "exhausted retries") } b.deleteCheckpoint(ctx, p.ExecCfg(), p.User()) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 8b939307295e..5fd731f48d84 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -13,9 +13,11 @@ import ( "context" "fmt" "math" + "time" "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -48,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -500,6 +503,56 @@ func rewriteBackupSpanKey( return newKey, nil } +func restoreWithRetry( + restoreCtx context.Context, + execCtx sql.JobExecContext, + backupManifests []BackupManifest, + backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, + endTime hlc.Timestamp, + dataToRestore restorationData, + job *jobs.Job, + encryption *jobspb.BackupEncryptionOptions, +) (RowCount, error) { + // We retry on pretty generic failures -- any rpc error. If a worker node were + // to restart, it would produce this kind of error, but there may be other + // errors that are also rpc errors. Don't retry to aggressively. + retryOpts := retry.Options{ + MaxBackoff: 1 * time.Second, + MaxRetries: 5, + } + + // We want to retry a restore if there are transient failures (i.e. worker nodes + // dying), so if we receive a retryable error, re-plan and retry the backup. + var res RowCount + var err error + for r := retry.StartWithCtx(restoreCtx, retryOpts); r.Next(); { + res, err = restore( + restoreCtx, + execCtx, + backupManifests, + backupLocalityInfo, + endTime, + dataToRestore, + job, + encryption, + ) + if err == nil { + break + } + + if !utilccl.IsDistSQLRetryableError(err) { + return RowCount{}, err + } + + log.Warningf(restoreCtx, `encountered retryable error: %+v`, err) + } + + if err != nil { + return RowCount{}, errors.Wrap(err, "exhausted retries") + } + return res, nil +} + // restore imports a SQL table (or tables) from sets of non-overlapping sstable // files. func restore( @@ -1463,7 +1516,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error var resTotal RowCount if !preData.isEmpty() { - res, err := restore( + res, err := restoreWithRetry( ctx, p, backupManifests, @@ -1497,7 +1550,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error { // Restore the main data bundle. We notably only restore the system tables // later. - res, err := restore( + res, err := restoreWithRetry( ctx, p, backupManifests, diff --git a/pkg/ccl/changefeedccl/errors.go b/pkg/ccl/changefeedccl/errors.go index bacde502f257..1bf7eac1b987 100644 --- a/pkg/ccl/changefeedccl/errors.go +++ b/pkg/ccl/changefeedccl/errors.go @@ -13,6 +13,7 @@ import ( "reflect" "strings" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/errors" ) @@ -60,13 +61,8 @@ func IsRetryableError(err error) bool { // unfortunate string comparison. return true } - if strings.Contains(errStr, `rpc error`) { - // When a crdb node dies, any DistSQL flows with processors scheduled on - // it get an error with "rpc error" in the message from the call to - // `(*DistSQLPlanner).Run`. - return true - } - return false + + return utilccl.IsDistSQLRetryableError(err) } // MaybeStripRetryableErrorMarker performs some minimal attempt to clean the diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 55707bc80ba4..7210bbf794e0 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -19,6 +19,7 @@ import ( "sort" "strconv" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" @@ -1998,11 +1999,12 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { } } - res, err := sql.DistIngest(ctx, p, r.job, tables, files, format, details.Walltime, + res, err := ingestWithRetry(ctx, p, r.job, tables, files, format, details.Walltime, r.testingKnobs.alwaysFlushJobProgress) if err != nil { return err } + pkIDs := make(map[uint64]struct{}, len(details.Tables)) for _, t := range details.Tables { pkIDs[roachpb.BulkOpSummaryID(uint64(t.Desc.ID), uint64(t.Desc.PrimaryIndex.ID))] = struct{}{} @@ -2060,6 +2062,48 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { return nil } +func ingestWithRetry( + ctx context.Context, + execCtx sql.JobExecContext, + job *jobs.Job, + tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable, + from []string, + format roachpb.IOFileFormat, + walltime int64, + alwaysFlushProgress bool, +) (roachpb.BulkOpSummary, error) { + + // We retry on pretty generic failures -- any rpc error. If a worker node were + // to restart, it would produce this kind of error, but there may be other + // errors that are also rpc errors. Don't retry to aggressively. + retryOpts := retry.Options{ + MaxBackoff: 1 * time.Second, + MaxRetries: 5, + } + + // We want to retry a restore if there are transient failures (i.e. worker nodes + // dying), so if we receive a retryable error, re-plan and retry the backup. + var res roachpb.BulkOpSummary + var err error + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + res, err = sql.DistIngest(ctx, execCtx, job, tables, from, format, walltime, alwaysFlushProgress) + if err == nil { + break + } + + if !utilccl.IsDistSQLRetryableError(err) { + return roachpb.BulkOpSummary{}, err + } + + log.Warningf(ctx, `encountered retryable error: %+v`, err) + } + + if err != nil { + return roachpb.BulkOpSummary{}, errors.Wrap(err, "exhausted retries") + } + return res, nil +} + func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.ExecutorConfig) error { details := r.job.Details().(jobspb.ImportDetails) // Schemas should only be published once. diff --git a/pkg/ccl/utilccl/BUILD.bazel b/pkg/ccl/utilccl/BUILD.bazel index 5947e49a4f4c..f977d1ef5d3f 100644 --- a/pkg/ccl/utilccl/BUILD.bazel +++ b/pkg/ccl/utilccl/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "utilccl", srcs = [ + "errors.go", "jobutils.go", "license_check.go", ], diff --git a/pkg/ccl/utilccl/errors.go b/pkg/ccl/utilccl/errors.go new file mode 100644 index 000000000000..46ad5d387991 --- /dev/null +++ b/pkg/ccl/utilccl/errors.go @@ -0,0 +1,30 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package utilccl + +import "strings" + +// IsDistSQLRetryableError returns true if the supplied error, or any of its parent +// causes is an rpc error. +// This is an unfortunate implementation that should be looking for a more +// specific error. +func IsDistSQLRetryableError(err error) bool { + if err == nil { + return false + } + + // TODO(knz): this is a bad implementation. Make it go away + // by avoiding string comparisons. + + errStr := err.Error() + // When a crdb node dies, any DistSQL flows with processors scheduled on + // it get an error with "rpc error" in the message from the call to + // `(*DistSQLPlanner).Run`. + return strings.Contains(errStr, `rpc error`) +} diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index 62a25309cf4d..ca0facdc3dc0 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "inverted_index.go", "java_helpers.go", "jepsen.go", + "jobs.go", "kv.go", "kvbench.go", "ledger.go", diff --git a/pkg/cmd/roachtest/backup.go b/pkg/cmd/roachtest/backup.go index e3a073df538a..b7765563ec55 100644 --- a/pkg/cmd/roachtest/backup.go +++ b/pkg/cmd/roachtest/backup.go @@ -32,37 +32,110 @@ const ( KMSRegionBEnvVar = "AWS_KMS_REGION_B" KMSKeyARNAEnvVar = "AWS_KMS_KEY_ARN_A" KMSKeyARNBEnvVar = "AWS_KMS_KEY_ARN_B" + + // rows2TiB is the number of rows to import to load 2TB of data (when + // replicated). + rows2TiB = 65_104_166 + rows100GiB = rows2TiB / 20 + rows30GiB = rows2TiB / 66 + rows15GiB = rows30GiB / 2 + rows5GiB = rows100GiB / 20 + rows3GiB = rows30GiB / 10 ) -func registerBackup(r *testRegistry) { - importBankData := func(ctx context.Context, rows int, t *test, c *cluster) string { - dest := c.name - // Randomize starting with encryption-at-rest enabled. - c.encryptAtRandom = true +func importBankDataSplit(ctx context.Context, rows, ranges int, t *test, c *cluster) string { + dest := c.name + // Randomize starting with encryption-at-rest enabled. + c.encryptAtRandom = true + + if local { + dest += fmt.Sprintf("%d", timeutil.Now().UnixNano()) + } + + c.Put(ctx, workload, "./workload") + c.Put(ctx, cockroach, "./cockroach") + + // NB: starting the cluster creates the logs dir as a side effect, + // needed below. + c.Start(ctx, t) + c.Run(ctx, c.All(), `./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`) + time.Sleep(time.Second) // wait for csv server to open listener + + importArgs := []string{ + "./workload", "fixtures", "import", "bank", + "--db=bank", "--payload-bytes=10240", fmt.Sprintf("--ranges=%d", ranges), "--csv-server", "http://localhost:8081", + fmt.Sprintf("--rows=%d", rows), "--seed=1", "{pgurl:1}", + } + c.Run(ctx, c.Node(1), importArgs...) + + return dest +} + +func importBankData(ctx context.Context, rows int, t *test, c *cluster) string { + return importBankDataSplit(ctx, rows, 0 /* ranges */, t, c) +} +func registerBackupNodeShutdown(r *testRegistry) { + // backupNodeRestartSpec runs a backup and randomly shuts down a node during + // the backup. + backupNodeRestartSpec := makeClusterSpec(4) + loadBackupData := func(ctx context.Context, t *test, c *cluster) string { + // This aught to be enough since this isn't a performance test. + rows := rows15GiB if local { - rows = 100 - dest += fmt.Sprintf("%d", timeutil.Now().UnixNano()) + // Needs to be sufficiently large to give each processor a good chunk of + // works so the job doesn't complete immediately. + rows = rows5GiB } + return importBankData(ctx, rows, t, c) + } + + r.Add(testSpec{ + Name: fmt.Sprintf("backup/nodeShutdown/worker/%s", backupNodeRestartSpec), + Owner: OwnerBulkIO, + Cluster: backupNodeRestartSpec, + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 3 + dest := loadBackupData(ctx, t, c) + backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED` + startBackup := func(c *cluster) (jobID string, err error) { + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID) + return + } - c.Put(ctx, workload, "./workload") - c.Put(ctx, cockroach, "./cockroach") + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startBackup) + }, + }) + r.Add(testSpec{ + Name: fmt.Sprintf("backup/nodeShutdown/coordinator/%s", backupNodeRestartSpec), + Owner: OwnerBulkIO, + Cluster: backupNodeRestartSpec, + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 2 + dest := loadBackupData(ctx, t, c) + backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED` + startBackup := func(c *cluster) (jobID string, err error) { + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID) + return + } - // NB: starting the cluster creates the logs dir as a side effect, - // needed below. - c.Start(ctx, t) - c.Run(ctx, c.All(), `./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`) - time.Sleep(time.Second) // wait for csv server to open listener + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startBackup) + }, + }) - importArgs := []string{ - "./workload", "fixtures", "import", "bank", - "--db=bank", "--payload-bytes=10240", "--ranges=0", "--csv-server", "http://localhost:8081", - fmt.Sprintf("--rows=%d", rows), "--seed=1", "{pgurl:1}", - } - c.Run(ctx, c.Node(1), importArgs...) +} - return dest - } +func registerBackup(r *testRegistry) { backup2TBSpec := makeClusterSpec(10) r.Add(testSpec{ @@ -71,7 +144,10 @@ func registerBackup(r *testRegistry) { Cluster: backup2TBSpec, MinVersion: "v2.1.0", Run: func(ctx context.Context, t *test, c *cluster) { - rows := 65104166 + rows := rows2TiB + if local { + rows = 100 + } dest := importBankData(ctx, rows, t, c) m := newMonitor(ctx, c) m.Go(func(ctx context.Context) error { @@ -96,7 +172,10 @@ func registerBackup(r *testRegistry) { } // ~10GiB - which is 30Gib replicated. - rows := 976562 + rows := rows30GiB + if local { + rows = 100 + } dest := importBankData(ctx, rows, t, c) conn := c.Conn(ctx, 1) diff --git a/pkg/cmd/roachtest/import.go b/pkg/cmd/roachtest/import.go index a06ab34b4d68..9c673109051d 100644 --- a/pkg/cmd/roachtest/import.go +++ b/pkg/cmd/roachtest/import.go @@ -20,6 +20,65 @@ import ( "github.com/cockroachdb/errors" ) +func registerImportNodeShutdown(r *testRegistry) { + getImportRunner := func(ctx context.Context, gatewayNode int) jobStarter { + startImport := func(c *cluster) (jobID string, err error) { + importStmt := ` + IMPORT TABLE partsupp + CREATE USING 'gs://cockroach-fixtures/tpch-csv/schema/partcupp.sql' + CSV DATA ( + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.1', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.2', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.3', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.4', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.5', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.6', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.7', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.8' + ) WITH delimiter='|', detached + ` + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + err = gatewayDB.QueryRowContext(ctx, importStmt).Scan(&jobID) + return + } + + return startImport + } + + r.Add(testSpec{ + Name: "import/nodeShutdown/worker", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + gatewayNode := 2 + nodeToShutdown := 3 + startImport := getImportRunner(ctx, gatewayNode) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startImport) + }, + }) + r.Add(testSpec{ + Name: "import/nodeShutdown/coordinator", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + gatewayNode := 2 + nodeToShutdown := 2 + startImport := getImportRunner(ctx, gatewayNode) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startImport) + }, + }) +} + func registerImportTPCC(r *testRegistry) { runImportTPCC := func(ctx context.Context, t *test, c *cluster, warehouses int) { // Randomize starting with encryption-at-rest enabled. diff --git a/pkg/cmd/roachtest/jobs.go b/pkg/cmd/roachtest/jobs.go new file mode 100644 index 000000000000..623a09b6f1b6 --- /dev/null +++ b/pkg/cmd/roachtest/jobs.go @@ -0,0 +1,142 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package main + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +type jobStarter func(c *cluster) (string, error) + +// jobSurvivesNodeShutdown is a helper that tests that a given job, +// running on the specified gatewayNode will still complete successfully +// if nodeToShutdown is shutdown partway through execution. +// This helper assumes: +// - That the job is long running and will take a least a minute to complete. +// - That the necessary setup is done (e.g. any data that the job relies on is +// already loaded) so that `query` can be run on its own to kick off the job. +// - That the statement running the job is a detached statement, and does not +// block until the job completes. +func jobSurvivesNodeShutdown( + ctx context.Context, t *test, c *cluster, nodeToShutdown int, startJob jobStarter, +) { + watcherNode := 1 + (nodeToShutdown)%c.spec.NodeCount + target := c.Node(nodeToShutdown) + t.l.Printf("test has chosen shutdown target node %d, and watcher node %d", + nodeToShutdown, watcherNode) + + jobIDCh := make(chan string, 1) + + m := newMonitor(ctx, c) + m.Go(func(ctx context.Context) error { + defer close(jobIDCh) + t.Status(`running job`) + var jobID string + jobID, err := startJob(c) + if err != nil { + return errors.Wrap(err, "starting the job") + } + t.l.Printf("started running job with ID %s", jobID) + jobIDCh <- jobID + + pollInterval := 5 * time.Second + ticker := time.NewTicker(pollInterval) + + watcherDB := c.Conn(ctx, watcherNode) + defer watcherDB.Close() + + var status string + for { + select { + case <-ticker.C: + err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status) + if err != nil { + return errors.Wrap(err, "getting the job status") + } + jobStatus := jobs.Status(status) + switch jobStatus { + case jobs.StatusSucceeded: + t.Status("job completed") + return nil + case jobs.StatusRunning: + t.l.Printf("job %s still running, waiting to succeed", jobID) + default: + // Waiting for job to complete. + return errors.Newf("unexpectedly found job %s in state %s", jobID, status) + } + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context canceled while waiting for job to finish") + } + } + }) + + m.Go(func(ctx context.Context) error { + jobID, ok := <-jobIDCh + if !ok { + return errors.New("job never created") + } + + // Shutdown a node after a bit, and keep it shutdown for the remainder + // of the job. + timeToWait := 10 * time.Second + timer := timeutil.Timer{} + timer.Reset(timeToWait) + select { + case <-ctx.Done(): + return errors.Wrapf(ctx.Err(), "stopping test, did not shutdown node") + case <-timer.C: + timer.Read = true + } + + // Sanity check that the job is still running. + watcherDB := c.Conn(ctx, watcherNode) + defer watcherDB.Close() + + var status string + err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status) + if err != nil { + return errors.Wrap(err, "getting the job status") + } + jobStatus := jobs.Status(status) + if jobStatus != jobs.StatusRunning { + return errors.Newf("job too fast! job got to state %s before the target node could be shutdown", + status) + } + + t.l.Printf(`stopping node %s`, target) + if err := c.StopE(ctx, target); err != nil { + return errors.Wrapf(err, "could not stop node %s", target) + } + t.l.Printf("stopped node %s", target) + + return nil + }) + + // Before calling `m.Wait()`, do some cleanup. + if err := m.g.Wait(); err != nil { + t.Fatal(err) + } + + // NB: the roachtest harness checks that at the end of the test, all nodes + // that have data also have a running process. + t.Status(fmt.Sprintf("restarting %s (node restart test is done)\n", target)) + if err := c.StartE(ctx, target); err != nil { + t.Fatal(errors.Wrapf(err, "could not restart node %s", target)) + } + + m.Wait() +} diff --git a/pkg/cmd/roachtest/registry.go b/pkg/cmd/roachtest/registry.go index 72cba49d2e7a..434b71fa61eb 100644 --- a/pkg/cmd/roachtest/registry.go +++ b/pkg/cmd/roachtest/registry.go @@ -21,6 +21,7 @@ func registerTests(r *testRegistry) { registerAlterPK(r) registerAutoUpgrade(r) registerBackup(r) + registerBackupNodeShutdown(r) registerCancel(r) registerCDC(r) registerClearRange(r) @@ -46,6 +47,7 @@ func registerTests(r *testRegistry) { registerImportMixedVersion(r) registerImportTPCC(r) registerImportTPCH(r) + registerImportNodeShutdown(r) registerInconsistency(r) registerIndexes(r) registerInterleaved(r) @@ -73,6 +75,7 @@ func registerTests(r *testRegistry) { registerRebalanceLoad(r) registerReplicaGC(r) registerRestart(r) + registerRestoreNodeShutdown(r) registerRestore(r) registerRoachmart(r) registerScaleData(r) diff --git a/pkg/cmd/roachtest/restore.go b/pkg/cmd/roachtest/restore.go index d3957509e7ff..32dbb4786c07 100644 --- a/pkg/cmd/roachtest/restore.go +++ b/pkg/cmd/roachtest/restore.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -208,6 +209,101 @@ func (dul *DiskUsageLogger) Runner(ctx context.Context) error { logger.Printf("%s\n", strings.Join(s, ", ")) } } +func registerRestoreNodeShutdown(r *testRegistry) { + makeRestoreStarter := func(ctx context.Context, t *test, c *cluster, gatewayNode int) jobStarter { + return func(c *cluster) (string, error) { + t.l.Printf("connecting to gateway") + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + t.l.Printf("creating bank database") + if _, err := gatewayDB.Exec("CREATE DATABASE bank"); err != nil { + return "", err + } + + errCh := make(chan error, 1) + go func() { + defer close(errCh) + + // 10 GiB restore. + restoreQuery := `RESTORE bank.bank FROM + 'gs://cockroach-fixtures/workload/bank/version=1.0.0,payload-bytes=100,ranges=10,rows=10000000,seed=1/bank'` + + t.l.Printf("starting to run the restore job") + if _, err := gatewayDB.Exec(restoreQuery); err != nil { + errCh <- err + } + t.l.Printf("done running restore job") + }() + + // Wait for the job. + retryOpts := retry.Options{ + MaxRetries: 50, + InitialBackoff: 1 * time.Second, + MaxBackoff: 5 * time.Second, + } + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + var jobCount int + if err := gatewayDB.QueryRowContext(ctx, "SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE'").Scan(&jobCount); err != nil { + return "", err + } + + select { + case err := <-errCh: + // We got an error when starting the job. + return "", err + default: + } + + if jobCount == 0 { + t.l.Printf("waiting for restore job") + } else if jobCount == 1 { + t.l.Printf("found restore job") + break + } else { + t.l.Printf("found multiple restore jobs -- erroring") + return "", errors.New("unexpectedly found multiple restore jobs") + } + } + + var jobID string + if err := gatewayDB.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'RESTORE'").Scan(&jobID); err != nil { + return "", errors.Wrap(err, "querying the job ID") + } + return jobID, nil + } + } + + r.Add(testSpec{ + Name: "restore/nodeShutdown/worker", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 3 + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, makeRestoreStarter(ctx, t, c, gatewayNode)) + }, + }) + + r.Add(testSpec{ + Name: "restore/nodeShutdown/coordinator", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 2 + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, makeRestoreStarter(ctx, t, c, gatewayNode)) + }, + }) +} func registerRestore(r *testRegistry) { for _, item := range []struct { diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 8f89950641ed..b0a3d874023f 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -282,7 +282,7 @@ func (tc *Collection) getLeasedDescriptorByName( // Read the descriptor from the store in the face of some specific errors // because of a known limitation of AcquireByName. See the known // limitations of AcquireByName for details. - if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) || + if catalog.HasInactiveDescriptorError(err) || errors.Is(err, catalog.ErrDescriptorNotFound) { return nil, true, nil } diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index b76963ccff67..15fe9bc50ae1 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -54,7 +54,6 @@ go_test( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", @@ -68,9 +67,7 @@ go_test( "//pkg/sql/catalog/tabledesc", "//pkg/sql/pgwire/pgcode", "//pkg/sql/sem/tree", - "//pkg/sql/sessiondata", "//pkg/sql/sqltestutils", - "//pkg/sql/sqlutil", "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", @@ -87,7 +84,6 @@ go_test( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", - "//pkg/util/uuid", "@com_github_cockroachdb_cockroach_go//crdb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 572af1db8e38..b121babb6112 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -216,7 +216,7 @@ func (s storage) acquire( return err } if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only + desc, tree.CommonLookupFlags{}, // filter all non-public state ); err != nil { return err } @@ -981,7 +981,7 @@ func purgeOldVersions( ctx context.Context, db *kv.DB, id descpb.ID, - dropped bool, + takenOffline bool, minVersion descpb.DescriptorVersion, m *Manager, ) error { @@ -995,15 +995,15 @@ func purgeOldVersions( } empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0 t.mu.Unlock() - if empty && !dropped { + if empty && !takenOffline { // We don't currently have a version on this descriptor, so no need to refresh // anything. return nil } - removeInactives := func(dropped bool) { + removeInactives := func(takenOffline bool) { t.mu.Lock() - t.mu.takenOffline = dropped + t.mu.takenOffline = takenOffline leases := t.removeInactiveVersions() t.mu.Unlock() for _, l := range leases { @@ -1011,8 +1011,8 @@ func purgeOldVersions( } } - if dropped { - removeInactives(true /* dropped */) + if takenOffline { + removeInactives(true /* takenOffline */) return nil } @@ -1028,7 +1028,7 @@ func purgeOldVersions( return errRenewLease } newest.incRefcount() - removeInactives(false /* dropped */) + removeInactives(false /* takenOffline */) s, err := t.release(newest.Descriptor, m.removeOnceDereferenced()) if err != nil { return err @@ -1398,28 +1398,6 @@ func (m *Manager) AcquireByName( parentSchemaID descpb.ID, name string, ) (catalog.Descriptor, hlc.Timestamp, error) { - // When offline descriptor leases were not allowed to be cached, - // attempt to acquire a lease on them would generate a descriptor - // offline error. Recent changes allow offline descriptor leases - // to be cached, but callers still need the offline error generated. - // This logic will release the lease (the lease manager will still - // cache it), and generate the offline descriptor error. - validateDescriptorForReturn := func(desc catalog.Descriptor, - expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) { - if desc.Offline() { - if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{}, - ); err != nil { - err2 := m.Release(desc) - if err2 != nil { - log.Warningf(ctx, "error releasing lease: %s", err2) - } - return nil, hlc.Timestamp{}, err - } - } - return desc, expiration, nil - } - // Check if we have cached an ID for this name. descVersion := m.names.get(parentID, parentSchemaID, name, timestamp) if descVersion != nil { @@ -1434,7 +1412,7 @@ func (m *Manager) AcquireByName( } } } - return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration) + return descVersion.Descriptor, descVersion.expiration, nil } if err := m.Release(descVersion); err != nil { return nil, hlc.Timestamp{}, err @@ -1444,7 +1422,7 @@ func (m *Manager) AcquireByName( if err != nil { return nil, hlc.Timestamp{}, err } - return validateDescriptorForReturn(desc, expiration) + return desc, expiration, nil } // We failed to find something in the cache, or what we found is not @@ -1513,7 +1491,7 @@ func (m *Manager) AcquireByName( return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound } } - return validateDescriptorForReturn(desc, expiration) + return desc, expiration, nil } // resolveName resolves a descriptor name to a descriptor ID at a particular @@ -1716,11 +1694,11 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) } id, version, name, state := descpb.GetDescriptorMetadata(desc) - dropped := state == descpb.DescriptorState_DROP + goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE // Try to refresh the lease to one >= this version. - log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)", - id, version, dropped) - if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil { + log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)", + id, version, goingOffline) + if err := purgeOldVersions(ctx, db, id, goingOffline, version, m); err != nil { log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s", id, name, err) } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index aa5a5c14fbeb..742e60503cfb 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -27,9 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -41,9 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -54,12 +50,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/lib/pq" @@ -2361,7 +2355,6 @@ func TestLeaseWithOfflineTables(t *testing.T) { func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc) flags.CommonLookupFlags.IncludeOffline = true - flags.CommonLookupFlags.IncludeDropped = true desc, err := descsCol.GetMutableTableByID(ctx, txn, testTableID(), flags) require.NoError(t, err) require.Equal(t, desc.State, expected) @@ -2405,16 +2398,9 @@ func TestLeaseWithOfflineTables(t *testing.T) { checkLeaseState(true /* shouldBePresent */) // Take the table offline and back online again. - // This should not relinquish the lease anymore - // and offline ones will now be held. + // This should relinquish the lease. setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE) setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC) - checkLeaseState(true /* shouldBePresent */) - - // Take the table dropped and back online again. - // This should relinquish the lease. - setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_DROP) - setTableState(descpb.DescriptorState_DROP, descpb.DescriptorState_PUBLIC) checkLeaseState(false /* shouldBePresent */) // Query the table, thereby acquiring a lease once again. @@ -2717,135 +2703,3 @@ func TestDropDescriptorRacesWithAcquisition(t *testing.T) { return true }) } - -// TestOfflineLeaseRefresh validates that no live lock can occur, -// after a table is brought offline. Specifically a table a will be -// brought offline, and then one transaction will attempt to bring it -// online while another transaction will attempt to do a read. The read -// transaction could previously push back the lease of transaction -// trying to online the table perpetually (as seen in issue #61798). -func TestOfflineLeaseRefresh(t *testing.T) { - defer leaktest.AfterTest(t)() - ctx := context.Background() - waitForTxn := make(chan chan struct{}) - waitForRqstFilter := make(chan chan struct{}) - errorChan := make(chan error) - var txnID uuid.UUID - var mu syncutil.RWMutex - - knobs := &kvserver.StoreTestingKnobs{ - TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { - mu.RLock() - checkRequest := req.Txn != nil && req.Txn.ID.Equal(txnID) - mu.RUnlock() - if _, ok := req.GetArg(roachpb.EndTxn); checkRequest && ok { - notify := make(chan struct{}) - waitForRqstFilter <- notify - <-notify - } - return nil - }, - } - params := base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}} - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) - s := tc.Server(0) - defer tc.Stopper().Stop(ctx) - conn := tc.ServerConn(0) - - // Create t1 that will be offline, and t2, - // that will serve inserts. - _, err := conn.Exec(` -CREATE DATABASE d1; -CREATE TABLE d1.t1 (name int); -INSERT INTO d1.t1 values(5); -INSERT INTO d1.t1 values(5); -INSERT INTO d1.t1 values(5); -CREATE TABLE d1.t2 (name int); -`) - require.NoError(t, err) - - tableID := descpb.InvalidID - - // Force the table descriptor into a offline state - err = descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), - func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, tree.NewTableNameWithSchema("d1", "public", "t1"), tree.ObjectLookupFlagsWithRequired()) - if err != nil { - return err - } - tableDesc.SetOffline("For unit test") - err = descriptors.WriteDesc(ctx, false, tableDesc, txn) - if err != nil { - return err - } - tableID = tableDesc.ID - return nil - }) - require.NoError(t, err) - - _, err = s.LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID, retry.Options{}) - require.NoError(t, err) - - go func() { - err := descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), - s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), - func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - close(waitForRqstFilter) - mu.Lock() - waitForRqstFilter = make(chan chan struct{}) - txnID = txn.ID() - mu.Unlock() - - // Online the descriptor by making it public - _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, - tree.NewTableNameWithSchema("d1", "public", "t1"), - tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{ - Required: true, - RequireMutable: true, - IncludeOffline: true, - AvoidCached: true, - }}) - if err != nil { - return err - } - tableDesc.SetPublic() - err = descriptors.WriteDesc(ctx, false, tableDesc, txn) - if err != nil { - return err - } - // Allow the select on the table to proceed, - // so that it waits on the channel at the appropriate - // moment. - notify := make(chan struct{}) - waitForTxn <- notify - <-notify - - // Select from an unrelated table - _, err = s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", txn, - sessiondata.InternalExecutorOverride{User: security.RootUserName()}, - "insert into d1.t2 values (10);") - return err - - }) - close(waitForTxn) - close(waitForRqstFilter) - errorChan <- err - }() - - for notify := range waitForTxn { - close(notify) - mu.RLock() - rqstFilterChannel := waitForRqstFilter - mu.RUnlock() - for notify2 := range rqstFilterChannel { - // Push the query trying to online the table out by - // leasing out the table again - _, err = conn.Query("select * from d1.t1") - require.EqualError(t, err, "pq: relation \"t1\" is offline: For unit test", - "Table offline error was not generated as expected") - close(notify2) - } - } - require.NoError(t, <-errorChan) - close(errorChan) -} diff --git a/pkg/ui/src/views/login/loginPage.tsx b/pkg/ui/src/views/login/loginPage.tsx index a4b14f380049..38003154e0fa 100644 --- a/pkg/ui/src/views/login/loginPage.tsx +++ b/pkg/ui/src/views/login/loginPage.tsx @@ -162,7 +162,7 @@ export class LoginPage extends React.Component {
- Log in to the Admin UI + Log in to the DB Console {this.renderError()} @@ -171,8 +171,8 @@ export class LoginPage extends React.Component {
- A user with a password is required to log in to the Admin UI on - secure clusters. + A user with a password is required to log in to the DB Console + on secure clusters. Create a user with this SQL command: