diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 0d7d9c59bb82..8e896b6c0df3 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1369,6 +1369,7 @@ unreserved_keyword ::= | 'VALIDATE' | 'VALUE' | 'VARYING' + | 'VERIFY_BACKUP_TABLE_DATA' | 'VIEW' | 'VIEWACTIVITY' | 'VIEWACTIVITYREDACTED' @@ -2530,6 +2531,7 @@ restore_options ::= | 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list | 'TENANT' '=' string_or_placeholder | 'SCHEMA_ONLY' + | 'VERIFY_BACKUP_TABLE_DATA' scrub_option_list ::= ( scrub_option ) ( ( ',' scrub_option ) )* diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index fd9b4bde5c49..42d442372408 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -12,7 +12,9 @@ import ( "context" gosql "database/sql" "fmt" + "io/ioutil" "net/url" + "path/filepath" "regexp" "strings" "testing" @@ -32,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -117,13 +120,14 @@ func (d *datadrivenTestState) cleanup(ctx context.Context) { } type serverCfg struct { - name string - iodir string - nodes int - splits int - ioConf base.ExternalIODirConfig - localities string - beforeVersion string + name string + iodir string + nodes int + splits int + ioConf base.ExternalIODirConfig + localities string + beforeVersion string + testingKnobCfg string } func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error { @@ -171,6 +175,18 @@ func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error { } params.ServerArgsPerNode = serverArgsPerNode } + if cfg.testingKnobCfg != "" { + switch cfg.testingKnobCfg { + case "RecoverFromIterPanic": + params.ServerArgs.Knobs.DistSQL = &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RecoverFromIterPanic: true, + }, + } + default: + t.Fatalf("TestingKnobCfg %s not found", cfg.testingKnobCfg) + } + } if cfg.iodir == "" { tc, _, cfg.iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, cfg.splits, InitManualReplication, params) @@ -255,10 +271,14 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // version before the passed in key. See cockroach_versions.go // for possible values. // +// + testingKnobCfg: specifies a key to a hardcoded testingKnob configuration +// +// // - "upgrade-server version=" // Upgrade the cluster version of the active server to the passed in // clusterVersion key. See cockroach_versions.go for possible values. // +// // - "exec-sql [server=] [user=] [args]" // Executes the input SQL query on the target server. By default, server is // the last created server. @@ -347,6 +367,9 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // // + target: SQL target. Currently, only table names are supported. // +// +// - "corrupt-backup" uri= +// Finds the latest backup in the provided collection uri an flips a bit in one SST in the backup func TestDataDriven(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -375,7 +398,7 @@ func TestDataDriven(t *testing.T) { return "" case "new-server": - var name, shareDirWith, iodir, localities, beforeVersion string + var name, shareDirWith, iodir, localities, beforeVersion, testingKnobCfg string var splits int nodes := singleNode var io base.ExternalIODirConfig @@ -404,16 +427,20 @@ func TestDataDriven(t *testing.T) { if d.HasArg("beforeVersion") { d.ScanArgs(t, "beforeVersion", &beforeVersion) } + if d.HasArg("testingKnobCfg") { + d.ScanArgs(t, "testingKnobCfg", &testingKnobCfg) + } lastCreatedServer = name cfg := serverCfg{ - name: name, - iodir: iodir, - nodes: nodes, - splits: splits, - ioConf: io, - localities: localities, - beforeVersion: beforeVersion, + name: name, + iodir: iodir, + nodes: nodes, + splits: splits, + ioConf: io, + localities: localities, + beforeVersion: beforeVersion, + testingKnobCfg: testingKnobCfg, } err := ds.addServer(t, cfg) if err != nil { @@ -816,6 +843,28 @@ func TestDataDriven(t *testing.T) { }) require.NoError(t, err) return "" + + case "corrupt-backup": + server := lastCreatedServer + user := "root" + var uri string + d.ScanArgs(t, "uri", &uri) + parsedURI, err := url.Parse(strings.Replace(uri, "'", "", -1)) + require.NoError(t, err) + var filePath string + filePathQuery := fmt.Sprintf("SELECT path FROM [SHOW BACKUP FILES FROM LATEST IN %s] LIMIT 1", uri) + err = ds.getSQLDB(t, server, user).QueryRow(filePathQuery).Scan(&filePath) + require.NoError(t, err) + fullPath := filepath.Join(ds.getIODir(t, server), parsedURI.Path, filePath) + print(fullPath) + data, err := ioutil.ReadFile(fullPath) + require.NoError(t, err) + data[20] ^= 1 + if err := ioutil.WriteFile(fullPath, data, 0644 /* perm */); err != nil { + t.Fatal(err) + } + return "" + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } diff --git a/pkg/ccl/backupccl/restoration_data.go b/pkg/ccl/backupccl/restoration_data.go index b91a775ac379..1bb319e7f6d5 100644 --- a/pkg/ccl/backupccl/restoration_data.go +++ b/pkg/ccl/backupccl/restoration_data.go @@ -37,6 +37,9 @@ type restorationData interface { getTenantRekeys() []execinfrapb.TenantRekey getPKIDs() map[uint64]bool + // isValidateOnly returns ture iff only validation should occur + isValidateOnly() bool + // addTenant extends the set of data needed to restore to include a new tenant. addTenant(fromID, toID roachpb.TenantID) @@ -74,6 +77,9 @@ type restorationDataBase struct { // systemTables store the system tables that need to be restored for cluster // backups. Should be nil otherwise. systemTables []catalog.TableDescriptor + + // validateOnly indicates this data should only get read from external storage, not written + validateOnly bool } // restorationDataBase implements restorationData. @@ -119,6 +125,10 @@ func (b *restorationDataBase) isEmpty() bool { return len(b.spans) == 0 } +func (b *restorationDataBase) isValidateOnly() bool { + return b.validateOnly +} + // isMainBundle implements restorationData. func (restorationDataBase) isMainBundle() bool { return false } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 371e71222bce..02b70414ba5e 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -279,12 +279,25 @@ func (rd *restoreDataProcessor) openSSTs( } }() + var recoverFromIterPanic bool + if restoreKnobs, ok := rd.flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + recoverFromIterPanic = restoreKnobs.RecoverFromIterPanic + } + // sendIter sends a multiplexed iterator covering the currently accumulated files over the // channel. sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { + if recoverFromIterPanic { + defer func() { + if r := recover(); r != nil { + log.Errorf(ctx, "recovered from Iter panic %v", r) + } + }() + } + readAsOfIter.Close() for _, dir := range dirsToSend { @@ -417,6 +430,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( false, /* splitFilledRanges */ rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), rd.flowCtx.Cfg.BulkSenderLimiter, + rd.spec.ValidateOnly, ) if err != nil { return summary, err diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 285661ef5fcc..6007e66dc0d5 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -389,6 +389,7 @@ func restore( dataToRestore.getRekeys(), dataToRestore.getTenantRekeys(), endTime, + dataToRestore.isValidateOnly(), progCh, ) } @@ -484,6 +485,8 @@ type restoreResumer struct { // afterPreRestore runs on cluster restores after restoring the "preRestore" // data. afterPreRestore func() error + // checksumRecover + checksumRecover func() error } } @@ -657,15 +660,32 @@ func shouldPreRestore(table *tabledesc.Mutable) bool { return ok } -// createImportingDescriptors create the tables that we will restore into. It also -// fetches the information from the old tables that we need for the restore. +// createImportingDescriptors create the tables that we will restore into and returns up to three +// configurations for separate restoration flows. The three restoration flows are +// +// 1. dataToPreRestore: a restoration flow cfg to ingest a subset of +// system tables (e.g. zone configs) during a cluster restore that are +// required to be set up before the rest of the data gets restored. +// This should be empty during non-cluster restores. +// +// 2. preValidation: a restoration flow cfg to ingest the remainder of system tables, +// during a verify_backup_table_data, cluster level, restores. This should be empty otherwise. +// +// 3. trackedRestore: a restoration flow cfg to ingest the remainder of restore targets. +// +// It also fetches the information from the old tables that we need for the restore. func createImportingDescriptors( ctx context.Context, p sql.JobExecContext, backupCodec keys.SQLCodec, sqlDescs []catalog.Descriptor, r *restoreResumer, -) (*restorationDataBase, *mainRestorationData, error) { +) ( + dataToPreRestore *restorationDataBase, + preValidation *restorationDataBase, + trackedRestore *mainRestorationData, + err error, +) { details := r.job.Details().(jobspb.RestoreDetails) var allMutableDescs []catalog.MutableDescriptor @@ -729,12 +749,18 @@ func createImportingDescriptors( // that is, in the 'old' keyspace, before we reassign the table IDs. preRestoreSpans := spansForAllRestoreTableIndexes(backupCodec, preRestoreTables, nil, details.SchemaOnly) postRestoreSpans := spansForAllRestoreTableIndexes(backupCodec, postRestoreTables, nil, details.SchemaOnly) + var verifySpans []roachpb.Span + if details.VerifyData { + // verifySpans contains the spans that should be read and checksum'd during a + // verify_backup_table_data RESTORE + verifySpans = spansForAllRestoreTableIndexes(backupCodec, postRestoreTables, nil, false) + } log.Eventf(ctx, "starting restore for %d tables", len(mutableTables)) // Assign new IDs to the database descriptors. if err := rewrite.DatabaseDescs(mutableDatabases, details.DescriptorRewrites); err != nil { - return nil, nil, err + return nil, nil, nil, err } databaseDescs := make([]*descpb.DatabaseDescriptor, len(mutableDatabases)) @@ -757,11 +783,11 @@ func createImportingDescriptors( } if err := rewrite.SchemaDescs(schemasToWrite, details.DescriptorRewrites); err != nil { - return nil, nil, err + return nil, nil, nil, err } if err := remapPublicSchemas(ctx, p, mutableDatabases, &schemasToWrite, &writtenSchemas, &details); err != nil { - return nil, nil, err + return nil, nil, nil, err } // Assign new IDs and privileges to the tables, and update all references to @@ -769,7 +795,7 @@ func createImportingDescriptors( if err := rewrite.TableDescs( mutableTables, details.DescriptorRewrites, details.OverrideDB, ); err != nil { - return nil, nil, err + return nil, nil, nil, err } tableDescs := make([]*descpb.TableDescriptor, len(mutableTables)) for i, table := range mutableTables { @@ -807,13 +833,13 @@ func createImportingDescriptors( // descriptors will not be written to disk, and is only for accurate, // in-memory resolution hereon out. if err := rewrite.TypeDescs(types, details.DescriptorRewrites); err != nil { - return nil, nil, err + return nil, nil, nil, err } // Finally, clean up / update any schema changer state inside descriptors // globally. if err := rewrite.MaybeClearSchemaChangerStateInDescs(allMutableDescs); err != nil { - return nil, nil, err + return nil, nil, nil, err } // Set the new descriptors' states to offline. @@ -1104,7 +1130,7 @@ func createImportingDescriptors( return err }) if err != nil { - return nil, nil, err + return nil, nil, nil, err } } @@ -1114,7 +1140,7 @@ func createImportingDescriptors( tableToSerialize := tables[i] newDescBytes, err := protoutil.Marshal(tableToSerialize.DescriptorProto()) if err != nil { - return nil, nil, errors.NewAssertionErrorWithWrappedErrf(err, + return nil, nil, nil, errors.NewAssertionErrorWithWrappedErrf(err, "marshaling descriptor") } rekeys = append(rekeys, execinfrapb.TableRekey{ @@ -1125,7 +1151,7 @@ func createImportingDescriptors( _, backupTenantID, err := keys.DecodeTenantPrefix(backupCodec.TenantPrefix()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } if !backupCodec.TenantPrefix().Equal(p.ExecCfg().Codec.TenantPrefix()) { // Ensure old processors fail if this is a previously unsupported restore of @@ -1162,14 +1188,14 @@ func createImportingDescriptors( pkIDs[roachpb.BulkOpSummaryID(uint64(tbl.GetID()), uint64(tbl.GetPrimaryIndexID()))] = true } - dataToPreRestore := &restorationDataBase{ + dataToPreRestore = &restorationDataBase{ spans: preRestoreSpans, tableRekeys: rekeys, tenantRekeys: tenantRekeys, pkIDs: pkIDs, } - dataToRestore := &mainRestorationData{ + trackedRestore = &mainRestorationData{ restorationDataBase{ spans: postRestoreSpans, tableRekeys: rekeys, @@ -1178,6 +1204,27 @@ func createImportingDescriptors( }, } + preValidation = &restorationDataBase{} + // During a RESTORE with verify_backup_table_data data, progress on + // verifySpans should be the source of job progress (as it will take the most time); therefore, + // wrap them in a mainRestoration struct and unwrap postRestoreSpans + // (only relevant during a cluster restore). + if details.VerifyData { + trackedRestore.restorationDataBase.spans = verifySpans + trackedRestore.restorationDataBase.validateOnly = true + + // Before the main (validation) flow, during a cluster level restore, + // we still need to restore system tables that do NOT get restored in the dataToPreRestore + // flow. This restoration will not get tracked during job progress. + if (details.DescriptorCoverage != tree.AllDescriptors) && len(postRestoreSpans) != 0 { + return nil, nil, nil, errors.AssertionFailedf( + "no spans should get restored in a non cluster, verify_backup_table_data restore") + } + preValidation.spans = postRestoreSpans + preValidation.tableRekeys = rekeys + preValidation.pkIDs = pkIDs + } + if tempSystemDBID != descpb.InvalidID { for _, table := range preRestoreTables { if table.GetParentID() == tempSystemDBID { @@ -1186,11 +1233,19 @@ func createImportingDescriptors( } for _, table := range postRestoreTables { if table.GetParentID() == tempSystemDBID { - dataToRestore.systemTables = append(dataToRestore.systemTables, table) + if details.VerifyData { + // During a verify_backup_table_data RESTORE, system tables are + // restored pre validation. Note that the system tables are still + // added to the trackedRestore flow because after ingestion, the + // restore job uses systemTable metadata hanging from the + // trackedRestore object. + preValidation.systemTables = append(preValidation.systemTables, table) + } + trackedRestore.systemTables = append(trackedRestore.systemTables, table) } } } - return dataToPreRestore, dataToRestore, nil + return dataToPreRestore, preValidation, trackedRestore, nil } // remapPublicSchemas is used to create a descriptor backed public schema @@ -1327,7 +1382,8 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro return err } - preData, mainData, err := createImportingDescriptors(ctx, p, backupCodec, sqlDescs, r) + preData, preValidateData, mainData, err := createImportingDescriptors(ctx, p, backupCodec, + sqlDescs, r) if err != nil { return err } @@ -1438,6 +1494,25 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } } + if !preValidateData.isEmpty() { + res, err := restoreWithRetry( + ctx, + p, + numNodes, + backupManifests, + details.BackupLocalityInfo, + details.EndTime, + preValidateData, + r.job, + details.Encryption, + &kmsEnv, + ) + if err != nil { + return err + } + + resTotal.Add(res) + } { // Restore the main data bundle. We notably only restore the system tables // later. diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 729791f61d80..700e37fdbb3d 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -787,6 +787,7 @@ func resolveOptionsForRestoreJobDescription( SkipMissingViews: opts.SkipMissingViews, Detached: opts.Detached, SchemaOnly: opts.SchemaOnly, + VerifyData: opts.VerifyData, } if opts.EncryptionPassphrase != nil { @@ -880,6 +881,10 @@ func restorePlanHook( return nil, nil, nil, false, errors.New("cannot run RESTORE with schema_only until cluster has fully upgraded to 22.2") } + if !restoreStmt.Options.SchemaOnly && restoreStmt.Options.VerifyData { + return nil, nil, nil, false, + errors.New("to set the verify_backup_table_data option, the schema_only option must be set") + } fromFns := make([]func() ([]string, error), len(restoreStmt.From)) for i := range restoreStmt.From { @@ -1694,6 +1699,7 @@ func doRestorePlan( RestoreSystemUsers: restoreStmt.DescriptorCoverage == tree.SystemUsers, PreRewriteTenantId: oldTenantID, SchemaOnly: restoreStmt.Options.SchemaOnly, + VerifyData: restoreStmt.Options.VerifyData, } jr := jobs.Record{ diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 860b4bab7400..3b4880170a8a 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -67,6 +67,7 @@ func distRestore( tableRekeys []execinfrapb.TableRekey, tenantRekeys []execinfrapb.TenantRekey, restoreTime hlc.Timestamp, + validateOnly bool, progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, ) error { defer close(progCh) @@ -106,7 +107,8 @@ func distRestore( p := planCtx.NewPhysicalPlan() - splitAndScatterSpecs, err := makeSplitAndScatterSpecs(sqlInstanceIDs, chunks, tableRekeys, tenantRekeys) + splitAndScatterSpecs, err := makeSplitAndScatterSpecs(sqlInstanceIDs, chunks, tableRekeys, + tenantRekeys, validateOnly) if err != nil { return nil, nil, err } @@ -118,6 +120,7 @@ func distRestore( TableRekeys: tableRekeys, TenantRekeys: tenantRekeys, PKIDs: pkIDs, + ValidateOnly: validateOnly, } if len(splitAndScatterSpecs) == 0 { @@ -292,6 +295,7 @@ func makeSplitAndScatterSpecs( chunks [][]execinfrapb.RestoreSpanEntry, tableRekeys []execinfrapb.TableRekey, tenantRekeys []execinfrapb.TenantRekey, + validateOnly bool, ) (map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec, error) { specsBySQLInstanceID := make(map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec) for i, chunk := range chunks { @@ -307,6 +311,7 @@ func makeSplitAndScatterSpecs( }}, TableRekeys: tableRekeys, TenantRekeys: tenantRekeys, + ValidateOnly: validateOnly, } } } diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index 676231f6b3a8..3f34fc42da6f 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -42,7 +42,9 @@ type splitAndScatterer interface { scatter(ctx context.Context, codec keys.SQLCodec, scatterKey roachpb.Key) (roachpb.NodeID, error) } -type noopSplitAndScatterer struct{} +type noopSplitAndScatterer struct { + scatterNode roachpb.NodeID +} var _ splitAndScatterer = noopSplitAndScatterer{} @@ -55,7 +57,7 @@ func (n noopSplitAndScatterer) split(_ context.Context, _ keys.SQLCodec, _ roach func (n noopSplitAndScatterer) scatter( _ context.Context, _ keys.SQLCodec, _ roachpb.Key, ) (roachpb.NodeID, error) { - return 0, nil + return n.scatterNode, nil } // dbSplitAndScatter is the production implementation of this processor's @@ -225,6 +227,10 @@ func newSplitAndScatterProcessor( } scatterer := makeSplitAndScatterer(db, kr) + if spec.ValidateOnly { + nodeID, _ := flowCtx.NodeID.OptionalNodeID() + scatterer = noopSplitAndScatterer{nodeID} + } ssp := &splitAndScatterProcessor{ flowCtx: flowCtx, spec: spec, diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-schema-only b/pkg/ccl/backupccl/testdata/backup-restore/restore-schema-only index 53542234456f..623a8c43c917 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/restore-schema-only +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-schema-only @@ -142,3 +142,4 @@ pq: type "d2.public.greeting" already exists # into each of the restored tables. exec-sql INSERT INTO d2.t2 VALUES ('hi'); +---- diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-validation-only b/pkg/ccl/backupccl/testdata/backup-restore/restore-validation-only new file mode 100644 index 000000000000..0ef515cf8a32 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-validation-only @@ -0,0 +1,173 @@ +# Test verify_backup_table_data restore +# Part 1: test that verify_backup_table_data leaves the cluster in the same state as a schema_only +# Part 2: check that it can catch data corruption errors + +########## +# Part 1 +########## + +new-server name=s1 allow-implicit-access +---- + +exec-sql +CREATE DATABASE d; +CREATE TYPE d.greeting AS ENUM ('hello', 'howdy', 'hi'); +CREATE TABLE d.t1 (x INT); +INSERT INTO d.t1 VALUES (1), (2), (3); +CREATE TABLE d.t2 (x d.greeting); +INSERT INTO d.t2 VALUES ('hello'), ('howdy'); +COMMENT ON TABLE d.t1 IS 'This comment better get restored from the backed up system table!'; +---- + +query-sql +SHOW CREATE TABLE d.t1; +---- +d.public.t1 CREATE TABLE public.t1 ( + x INT8 NULL, + rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(), + CONSTRAINT t1_pkey PRIMARY KEY (rowid ASC) +); +COMMENT ON TABLE public.t1 IS 'This comment better get restored from the backed up system table!' + +# drop and create defaultDB to ensure it has a higher ID than by default. We will check that when +# this cluster is restored, the default db with the higher id was also restored +# by default, default db has an id of 100. +query-sql +SELECT id FROM system.namespace WHERE name = 'defaultdb' +---- +100 + +exec-sql +DROP DATABASE defaultdb; +CREATE DATABASE defaultdb; +---- + +query-sql +SELECT count(*) FROM system.namespace WHERE name = 'defaultdb' AND id > 100 +---- +1 + +exec-sql +BACKUP INTO 'nodelocal://1/full_cluster_backup/'; +---- + +exec-sql +BACKUP Database d INTO 'nodelocal://1/full_database_backup/'; +---- + + +# A new cluster in prep for a cluster level schema_only restore. +new-server name=s2 share-io-dir=s1 allow-implicit-access +---- + +# First, ensure cluster level schema_only restore fails fast in same ways as a cluster level restore. +# +# Fail fast if the user passes new_db_name. +exec-sql +RESTORE FROM LATEST IN 'nodelocal://0/full_cluster_backup/' with schema_only, verify_backup_table_data, new_db_name='d2'; +---- +pq: new_db_name can only be used for RESTORE DATABASE with a single target database + + +exec-sql server=s2 +CREATE USER testuser +---- + +# Non admins cannot run schema_only cluster restore +exec-sql user=testuser +RESTORE FROM LATEST IN 'nodelocal://0/full_cluster_backup/' with schema_only, verify_backup_table_data +---- +pq: only users with the admin role are allowed to restore full cluster backups + +# Fail fast using a database backup +exec-sql +RESTORE FROM LATEST IN 'nodelocal://0/full_database_backup/' with schema_only, verify_backup_table_data; +---- +pq: full cluster RESTORE can only be used on full cluster BACKUP files + +exec-sql +RESTORE FROM LATEST IN 'nodelocal://0/full_cluster_backup/' with schema_only, verify_backup_table_data; +---- + +# there should be no data in the restored tables +query-sql +SELECT * FROM d.t1; +---- + + +query-sql +SELECT * FROM d.t2; +---- + +# The backed up cluster was initiated with bank. Ensure it's now empty. +query-sql +SELECT * FROM data.bank; +---- + +# The backed table d.t1 had a comment stored in a system table. This should have been restored. +query-sql +SHOW CREATE TABLE d.t1; +---- +d.public.t1 CREATE TABLE public.t1 ( + x INT8 NULL, + rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(), + CONSTRAINT t1_pkey PRIMARY KEY (rowid ASC) +); +COMMENT ON TABLE public.t1 IS 'This comment better get restored from the backed up system table!' + +# Ensure the defaultdb from the backed up cluster was restored. +query-sql +SELECT count(*) FROM system.namespace WHERE name = 'defaultdb' AND id > 100 +---- +1 + +# Ensure Database Level schema_only restore logic is sound +exec-sql +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/full_database_backup/' with schema_only, new_db_name='d2'; +---- + +# There should be no data in the user tables. +query-sql +SELECT * FROM d2.t1; +---- + +query-sql +SELECT * FROM d2.t2; +---- + + +# Each of the restored types should have namespace entries. Test this by +# trying to create types that would cause namespace conflicts. +exec-sql +CREATE TYPE d2.greeting AS ENUM ('hello', 'hiya') +---- +pq: type "d2.public.greeting" already exists + +# We should be able to resolve each restored type. Test this by inserting +# into each of the restored tables. +exec-sql +INSERT INTO d2.t2 VALUES ('hi'); +---- + + +######## +# Part 2: test this checks corrupt data +######## + +# A new cluster that recovers from checksum error. +new-server name=s3 share-io-dir=s1 allow-implicit-access testingKnobCfg=RecoverFromIterPanic +---- + +corrupt-backup uri='nodelocal://0/full_database_backup/' +---- + +# Schema only restore misses the data corruption +exec-sql +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/full_database_backup/' with schema_only, new_db_name='d3'; +---- + +# But verify-backup-table-data catches it +exec-sql expect-error-regex='pebble/table: invalid table 000000' +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/full_database_backup/' with schema_only, verify_backup_table_data, new_db_name='d4'; +---- +regex matches error diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 484924087a74..2a9036a32f46 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -383,7 +383,9 @@ message RestoreDetails { // SchemaOnly determines whether to only restore the schema in the backup. bool schema_only = 25; - // NEXT ID: 26. + bool VerifyData = 26; + + // NEXT ID: 27. } diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index d1c88e7c6a44..dbebd08da2b7 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -139,6 +139,10 @@ type SSTBatcher struct { stats ingestionPerformanceStats disableSplits bool + // noop indicates that the SSTBatcher will not actually write any data to disk or manipulate + // any database state. All public operations immediately noop. + noop bool + // The rest of the fields are per-batch and are reset via Reset() before each // batch is started. sstWriter storage.SSTWriter @@ -183,7 +187,11 @@ func MakeSSTBatcher( splitFilledRanges bool, mem mon.BoundAccount, sendLimiter limit.ConcurrentRequestLimiter, + noop bool, ) (*SSTBatcher, error) { + if noop && splitFilledRanges { + return nil, errors.New("Cannot create noop SSTBatcher and allow range splits") + } b := &SSTBatcher{ name: name, db: db, @@ -193,6 +201,7 @@ func MakeSSTBatcher( disableSplits: !splitFilledRanges, mem: mem, limiter: sendLimiter, + noop: noop, } err := b.Reset(ctx) return b, err @@ -645,6 +654,9 @@ func (b *SSTBatcher) addSSTable( stats enginepb.MVCCStats, updatesLastRange bool, ) error { + if b.noop { + return nil + } sendStart := timeutil.Now() // Currently, the SSTBatcher cannot ingest range keys, so it is safe to diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 6874898e7b6b..eaecde896695 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1612,6 +1612,9 @@ type BackupRestoreTestingKnobs struct { // testing. This is typically the bulk mem monitor if not // specified here. BackupMemMonitor *mon.BytesMonitor + + // RecoverFromIterClosePanic prevents the node from panicing during ReadAsOfIterator.Close + RecoverFromIterPanic bool } var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{} diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 18e29b3cd6d5..e7d589d388c7 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -303,7 +303,7 @@ message RestoreDataSpec { // information passed back to track progress in the backup job. map pk_ids = 4 [(gogoproto.customname) = "PKIDs"]; reserved 7; - optional bool schema_only = 8 [(gogoproto.nullable) = false]; + optional bool validate_only = 8 [(gogoproto.nullable) = false]; // NEXT ID: 9. } @@ -318,7 +318,7 @@ message SplitAndScatterSpec { repeated TableRekey table_rekeys = 2 [(gogoproto.nullable) = false]; repeated TenantRekey tenant_rekeys = 3 [(gogoproto.nullable) = false]; reserved 5; - optional bool schema_only = 6 [(gogoproto.nullable) = false]; + optional bool validate_only = 6 [(gogoproto.nullable) = false]; // NEXTID: 7. } diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 37d9c8dfab79..65e097002a80 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -939,7 +939,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs { %token UNBOUNDED UNCOMMITTED UNION UNIQUE UNKNOWN UNLOGGED UNSPLIT %token UPDATE UPSERT UNSET UNTIL USE USER USERS USING UUID -%token VALID VALIDATE VALUE VALUES VARBIT VARCHAR VARIADIC VIEW VARYING VIEWACTIVITY VIEWACTIVITYREDACTED VIEWDEBUG +%token VALID VALIDATE VALUE VALUES VARBIT VARCHAR VARIADIC VERIFY_BACKUP_TABLE_DATA VIEW VARYING VIEWACTIVITY VIEWACTIVITYREDACTED VIEWDEBUG %token VIEWCLUSTERMETADATA VIEWCLUSTERSETTING VIRTUAL VISIBLE VOLATILE VOTERS %token WHEN WHERE WINDOW WITH WITHIN WITHOUT WORK WRITE @@ -3659,6 +3659,10 @@ restore_options: { $$.val = &tree.RestoreOptions{SchemaOnly: true} } +| VERIFY_BACKUP_TABLE_DATA + { + $$.val = &tree.RestoreOptions{VerifyData: true} + } import_format: name { @@ -15403,6 +15407,7 @@ unreserved_keyword: | VALIDATE | VALUE | VARYING +| VERIFY_BACKUP_TABLE_DATA | VIEW | VIEWACTIVITY | VIEWACTIVITYREDACTED diff --git a/pkg/sql/sem/tree/backup.go b/pkg/sql/sem/tree/backup.go index c82787c7532c..098c2d71834c 100644 --- a/pkg/sql/sem/tree/backup.go +++ b/pkg/sql/sem/tree/backup.go @@ -138,6 +138,7 @@ type RestoreOptions struct { IncrementalStorage StringOrPlaceholderOptList AsTenant Expr SchemaOnly bool + VerifyData bool } var _ NodeFormatter = &RestoreOptions{} @@ -412,6 +413,10 @@ func (o *RestoreOptions) Format(ctx *FmtCtx) { maybeAddSep() ctx.WriteString("schema_only") } + if o.VerifyData { + maybeAddSep() + ctx.WriteString("verify_backup_table_data") + } } // CombineWith merges other backup options into this backup options struct. @@ -514,6 +519,13 @@ func (o *RestoreOptions) CombineWith(other *RestoreOptions) error { } else { o.SchemaOnly = other.SchemaOnly } + if o.VerifyData { + if other.VerifyData { + return errors.New("verify_backup_table_data option specified multiple times") + } + } else { + o.VerifyData = other.VerifyData + } return nil } @@ -533,7 +545,8 @@ func (o RestoreOptions) IsDefault() bool { o.NewDBName == options.NewDBName && cmp.Equal(o.IncrementalStorage, options.IncrementalStorage) && o.AsTenant == options.AsTenant && - o.SchemaOnly == options.SchemaOnly + o.SchemaOnly == options.SchemaOnly && + o.VerifyData == options.VerifyData } // BackupTargetList represents a list of targets.