Skip to content

Commit

Permalink
roachtest: during c2c/shutdown, shutdown main driver if shutdown exec…
Browse files Browse the repository at this point in the history
…utor fails

During cockroachdb#110166, the c2c/shutdown test fataled while the job shutdown executor
was running, yet the test kept running for quite a while because the goroutine
that manages the c2c job had not realized the test failed. This patch refactors
the c2c/shutdown tests such that when the job shutdown executor detects a
failure, it cancels the context used by the goroutine managing the c2c job.

Informs cockroachdb#110166

Release note: none
  • Loading branch information
msbutler committed Sep 8, 2023
1 parent 0f34ace commit 01db2d8
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 29 deletions.
8 changes: 4 additions & 4 deletions pkg/cmd/roachtest/tests/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ func registerBackupNodeShutdown(r registry.Registry) {
nodeToShutdown := 3
dest := loadBackupData(ctx, t, c)
backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED`
startBackup := func(c cluster.Cluster, t test.Test) (jobID jobspb.JobID, err error) {
gatewayDB := c.Conn(ctx, t.L(), gatewayNode)
startBackup := func(c cluster.Cluster, l *logger.Logger) (jobID jobspb.JobID, err error) {
gatewayDB := c.Conn(ctx, l, gatewayNode)
defer gatewayDB.Close()

err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID)
Expand All @@ -246,8 +246,8 @@ func registerBackupNodeShutdown(r registry.Registry) {
nodeToShutdown := 2
dest := loadBackupData(ctx, t, c)
backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED`
startBackup := func(c cluster.Cluster, t test.Test) (jobID jobspb.JobID, err error) {
gatewayDB := c.Conn(ctx, t.L(), gatewayNode)
startBackup := func(c cluster.Cluster, l *logger.Logger) (jobID jobspb.JobID, err error) {
gatewayDB := c.Conn(ctx, l, gatewayNode)
defer gatewayDB.Close()

err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID)
Expand Down
20 changes: 14 additions & 6 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,12 +1322,14 @@ func registerClusterReplicationResilience(r registry.Registry) {
t.L().Printf(`%s configured: Shutdown Node %d; Watcher node %d; Gateway nodes %s`,
rrd.rsp.name(), rrd.shutdownNode, rrd.watcherNode, rrd.setup.gatewayNodes)
}
m := rrd.newMonitor(ctx)
m.Go(func(ctx context.Context) error {
mainDriverCtx, cancelMain := context.WithCancel(ctx)
mainMonitor := rrd.newMonitor(mainDriverCtx)
mainMonitor.Go(func(ctx context.Context) error {
rrd.main(ctx)
return nil
})
defer m.Wait()
defer cancelMain()
defer mainMonitor.Wait()

// Don't begin shutdown process until c2c job is set up.
<-shutdownSetupDone
Expand All @@ -1341,8 +1343,10 @@ func registerClusterReplicationResilience(r registry.Registry) {
// DR scenario the src cluster may have gone belly up during a
// successful c2c replication execution.
shutdownStarter := func() jobStarter {
return func(c cluster.Cluster, t test.Test) (jobspb.JobID, error) {
require.NoError(t, waitForTargetPhase(ctx, rrd.replicationDriver, rrd.dstJobID, rrd.phase))
return func(c cluster.Cluster, l *logger.Logger) (jobspb.JobID, error) {
if err := waitForTargetPhase(ctx, rrd.replicationDriver, rrd.dstJobID, rrd.phase); err != nil {
return jobspb.JobID(0), err
}
sleepBeforeResiliencyEvent(rrd.replicationDriver, rrd.phase)
return rrd.dstJobID, nil
}
Expand All @@ -1356,8 +1360,12 @@ func registerClusterReplicationResilience(r registry.Registry) {
watcherNode: destinationWatcherNode,
crdbNodes: rrd.crdbNodes(),
restartSettings: []install.ClusterSettingOption{install.SecureOption(true)},
rng: rrd.rng,
}
if err := executeNodeShutdown(ctx, t, c, shutdownCfg, shutdownStarter()); err != nil {
cancelMain()
t.Fatalf("shutdown execution failed: %s", err)
}
executeNodeShutdown(ctx, t, c, shutdownCfg, shutdownStarter())
},
)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/tests/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils/release"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand All @@ -44,7 +45,7 @@ func readCreateTableFromFixture(fixtureURI string, gatewayDB *gosql.DB) (string,

func registerImportNodeShutdown(r registry.Registry) {
getImportRunner := func(ctx context.Context, t test.Test, gatewayNode int) jobStarter {
startImport := func(c cluster.Cluster, t test.Test) (jobspb.JobID, error) {
startImport := func(c cluster.Cluster, l *logger.Logger) (jobspb.JobID, error) {
var jobID jobspb.JobID
// partsupp is 11.2 GiB.
tableName := "partsupp"
Expand Down
61 changes: 44 additions & 17 deletions pkg/cmd/roachtest/tests/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand All @@ -30,7 +32,7 @@ import (
"github.com/stretchr/testify/require"
)

type jobStarter func(c cluster.Cluster, t test.Test) (jobspb.JobID, error)
type jobStarter func(c cluster.Cluster, l *logger.Logger) (jobspb.JobID, error)

// jobSurvivesNodeShutdown is a helper that tests that a given job,
// running on the specified gatewayNode will still complete successfully
Expand All @@ -55,7 +57,7 @@ func jobSurvivesNodeShutdown(
waitFor3XReplication: true,
sleepBeforeShutdown: 30 * time.Second,
}
executeNodeShutdown(ctx, t, c, cfg, startJob)
require.NoError(t, executeNodeShutdown(ctx, t, c, cfg, startJob))
}

type nodeShutdownConfig struct {
Expand All @@ -65,11 +67,17 @@ type nodeShutdownConfig struct {
restartSettings []install.ClusterSettingOption
waitFor3XReplication bool
sleepBeforeShutdown time.Duration
rng *rand.Rand
}

// executeNodeShutdown executes a node shutdown and returns all errors back to the caller.
//
// TODO(msbutler): ideally, t.L() is only passed to this function instead of t,
// but WaitFor3xReplication requires t. Once this function only has a logger, we
// can guarantee that all errors return to the caller.
func executeNodeShutdown(
ctx context.Context, t test.Test, c cluster.Cluster, cfg nodeShutdownConfig, startJob jobStarter,
) {
) error {
target := c.Node(cfg.shutdownNode)
t.L().Printf("test has chosen shutdown target node %d, and watcher node %d",
cfg.shutdownNode, cfg.watcherNode)
Expand All @@ -83,14 +91,20 @@ func executeNodeShutdown(
// nodes down.
t.Status("waiting for cluster to be 3x replicated")
err := WaitFor3XReplication(ctx, t, watcherDB)
require.NoError(t, err)
if err != nil {
return err
}
}

t.Status("running job")
jobID, err := startJob(c, t)
require.NoError(t, err)
jobID, err := startJob(c, t.L())
if err != nil {
return err
}
t.L().Printf("started running job with ID %s", jobID)
WaitForRunning(t, watcherDB, jobID, time.Minute)
if err := WaitForRunning(ctx, watcherDB, jobID, time.Minute); err != nil {
return err
}

m := c.NewMonitor(ctx, cfg.crdbNodes)
m.ExpectDeath()
Expand Down Expand Up @@ -122,34 +136,47 @@ func executeNodeShutdown(
}
})
time.Sleep(cfg.sleepBeforeShutdown)
rng, _ := randutil.NewTestRand()
shouldUseSigKill := rng.Float64() > 0.5
if cfg.rng == nil {
rng, _ := randutil.NewTestRand()
cfg.rng = rng
}
shouldUseSigKill := cfg.rng.Float64() > 0.5
if shouldUseSigKill {
t.L().Printf(`stopping node (using SIGKILL) %s`, target)
require.NoError(t, c.StopE(ctx, t.L(), option.DefaultStopOpts(), target), "could not stop node %s", target)
if err := c.StopE(ctx, t.L(), option.DefaultStopOpts(), target); err != nil {
return errors.Wrapf(err, "could not stop node %s", target)
}
} else {
t.L().Printf(`stopping node gracefully %s`, target)
require.NoError(t, c.StopCockroachGracefullyOnNode(ctx, t.L(), cfg.shutdownNode), "could not stop node %s", target)
if err := c.StopCockroachGracefullyOnNode(ctx, t.L(), cfg.shutdownNode); err != nil {
return errors.Wrapf(err, "could not stop node %s", target)
}
}
t.L().Printf("stopped node %s", target)

m.Wait()
if err := m.WaitE(); err != nil {
return err
}
// NB: the roachtest harness checks that at the end of the test, all nodes
// that have data also have a running process.
t.Status(fmt.Sprintf("restarting %s (node restart test is done)\n", target))
// Don't begin another backup schedule, as the parent test driver has already
// set or disallowed the automatic backup schedule.
if err := c.StartE(ctx, t.L(), option.DefaultStartOptsNoBackups(),
install.MakeClusterSettings(cfg.restartSettings...), target); err != nil {
t.Fatal(errors.Wrapf(err, "could not restart node %s", target))
return errors.Wrapf(err, "could not restart node %s", target)
}
return nil
}

func WaitForRunning(t test.Test, db *gosql.DB, jobID jobspb.JobID, maxWait time.Duration) {
sqlDB := sqlutils.MakeSQLRunner(db)
testutils.SucceedsWithin(t, func() error {
func WaitForRunning(
ctx context.Context, db *gosql.DB, jobID jobspb.JobID, maxWait time.Duration,
) error {
return testutils.SucceedsWithinError(func() error {
var status jobs.Status
sqlDB.QueryRow(t, "SELECT status FROM crdb_internal.system_jobs WHERE id = $1", jobID).Scan(&status)
if err := db.QueryRowContext(ctx, "SELECT status FROM crdb_internal.system_jobs WHERE id = $1", jobID).Scan(&status); err != nil {
return err
}
switch status {
case jobs.StatusPending:
case jobs.StatusRunning:
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/tests/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand All @@ -57,7 +58,7 @@ func registerRestoreNodeShutdown(r registry.Registry) {

makeRestoreStarter := func(ctx context.Context, t test.Test, c cluster.Cluster,
gatewayNode int, rd restoreDriver) jobStarter {
return func(c cluster.Cluster, t test.Test) (jobspb.JobID, error) {
return func(c cluster.Cluster, l *logger.Logger) (jobspb.JobID, error) {
return rd.runDetached(ctx, "DATABASE tpce", gatewayNode)
}
}
Expand Down

0 comments on commit 01db2d8

Please sign in to comment.