Skip to content

Commit

Permalink
roachtest: add nodeShutdown roachtests
Browse files Browse the repository at this point in the history
This commit adds a roachtest that runs a backup and randomly shutsdown a
node during the backup/restore/import and ensure that the jobs still
complete.

Release justification: test only change
Release note: None
  • Loading branch information
pbardea committed Mar 22, 2021
1 parent d704baa commit 7fd01c5
Show file tree
Hide file tree
Showing 6 changed files with 404 additions and 24 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_library(
"inverted_index.go",
"java_helpers.go",
"jepsen.go",
"jobs.go",
"kv.go",
"kvbench.go",
"ledger.go",
Expand Down
127 changes: 103 additions & 24 deletions pkg/cmd/roachtest/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,110 @@ const (
KMSRegionBEnvVar = "AWS_KMS_REGION_B"
KMSKeyARNAEnvVar = "AWS_KMS_KEY_ARN_A"
KMSKeyARNBEnvVar = "AWS_KMS_KEY_ARN_B"

// rows2TiB is the number of rows to import to load 2TB of data (when
// replicated).
rows2TiB = 65_104_166
rows100GiB = rows2TiB / 20
rows30GiB = rows2TiB / 66
rows15GiB = rows30GiB / 2
rows5GiB = rows100GiB / 20
rows3GiB = rows30GiB / 10
)

func registerBackup(r *testRegistry) {
importBankData := func(ctx context.Context, rows int, t *test, c *cluster) string {
dest := c.name
// Randomize starting with encryption-at-rest enabled.
c.encryptAtRandom = true
func importBankDataSplit(ctx context.Context, rows, ranges int, t *test, c *cluster) string {
dest := c.name
// Randomize starting with encryption-at-rest enabled.
c.encryptAtRandom = true

if local {
dest += fmt.Sprintf("%d", timeutil.Now().UnixNano())
}

c.Put(ctx, workload, "./workload")
c.Put(ctx, cockroach, "./cockroach")

// NB: starting the cluster creates the logs dir as a side effect,
// needed below.
c.Start(ctx, t)
c.Run(ctx, c.All(), `./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`)
time.Sleep(time.Second) // wait for csv server to open listener

importArgs := []string{
"./workload", "fixtures", "import", "bank",
"--db=bank", "--payload-bytes=10240", fmt.Sprintf("--ranges=%d", ranges), "--csv-server", "http://localhost:8081",
fmt.Sprintf("--rows=%d", rows), "--seed=1", "{pgurl:1}",
}
c.Run(ctx, c.Node(1), importArgs...)

return dest
}

func importBankData(ctx context.Context, rows int, t *test, c *cluster) string {
return importBankDataSplit(ctx, rows, 0 /* ranges */, t, c)
}

func registerBackupNodeShutdown(r *testRegistry) {
// backupNodeRestartSpec runs a backup and randomly shuts down a node during
// the backup.
backupNodeRestartSpec := makeClusterSpec(4)
loadBackupData := func(ctx context.Context, t *test, c *cluster) string {
// This aught to be enough since this isn't a performance test.
rows := rows15GiB
if local {
rows = 100
dest += fmt.Sprintf("%d", timeutil.Now().UnixNano())
// Needs to be sufficiently large to give each processor a good chunk of
// works so the job doesn't complete immediately.
rows = rows5GiB
}
return importBankData(ctx, rows, t, c)
}

r.Add(testSpec{
Name: fmt.Sprintf("backup/nodeShutdown/worker/%s", backupNodeRestartSpec),
Owner: OwnerBulkIO,
Cluster: backupNodeRestartSpec,
MinVersion: "v21.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
gatewayNode := 2
nodeToShutdown := 3
dest := loadBackupData(ctx, t, c)
backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED`
startBackup := func(c *cluster) (jobID string, err error) {
gatewayDB := c.Conn(ctx, gatewayNode)
defer gatewayDB.Close()

err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID)
return
}

c.Put(ctx, workload, "./workload")
c.Put(ctx, cockroach, "./cockroach")
jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startBackup)
},
})
r.Add(testSpec{
Name: fmt.Sprintf("backup/nodeShutdown/coordinator/%s", backupNodeRestartSpec),
Owner: OwnerBulkIO,
Cluster: backupNodeRestartSpec,
MinVersion: "v21.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
gatewayNode := 2
nodeToShutdown := 2
dest := loadBackupData(ctx, t, c)
backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED`
startBackup := func(c *cluster) (jobID string, err error) {
gatewayDB := c.Conn(ctx, gatewayNode)
defer gatewayDB.Close()

err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID)
return
}

// NB: starting the cluster creates the logs dir as a side effect,
// needed below.
c.Start(ctx, t)
c.Run(ctx, c.All(), `./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`)
time.Sleep(time.Second) // wait for csv server to open listener
jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startBackup)
},
})

importArgs := []string{
"./workload", "fixtures", "import", "bank",
"--db=bank", "--payload-bytes=10240", "--ranges=0", "--csv-server", "http://localhost:8081",
fmt.Sprintf("--rows=%d", rows), "--seed=1", "{pgurl:1}",
}
c.Run(ctx, c.Node(1), importArgs...)
}

