From b6cdc9f4b0da8328b35527b99722d26a05fe58e7 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Wed, 1 Feb 2023 16:50:26 -0500 Subject: [PATCH] roachtest: update mixed-version backup to use new framework This updates the `backup/mixed-version` roachtest to use the recently introduced mixed-version roachtest framework (`mixedversion` package). The main behavior exercised remains the same: backups are taken in mixed-binary state, and those backups are restored and verified at the end of the test. However, this commit also improves the coverage of mixed-version backup testing in a few ways: * **Randomization**. By virtue of using the new framework, most runs will be different from one another since the order of actions taken by the test will be different. Previously, backups would always be taken with 2 nodes in the old version and 2 nodes in the new version. Now, backups can be taken when an arbitrary number of nodes is running the new version. As a consequence, it's also possible that some executions will attempt backups when all nodes are running a new binary version, but the cluster version itself has not been updated. Other points of new randomization include the choice of the node's external dir where backups are stored, which node to connect to when running certain statements, and how much to wait between backups. * **Backup Options**. Backups will randomly be created with `revision_history` enabled, or with an `encryption_passphrase`. * **Downgrades**. The cluster is also downgraded in mixed-version tests. No downgrades happened in that test before this commit. * **Workload**. Instead of using fixed call to `generate_series` to generate data between backups, the test now runs the `bank` workload continuously during the test. A random wait between backups allows the workload to make changes to the underlying table during the test and for the backups to be taken while writes are taking place. * **Finalization**: the test _may_ attempt to create a backup as the upgrade is finalizing (i.e., migrations are running and cluster version is advancing). In addition, this test will also see improved coverage as we make more improvements to test plans generated by the `mixedversion` package. These changes will create more backup scenarios in the future without requiring any code changes to this test. Epic: CRDB-19321 Release note: None --- pkg/cmd/roachtest/tests/BUILD.bazel | 1 + pkg/cmd/roachtest/tests/backup.go | 390 ++------- .../roachtest/tests/mixed_version_backup.go | 737 ++++++++++++++++++ ...atibility_in_declarative_schema_changer.go | 18 +- 4 files changed, 813 insertions(+), 333 deletions(-) create mode 100644 pkg/cmd/roachtest/tests/mixed_version_backup.go diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 5af63743a155..86575cec06bc 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -88,6 +88,7 @@ go_library( "liquibase_blocklist.go", "loss_of_quorum_recovery.go", "many_splits.go", + "mixed_version_backup.go", "mixed_version_cdc.go", "mixed_version_change_replicas.go", "mixed_version_decl_schemachange_compat.go", diff --git a/pkg/cmd/roachtest/tests/backup.go b/pkg/cmd/roachtest/tests/backup.go index a106ddab50bc..5683a73b69fe 100644 --- a/pkg/cmd/roachtest/tests/backup.go +++ b/pkg/cmd/roachtest/tests/backup.go @@ -20,8 +20,6 @@ import ( "net/url" "os" "path/filepath" - "runtime" - "strconv" "strings" "time" @@ -31,7 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -104,20 +102,58 @@ func importBankDataSplit( return dest } +// setShortJobIntervalsCommon exists because this functionality is +// shared across the `backup/mixed-version` and +// `declarative_schema_changer/job-compatibility-mixed-version`. Delete +// this duplicated code once both tests have been migrated to the new +// mixed-version testing framework. +func setShortJobIntervalsCommon(runQuery func(query string, args ...interface{}) error) error { + settings := map[string]string{ + "jobs.registry.interval.cancel": "1s", + "jobs.registry.interval.adopt": "1s", + } + + for name, val := range settings { + query := fmt.Sprintf("SET CLUSTER SETTING %s = $1", name) + if err := runQuery(query, val); err != nil { + return err + } + } + + return nil +} + +// importBankCSVServerCommand returns the command to start a csv +// server on the specified port, using the given cockroach binary. +func importBankCSVServerCommand(cockroach string, port int) string { + return roachtestutil. + NewCommand("%s workload csv-server", cockroach). + Flag("port", port). + String() +} + +// importBankCommand returns the command to import `bank` fixtures +// according to the parameters passed. +func importBankCommand(cockroach string, rows, ranges, csvPort, node int) string { + return roachtestutil. + NewCommand("%s workload fixtures import bank", cockroach). + Arg("{pgurl:%d}", node). + Flag("db", "bank"). + Flag("payload-bytes", 10240). + Flag("csv-server", fmt.Sprintf("http://localhost:%d", csvPort)). + Flag("seed", 1). + Flag("ranges", ranges). + Flag("rows", rows). + String() +} + func runImportBankDataSplit(ctx context.Context, rows, ranges int, t test.Test, c cluster.Cluster) { - c.Run(ctx, c.All(), `./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`) + csvPort := 8081 + csvCmd := importBankCSVServerCommand("./cockroach", csvPort) + c.Run(ctx, c.All(), csvCmd+` &> logs/workload-csv-server.log < /dev/null &`) time.Sleep(time.Second) // wait for csv server to open listener - importArgs := []string{ - "./workload", "fixtures", "import", "bank", - "--db=bank", - "--payload-bytes=10240", - "--csv-server", "http://localhost:8081", - "--seed=1", - fmt.Sprintf("--ranges=%d", ranges), - fmt.Sprintf("--rows=%d", rows), - "{pgurl:1}", - } - c.Run(ctx, c.Node(1), importArgs...) + importNode := 1 + c.Run(ctx, c.Node(importNode), importBankCommand("./cockroach", rows, ranges, csvPort, importNode)) } func importBankData(ctx context.Context, rows int, t test.Test, c cluster.Cluster) string { @@ -184,76 +220,6 @@ func registerBackupNodeShutdown(r registry.Registry) { } -// removeJobClaimsForNodes nullifies the `claim_session_id` for the job -// corresponding to jobID, if it has been incorrectly claimed by one of the -// `nodes`. -func removeJobClaimsForNodes( - ctx context.Context, t test.Test, db *gosql.DB, nodes option.NodeListOption, jobID jobspb.JobID, -) { - if len(nodes) == 0 { - return - } - - n := make([]string, 0) - for _, node := range nodes { - n = append(n, strconv.Itoa(node)) - } - nodesStr := strings.Join(n, ",") - - removeClaimQuery := ` -UPDATE system.jobs - SET claim_session_id = NULL -WHERE claim_instance_id IN (%s) -AND id = $1 -` - _, err := db.ExecContext(ctx, fmt.Sprintf(removeClaimQuery, nodesStr), jobID) - require.NoError(t, err) -} - -// waitForJobToHaveStatus waits for the job with jobID to reach the -// expectedStatus. -func waitForJobToHaveStatus( - ctx context.Context, - t test.Test, - db *gosql.DB, - jobID jobspb.JobID, - expectedStatus jobs.Status, - nodesWithAdoptionDisabled option.NodeListOption, -) { - if err := retry.ForDuration(time.Minute*2, func() error { - // TODO(adityamaru): This is unfortunate and can be deleted once - // https://github.com/cockroachdb/cockroach/pull/79666 is backported to - // 21.2 and the mixed version map for roachtests is bumped to the 21.2 - // patch release with the backport. - // - // The bug above means that nodes for which we have disabled adoption may - // still lay claim on the job, and then not clear their claim on realizing - // that adoption is disabled. To prevent the job from getting wedged, we - // manually clear the claim session on the job for instances where job - // adoption is disabled. This will allow other node's in the cluster to - // adopt the job and run it. - removeJobClaimsForNodes(ctx, t, db, nodesWithAdoptionDisabled, jobID) - - var status string - var payloadBytes []byte - err := db.QueryRow(`SELECT status, payload FROM system.jobs WHERE id = $1`, jobID).Scan(&status, &payloadBytes) - require.NoError(t, err) - if jobs.Status(status) == jobs.StatusFailed { - payload := &jobspb.Payload{} - if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { - t.Fatalf("job failed: %s", payload.Error) - } - t.Fatalf("job failed") - } - if e, a := expectedStatus, jobs.Status(status); e != a { - return errors.Errorf("expected job status %s, but got %s", e, a) - } - return nil - }); err != nil { - t.Fatal(err) - } -} - // fingerprint returns a fingerprint of `db.table`. func fingerprint(ctx context.Context, conn *gosql.DB, db, table string) (string, error) { var b strings.Builder @@ -275,26 +241,12 @@ func fingerprint(ctx context.Context, conn *gosql.DB, db, table string) (string, return b.String(), rows.Err() } -// setShortJobIntervalsStep increases the frequency of the adopt and cancel -// loops in the job registry. This enables changes to job state to be observed -// faster, and the test to run quicker. -func setShortJobIntervalsStep(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - db := u.conn(ctx, t, node) - _, err := db.ExecContext(ctx, `SET CLUSTER SETTING jobs.registry.interval.cancel = '1s'`) - if err != nil { - t.Fatal(err) - } - - _, err = db.ExecContext(ctx, `SET CLUSTER SETTING jobs.registry.interval.adopt = '1s'`) - if err != nil { - t.Fatal(err) - } - } -} - // disableJobAdoptionStep writes the sentinel file to prevent a node's // registry from adopting a job. +// +// TODO(renato): remove this duplicated function once +// `declarative_schema_changer/job-compatibility-mixed-version` is +// migrated to the new mixed-version testing framework. func disableJobAdoptionStep(c cluster.Cluster, nodeIDs option.NodeListOption) versionStep { return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { for _, nodeID := range nodeIDs { @@ -338,6 +290,10 @@ func disableJobAdoptionStep(c cluster.Cluster, nodeIDs option.NodeListOption) ve // enableJobAdoptionStep clears the sentinel file that prevents a node's // registry from adopting a job. +// +// TODO(renato): remove this duplicated function once +// `declarative_schema_changer/job-compatibility-mixed-version` is +// migrated to the new mixed-version testing framework. func enableJobAdoptionStep(c cluster.Cluster, nodeIDs option.NodeListOption) versionStep { return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { for _, nodeID := range nodeIDs { @@ -358,234 +314,6 @@ func enableJobAdoptionStep(c cluster.Cluster, nodeIDs option.NodeListOption) ver } } -func registerBackupMixedVersion(r registry.Registry) { - loadBackupDataStep := func(c cluster.Cluster) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - rows := rows3GiB - if c.IsLocal() { - rows = 100 - } - runImportBankDataSplit(ctx, rows, 0 /* ranges */, t, u.c) - } - } - - planAndRunBackup := func(t test.Test, c cluster.Cluster, nodeToPlanBackup option.NodeListOption, - nodesWithAdoptionDisabled option.NodeListOption, backupStmt string) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - gatewayDB := c.Conn(ctx, t.L(), nodeToPlanBackup[0]) - defer gatewayDB.Close() - t.Status("Running: ", backupStmt) - var jobID jobspb.JobID - err := gatewayDB.QueryRow(backupStmt).Scan(&jobID) - require.NoError(t, err) - waitForJobToHaveStatus(ctx, t, gatewayDB, jobID, jobs.StatusSucceeded, nodesWithAdoptionDisabled) - } - } - - writeToBankStep := func(node int) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - db := u.conn(ctx, t, node) - _, err := db.ExecContext(ctx, `UPSERT INTO bank.bank (id,balance) SELECT generate_series(1,100), random()*100;`) - if err != nil { - t.Fatal(err) - } - } - } - - // saveFingerprintStep computes the fingerprint of the `bank.bank` and adds it - // to `fingerprints` slice that is passed in. The fingerprints slice should be - // pre-allocated because of the structure of the versionUpgradeTest. - saveFingerprintStep := func(node int, fingerprints map[string]string, key string) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - db := u.conn(ctx, t, node) - f, err := fingerprint(ctx, db, "bank", "bank") - if err != nil { - t.Fatal(err) - } - fingerprints[key] = f - } - } - - // verifyBackupStep compares the backed up and restored table fingerprints and - // ensures they're the same. - verifyBackupStep := func( - node option.NodeListOption, - backupLoc string, - dbName, tableName, intoDB string, - fingerprints map[string]string, - ) versionStep { - return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { - db := u.conn(ctx, t, node[0]) - - // Restore the backup. - _, err := db.ExecContext(ctx, fmt.Sprintf(`CREATE DATABASE %s`, intoDB)) - require.NoError(t, err) - _, err = db.ExecContext(ctx, fmt.Sprintf(`RESTORE TABLE %s.%s FROM LATEST IN '%s' WITH into_db = '%s'`, - dbName, tableName, backupLoc, intoDB)) - require.NoError(t, err) - - restoredFingerPrint, err := fingerprint(ctx, db, intoDB, tableName) - require.NoError(t, err) - if fingerprints[backupLoc] != restoredFingerPrint { - log.Infof(ctx, "original %s \n\n restored %s", fingerprints[backupLoc], - restoredFingerPrint) - t.Fatal("expected backup and restore fingerprints to match") - } - } - } - - // backup/mixed-version-basic tests different states of backup in a mixed - // version cluster. - // - // This test can serve as a template for more targeted testing of features - // that require careful consideration of mixed version states. - r.Add(registry.TestSpec{ - Name: "backup/mixed-version-basic", - Owner: registry.OwnerDisasterRecovery, - Cluster: r.MakeClusterSpec(4), - EncryptionSupport: registry.EncryptionMetamorphic, - RequiresLicense: true, - Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - if c.IsLocal() && runtime.GOARCH == "arm64" { - t.Skip("Skip under ARM64. See https://github.com/cockroachdb/cockroach/issues/89268") - } - roachNodes := c.All() - upgradedNodes := c.Nodes(1, 2) - oldNodes := c.Nodes(3, 4) - predV, err := clusterupgrade.PredecessorVersion(*t.BuildVersion()) - require.NoError(t, err) - c.Put(ctx, t.DeprecatedWorkload(), "./workload") - - // fingerprints stores the fingerprint of the `bank.bank` table at - // different points of this test to compare against the restored table at - // the end of the test. - fingerprints := make(map[string]string) - u := newVersionUpgradeTest(c, - uploadAndStartFromCheckpointFixture(roachNodes, predV), - waitForUpgradeStep(roachNodes), - preventAutoUpgradeStep(1), - setShortJobIntervalsStep(1), - loadBackupDataStep(c), - // Upgrade some nodes. - binaryUpgradeStep(upgradedNodes, clusterupgrade.MainVersion), - - // Let us first test planning and executing a backup on different node - // versions. - // - // NB: All backups in this test are writing to node 1's ExternalIODir - // for simplicity. - - // Case 1: plan backup -> old node - // execute backup -> upgraded node - // - // Halt job execution on older nodes. - disableJobAdoptionStep(c, oldNodes), - - // Run a backup from an old node so that it is planned on the old node - // but the job is adopted on a new node. - planAndRunBackup(t, c, oldNodes.RandNode(), oldNodes, - `BACKUP TABLE bank.bank INTO 'nodelocal://1/plan-old-resume-new' WITH detached`), - - // Write some data between backups. - writeToBankStep(1), - - // Run an incremental backup from an old node so that it is planned on - // the old node but the job is adopted on a new node. - planAndRunBackup(t, c, oldNodes.RandNode(), oldNodes, - `BACKUP TABLE bank.bank INTO LATEST IN 'nodelocal://1/plan-old-resume-new' WITH detached`), - - // Save fingerprint to compare against restore below. - saveFingerprintStep(1, fingerprints, "nodelocal://1/plan-old-resume-new"), - - enableJobAdoptionStep(c, oldNodes), - - // Case 2: plan backup -> upgraded node - // execute backup -> old node - // - // Halt job execution on upgraded nodes. - disableJobAdoptionStep(c, upgradedNodes), - - // Run a backup from a new node so that it is planned on the new node - // but the job is adopted on an old node. - planAndRunBackup(t, c, upgradedNodes.RandNode(), upgradedNodes, - `BACKUP TABLE bank.bank INTO 'nodelocal://1/plan-new-resume-old' WITH detached`), - - writeToBankStep(1), - - // Run an incremental backup from a new node so that it is planned on - // the new node but the job is adopted on an old node. - planAndRunBackup(t, c, upgradedNodes.RandNode(), upgradedNodes, - `BACKUP TABLE bank.bank INTO LATEST IN 'nodelocal://1/plan-new-resume-old' WITH detached`), - - // Save fingerprint to compare against restore below. - saveFingerprintStep(1, fingerprints, "nodelocal://1/plan-new-resume-old"), - - enableJobAdoptionStep(c, upgradedNodes), - - // Now let us test building an incremental chain on top of a full backup - // created by a node of a different version. - // - // Case 1: full backup -> new nodes - // inc backup -> old nodes - disableJobAdoptionStep(c, oldNodes), - // Plan and run a full backup on the new nodes. - planAndRunBackup(t, c, upgradedNodes.RandNode(), oldNodes, - `BACKUP TABLE bank.bank INTO 'nodelocal://1/new-node-full-backup' WITH detached`), - - writeToBankStep(1), - - // Set up the cluster so that only the old nodes plan and run the - // incremental backup. - enableJobAdoptionStep(c, oldNodes), - disableJobAdoptionStep(c, upgradedNodes), - - // Run an incremental (on old nodes) on top of a full backup taken by - // nodes on the upgraded version. - planAndRunBackup(t, c, oldNodes.RandNode(), upgradedNodes, - `BACKUP TABLE bank.bank INTO LATEST IN 'nodelocal://1/new-node-full-backup' WITH detached`), - - // Save fingerprint to compare against restore below. - saveFingerprintStep(1, fingerprints, "nodelocal://1/new-node-full-backup"), - - // Case 2: full backup -> old nodes - // inc backup -> new nodes - - // Plan and run a full backup on the old nodes. - planAndRunBackup(t, c, oldNodes.RandNode(), upgradedNodes, - `BACKUP TABLE bank.bank INTO 'nodelocal://1/old-node-full-backup' WITH detached`), - - writeToBankStep(1), - - enableJobAdoptionStep(c, upgradedNodes), - - // Allow all the nodes to now finalize their cluster version. - binaryUpgradeStep(oldNodes, clusterupgrade.MainVersion), - allowAutoUpgradeStep(1), - waitForUpgradeStep(roachNodes), - - // Run an incremental on top of a full backup taken by nodes on the - // old version. - planAndRunBackup(t, c, roachNodes.RandNode(), nil, - `BACKUP TABLE bank.bank INTO LATEST IN 'nodelocal://1/old-node-full-backup' WITH detached`), - - // Save fingerprint to compare against restore below. - saveFingerprintStep(1, fingerprints, "nodelocal://1/old-node-full-backup"), - - // Verify all the backups are actually restoreable. - verifyBackupStep(roachNodes.RandNode(), "nodelocal://1/plan-old-resume-new", - "bank", "bank", "bank1", fingerprints), - verifyBackupStep(roachNodes.RandNode(), "nodelocal://1/plan-new-resume-old", - "bank", "bank", "bank2", fingerprints), - verifyBackupStep(roachNodes.RandNode(), "nodelocal://1/new-node-full-backup", - "bank", "bank", "bank3", fingerprints), - verifyBackupStep(roachNodes.RandNode(), "nodelocal://1/old-node-full-backup", - "bank", "bank", "bank4", fingerprints), - ) - u.run(ctx, t) - }, - }) -} - // initBulkJobPerfArtifacts registers a histogram, creates a performance // artifact directory and returns a method that when invoked records a tick. func initBulkJobPerfArtifacts(testName string, timeout time.Duration) (func(), *bytes.Buffer) { diff --git a/pkg/cmd/roachtest/tests/mixed_version_backup.go b/pkg/cmd/roachtest/tests/mixed_version_backup.go new file mode 100644 index 000000000000..eda75ef9393a --- /dev/null +++ b/pkg/cmd/roachtest/tests/mixed_version_backup.go @@ -0,0 +1,737 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "reflect" + "regexp" + "runtime" + "sort" + "strings" + "sync/atomic" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "golang.org/x/sync/errgroup" +) + +var ( + invalidVersionRE = regexp.MustCompile(`[^a-zA-Z0-9.]`) + invalidDBNameRE = regexp.MustCompile(`[\-\.:/]`) + + // retry options while waiting for a backup to complete + backupCompletionRetryOptions = retry.Options{ + InitialBackoff: 10 * time.Millisecond, + MaxBackoff: 5 * time.Second, + Multiplier: 2, + MaxRetries: 100, + } +) + +func sanitizeVersion(v string) string { + return invalidVersionRE.ReplaceAllString(clusterupgrade.VersionMsg(v), "") +} + +type ( + // backupOption is an option passed to the `BACKUP` command (i.e., + // `WITH ...` portion). + backupOption interface { + fmt.Stringer + } + + // revisionHistory wraps the `revision_history` backup option. + revisionHistory struct{} + + // encryptionPassphrase is the `encryption_passphrase` backup + // option. If passed when a backup is created, the same passphrase + // needs to be passed for incremental backups and for restoring the + // backup as well. + encryptionPassphrase struct { + passphrase string + } + + // backupCollection wraps a backup collection (which may or may not + // contain incremental backups). The associated fingerprint is the + // expected fingerprint when the corresponding table is restored. + backupCollection struct { + uri string + dataFingerprint string + options []backupOption + } + + fullBackup struct { + label string + } + incrementalBackup struct { + collection backupCollection + } + + // labeledNodes allows us to label a set of nodes with the version + // they are running, to allow for human-readable backup names + labeledNodes struct { + Nodes option.NodeListOption + Version string + } + + // backupSpec indicates where backups are supposed to be planned + // (`BACKUP` statement sent to); and where they are supposed to be + // executed (where the backup job will be picked up). + backupSpec struct { + Plan labeledNodes + Execute labeledNodes + } +) + +func (fb fullBackup) String() string { return "full" } +func (ib incrementalBackup) String() string { return "incremental" } + +func (rh revisionHistory) String() string { + return "revision_history" +} + +func newEncryptionPassphrase(rng *rand.Rand) encryptionPassphrase { + const alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" + const passLen = 32 + + pass := make([]byte, passLen) + for i := range pass { + pass[i] = alphabet[rand.Intn(len(alphabet))] + } + + return encryptionPassphrase{string(pass)} +} + +func (ep encryptionPassphrase) String() string { + return fmt.Sprintf("encryption_passphrase = '%s'", ep.passphrase) +} + +// newBackupOptions returns a list of backup options to be used when +// creating a new backup. Each backup option has a 50% chance of being +// included. +func newBackupOptions(rng *rand.Rand) []backupOption { + possibleOpts := []backupOption{ + revisionHistory{}, + newEncryptionPassphrase(rng), + } + + var options []backupOption + for _, opt := range possibleOpts { + if rng.Float64() < 0.5 { + options = append(options, opt) + } + } + + return options +} + +// backupCollectionDesc builds a string that describes how a backup +// collection comprised of a full backup and a follow-up incremental +// backup was generated (in terms of which versions planned vs +// executed the backup). Used to generate descriptive backup names. +func backupCollectionDesc(fullSpec, incSpec backupSpec) string { + specMsg := func(label string, s backupSpec) string { + if s.Plan.Version == s.Execute.Version { + return fmt.Sprintf("%s planned and executed on %s", label, s.Plan.Version) + } + + return fmt.Sprintf("%s planned on %s executed on %s", label, s.Plan.Version, s.Execute.Version) + } + + if reflect.DeepEqual(fullSpec, incSpec) { + return specMsg("all", fullSpec) + } + + return fmt.Sprintf("%s %s", specMsg("full", fullSpec), specMsg("incremental", incSpec)) +} + +// mixedVersionBackup is the struct that contains all the necessary +// state involved in the mixed-version backup test. +type mixedVersionBackup struct { + ctx context.Context + cluster cluster.Cluster + roachNodes option.NodeListOption + // backup collections that are created along the test + collections []*backupCollection + // the table being backed up/restored + table string + // counter that is incremented atomically to provide unique + // identifiers to backups created during the test + currentBackupID int64 +} + +func newMixedVersionBackup( + ctx context.Context, c cluster.Cluster, roachNodes option.NodeListOption, table string, +) *mixedVersionBackup { + mvb := &mixedVersionBackup{ctx: ctx, cluster: c, table: table, roachNodes: roachNodes} + return mvb +} + +// setShortJobIntervals increases the frequency of the adopt and +// cancel loops in the job registry. This enables changes to job state +// to be observed faster, and the test to run quicker. +func (*mixedVersionBackup) setShortJobIntervals( + l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, +) error { + return setShortJobIntervalsCommon(func(query string, args ...interface{}) error { + return h.Exec(rng, query, args...) + }) +} + +// loadBackupData starts a CSV server in the background and runs +// imports a bank fixture. Blocks until the importing is finished, at +// which point the CSV server is terminated. +func (mvb *mixedVersionBackup) loadBackupData( + l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, +) error { + var rows int + if mvb.cluster.IsLocal() { + rows = 100 + l.Printf("importing only %d rows (as this is a local run)", rows) + } else { + rows = rows3GiB + l.Printf("importing %d rows", rows) + } + + csvPort := 8081 + importNode := h.RandomNode(rng, mvb.roachNodes) + l.Printf("decided to run import on node %d", importNode) + + currentRoach := mixedversion.CurrentCockroachPath + var stopCSVServer context.CancelFunc + h.Background("csv server", func(bgCtx context.Context, bgLogger *logger.Logger) error { + cmd := importBankCSVServerCommand(currentRoach, csvPort) + bgLogger.Printf("running CSV server in the background: %q", cmd) + var csvServerCtx context.Context + csvServerCtx, stopCSVServer = context.WithCancel(bgCtx) + err := mvb.cluster.RunE(csvServerCtx, mvb.roachNodes, cmd) + + if err == nil || errors.Is(err, context.Canceled) { + bgLogger.Printf("CSV server terminated") + return nil + } + + return fmt.Errorf("error while running csv server: %w", err) + }) + time.Sleep(time.Second) // wait for csv server to open listener + + err := mvb.cluster.RunE( + mvb.ctx, + mvb.cluster.Node(importNode), + importBankCommand(currentRoach, rows, 0 /* ranges */, csvPort, importNode), + ) + stopCSVServer() + return err +} + +// randomWait waits from 1s to 5m, to allow for the background +// workload to update the underlying table we are backing up. +func (mvb *mixedVersionBackup) randomWait(l *logger.Logger, rng *rand.Rand) { + durations := []int{1, 10, 60, 5 * 60} + dur := time.Duration(durations[rng.Intn(len(durations))]) * time.Second + l.Printf("waiting for %s", dur) + time.Sleep(dur) +} + +func (mvb *mixedVersionBackup) now() string { + return hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}.AsOfSystemTime() +} + +func (mvb *mixedVersionBackup) nextID() int64 { + return atomic.AddInt64(&mvb.currentBackupID, 1) +} + +// backupName returns a descriptive name for a backup depending on the +// state of the test we are in. The given label is also used to +// provide more context. Example: '3_22.2.4-to-current_final' +func (mvb *mixedVersionBackup) backupName(id int64, h *mixedversion.Helper, label string) string { + testContext := h.Context() + var finalizing string + if testContext.Finalizing { + finalizing = "_finalizing" + } + + fromVersion := sanitizeVersion(testContext.FromVersion) + toVersion := sanitizeVersion(testContext.ToVersion) + sanitizedLabel := strings.ReplaceAll(label, " ", "-") + + return fmt.Sprintf("%d_%s-to-%s_%s%s", id, fromVersion, toVersion, sanitizedLabel, finalizing) +} + +// waitForJobSuccess waits for the given job with the given ID to +// succeed (according to `backupCompletionRetryOptions`). Returns an +// error if the job doesn't succeed within the attempted retries. +func (mvb *mixedVersionBackup) waitForJobSuccess( + l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, jobID int, +) error { + var lastErr error + node, db := h.RandomDB(rng, mvb.roachNodes) + l.Printf("querying job status through node %d", node) + for r := retry.StartWithCtx(mvb.ctx, backupCompletionRetryOptions); r.Next(); { + var status string + var payloadBytes []byte + err := db.QueryRow(`SELECT status, payload FROM system.jobs WHERE id = $1`, jobID).Scan(&status, &payloadBytes) + if err != nil { + lastErr = fmt.Errorf("error reading (status, payload) for job %d: %w", jobID, err) + l.Printf("%v", lastErr) + continue + } + + if jobs.Status(status) == jobs.StatusFailed { + payload := &jobspb.Payload{} + if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { + lastErr = fmt.Errorf("job %d failed with error: %s", jobID, payload.Error) + } else { + lastErr = fmt.Errorf("job %d failed, and could not unmarshal payload: %w", jobID, err) + } + + l.Printf("%v", lastErr) + break + } + + if expected, actual := jobs.StatusSucceeded, jobs.Status(status); expected != actual { + lastErr = fmt.Errorf("job %d: current status %q, waiting for %q", jobID, actual, expected) + l.Printf("%v", lastErr) + continue + } + + l.Printf("job %d: success", jobID) + return nil + } + + return fmt.Errorf("waiting for job to finish: %w", lastErr) +} + +// computeFingerprint returns the fingerprint for a given table; if a +// non-empty `timestamp` is passed, the fingerprints is calculated as +// of that timestamp. +func (mvb *mixedVersionBackup) computeFingerprint( + table, timestamp string, rng *rand.Rand, h *mixedversion.Helper, +) (string, error) { + var aost, fprint string + if timestamp != "" { + aost = fmt.Sprintf(" AS OF SYSTEM TIME '%s'", timestamp) + } + + query := fmt.Sprintf("SELECT fingerprint FROM [SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE %s]%s", table, aost) + if err := h.QueryRow(rng, query).Scan(&fprint); err != nil { + return "", err + } + + return fprint, nil +} + +// saveFingerprint stores the backup collection in the +// `mixedVersionBackup` struct, along with the associated fingerprint. +func (mvb *mixedVersionBackup) saveFingerprint( + l *logger.Logger, + rng *rand.Rand, + collection *backupCollection, + timestamp string, + h *mixedversion.Helper, +) error { + fprint, err := mvb.computeFingerprint(mvb.table, timestamp, rng, h) + if err != nil { + return fmt.Errorf("error computing fingerprint for backup %s: %w", collection.uri, err) + } + + collection.dataFingerprint = fprint + l.Printf("fingerprint(%s) = %s", collection.uri, collection.dataFingerprint) + mvb.collections = append(mvb.collections, collection) + return nil +} + +// runBackup runs a `BACKUP` statement; the backup type `bType` needs +// to be an instance of either `fullBackup` or +// `incrementalBackup`. Returns when the backup job has completed. +func (mvb *mixedVersionBackup) runBackup( + l *logger.Logger, + bType fmt.Stringer, + rng *rand.Rand, + nodes option.NodeListOption, + h *mixedversion.Helper, +) (backupCollection, string, error) { + tc := h.Context() + if !tc.Finalizing { + // don't wait if upgrade is finalizing to increase the chances of + // creating a backup while upgrade migrations are being run. + mvb.randomWait(l, rng) + } + + // NB: we need to run with the `detached` option + poll the + // `system.jobs` table because we are intentionally disabling job + // adoption in some nodes in the mixed-version test. Running the job + // without the `detached` option will cause a `node liveness` error + // to be returned when running the `BACKUP` statement. + options := []string{"detached"} + + var latest string + var collection backupCollection + switch b := bType.(type) { + case fullBackup: + name := mvb.backupName(mvb.nextID(), h, b.label) + destNode := h.RandomNode(rng, mvb.roachNodes) + createOptions := newBackupOptions(rng) + collection = backupCollection{ + uri: fmt.Sprintf("nodelocal://%d/%s", destNode, name), + options: createOptions, + } + case incrementalBackup: + collection = b.collection + latest = " LATEST IN" + } + + for _, opt := range collection.options { + options = append(options, opt.String()) + } + + backupTime := mvb.now() + node, db := h.RandomDB(rng, nodes) + + stmt := fmt.Sprintf( + "BACKUP TABLE %s INTO%s '%s' AS OF SYSTEM TIME '%s' WITH %s", + mvb.table, latest, collection.uri, backupTime, strings.Join(options, ", "), + ) + l.Printf("creating %s backup via node %d: %s", bType, node, stmt) + var jobID int + if err := db.QueryRowContext(mvb.ctx, stmt).Scan(&jobID); err != nil { + return backupCollection{}, "", fmt.Errorf("error while creating %s backup %s: %w", bType, collection.uri, err) + } + + l.Printf("waiting for job %d (%s)", jobID, collection.uri) + if err := mvb.waitForJobSuccess(l, rng, h, jobID); err != nil { + return backupCollection{}, "", err + } + + return collection, backupTime, nil +} + +// runJobOnOneOf disables job adoption on cockroach nodes that are not +// in the `nodes` list. The function passed is then executed and job +// adoption is re-enabled at the end of the function. The function +// passed is expected to run statements that trigger job creation. +func (mvb *mixedVersionBackup) runJobOnOneOf( + l *logger.Logger, nodes option.NodeListOption, h *mixedversion.Helper, fn func() error, +) error { + sort.Ints(nodes) + var disabledNodes option.NodeListOption + for _, node := range mvb.roachNodes { + idx := sort.SearchInts(nodes, node) + if idx == len(nodes) || nodes[idx] != node { + disabledNodes = append(disabledNodes, node) + } + } + + if err := mvb.disableJobAdoption(l, disabledNodes, h); err != nil { + return err + } + if err := fn(); err != nil { + return err + } + return mvb.enableJobAdoption(l, disabledNodes, h) +} + +// createBackupCollection creates a new backup collection to be +// restored/verified at the end of the test. A full backup is created, +// and an incremental one is created on top of it. Both backups are +// created according to their respective `backupSpec`, indicating +// where they should be planned and executed. +func (mvb *mixedVersionBackup) createBackupCollection( + l *logger.Logger, + rng *rand.Rand, + fullBackupSpec backupSpec, + incBackupSpec backupSpec, + h *mixedversion.Helper, +) error { + var collection backupCollection + var timestamp string + + if err := mvb.runJobOnOneOf(l, fullBackupSpec.Execute.Nodes, h, func() error { + var err error + label := backupCollectionDesc(fullBackupSpec, incBackupSpec) + collection, timestamp, err = mvb.runBackup(l, fullBackup{label}, rng, fullBackupSpec.Plan.Nodes, h) + return err + }); err != nil { + return err + } + + l.Printf("creating incremental backup for %s", collection.uri) + if err := mvb.runJobOnOneOf(l, incBackupSpec.Execute.Nodes, h, func() error { + var err error + collection, timestamp, err = mvb.runBackup(l, incrementalBackup{collection}, rng, incBackupSpec.Plan.Nodes, h) + return err + }); err != nil { + return err + } + + return mvb.saveFingerprint(l, rng, &collection, timestamp, h) +} + +// sentinelFilePath returns the path to the file that prevents job +// adoption on the given node. +func (mvb *mixedVersionBackup) sentinelFilePath(l *logger.Logger, node int) (string, error) { + result, err := mvb.cluster.RunWithDetailsSingleNode( + mvb.ctx, l, mvb.cluster.Node(node), "echo -n {store-dir}", + ) + if err != nil { + return "", fmt.Errorf("failed to retrieve store directory from node %d: %w", node, err) + } + return filepath.Join(result.Stdout, jobs.PreventAdoptionFile), nil +} + +// disableJobAdoption disables job adoption on the given nodes by +// creating an empty file in `jobs.PreventAdoptionFile`. The function +// returns once any currently running jobs on the nodes terminate. +func (mvb *mixedVersionBackup) disableJobAdoption( + l *logger.Logger, nodes option.NodeListOption, h *mixedversion.Helper, +) error { + l.Printf("disabling job adoption on nodes %v", nodes) + eg, _ := errgroup.WithContext(mvb.ctx) + for _, node := range nodes { + node := node // capture range variable + eg.Go(func() error { + l.Printf("node %d: disabling job adoption", node) + sentinelFilePath, err := mvb.sentinelFilePath(l, node) + if err != nil { + return err + } + if err := mvb.cluster.RunE(mvb.ctx, mvb.cluster.Node(node), "touch", sentinelFilePath); err != nil { + return fmt.Errorf("node %d: failed to touch sentinel file %q: %w", node, sentinelFilePath, err) + } + + // Wait for no jobs to be running on the node that we have halted + // adoption on. + l.Printf("node %d: waiting for all running jobs to terminate", node) + if err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { + db := h.Connect(node) + var count int + err := db.QueryRow(`SELECT count(*) FROM [SHOW JOBS] WHERE status = 'running'`).Scan(&count) + if err != nil { + l.Printf("node %d: error querying running jobs (%s)", node, err) + return err + } + + if count != 0 { + l.Printf("node %d: %d running jobs...", node, count) + return fmt.Errorf("node %d is still running %d jobs", node, count) + } + return nil + }); err != nil { + return err + } + + l.Printf("node %d: job adoption disabled", node) + return nil + }) + } + + return eg.Wait() +} + +// enableJobAdoption (re-)enables job adoption on the given nodes. +func (mvb *mixedVersionBackup) enableJobAdoption( + l *logger.Logger, nodes option.NodeListOption, h *mixedversion.Helper, +) error { + l.Printf("enabling job adoption on nodes %v", nodes) + eg, _ := errgroup.WithContext(mvb.ctx) + for _, node := range nodes { + node := node // capture range variable + eg.Go(func() error { + l.Printf("node %d: enabling job adoption", node) + sentinelFilePath, err := mvb.sentinelFilePath(l, node) + if err != nil { + return err + } + + if err := mvb.cluster.RunE(mvb.ctx, mvb.cluster.Node(node), "rm -f", sentinelFilePath); err != nil { + return fmt.Errorf("node %d: failed to remove sentinel file %q: %w", node, sentinelFilePath, err) + } + + l.Printf("node %d: job adoption enabled", node) + return nil + }) + } + + return eg.Wait() +} + +// planAndRunBackups is the function that can be passed to the +// mixed-version test's `InMixedVersion` function. If the cluster is +// in mixed-binary state, four backup collections are created: (see +// four cases in the function body). If all nodes are running the next +// version (which is different from the cluster version), then a +// backup collection is created in an arbitrary node. +func (mvb *mixedVersionBackup) planAndRunBackups( + l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, +) error { + tc := h.Context() // test context + l.Printf("current context: %#v", tc) + + onPrevious := labeledNodes{Nodes: tc.FromVersionNodes, Version: sanitizeVersion(tc.FromVersion)} + onNext := labeledNodes{Nodes: tc.ToVersionNodes, Version: sanitizeVersion(tc.ToVersion)} + + if len(tc.FromVersionNodes) > 0 { + // Case 1: plan backups -> previous node + // execute backups -> next node + fullSpec := backupSpec{Plan: onPrevious, Execute: onNext} + incSpec := fullSpec + l.Printf("planning backup: %s", backupCollectionDesc(fullSpec, incSpec)) + if err := mvb.createBackupCollection(l, rng, fullSpec, incSpec, h); err != nil { + return err + } + + // Case 2: plan backups -> next node + // execute backups -> previous node + fullSpec = backupSpec{Plan: onNext, Execute: onPrevious} + incSpec = fullSpec + l.Printf("planning backup: %s", backupCollectionDesc(fullSpec, incSpec)) + if err := mvb.createBackupCollection(l, rng, fullSpec, incSpec, h); err != nil { + return err + } + + // Case 3: plan & execute full backup -> previous node + // plan & execute incremental backup -> next node + fullSpec = backupSpec{Plan: onPrevious, Execute: onPrevious} + incSpec = backupSpec{Plan: onNext, Execute: onNext} + l.Printf("planning backup: %s", backupCollectionDesc(fullSpec, incSpec)) + if err := mvb.createBackupCollection(l, rng, fullSpec, incSpec, h); err != nil { + return err + } + + // Case 4: plan & execute full backup -> next node + // plan & execute incremental backup -> previous node + fullSpec = backupSpec{Plan: onNext, Execute: onNext} + incSpec = backupSpec{Plan: onPrevious, Execute: onPrevious} + l.Printf("planning backup: %s", backupCollectionDesc(fullSpec, incSpec)) + if err := mvb.createBackupCollection(l, rng, fullSpec, incSpec, h); err != nil { + return err + } + return nil + } + + l.Printf("all nodes running next version, running backup on arbitrary node") + fullSpec := backupSpec{Plan: onNext, Execute: onNext} + incSpec := fullSpec + return mvb.createBackupCollection(l, rng, fullSpec, incSpec, h) +} + +// verifyBackups cycles through all the backup collections created for +// the duration of the test, and verifies that restoring the backups +// in a new database results in the same data as when the backup was +// taken. This is accomplished by comparing the fingerprints after +// restoring with the expected fingerpring associated with the backup +// collection. +func (mvb *mixedVersionBackup) verifyBackups( + l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, +) error { + l.Printf("verifying %d collections created during this test", len(mvb.collections)) + for _, collection := range mvb.collections { + l.Printf("verifying %s", collection.uri) + intoDB := fmt.Sprintf("restore_%s", invalidDBNameRE.ReplaceAllString(collection.uri, "_")) + + // Restore the backup. + createDB := fmt.Sprintf("CREATE DATABASE %s", intoDB) + if err := h.Exec(rng, createDB); err != nil { + return fmt.Errorf("backup %s: error creating database %s: %w", collection.uri, intoDB, err) + } + // If the backup was created with an encryption passphrase, we + // need to include it when restoring as well. + var passphraseOption string + for _, option := range collection.options { + if ep, ok := option.(encryptionPassphrase); ok { + passphraseOption = fmt.Sprintf(", %s", ep.String()) + } + } + restoreDB := fmt.Sprintf( + "RESTORE TABLE %s FROM LATEST IN '%s' WITH into_db = '%s'%s", + mvb.table, collection.uri, intoDB, passphraseOption, + ) + if err := h.Exec(rng, restoreDB); err != nil { + return fmt.Errorf("backup %s: error restoring %s: %w", collection.uri, mvb.table, err) + } + + origTableName := strings.Split(mvb.table, ".")[1] + restoredTable := fmt.Sprintf("%s.%s", intoDB, origTableName) + restoredFingerprint, err := mvb.computeFingerprint(restoredTable, "" /* timestamp */, rng, h) + if err != nil { + return fmt.Errorf("backup %s: error computing fingerprint for %s: %w", collection.uri, restoredTable, err) + } + + if restoredFingerprint != collection.dataFingerprint { + return fmt.Errorf( + "backup %s: mismatched fingerprints (expected=%s | actual=%s)", + collection.uri, collection.dataFingerprint, restoredFingerprint, + ) + } + + l.Printf("%s: OK", collection.uri) + } + + return nil +} + +func registerBackupMixedVersion(r registry.Registry) { + // backup/mixed-version tests different states of backup in a mixed + // version cluster. The actual state of the cluster when a backup is + // executed is randomized, so each run of the test will exercise a + // different set of events. Reusing the same seed will produce the + // same test. + r.Add(registry.TestSpec{ + Name: "backup/mixed-version", + Owner: registry.OwnerDisasterRecovery, + Cluster: r.MakeClusterSpec(5), + EncryptionSupport: registry.EncryptionMetamorphic, + Timeout: 4 * time.Hour, + RequiresLicense: true, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + if c.IsLocal() && runtime.GOARCH == "arm64" { + t.Skip("Skip under ARM64. See https://github.com/cockroachdb/cockroach/issues/89268") + } + + roachNodes := c.Range(1, c.Spec().NodeCount-1) + workloadNode := c.Node(c.Spec().NodeCount) + + uploadVersion(ctx, t, c, workloadNode, clusterupgrade.MainVersion) + bankRun := roachtestutil.NewCommand("./cockroach workload run bank"). + Arg("{pgurl%s}", roachNodes). + Option("tolerate-errors") + + mvt := mixedversion.NewTest(ctx, t, t.L(), c, roachNodes) + mvb := newMixedVersionBackup(ctx, c, roachNodes, "bank.bank") + mvt.OnStartup("set short job interval", mvb.setShortJobIntervals) + mvt.OnStartup("load backup data", mvb.loadBackupData) + mvt.Workload("bank", workloadNode, nil /* initCmd */, bankRun) + + mvt.InMixedVersion("plan and run backups", mvb.planAndRunBackups) + mvt.AfterUpgradeFinalized("verify backups", mvb.verifyBackups) + + mvt.Run() + }, + }) +} diff --git a/pkg/cmd/roachtest/tests/mixed_version_job_compatibility_in_declarative_schema_changer.go b/pkg/cmd/roachtest/tests/mixed_version_job_compatibility_in_declarative_schema_changer.go index 4cd6165fcbb6..725bf4b8a625 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_job_compatibility_in_declarative_schema_changer.go +++ b/pkg/cmd/roachtest/tests/mixed_version_job_compatibility_in_declarative_schema_changer.go @@ -29,8 +29,8 @@ func testSetupResetStep(c cluster.Cluster) versionStep { return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { db := c.Conn(ctx, t.L(), 1) setUpQuery := ` -CREATE DATABASE testdb; -CREATE SCHEMA testdb.testsc; +CREATE DATABASE testdb; +CREATE SCHEMA testdb.testsc; CREATE TABLE testdb.testsc.t (i INT PRIMARY KEY); CREATE TYPE testdb.testsc.typ AS ENUM ('a', 'b'); CREATE SEQUENCE testdb.testsc.s; @@ -73,6 +73,20 @@ func planAndRunSchemaChange( } } +func setShortJobIntervalsStep(node int) versionStep { + return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { + db := u.conn(ctx, t, node) + runQuery := func(query string, args ...interface{}) error { + _, err := db.ExecContext(ctx, query, args...) + return err + } + + if err := setShortJobIntervalsCommon(runQuery); err != nil { + t.Fatal(err) + } + } +} + func registerDeclarativeSchemaChangerJobCompatibilityInMixedVersion(r registry.Registry) { // declarative_schema_changer/job-compatibility-mixed-version tests that, // in a mixed version cluster, jobs created by the declarative schema changer