diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index 62a25309cf4d..ca0facdc3dc0 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "inverted_index.go", "java_helpers.go", "jepsen.go", + "jobs.go", "kv.go", "kvbench.go", "ledger.go", diff --git a/pkg/cmd/roachtest/backup.go b/pkg/cmd/roachtest/backup.go index e3a073df538a..b7765563ec55 100644 --- a/pkg/cmd/roachtest/backup.go +++ b/pkg/cmd/roachtest/backup.go @@ -32,37 +32,110 @@ const ( KMSRegionBEnvVar = "AWS_KMS_REGION_B" KMSKeyARNAEnvVar = "AWS_KMS_KEY_ARN_A" KMSKeyARNBEnvVar = "AWS_KMS_KEY_ARN_B" + + // rows2TiB is the number of rows to import to load 2TB of data (when + // replicated). + rows2TiB = 65_104_166 + rows100GiB = rows2TiB / 20 + rows30GiB = rows2TiB / 66 + rows15GiB = rows30GiB / 2 + rows5GiB = rows100GiB / 20 + rows3GiB = rows30GiB / 10 ) -func registerBackup(r *testRegistry) { - importBankData := func(ctx context.Context, rows int, t *test, c *cluster) string { - dest := c.name - // Randomize starting with encryption-at-rest enabled. - c.encryptAtRandom = true +func importBankDataSplit(ctx context.Context, rows, ranges int, t *test, c *cluster) string { + dest := c.name + // Randomize starting with encryption-at-rest enabled. + c.encryptAtRandom = true + + if local { + dest += fmt.Sprintf("%d", timeutil.Now().UnixNano()) + } + + c.Put(ctx, workload, "./workload") + c.Put(ctx, cockroach, "./cockroach") + + // NB: starting the cluster creates the logs dir as a side effect, + // needed below. + c.Start(ctx, t) + c.Run(ctx, c.All(), `./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`) + time.Sleep(time.Second) // wait for csv server to open listener + + importArgs := []string{ + "./workload", "fixtures", "import", "bank", + "--db=bank", "--payload-bytes=10240", fmt.Sprintf("--ranges=%d", ranges), "--csv-server", "http://localhost:8081", + fmt.Sprintf("--rows=%d", rows), "--seed=1", "{pgurl:1}", + } + c.Run(ctx, c.Node(1), importArgs...) + + return dest +} + +func importBankData(ctx context.Context, rows int, t *test, c *cluster) string { + return importBankDataSplit(ctx, rows, 0 /* ranges */, t, c) +} +func registerBackupNodeShutdown(r *testRegistry) { + // backupNodeRestartSpec runs a backup and randomly shuts down a node during + // the backup. + backupNodeRestartSpec := makeClusterSpec(4) + loadBackupData := func(ctx context.Context, t *test, c *cluster) string { + // This aught to be enough since this isn't a performance test. + rows := rows15GiB if local { - rows = 100 - dest += fmt.Sprintf("%d", timeutil.Now().UnixNano()) + // Needs to be sufficiently large to give each processor a good chunk of + // works so the job doesn't complete immediately. + rows = rows5GiB } + return importBankData(ctx, rows, t, c) + } + + r.Add(testSpec{ + Name: fmt.Sprintf("backup/nodeShutdown/worker/%s", backupNodeRestartSpec), + Owner: OwnerBulkIO, + Cluster: backupNodeRestartSpec, + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 3 + dest := loadBackupData(ctx, t, c) + backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED` + startBackup := func(c *cluster) (jobID string, err error) { + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID) + return + } - c.Put(ctx, workload, "./workload") - c.Put(ctx, cockroach, "./cockroach") + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startBackup) + }, + }) + r.Add(testSpec{ + Name: fmt.Sprintf("backup/nodeShutdown/coordinator/%s", backupNodeRestartSpec), + Owner: OwnerBulkIO, + Cluster: backupNodeRestartSpec, + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 2 + dest := loadBackupData(ctx, t, c) + backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED` + startBackup := func(c *cluster) (jobID string, err error) { + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID) + return + } - // NB: starting the cluster creates the logs dir as a side effect, - // needed below. - c.Start(ctx, t) - c.Run(ctx, c.All(), `./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`) - time.Sleep(time.Second) // wait for csv server to open listener + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startBackup) + }, + }) - importArgs := []string{ - "./workload", "fixtures", "import", "bank", - "--db=bank", "--payload-bytes=10240", "--ranges=0", "--csv-server", "http://localhost:8081", - fmt.Sprintf("--rows=%d", rows), "--seed=1", "{pgurl:1}", - } - c.Run(ctx, c.Node(1), importArgs...) +} - return dest - } +func registerBackup(r *testRegistry) { backup2TBSpec := makeClusterSpec(10) r.Add(testSpec{ @@ -71,7 +144,10 @@ func registerBackup(r *testRegistry) { Cluster: backup2TBSpec, MinVersion: "v2.1.0", Run: func(ctx context.Context, t *test, c *cluster) { - rows := 65104166 + rows := rows2TiB + if local { + rows = 100 + } dest := importBankData(ctx, rows, t, c) m := newMonitor(ctx, c) m.Go(func(ctx context.Context) error { @@ -96,7 +172,10 @@ func registerBackup(r *testRegistry) { } // ~10GiB - which is 30Gib replicated. - rows := 976562 + rows := rows30GiB + if local { + rows = 100 + } dest := importBankData(ctx, rows, t, c) conn := c.Conn(ctx, 1) diff --git a/pkg/cmd/roachtest/import.go b/pkg/cmd/roachtest/import.go index a06ab34b4d68..9c673109051d 100644 --- a/pkg/cmd/roachtest/import.go +++ b/pkg/cmd/roachtest/import.go @@ -20,6 +20,65 @@ import ( "github.com/cockroachdb/errors" ) +func registerImportNodeShutdown(r *testRegistry) { + getImportRunner := func(ctx context.Context, gatewayNode int) jobStarter { + startImport := func(c *cluster) (jobID string, err error) { + importStmt := ` + IMPORT TABLE partsupp + CREATE USING 'gs://cockroach-fixtures/tpch-csv/schema/partcupp.sql' + CSV DATA ( + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.1', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.2', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.3', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.4', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.5', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.6', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.7', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.8' + ) WITH delimiter='|', detached + ` + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + err = gatewayDB.QueryRowContext(ctx, importStmt).Scan(&jobID) + return + } + + return startImport + } + + r.Add(testSpec{ + Name: "import/nodeShutdown/worker", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + gatewayNode := 2 + nodeToShutdown := 3 + startImport := getImportRunner(ctx, gatewayNode) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startImport) + }, + }) + r.Add(testSpec{ + Name: "import/nodeShutdown/coordinator", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + gatewayNode := 2 + nodeToShutdown := 2 + startImport := getImportRunner(ctx, gatewayNode) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startImport) + }, + }) +} + func registerImportTPCC(r *testRegistry) { runImportTPCC := func(ctx context.Context, t *test, c *cluster, warehouses int) { // Randomize starting with encryption-at-rest enabled. diff --git a/pkg/cmd/roachtest/jobs.go b/pkg/cmd/roachtest/jobs.go new file mode 100644 index 000000000000..623a09b6f1b6 --- /dev/null +++ b/pkg/cmd/roachtest/jobs.go @@ -0,0 +1,142 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package main + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +type jobStarter func(c *cluster) (string, error) + +// jobSurvivesNodeShutdown is a helper that tests that a given job, +// running on the specified gatewayNode will still complete successfully +// if nodeToShutdown is shutdown partway through execution. +// This helper assumes: +// - That the job is long running and will take a least a minute to complete. +// - That the necessary setup is done (e.g. any data that the job relies on is +// already loaded) so that `query` can be run on its own to kick off the job. +// - That the statement running the job is a detached statement, and does not +// block until the job completes. +func jobSurvivesNodeShutdown( + ctx context.Context, t *test, c *cluster, nodeToShutdown int, startJob jobStarter, +) { + watcherNode := 1 + (nodeToShutdown)%c.spec.NodeCount + target := c.Node(nodeToShutdown) + t.l.Printf("test has chosen shutdown target node %d, and watcher node %d", + nodeToShutdown, watcherNode) + + jobIDCh := make(chan string, 1) + + m := newMonitor(ctx, c) + m.Go(func(ctx context.Context) error { + defer close(jobIDCh) + t.Status(`running job`) + var jobID string + jobID, err := startJob(c) + if err != nil { + return errors.Wrap(err, "starting the job") + } + t.l.Printf("started running job with ID %s", jobID) + jobIDCh <- jobID + + pollInterval := 5 * time.Second + ticker := time.NewTicker(pollInterval) + + watcherDB := c.Conn(ctx, watcherNode) + defer watcherDB.Close() + + var status string + for { + select { + case <-ticker.C: + err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status) + if err != nil { + return errors.Wrap(err, "getting the job status") + } + jobStatus := jobs.Status(status) + switch jobStatus { + case jobs.StatusSucceeded: + t.Status("job completed") + return nil + case jobs.StatusRunning: + t.l.Printf("job %s still running, waiting to succeed", jobID) + default: + // Waiting for job to complete. + return errors.Newf("unexpectedly found job %s in state %s", jobID, status) + } + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context canceled while waiting for job to finish") + } + } + }) + + m.Go(func(ctx context.Context) error { + jobID, ok := <-jobIDCh + if !ok { + return errors.New("job never created") + } + + // Shutdown a node after a bit, and keep it shutdown for the remainder + // of the job. + timeToWait := 10 * time.Second + timer := timeutil.Timer{} + timer.Reset(timeToWait) + select { + case <-ctx.Done(): + return errors.Wrapf(ctx.Err(), "stopping test, did not shutdown node") + case <-timer.C: + timer.Read = true + } + + // Sanity check that the job is still running. + watcherDB := c.Conn(ctx, watcherNode) + defer watcherDB.Close() + + var status string + err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status) + if err != nil { + return errors.Wrap(err, "getting the job status") + } + jobStatus := jobs.Status(status) + if jobStatus != jobs.StatusRunning { + return errors.Newf("job too fast! job got to state %s before the target node could be shutdown", + status) + } + + t.l.Printf(`stopping node %s`, target) + if err := c.StopE(ctx, target); err != nil { + return errors.Wrapf(err, "could not stop node %s", target) + } + t.l.Printf("stopped node %s", target) + + return nil + }) + + // Before calling `m.Wait()`, do some cleanup. + if err := m.g.Wait(); err != nil { + t.Fatal(err) + } + + // NB: the roachtest harness checks that at the end of the test, all nodes + // that have data also have a running process. + t.Status(fmt.Sprintf("restarting %s (node restart test is done)\n", target)) + if err := c.StartE(ctx, target); err != nil { + t.Fatal(errors.Wrapf(err, "could not restart node %s", target)) + } + + m.Wait() +} diff --git a/pkg/cmd/roachtest/registry.go b/pkg/cmd/roachtest/registry.go index 8207da716f3f..ba1968a5eccc 100644 --- a/pkg/cmd/roachtest/registry.go +++ b/pkg/cmd/roachtest/registry.go @@ -21,6 +21,7 @@ func registerTests(r *testRegistry) { registerAlterPK(r) registerAutoUpgrade(r) registerBackup(r) + registerBackupNodeShutdown(r) registerCancel(r) registerCDC(r) registerClearRange(r) @@ -45,6 +46,7 @@ func registerTests(r *testRegistry) { registerImportMixedVersion(r) registerImportTPCC(r) registerImportTPCH(r) + registerImportNodeShutdown(r) registerInconsistency(r) registerIndexes(r) registerInterleaved(r) @@ -72,6 +74,7 @@ func registerTests(r *testRegistry) { registerRebalanceLoad(r) registerReplicaGC(r) registerRestart(r) + registerRestoreNodeShutdown(r) registerRestore(r) registerRoachmart(r) registerScaleData(r) diff --git a/pkg/cmd/roachtest/restore.go b/pkg/cmd/roachtest/restore.go index d3957509e7ff..32dbb4786c07 100644 --- a/pkg/cmd/roachtest/restore.go +++ b/pkg/cmd/roachtest/restore.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -208,6 +209,101 @@ func (dul *DiskUsageLogger) Runner(ctx context.Context) error { logger.Printf("%s\n", strings.Join(s, ", ")) } } +func registerRestoreNodeShutdown(r *testRegistry) { + makeRestoreStarter := func(ctx context.Context, t *test, c *cluster, gatewayNode int) jobStarter { + return func(c *cluster) (string, error) { + t.l.Printf("connecting to gateway") + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + t.l.Printf("creating bank database") + if _, err := gatewayDB.Exec("CREATE DATABASE bank"); err != nil { + return "", err + } + + errCh := make(chan error, 1) + go func() { + defer close(errCh) + + // 10 GiB restore. + restoreQuery := `RESTORE bank.bank FROM + 'gs://cockroach-fixtures/workload/bank/version=1.0.0,payload-bytes=100,ranges=10,rows=10000000,seed=1/bank'` + + t.l.Printf("starting to run the restore job") + if _, err := gatewayDB.Exec(restoreQuery); err != nil { + errCh <- err + } + t.l.Printf("done running restore job") + }() + + // Wait for the job. + retryOpts := retry.Options{ + MaxRetries: 50, + InitialBackoff: 1 * time.Second, + MaxBackoff: 5 * time.Second, + } + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + var jobCount int + if err := gatewayDB.QueryRowContext(ctx, "SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE'").Scan(&jobCount); err != nil { + return "", err + } + + select { + case err := <-errCh: + // We got an error when starting the job. + return "", err + default: + } + + if jobCount == 0 { + t.l.Printf("waiting for restore job") + } else if jobCount == 1 { + t.l.Printf("found restore job") + break + } else { + t.l.Printf("found multiple restore jobs -- erroring") + return "", errors.New("unexpectedly found multiple restore jobs") + } + } + + var jobID string + if err := gatewayDB.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'RESTORE'").Scan(&jobID); err != nil { + return "", errors.Wrap(err, "querying the job ID") + } + return jobID, nil + } + } + + r.Add(testSpec{ + Name: "restore/nodeShutdown/worker", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 3 + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, makeRestoreStarter(ctx, t, c, gatewayNode)) + }, + }) + + r.Add(testSpec{ + Name: "restore/nodeShutdown/coordinator", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 2 + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, makeRestoreStarter(ctx, t, c, gatewayNode)) + }, + }) +} func registerRestore(r *testRegistry) { for _, item := range []struct {