return dest
}
func registerBackup(r *testRegistry) {

backup2TBSpec := makeClusterSpec(10)
r.Add(testSpec{
Expand All @@ -71,7 +144,10 @@ func registerBackup(r *testRegistry) {
Cluster: backup2TBSpec,
MinVersion: "v2.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
rows := 65104166
rows := rows2TiB
if local {
rows = 100
}
dest := importBankData(ctx, rows, t, c)
m := newMonitor(ctx, c)
m.Go(func(ctx context.Context) error {
Expand All @@ -96,7 +172,10 @@ func registerBackup(r *testRegistry) {
}

// ~10GiB - which is 30Gib replicated.
rows := 976562
rows := rows30GiB
if local {
rows = 100
}
dest := importBankData(ctx, rows, t, c)

conn := c.Conn(ctx, 1)
Expand Down
59 changes: 59 additions & 0 deletions pkg/cmd/roachtest/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,65 @@ import (
"github.com/cockroachdb/errors"
)

func registerImportNodeShutdown(r *testRegistry) {
getImportRunner := func(ctx context.Context, gatewayNode int) jobStarter {
startImport := func(c *cluster) (jobID string, err error) {
importStmt := `
IMPORT TABLE partsupp
CREATE USING 'gs://cockroach-fixtures/tpch-csv/schema/partcupp.sql'
CSV DATA (
'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.1',
'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.2',
'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.3',
'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.4',
'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.5',
'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.6',
'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.7',
'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.8'
) WITH delimiter='|', detached
`
gatewayDB := c.Conn(ctx, gatewayNode)
defer gatewayDB.Close()

err = gatewayDB.QueryRowContext(ctx, importStmt).Scan(&jobID)
return
}

return startImport
}

r.Add(testSpec{
Name: "import/nodeShutdown/worker",
Owner: OwnerBulkIO,
Cluster: makeClusterSpec(4),
MinVersion: "v21.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
c.Put(ctx, cockroach, "./cockroach")
c.Start(ctx, t)
gatewayNode := 2
nodeToShutdown := 3
startImport := getImportRunner(ctx, gatewayNode)

jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startImport)
},
})
r.Add(testSpec{
Name: "import/nodeShutdown/coordinator",
Owner: OwnerBulkIO,
Cluster: makeClusterSpec(4),
MinVersion: "v21.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
c.Put(ctx, cockroach, "./cockroach")
c.Start(ctx, t)
gatewayNode := 2
nodeToShutdown := 2
startImport := getImportRunner(ctx, gatewayNode)

jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startImport)
},
})
}

func registerImportTPCC(r *testRegistry) {
runImportTPCC := func(ctx context.Context, t *test, c *cluster, warehouses int) {
// Randomize starting with encryption-at-rest enabled.
Expand Down
142 changes: 142 additions & 0 deletions pkg/cmd/roachtest/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

type jobStarter func(c *cluster) (string, error)

// jobSurvivesNodeShutdown is a helper that tests that a given job,
// running on the specified gatewayNode will still complete successfully
// if nodeToShutdown is shutdown partway through execution.
// This helper assumes:
// - That the job is long running and will take a least a minute to complete.
// - That the necessary setup is done (e.g. any data that the job relies on is
// already loaded) so that `query` can be run on its own to kick off the job.
// - That the statement running the job is a detached statement, and does not
// block until the job completes.
func jobSurvivesNodeShutdown(
ctx context.Context, t *test, c *cluster, nodeToShutdown int, startJob jobStarter,
) {
watcherNode := 1 + (nodeToShutdown)%c.spec.NodeCount
target := c.Node(nodeToShutdown)
t.l.Printf("test has chosen shutdown target node %d, and watcher node %d",
nodeToShutdown, watcherNode)

jobIDCh := make(chan string, 1)

m := newMonitor(ctx, c)
m.Go(func(ctx context.Context) error {
defer close(jobIDCh)
t.Status(`running job`)
var jobID string
jobID, err := startJob(c)
if err != nil {
return errors.Wrap(err, "starting the job")
}
t.l.Printf("started running job with ID %s", jobID)
jobIDCh <- jobID

pollInterval := 5 * time.Second
ticker := time.NewTicker(pollInterval)

watcherDB := c.Conn(ctx, watcherNode)
defer watcherDB.Close()

var status string
for {
select {
case <-ticker.C:
err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status)
if err != nil {
return errors.Wrap(err, "getting the job status")
}
jobStatus := jobs.Status(status)
switch jobStatus {
case jobs.StatusSucceeded:
t.Status("job completed")
return nil
case jobs.StatusRunning:
t.l.Printf("job %s still running, waiting to succeed", jobID)
default:
// Waiting for job to complete.
return errors.Newf("unexpectedly found job %s in state %s", jobID, status)
}
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context canceled while waiting for job to finish")
}
}
})

m.Go(func(ctx context.Context) error {
jobID, ok := <-jobIDCh
if !ok {
return errors.New("job never created")
}

// Shutdown a node after a bit, and keep it shutdown for the remainder
// of the job.
timeToWait := 10 * time.Second
timer := timeutil.Timer{}
timer.Reset(timeToWait)
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "stopping test, did not shutdown node")
case <-timer.C:
timer.Read = true
}

// Sanity check that the job is still running.
watcherDB := c.Conn(ctx, watcherNode)
defer watcherDB.Close()

var status string
err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status)
if err != nil {
return errors.Wrap(err, "getting the job status")
}
jobStatus := jobs.Status(status)
if jobStatus != jobs.StatusRunning {
return errors.Newf("job too fast! job got to state %s before the target node could be shutdown",
status)
}

t.l.Printf(`stopping node %s`, target)
if err := c.StopE(ctx, target); err != nil {
return errors.Wrapf(err, "could not stop node %s", target)
}
t.l.Printf("stopped node %s", target)

return nil
})

// Before calling `m.Wait()`, do some cleanup.
if err := m.g.Wait(); err != nil {
t.Fatal(err)
}

// NB: the roachtest harness checks that at the end of the test, all nodes
// that have data also have a running process.
t.Status(fmt.Sprintf("restarting %s (node restart test is done)\n", target))
if err := c.StartE(ctx, target); err != nil {
t.Fatal(errors.Wrapf(err, "could not restart node %s", target))
}

m.Wait()
}
3 changes: 3 additions & 0 deletions pkg/cmd/roachtest/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func registerTests(r *testRegistry) {
registerAlterPK(r)
registerAutoUpgrade(r)
registerBackup(r)
registerBackupNodeShutdown(r)
registerCancel(r)
registerCDC(r)
registerClearRange(r)
Expand All @@ -45,6 +46,7 @@ func registerTests(r *testRegistry) {
registerImportMixedVersion(r)
registerImportTPCC(r)
registerImportTPCH(r)
registerImportNodeShutdown(r)
registerInconsistency(r)
registerIndexes(r)
registerInterleaved(r)
Expand Down Expand Up @@ -72,6 +74,7 @@ func registerTests(r *testRegistry) {
registerRebalanceLoad(r)
registerReplicaGC(r)
registerRestart(r)
registerRestoreNodeShutdown(r)
registerRestore(r)
registerRoachmart(r)
registerScaleData(r)
Expand Down
Loading

0 comments on commit 7fd01c5

Please sign in to comment.