diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index fa97ea78252c..4a34fddc7079 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -275,6 +275,17 @@ func NewTest( } } +// RNG returns the underlying random number generator used by the +// mixedversion framework to generate a test plan. This rng can be +// used to make random decisions during test setup. +// +// Do NOT use the rng returned by this function in mixedversion hooks +// (functions passed to `InMixedVersion` and similar). Instead, use +// the rng instance directly passed as argument to those functions. +func (t *Test) RNG() *rand.Rand { + return t.prng +} + // InMixedVersion adds a new mixed-version hook to the test. The // functionality in the function passed as argument to this function // will be tested in arbitrary mixed-version states. If multiple diff --git a/pkg/cmd/roachtest/tests/mixed_version_backup.go b/pkg/cmd/roachtest/tests/mixed_version_backup.go index 78c044e9432a..bf8e1c3e706c 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_backup.go +++ b/pkg/cmd/roachtest/tests/mixed_version_backup.go @@ -59,6 +59,17 @@ const ( // probability that we will attempt to restore a backup in // mixed-version state. mixedVersionRestoreProbability = 0.5 + + // suffix added to the names of backups taken while the cluster is + // upgrading. + finalizingSuffix = "_finalizing" + + // probabilities that the test will attempt to pause a backup job. + neverPause = 0 + alwaysPause = 1 + + // the test will not pause a backup job more than `maxPauses` times. + maxPauses = 3 ) var ( @@ -70,7 +81,7 @@ var ( InitialBackoff: 10 * time.Second, MaxBackoff: 1 * time.Minute, Multiplier: 1.5, - MaxRetries: 50, + MaxRetries: 80, } v231 = func() *version.Version { @@ -108,6 +119,52 @@ var ( "users": "SHOW USERS", "zones": "SHOW ZONE CONFIGURATIONS", } + + // systemSettingValues is a mapping from system setting names to + // possible values they can assume in this test. The system settings + // listed here are chosen based on availability of documentation + // (see below), meaning a customer could have reasonably set them; + // and relationship to backup/restore. The list of possible values + // are generally 50-100% smaller or larger than the documented + // default. + // + // Documentation: + // https://www.cockroachlabs.com/docs/stable/cluster-settings.html + systemSettingValues = map[string][]string{ + "bulkio.backup.file_size": {"8MiB", "32MiB", "512MiB", "750MiB"}, + "bulkio.backup.read_timeout": {"2m0s", "10m0s"}, + "bulkio.backup.read_with_priority_after": {"20s", "5m0s"}, + "bulkio.stream_ingestion.minimum_flush_interval": {"1s", "10s", "30s"}, + "kv.bulk_io_write.max_rate": {"250MiB", "500MiB", "2TiB"}, + "kv.bulk_sst.max_allowed_overage": {"16MiB", "256MiB"}, + "kv.bulk_sst.target_size": {"4MiB", "64MiB", "128MiB"}, + } + + systemSettingNames = func() []string { + names := make([]string, 0, len(systemSettingValues)) + for name := range systemSettingValues { + names = append(names, name) + } + + // Sort the settings names so that picking a random setting is + // deterministic, given the same random seed. + sort.Strings(names) + return names + }() + + bankPossibleRows = []int{ + 100, // creates keys with long revision history + 1_000, // small backup + 10_000, // larger backups (a few GiB when using 128 KiB payloads) + } + + bankPossiblePayloadBytes = []int{ + 0, // workload default + 9, // 1 random byte (`initial-` + 1) + 500, // 5x default at the time of writing + 16 << 10, // 16 KiB + 128 << 10, // 128 KiB + } ) // sanitizeVersionForBackup takes the string representation of a @@ -320,8 +377,9 @@ type ( // (`BACKUP` statement sent to); and where they are supposed to be // executed (where the backup job will be picked up). backupSpec struct { - Plan labeledNodes - Execute labeledNodes + PauseProbability float64 + Plan labeledNodes + Execute labeledNodes } ) @@ -359,12 +417,21 @@ func (rh revisionHistory) String() string { return "revision_history" } -func newEncryptionPassphrase(rng *rand.Rand) encryptionPassphrase { - minLen := 32 - maxLen := 64 - pwdLen := rng.Intn(maxLen-minLen) + minLen +func randIntBetween(rng *rand.Rand, min, max int) int { + return rng.Intn(max-min) + min +} + +func randString(rng *rand.Rand, strLen int) string { + return randutil.RandString(rng, strLen, randutil.PrintableKeyAlphabet) +} - return encryptionPassphrase{randutil.RandString(rng, pwdLen, randutil.PrintableKeyAlphabet)} +func randWaitDuration(rng *rand.Rand) time.Duration { + durations := []int{1, 10, 60, 5 * 60} + return time.Duration(durations[rng.Intn(len(durations))]) * time.Second +} + +func newEncryptionPassphrase(rng *rand.Rand) encryptionPassphrase { + return encryptionPassphrase{randString(rng, randIntBetween(rng, 32, 64))} } func (ep encryptionPassphrase) String() string { @@ -390,6 +457,24 @@ func newBackupOptions(rng *rand.Rand) []backupOption { return options } +// newCommentTarget returns either a database or a table to be used as +// a target for a `COMMENT ON` statement. Returns the object being +// commented (either 'database' or 'table'), and the name of the +// object itself. +func newCommentTarget(rng *rand.Rand, dbs []string, tables [][]string) (string, string) { + const dbCommentProbability = 0.4 + + targetDBIdx := rng.Intn(len(dbs)) + targetDB := dbs[targetDBIdx] + if rng.Float64() < dbCommentProbability { + return "database", targetDB + } + + dbTables := tables[targetDBIdx] + targetTable := dbTables[rng.Intn(len(dbTables))] + return "table", fmt.Sprintf("%s.%s", targetDB, targetTable) +} + func newTableBackup(rng *rand.Rand, dbs []string, tables [][]string) *tableBackup { var targetDBIdx int var targetDB string @@ -681,6 +766,14 @@ func (sc *systemTableContents) scheduledJobsHandler( Values() } +func (sc *systemTableContents) commentsHandler( + values []interface{}, columns []string, +) ([]interface{}, error) { + return newSystemTableRow(sc.table, values, columns). + WithSentinel("object_id"). // object_id is rekeyed + Values() +} + // handleSpecialCases exists because there are still cases where we // can't assume that the contents of a system table are the same after // a RESTORE. Columns that cannot be expected to be the same are @@ -693,6 +786,8 @@ func (sc *systemTableContents) handleSpecialCases( return sc.settingsHandler(row, columns) case "system.scheduled_jobs": return sc.scheduledJobsHandler(row, columns) + case "system.comments": + return sc.commentsHandler(row, columns) default: return row, nil } @@ -845,7 +940,7 @@ func newBackupCollection(name string, btype backupType, options []backupOption) name: name, tables: btype.TargetTables(), options: options, - nonce: randutil.RandString(nonceRng, nonceLen, randutil.PrintableKeyAlphabet), + nonce: randString(nonceRng, nonceLen), } } @@ -854,7 +949,17 @@ func (bc *backupCollection) uri() string { // global namespace represented by the cockroachdb-backup-testing // bucket. The nonce allows multiple people (or TeamCity builds) to // be running this test without interfering with one another. - return fmt.Sprintf("gs://cockroachdb-backup-testing/%s_%s?AUTH=implicit", bc.name, bc.nonce) + return fmt.Sprintf("gs://cockroachdb-backup-testing/mixed-version/%s_%s?AUTH=implicit", bc.name, bc.nonce) +} + +func (bc *backupCollection) encryptionOption() *encryptionPassphrase { + for _, option := range bc.options { + if ep, ok := option.(encryptionPassphrase); ok { + return &ep + } + } + + return nil } // backupCollectionDesc builds a string that describes how a backup @@ -885,8 +990,9 @@ type mixedVersionBackup struct { // backup collections that are created along the test collections []*backupCollection // databases where user data is being inserted - dbs []string - tables [][]string + dbs []string + tables [][]string + tablesLoaded *atomic.Bool // counter that is incremented atomically to provide unique // identifiers to backups created during the test currentBackupID int64 @@ -896,17 +1002,22 @@ type mixedVersionBackup struct { // database backups are restored. currentRestoreID int64 - // stopWorkloads can be called to stop the any workloads started in - // this test. Useful when restoring cluster backups, as we don't - // want a stream of errors in the workload due to the nodes - // stopping. - stopWorkloads mixedversion.StopFunc + // stopBackground can be called to stop any background functions + // (including workloads) started in this test. Useful when restoring + // cluster backups, as we don't want a stream of errors in the + // these functions due to the nodes stopping. + stopBackground mixedversion.StopFunc } func newMixedVersionBackup( c cluster.Cluster, roachNodes option.NodeListOption, dbs ...string, ) *mixedVersionBackup { - return &mixedVersionBackup{cluster: c, dbs: dbs, roachNodes: roachNodes} + var tablesLoaded atomic.Bool + tablesLoaded.Store(false) + + return &mixedVersionBackup{ + cluster: c, dbs: dbs, roachNodes: roachNodes, tablesLoaded: &tablesLoaded, + } } // newBackupType chooses a random backup type (table, database, @@ -932,6 +1043,127 @@ func (*mixedVersionBackup) setShortJobIntervals( }) } +// systemTableWriter will run random statements that lead to data +// being written to system.* tables. The frequency of these writes +// (and the data written) are randomized. This function is expected to +// be run in the background throughout the entire test duration: any +// errors found while writing data will be logged but will not be +// considered fatal (after all, nodes are restarted multiple times +// during this test). +// +// The goal is to exercise the backup/restore functionality of some +// system tables that are typically empty in most tests. +// +// TODO(renato): this should be a `workload`. +func (mvb *mixedVersionBackup) systemTableWriter( + ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, +) error { + for !mvb.tablesLoaded.Load() { + l.Printf("waiting for user tables to be loaded...") + time.Sleep(10 * time.Second) + } + l.Printf("user tables loaded, starting random inserts") + + type systemOperation func() error + // addComment will run a `COMMENT ON (DATABASE|TABLE)` statement. It + // may also randomly set the comment to NULL (which is equivalent ot + // removing an existing comment, if any). + addComment := func() error { + const nullProbability = 0.2 + object, name := newCommentTarget(rng, mvb.dbs, mvb.tables) + + removeComment := rng.Float64() < nullProbability + var prefix, commentContents string + if removeComment { + prefix = fmt.Sprintf("removing comment from %s", object) + commentContents = "NULL" + } else { + prefix = fmt.Sprintf("adding comment to %s", object) + commentLen := randIntBetween(rng, 64, 1024) + commentContents = fmt.Sprintf("'%s'", randString(rng, commentLen)) + } + + l.Printf("%s: %s", prefix, name) + return h.Exec(rng, fmt.Sprintf("COMMENT ON %s %s IS %s", strings.ToUpper(object), name, commentContents)) + } + + // addExternalConnection runs a `CREATE EXTERNAL CONNECTION` + // statement, creating a named external connection to a nodelocal + // location. + addExternalConnection := func() error { + node := h.RandomNode(rng, mvb.roachNodes) + l.Printf("adding external connection to node %d", node) + nodeLocal := fmt.Sprintf("nodelocal://%d/%s", node, randString(rng, 16)) + name := randString(rng, 8) + return h.Exec(rng, fmt.Sprintf("CREATE EXTERNAL CONNECTION %q AS '%s'", name, nodeLocal)) + } + + // addRoleOrUser creates a new user or a role. The logic for both is + // the same: both users and roles may have a password associated + // with them, password expiration, associated roles, etc. + addRoleOrUser := func() error { + const roleProbability = 0.5 + isRole := rng.Float64() < roleProbability + entity := "USER" + if isRole { + entity = "ROLE" + } + + name := randString(rng, 6) + l.Printf("creating %s %s", strings.ToLower(entity), name) + + const nullPasswordProbability = 0.2 + const expirationProbability = 0.5 + numRoles := rng.Intn(3) // up to 3 roles + var options []string + + if rng.Float64() < nullPasswordProbability { + options = append(options, "PASSWORD NULL") + } else { + password := randString(rng, randIntBetween(rng, 4, 32)) + options = append(options, fmt.Sprintf("LOGIN PASSWORD '%s'", password)) + } + + if rng.Float64() < expirationProbability { + possibleExpirations := []time.Duration{ + 10 * time.Second, 1 * time.Minute, 30 * time.Minute, + } + dur := possibleExpirations[rng.Intn(len(possibleExpirations))] + expiresAt := timeutil.Now().Add(dur) + options = append(options, fmt.Sprintf("VALID UNTIL '%s'", expiresAt.Format(time.RFC3339))) + } + + possibleRoles := []string{ + "CANCELQUERY", "CONTROLCHANGEFEED", "CONTROLJOB", "CREATEDB", "CREATELOGIN", + "CREATEROLE", "MODIFYCLUSTERSETTING", + } + rng.Shuffle(len(possibleRoles), func(i, j int) { + possibleRoles[i], possibleRoles[j] = possibleRoles[j], possibleRoles[i] + }) + options = append(options, possibleRoles[:numRoles]...) + return h.Exec(rng, fmt.Sprintf("CREATE %s %q WITH %s", entity, name, strings.Join(options, " "))) + } + + possibleOps := []systemOperation{ + addComment, addExternalConnection, addRoleOrUser, + } + for { + nextDur := randWaitDuration(rng) + l.Printf("will attempt a random insert in %s", nextDur) + + select { + case <-time.After(nextDur): + op := possibleOps[rng.Intn(len(possibleOps))] + if err := op(); err != nil { + l.Printf("error running operation: %v", err) + } + case <-ctx.Done(): + l.Printf("context is canceled, finishing") + return ctx.Err() + } + } +} + // loadTables returns a list of tables that are part of the database // with the given name. func (mvb *mixedVersionBackup) loadTables( @@ -975,48 +1207,40 @@ func (mvb *mixedVersionBackup) loadTables( } mvb.tables = allTables + mvb.tablesLoaded.Store(true) return nil } -// loadBackupData starts a CSV server in the background and runs -// imports a bank fixture. Blocks until the importing is finished, at -// which point the CSV server is terminated. -func (mvb *mixedVersionBackup) loadBackupData( +// setClusterSettings may set up to numCustomSettings cluster settings +// as defined in `systemSettingValues`. The system settings changed +// are logged. This function should be called *before* the upgrade +// begins; the cockroach documentation says explicitly that changing +// cluster settings is not supported in mixed-version, so we don't +// test that scenario. +func (mvb *mixedVersionBackup) setClusterSettings( ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, ) error { - var rows int - if mvb.cluster.IsLocal() { - rows = 100 - l.Printf("importing only %d rows (as this is a local run)", rows) - } else { - rows = rows3GiB - l.Printf("importing %d rows", rows) + const numCustomSettings = 3 + const defaultSettingsProbability = 0.2 + + if rng.Float64() < defaultSettingsProbability { + l.Printf("not setting any custom cluster settings (using defaults)") + return nil } - csvPort := 8081 - importNode := h.RandomNode(rng, mvb.roachNodes) - l.Printf("decided to run import on node %d", importNode) + for j := 0; j < numCustomSettings; j++ { + setting := systemSettingNames[rng.Intn(len(systemSettingNames))] + possibleValues := systemSettingValues[setting] + value := possibleValues[rng.Intn(len(possibleValues))] - currentRoach := mixedversion.CurrentCockroachPath - stopCSVServer := h.Background("csv server", func(bgCtx context.Context, bgLogger *logger.Logger) error { - cmd := importBankCSVServerCommand(currentRoach, csvPort) - bgLogger.Printf("running CSV server in the background: %q", cmd) - if err := mvb.cluster.RunE(bgCtx, mvb.roachNodes, cmd); err != nil { - return fmt.Errorf("error while running csv server: %w", err) + l.Printf("setting cluster setting %q to %q", setting, value) + stmt := fmt.Sprintf("SET CLUSTER SETTING %s = '%s'", setting, value) + if err := h.Exec(rng, stmt); err != nil { + return err } - - return nil - }) - defer stopCSVServer() - if err := waitForPort(ctx, l, mvb.roachNodes, csvPort, mvb.cluster); err != nil { - return err } - return mvb.cluster.RunE( - ctx, - mvb.cluster.Node(importNode), - importBankCommand(currentRoach, rows, 0 /* ranges */, csvPort, importNode), - ) + return nil } // takePreviousVersionBackup creates a backup collection (full + @@ -1051,13 +1275,13 @@ func (mvb *mixedVersionBackup) takePreviousVersionBackup( // Create full backup. previousVersion := h.Context().FromVersion label := fmt.Sprintf("before upgrade in %s", sanitizeVersionForBackup(previousVersion)) - collection, _, err = mvb.runBackup(ctx, l, fullBackup{label}, rng, mvb.roachNodes, h) + collection, _, err = mvb.runBackup(ctx, l, fullBackup{label}, rng, mvb.roachNodes, neverPause, h) if err != nil { return err } // Create incremental backup. - collection, timestamp, err = mvb.runBackup(ctx, l, incrementalBackup{collection}, rng, mvb.roachNodes, h) + collection, timestamp, err = mvb.runBackup(ctx, l, incrementalBackup{collection}, rng, mvb.roachNodes, neverPause, h) if err != nil { return err } @@ -1066,10 +1290,9 @@ func (mvb *mixedVersionBackup) takePreviousVersionBackup( } // randomWait waits from 1s to 5m, to allow for the background -// workload to update the underlying table we are backing up. +// workloads to update the databases we are backing up. func (mvb *mixedVersionBackup) randomWait(l *logger.Logger, rng *rand.Rand) { - durations := []int{1, 10, 60, 5 * 60} - dur := time.Duration(durations[rng.Intn(len(durations))]) * time.Second + dur := randWaitDuration(rng) l.Printf("waiting for %s", dur) time.Sleep(dur) } @@ -1095,7 +1318,7 @@ func (mvb *mixedVersionBackup) backupName( testContext := h.Context() var finalizing string if testContext.Finalizing { - finalizing = "_finalizing" + finalizing = finalizingSuffix } fromVersion := sanitizeVersionForBackup(testContext.FromVersion) @@ -1226,6 +1449,7 @@ func (mvb *mixedVersionBackup) saveContents( timestamp string, h *mixedversion.Helper, ) error { + l.Printf("backup %s: loading table contents at timestamp '%s'", collection.name, timestamp) contents, err := mvb.computeTableContents( ctx, l, rng, collection.tables, nil /* previousContents */, timestamp, h, ) @@ -1235,6 +1459,7 @@ func (mvb *mixedVersionBackup) saveContents( collection.contents = contents l.Printf("computed contents for %d tables as part of %s", len(collection.contents), collection.name) + mvb.collections = append(mvb.collections, collection) return nil } @@ -1248,6 +1473,7 @@ func (mvb *mixedVersionBackup) runBackup( bType fmt.Stringer, rng *rand.Rand, nodes option.NodeListOption, + pauseProbability float64, h *mixedversion.Helper, ) (backupCollection, string, error) { tc := h.Context() @@ -1257,6 +1483,15 @@ func (mvb *mixedVersionBackup) runBackup( mvb.randomWait(l, rng) } + pauseAfter := 1024 * time.Hour // infinity + if rng.Float64() < pauseProbability { + possibleDurations := []time.Duration{ + 10 * time.Second, 30 * time.Second, 2 * time.Minute, + } + pauseAfter = possibleDurations[rng.Intn(len(possibleDurations))] + l.Printf("attempting pauses in %s", pauseAfter) + } + // NB: we need to run with the `detached` option + poll the // `system.jobs` table because we are intentionally disabling job // adoption in some nodes in the mixed-version test. Running the job @@ -1300,12 +1535,50 @@ func (mvb *mixedVersionBackup) runBackup( return backupCollection{}, "", fmt.Errorf("error while creating %s backup %s: %w", bType, collection.name, err) } - l.Printf("waiting for job %d (%s)", jobID, collection.name) - if err := mvb.waitForJobSuccess(ctx, l, rng, h, jobID); err != nil { - return backupCollection{}, "", err - } + backupErr := make(chan error) + go func() { + defer close(backupErr) + l.Printf("waiting for job %d (%s)", jobID, collection.name) + if err := mvb.waitForJobSuccess(ctx, l, rng, h, jobID); err != nil { + backupErr <- err + } + }() + + var numPauses int + for { + select { + case err := <-backupErr: + if err != nil { + return backupCollection{}, "", err + } + return collection, backupTime, nil + + case <-time.After(pauseAfter): + if numPauses >= maxPauses { + continue + } + + pauseDur := 5 * time.Second + l.Printf("pausing job %d for %s", jobID, pauseDur) + if err := h.Exec(rng, fmt.Sprintf("PAUSE JOB %d", jobID)); err != nil { + // We just log the error if pausing the job fails since we + // cannot guarantee the job is still running by the time we + // attempt to pause it. If that's the case, the next iteration + // of the loop should read from the backupErr channel. + l.Printf("error pausing job %d: %s", jobID, err) + continue + } + + time.Sleep(pauseDur) + + l.Printf("resuming job %d", jobID) + if err := h.Exec(rng, fmt.Sprintf("RESUME JOB %d", jobID)); err != nil { + return backupCollection{}, "", fmt.Errorf("error resuming job %d: %w", jobID, err) + } - return collection, backupTime, nil + numPauses++ + } + } } // runJobOnOneOf disables job adoption on cockroach nodes that are not @@ -1357,7 +1630,9 @@ func (mvb *mixedVersionBackup) createBackupCollection( if err := mvb.runJobOnOneOf(ctx, l, fullBackupSpec.Execute.Nodes, h, func() error { var err error label := backupCollectionDesc(fullBackupSpec, incBackupSpec) - collection, _, err = mvb.runBackup(ctx, l, fullBackup{label}, rng, fullBackupSpec.Plan.Nodes, h) + collection, _, err = mvb.runBackup( + ctx, l, fullBackup{label}, rng, fullBackupSpec.Plan.Nodes, fullBackupSpec.PauseProbability, h, + ) return err }); err != nil { return err @@ -1366,7 +1641,9 @@ func (mvb *mixedVersionBackup) createBackupCollection( // Create incremental backup. if err := mvb.runJobOnOneOf(ctx, l, incBackupSpec.Execute.Nodes, h, func() error { var err error - collection, timestamp, err = mvb.runBackup(ctx, l, incrementalBackup{collection}, rng, incBackupSpec.Plan.Nodes, h) + collection, timestamp, err = mvb.runBackup( + ctx, l, incrementalBackup{collection}, rng, incBackupSpec.Plan.Nodes, incBackupSpec.PauseProbability, h, + ) return err }); err != nil { return err @@ -1467,75 +1744,97 @@ func (mvb *mixedVersionBackup) enableJobAdoption( // planAndRunBackups is the function that can be passed to the // mixed-version test's `InMixedVersion` function. If the cluster is -// in mixed-binary state, four backup collections are created (see -// four cases in the function body). If all nodes are running the next -// version (which is different from the cluster version), then a -// backup collection is created in an arbitrary node. +// in mixed-binary state, 3 (`numCollections`) backup collections are +// created (see possible setups in `collectionSpecs`). If all nodes +// are running the next version (which is different from the cluster +// version), then a backup collection is created in an arbitrary node. func (mvb *mixedVersionBackup) planAndRunBackups( ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, ) error { tc := h.Context() // test context l.Printf("current context: %#v", tc) - if len(mvb.tables) == 0 { - l.Printf("planning backups for the first time; loading all user tables") - if err := mvb.loadTables(ctx, l, rng, h); err != nil { - return fmt.Errorf("error loading user tables: %w", err) - } - } - onPrevious := labeledNodes{ Nodes: tc.FromVersionNodes, Version: sanitizeVersionForBackup(tc.FromVersion), } onNext := labeledNodes{ Nodes: tc.ToVersionNodes, Version: sanitizeVersionForBackup(tc.ToVersion), } + onRandom := labeledNodes{Nodes: mvb.roachNodes, Version: "random node"} + defaultPauseProbability := 0.2 - if len(tc.FromVersionNodes) > 0 { + collectionSpecs := [][2]backupSpec{ // Case 1: plan backups -> previous node // execute backups -> next node - fullSpec := backupSpec{Plan: onPrevious, Execute: onNext} - incSpec := fullSpec - l.Printf("planning backup: %s", backupCollectionDesc(fullSpec, incSpec)) - if err := mvb.createBackupCollection(ctx, l, rng, fullSpec, incSpec, h); err != nil { - return err - } - - // Case 2: plan backups -> next node + { + {Plan: onPrevious, Execute: onNext, PauseProbability: defaultPauseProbability}, // full + {Plan: onPrevious, Execute: onNext, PauseProbability: defaultPauseProbability}, // incremental + }, + // Case 2: plan backups -> next node // execute backups -> previous node - fullSpec = backupSpec{Plan: onNext, Execute: onPrevious} - incSpec = fullSpec - l.Printf("planning backup: %s", backupCollectionDesc(fullSpec, incSpec)) - if err := mvb.createBackupCollection(ctx, l, rng, fullSpec, incSpec, h); err != nil { - return err - } - + { + {Plan: onNext, Execute: onPrevious, PauseProbability: defaultPauseProbability}, // full + {Plan: onNext, Execute: onPrevious, PauseProbability: defaultPauseProbability}, // incremental + }, // Case 3: plan & execute full backup -> previous node // plan & execute incremental backup -> next node - fullSpec = backupSpec{Plan: onPrevious, Execute: onPrevious} - incSpec = backupSpec{Plan: onNext, Execute: onNext} - l.Printf("planning backup: %s", backupCollectionDesc(fullSpec, incSpec)) - if err := mvb.createBackupCollection(ctx, l, rng, fullSpec, incSpec, h); err != nil { - return err - } - + { + {Plan: onPrevious, Execute: onPrevious, PauseProbability: defaultPauseProbability}, // full + {Plan: onNext, Execute: onNext, PauseProbability: defaultPauseProbability}, // incremental + }, // Case 4: plan & execute full backup -> next node // plan & execute incremental backup -> previous node - fullSpec = backupSpec{Plan: onNext, Execute: onNext} - incSpec = backupSpec{Plan: onPrevious, Execute: onPrevious} - l.Printf("planning backup: %s", backupCollectionDesc(fullSpec, incSpec)) - if err := mvb.createBackupCollection(ctx, l, rng, fullSpec, incSpec, h); err != nil { - return err + { + {Plan: onNext, Execute: onNext, PauseProbability: defaultPauseProbability}, // full + {Plan: onPrevious, Execute: onPrevious, PauseProbability: defaultPauseProbability}, // incremental + }, + // Case 5: plan backups -> random node + // execute backups -> random node (with pause + resume) + { + {Plan: onRandom, Execute: onRandom, PauseProbability: alwaysPause}, // full + {Plan: onRandom, Execute: onRandom, PauseProbability: alwaysPause}, // incremental + }, + } + + if len(tc.FromVersionNodes) > 0 { + const numCollections = 3 + rng.Shuffle(len(collectionSpecs), func(i, j int) { + collectionSpecs[i], collectionSpecs[j] = collectionSpecs[j], collectionSpecs[i] + }) + + for _, specPair := range collectionSpecs[:numCollections] { + fullSpec, incSpec := specPair[0], specPair[1] + l.Printf("planning backup: %s", backupCollectionDesc(fullSpec, incSpec)) + if err := mvb.createBackupCollection(ctx, l, rng, fullSpec, incSpec, h); err != nil { + return err + } } return nil } l.Printf("all nodes running next version, running backup on arbitrary node") - fullSpec := backupSpec{Plan: onNext, Execute: onNext} + fullSpec := backupSpec{Plan: onNext, Execute: onNext, PauseProbability: defaultPauseProbability} incSpec := fullSpec return mvb.createBackupCollection(ctx, l, rng, fullSpec, incSpec, h) } +// checkFiles uses the `check_files` option of `SHOW BACKUP` to verify +// that the latest backup in the collection passed is valid. +func (mvb *mixedVersionBackup) checkFiles( + rng *rand.Rand, collection *backupCollection, h *mixedversion.Helper, +) error { + options := []string{"check_files"} + if opt := collection.encryptionOption(); opt != nil { + options = append(options, opt.String()) + } + + checkFilesStmt := fmt.Sprintf( + "SHOW BACKUP LATEST IN '%s' WITH %s", + collection.uri(), strings.Join(options, ", "), + ) + return h.Exec(rng, checkFilesStmt) +} + // verifyBackupCollection restores the backup collection passed and // verifies that the contents after the restore match the contents // when the backup was taken. @@ -1545,8 +1844,10 @@ func (mvb *mixedVersionBackup) verifyBackupCollection( rng *rand.Rand, h *mixedversion.Helper, collection *backupCollection, + version string, ) error { - l.Printf("verifying %s", collection.name) + v := clusterupgrade.VersionMsg(version) + l.Printf("%s: verifying %s", v, collection.name) // Defaults for the database where the backup will be restored, // along with the expected names of the tables after restore. @@ -1562,25 +1863,29 @@ func (mvb *mixedVersionBackup) verifyBackupCollection( // as the tables we backed up. In addition, we need to wipe the // cluster before attempting a restore. restoredTables = collection.tables - if err := mvb.resetCluster(ctx, l); err != nil { + if err := mvb.resetCluster(ctx, l, version); err != nil { return err } case *tableBackup: // If we are restoring a table backup , we need to create it // first. if err := h.Exec(rng, fmt.Sprintf("CREATE DATABASE %s", restoreDB)); err != nil { - return fmt.Errorf("backup %s: error creating database %s: %w", collection.name, restoreDB, err) + return fmt.Errorf("%s: backup %s: error creating database %s: %w", v, collection.name, restoreDB, err) } } + // As a sanity check, make sure that a `check_files` check passes + // before attempting a restore. + if err := mvb.checkFiles(rng, collection, h); err != nil { + return fmt.Errorf("%s: backup %s: check_files failed: %w", v, collection.name, err) + } + restoreCmd, options := collection.btype.RestoreCommand(restoreDB) restoreOptions := append([]string{"detached"}, options...) // If the backup was created with an encryption passphrase, we // need to include it when restoring as well. - for _, option := range collection.options { - if ep, ok := option.(encryptionPassphrase); ok { - restoreOptions = append(restoreOptions, ep.String()) - } + if opt := collection.encryptionOption(); opt != nil { + restoreOptions = append(restoreOptions, opt.String()) } var optionsStr string @@ -1593,18 +1898,18 @@ func (mvb *mixedVersionBackup) verifyBackupCollection( ) var jobID int if err := h.QueryRow(rng, restoreStmt).Scan(&jobID); err != nil { - return fmt.Errorf("backup %s: error in restore statement: %w", collection.name, err) + return fmt.Errorf("%s: backup %s: error in restore statement: %w", v, collection.name, err) } if err := mvb.waitForJobSuccess(ctx, l, rng, h, jobID); err != nil { - return err + return fmt.Errorf("%s: %w", v, err) } restoredContents, err := mvb.computeTableContents( ctx, l, rng, restoredTables, collection.contents, "" /* timestamp */, h, ) if err != nil { - return fmt.Errorf("backup %s: error loading restored contents: %w", collection.name, err) + return fmt.Errorf("%s: backup %s: error loading restored contents: %w", v, collection.name, err) } for j, contents := range collection.contents { @@ -1612,24 +1917,28 @@ func (mvb *mixedVersionBackup) verifyBackupCollection( restoredTableContents := restoredContents[j] l.Printf("%s: verifying %s", collection.name, table) if err := contents.ValidateRestore(ctx, l, restoredTableContents); err != nil { - return fmt.Errorf("backup %s: %w", collection.name, err) + return fmt.Errorf("%s: backup %s: %w", v, collection.name, err) } } - l.Printf("%s: OK", collection.name) + l.Printf("%s: %s: OK", v, collection.name) return nil } // resetCluster wipes the entire cluster and starts it again with the -// current latest binary. This is done before we attempt restoring a +// specified version binary. This is done before we attempt restoring a // full cluster backup. -func (mvb *mixedVersionBackup) resetCluster(ctx context.Context, l *logger.Logger) error { +func (mvb *mixedVersionBackup) resetCluster( + ctx context.Context, l *logger.Logger, version string, +) error { + l.Printf("resetting cluster using version %s", version) if err := mvb.cluster.WipeE(ctx, l, mvb.roachNodes); err != nil { return fmt.Errorf("failed to wipe cluster: %w", err) } + cockroachPath := clusterupgrade.BinaryPathFromVersion(version) return clusterupgrade.StartWithBinary( - ctx, l, mvb.cluster, mvb.roachNodes, mixedversion.CurrentCockroachPath, option.DefaultStartOptsNoBackups(), + ctx, l, mvb.cluster, mvb.roachNodes, cockroachPath, option.DefaultStartOptsNoBackups(), ) } @@ -1652,7 +1961,7 @@ func (mvb *mixedVersionBackup) verifySomeBackups( l.Printf("verifying %d out of %d backups in mixed version", len(toBeRestored), len(mvb.collections)) for _, collection := range toBeRestored { - if err := mvb.verifyBackupCollection(ctx, l, rng, h, collection); err != nil { + if err := mvb.verifyBackupCollection(ctx, l, rng, h, collection, "mixed-version"); err != nil { return err } } @@ -1662,18 +1971,49 @@ func (mvb *mixedVersionBackup) verifySomeBackups( // verifyAllBackups cycles through all the backup collections created // for the duration of the test, and verifies that restoring the -// backups results in the same data as when the backup was taken. +// backups results in the same data as when the backup was taken. We +// attempt to restore all backups taken both in the previous version, +// as well as in the current (latest) version, returning all errors +// found in the process. func (mvb *mixedVersionBackup) verifyAllBackups( ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper, ) error { - l.Printf("stopping background workloads") - mvb.stopWorkloads() + l.Printf("stopping background functions and workloads") + mvb.stopBackground() + + var restoreErrors []error + verify := func(version string) { + v := clusterupgrade.VersionMsg(version) + l.Printf("%s: verifying %d collections created during this test", v, len(mvb.collections)) + for _, collection := range mvb.collections { + if version != clusterupgrade.MainVersion && strings.Contains(collection.name, finalizingSuffix) { + // Do not attempt to restore, in the previous version, a + // backup that was taken while the cluster was finalizing, as + // that will most likely fail (the backup version will be past + // the cluster version). + continue + } + if err := mvb.verifyBackupCollection(ctx, l, rng, h, collection, version); err != nil { + l.Printf("restore error: %v", err) + restoreErrors = append(restoreErrors, err) + } + } + } - l.Printf("verifying %d collections created during this test", len(mvb.collections)) - for _, collection := range mvb.collections { - if err := mvb.verifyBackupCollection(ctx, l, rng, h, collection); err != nil { - return err + verify(h.Context().FromVersion) + verify(clusterupgrade.MainVersion) + + if len(restoreErrors) > 0 { + if len(restoreErrors) == 1 { + // Simplify error reporting if only one error was found. + return restoreErrors[0] + } + + msgs := make([]string, 0, len(restoreErrors)) + for j, err := range restoreErrors { + msgs = append(msgs, fmt.Sprintf("%d: %s", j, err.Error())) } + return fmt.Errorf("%d errors during restore:\n%s", len(restoreErrors), strings.Join(msgs, "\n")) } return nil @@ -1687,7 +2027,7 @@ func registerBackupMixedVersion(r registry.Registry) { // same test. r.Add(registry.TestSpec{ Name: "backup-restore/mixed-version", - Timeout: 7 * time.Hour, + Timeout: 8 * time.Hour, Owner: registry.OwnerDisasterRecovery, Cluster: r.MakeClusterSpec(5), EncryptionSupport: registry.EncryptionMetamorphic, @@ -1699,6 +2039,8 @@ func registerBackupMixedVersion(r registry.Registry) { roachNodes := c.Range(1, c.Spec().NodeCount-1) workloadNode := c.Node(c.Spec().NodeCount) + mvt := mixedversion.NewTest(ctx, t, t.L(), c, roachNodes) + testRNG := mvt.RNG() uploadVersion(ctx, t, c, workloadNode, clusterupgrade.MainVersion) // numWarehouses is picked as a number that provides enough work @@ -1712,27 +2054,44 @@ func registerBackupMixedVersion(r registry.Registry) { Arg("{pgurl%s}", roachNodes). Flag("warehouses", numWarehouses). Option("tolerate-errors") + + bankRows := bankPossibleRows[testRNG.Intn(len(bankPossibleRows))] + bankPayload := bankPossiblePayloadBytes[testRNG.Intn(len(bankPossiblePayloadBytes))] + bankInit := roachtestutil.NewCommand("./cockroach workload init bank"). + Flag("rows", bankRows). + MaybeFlag(bankPayload != 0, "payload-bytes", bankPayload). + Flag("ranges", 0). + Arg("{pgurl%s}", roachNodes) bankRun := roachtestutil.NewCommand("./cockroach workload run bank"). Arg("{pgurl%s}", roachNodes). Option("tolerate-errors") - var stopBank, stopTPCC mixedversion.StopFunc - mvt := mixedversion.NewTest(ctx, t, t.L(), c, roachNodes) - mvb := newMixedVersionBackup(c, roachNodes, "bank", "tpcc") - mvt.OnStartup("set short job interval", mvb.setShortJobIntervals) - mvt.OnStartup("load backup data", mvb.loadBackupData) - mvt.OnStartup("take backup in previous version", mvb.takePreviousVersionBackup) - - stopBank = mvt.Workload("bank", workloadNode, nil /* initCmd */, bankRun) - stopTPCC = mvt.Workload("tpcc", workloadNode, tpccInit, tpccRun) - - mvt.InMixedVersion("plan and run backups", mvb.planAndRunBackups) - mvt.InMixedVersion("verify some backups", mvb.verifySomeBackups) - mvt.AfterUpgradeFinalized("verify all backups", mvb.verifyAllBackups) - - mvb.stopWorkloads = func() { + backupTest := newMixedVersionBackup(c, roachNodes, "bank", "tpcc") + + mvt.OnStartup("set short job interval", backupTest.setShortJobIntervals) + mvt.OnStartup("take backup in previous version", backupTest.takePreviousVersionBackup) + mvt.OnStartup("maybe set custom cluster settings", backupTest.setClusterSettings) + + // We start two workloads in this test: + // - bank: the main purpose of this workload is to test some + // edge cases, such as columns with small or large payloads, + // keys with long revision history, etc. + // - tpcc: tpcc is a much more complex workload, and should keep + // the cluster relatively busy while the backup and restores + // take place. Its schema is also more complex, and the + // operations more closely resemble a customer workload. + stopBank := mvt.Workload("bank", workloadNode, bankInit, bankRun) + stopTPCC := mvt.Workload("tpcc", workloadNode, tpccInit, tpccRun) + stopSystemWriter := mvt.BackgroundFunc("system table writer", backupTest.systemTableWriter) + + mvt.InMixedVersion("plan and run backups", backupTest.planAndRunBackups) + mvt.InMixedVersion("verify some backups", backupTest.verifySomeBackups) + mvt.AfterUpgradeFinalized("verify all backups", backupTest.verifyAllBackups) + + backupTest.stopBackground = func() { stopBank() stopTPCC() + stopSystemWriter() } mvt.Run() }, diff --git a/pkg/cmd/roachtest/tests/restore.go b/pkg/cmd/roachtest/tests/restore.go index 1eb66e854bf4..4b265fe37303 100644 --- a/pkg/cmd/roachtest/tests/restore.go +++ b/pkg/cmd/roachtest/tests/restore.go @@ -338,7 +338,7 @@ func registerRestore(r registry.Registry) { tags: registry.Tags("weekly", "aws-weekly"), }, { - // The weekly 32TB, 400 incremental layer Restore test. + // The weekly 32TB, 400 incremental layer Restore test on AWS. // // NB: Prior to 23.1, restore would OOM on backups that had many // incremental layers and many import spans. This test disables span @@ -358,6 +358,21 @@ func registerRestore(r registry.Registry) { `SET CLUSTER SETTING backup.restore_span.target_size = '0'`, }, }, + { + // The weekly 32TB, 400 incremental layer Restore test on GCP. + hardware: makeHardwareSpecs(hardwareSpecs{nodes: 15, cpus: 16, volumeSize: 5000}), + backup: makeBackupSpecs(backupSpecs{ + version: "v22.2.4", + workload: tpceRestore{customers: 2000000}, + backupProperties: "inc-count=400", + cloud: spec.GCE, + }), + timeout: 30 * time.Hour, + tags: registry.Tags("weekly"), + setUpStmts: []string{ + `SET CLUSTER SETTING backup.restore_span.target_size = '0'`, + }, + }, { // A teeny weeny 15GB restore that could be used to bisect scale agnostic perf regressions. hardware: makeHardwareSpecs(hardwareSpecs{}),