From 96c9491351d68257e54343ea74c93e8cc646df4b Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Sun, 14 Aug 2022 14:07:47 -0400 Subject: [PATCH 1/2] backupccl: push tenant rekeying planning into createImportingDescriptors This small refactor pushes tenant rekeying logic from the main restore_job Resume() function into createImportingDescriptors. Release note: None --- pkg/ccl/backupccl/restore_job.go | 83 +++++++++++++++++--------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 52e56e2f672d..3f4c60d2c8cb 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1129,22 +1129,58 @@ func createImportingDescriptors( }) } + _, backupTenantID, err := keys.DecodeTenantPrefix(backupCodec.TenantPrefix()) + if err != nil { + return nil, nil, err + } + if !backupCodec.TenantPrefix().Equal(p.ExecCfg().Codec.TenantPrefix()) { + // Ensure old processors fail if this is a previously unsupported restore of + // a tenant backup by the system tenant, which the old rekey processor would + // mishandle since it assumed the system tenant always restored tenant keys + // to tenant prefixes, i.e. as tenant restore. + if backupTenantID != roachpb.SystemTenantID && p.ExecCfg().Codec.ForSystemTenant() { + // This empty table rekey acts as a poison-pill, which will be ignored by + // a current processor but reliably cause an older processor, which would + // otherwise mishandle tenant-made backup keys, to fail as it will be + // unable to decode the zero ID table desc. + rekeys = append(rekeys, execinfrapb.TableRekey{}) + } + } + + // If, and only if, the backup was made by a system tenant, can it contain + // backed up tenants, which the processor needs to know when is rekeying -- if + // the backup contains tenants, then a key with a tenant prefix should be + // restored if, and only if, we're restoring that tenant, and restored to a + // tenant. Otherwise, if this backup was not made by a system tenant, it does + // not contain tenants, so the rekey will assume if a key has a tenant prefix, + // it is because the tenant produced the backup, and it should be removed to + // then decode the remainder of the key. We communicate this distinction to + // the processor with a special tenant rekey _into_ the system tenant, which + // would never otherwise be valid. It will discard this rekey but it signals + // to it that we're rekeying a system-made backup. + var tenantRekeys []execinfrapb.TenantRekey + if backupTenantID == roachpb.SystemTenantID { + tenantRekeys = append(tenantRekeys, isBackupFromSystemTenantRekey) + } + pkIDs := make(map[uint64]bool) for _, tbl := range tables { pkIDs[roachpb.BulkOpSummaryID(uint64(tbl.GetID()), uint64(tbl.GetPrimaryIndexID()))] = true } dataToPreRestore := &restorationDataBase{ - spans: preRestoreSpans, - tableRekeys: rekeys, - pkIDs: pkIDs, + spans: preRestoreSpans, + tableRekeys: rekeys, + tenantRekeys: tenantRekeys, + pkIDs: pkIDs, } dataToRestore := &mainRestorationData{ restorationDataBase{ - spans: postRestoreSpans, - tableRekeys: rekeys, - pkIDs: pkIDs, + spans: postRestoreSpans, + tableRekeys: rekeys, + tenantRekeys: tenantRekeys, + pkIDs: pkIDs, }, } @@ -1272,13 +1308,11 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro // backupCodec is the codec that was used to encode the keys in the backup. It // is the tenant in which the backup was taken. backupCodec := keys.SystemSQLCodec - backupTenantID := roachpb.SystemTenantID - if len(sqlDescs) != 0 { if len(latestBackupManifest.Spans) != 0 && !latestBackupManifest.HasTenants() { // If there are no tenant targets, then the entire keyspace covered by // Spans must lie in 1 tenant. - _, backupTenantID, err = keys.DecodeTenantPrefix(latestBackupManifest.Spans[0].Key) + _, backupTenantID, err := keys.DecodeTenantPrefix(latestBackupManifest.Spans[0].Key) if err != nil { return err } @@ -1304,37 +1338,6 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro return err } - if !backupCodec.TenantPrefix().Equal(p.ExecCfg().Codec.TenantPrefix()) { - // Ensure old processors fail if this is a previously unsupported restore of - // a tenant backup by the system tenant, which the old rekey processor would - // mishandle since it assumed the system tenant always restored tenant keys - // to tenant prefixes, i.e. as tenant restore. - if backupTenantID != roachpb.SystemTenantID && p.ExecCfg().Codec.ForSystemTenant() { - // This empty table rekey acts as a poison-pill, which will be ignored by - // a current processor but reliably cause an older processor, which would - // otherwise mishandle tenant-made backup keys, to fail as it will be - // unable to decode the zero ID table desc. - preData.tableRekeys = append(preData.tableRekeys, execinfrapb.TableRekey{}) - mainData.tableRekeys = append(preData.tableRekeys, execinfrapb.TableRekey{}) - } - } - - // If, and only if, the backup was made by a system tenant, can it contain - // backed up tenants, which the processor needs to know when is rekeying -- if - // the backup contains tenants, then a key with a tenant prefix should be - // restored if, and only if, we're restoring that tenant, and restored to a - // tenant. Otherwise, if this backup was not made by a system tenant, it does - // not contain tenants, so the rekey will assume if a key has a tenant prefix, - // it is because the tenant produced the backup, and it should be removed to - // then decode the remainder of the key. We communicate this distinction to - // the processor with a special tenant rekey _into_ the system tenant, which - // would never otherwise be valid. It will discard this rekey but it signals - // to it that we're rekeying a system-made backup. - if backupTenantID == roachpb.SystemTenantID { - preData.tenantRekeys = append(preData.tenantRekeys, isBackupFromSystemTenantRekey) - mainData.tenantRekeys = append(preData.tenantRekeys, isBackupFromSystemTenantRekey) - } - // Refresh the job details since they may have been updated when creating the // importing descriptors. details = r.job.Details().(jobspb.RestoreDetails) From 58df28df82ed12bb624f071b52782dbf4c4f19ee Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Sun, 10 Jul 2022 21:38:26 -0400 Subject: [PATCH 2/2] backupccl: add verify_backup_table_data option to RESTORE Release note (sql change): this patch adds the verify_backup_table_data flag to RESTORE. When the user passes this flag, along with the required schema_only flag, a schema_only RESTORE will get run _and_ all user data will get read from external storage, checksummed, and disarded before getting written to disk. This flag provides two additional validation steps that a regular schema_only RESTORE and a SHOW BACKUP with check_files cannot provide: This RESTORE verifies that all data can get read and rekeyed to the Restoring Cluster, and that all data passes a checksum check. Release justification: low risk, high impact change to improve restore validation --- docs/generated/sql/bnf/stmt_block.bnf | 2 + pkg/ccl/backupccl/datadriven_test.go | 79 ++++++-- pkg/ccl/backupccl/restoration_data.go | 10 + pkg/ccl/backupccl/restore_data_processor.go | 121 +++++++++--- pkg/ccl/backupccl/restore_job.go | 109 +++++++++-- pkg/ccl/backupccl/restore_planning.go | 6 + .../backupccl/restore_processor_planning.go | 7 +- .../backupccl/split_and_scatter_processor.go | 10 +- .../backup-restore/restore-schema-only | 1 + .../backup-restore/restore-validation-only | 178 ++++++++++++++++++ pkg/jobs/jobspb/jobs.proto | 4 +- pkg/sql/exec_util.go | 3 + pkg/sql/execinfrapb/processors_bulk_io.proto | 4 +- pkg/sql/parser/sql.y | 7 +- pkg/sql/sem/tree/backup.go | 15 +- 15 files changed, 486 insertions(+), 70 deletions(-) create mode 100644 pkg/ccl/backupccl/testdata/backup-restore/restore-validation-only diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 0d7d9c59bb82..8e896b6c0df3 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1369,6 +1369,7 @@ unreserved_keyword ::= | 'VALIDATE' | 'VALUE' | 'VARYING' + | 'VERIFY_BACKUP_TABLE_DATA' | 'VIEW' | 'VIEWACTIVITY' | 'VIEWACTIVITYREDACTED' @@ -2530,6 +2531,7 @@ restore_options ::= | 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list | 'TENANT' '=' string_or_placeholder | 'SCHEMA_ONLY' + | 'VERIFY_BACKUP_TABLE_DATA' scrub_option_list ::= ( scrub_option ) ( ( ',' scrub_option ) )* diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index fd9b4bde5c49..42d442372408 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -12,7 +12,9 @@ import ( "context" gosql "database/sql" "fmt" + "io/ioutil" "net/url" + "path/filepath" "regexp" "strings" "testing" @@ -32,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -117,13 +120,14 @@ func (d *datadrivenTestState) cleanup(ctx context.Context) { } type serverCfg struct { - name string - iodir string - nodes int - splits int - ioConf base.ExternalIODirConfig - localities string - beforeVersion string + name string + iodir string + nodes int + splits int + ioConf base.ExternalIODirConfig + localities string + beforeVersion string + testingKnobCfg string } func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error { @@ -171,6 +175,18 @@ func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error { } params.ServerArgsPerNode = serverArgsPerNode } + if cfg.testingKnobCfg != "" { + switch cfg.testingKnobCfg { + case "RecoverFromIterPanic": + params.ServerArgs.Knobs.DistSQL = &execinfra.TestingKnobs{ + BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ + RecoverFromIterPanic: true, + }, + } + default: + t.Fatalf("TestingKnobCfg %s not found", cfg.testingKnobCfg) + } + } if cfg.iodir == "" { tc, _, cfg.iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, cfg.splits, InitManualReplication, params) @@ -255,10 +271,14 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // version before the passed in key. See cockroach_versions.go // for possible values. // +// + testingKnobCfg: specifies a key to a hardcoded testingKnob configuration +// +// // - "upgrade-server version=" // Upgrade the cluster version of the active server to the passed in // clusterVersion key. See cockroach_versions.go for possible values. // +// // - "exec-sql [server=] [user=] [args]" // Executes the input SQL query on the target server. By default, server is // the last created server. @@ -347,6 +367,9 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // // + target: SQL target. Currently, only table names are supported. // +// +// - "corrupt-backup" uri= +// Finds the latest backup in the provided collection uri an flips a bit in one SST in the backup func TestDataDriven(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -375,7 +398,7 @@ func TestDataDriven(t *testing.T) { return "" case "new-server": - var name, shareDirWith, iodir, localities, beforeVersion string + var name, shareDirWith, iodir, localities, beforeVersion, testingKnobCfg string var splits int nodes := singleNode var io base.ExternalIODirConfig @@ -404,16 +427,20 @@ func TestDataDriven(t *testing.T) { if d.HasArg("beforeVersion") { d.ScanArgs(t, "beforeVersion", &beforeVersion) } + if d.HasArg("testingKnobCfg") { + d.ScanArgs(t, "testingKnobCfg", &testingKnobCfg) + } lastCreatedServer = name cfg := serverCfg{ - name: name, - iodir: iodir, - nodes: nodes, - splits: splits, - ioConf: io, - localities: localities, - beforeVersion: beforeVersion, + name: name, + iodir: iodir, + nodes: nodes, + splits: splits, + ioConf: io, + localities: localities, + beforeVersion: beforeVersion, + testingKnobCfg: testingKnobCfg, } err := ds.addServer(t, cfg) if err != nil { @@ -816,6 +843,28 @@ func TestDataDriven(t *testing.T) { }) require.NoError(t, err) return "" + + case "corrupt-backup": + server := lastCreatedServer + user := "root" + var uri string + d.ScanArgs(t, "uri", &uri) + parsedURI, err := url.Parse(strings.Replace(uri, "'", "", -1)) + require.NoError(t, err) + var filePath string + filePathQuery := fmt.Sprintf("SELECT path FROM [SHOW BACKUP FILES FROM LATEST IN %s] LIMIT 1", uri) + err = ds.getSQLDB(t, server, user).QueryRow(filePathQuery).Scan(&filePath) + require.NoError(t, err) + fullPath := filepath.Join(ds.getIODir(t, server), parsedURI.Path, filePath) + print(fullPath) + data, err := ioutil.ReadFile(fullPath) + require.NoError(t, err) + data[20] ^= 1 + if err := ioutil.WriteFile(fullPath, data, 0644 /* perm */); err != nil { + t.Fatal(err) + } + return "" + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } diff --git a/pkg/ccl/backupccl/restoration_data.go b/pkg/ccl/backupccl/restoration_data.go index b91a775ac379..1bb319e7f6d5 100644 --- a/pkg/ccl/backupccl/restoration_data.go +++ b/pkg/ccl/backupccl/restoration_data.go @@ -37,6 +37,9 @@ type restorationData interface { getTenantRekeys() []execinfrapb.TenantRekey getPKIDs() map[uint64]bool + // isValidateOnly returns ture iff only validation should occur + isValidateOnly() bool + // addTenant extends the set of data needed to restore to include a new tenant. addTenant(fromID, toID roachpb.TenantID) @@ -74,6 +77,9 @@ type restorationDataBase struct { // systemTables store the system tables that need to be restored for cluster // backups. Should be nil otherwise. systemTables []catalog.TableDescriptor + + // validateOnly indicates this data should only get read from external storage, not written + validateOnly bool } // restorationDataBase implements restorationData. @@ -119,6 +125,10 @@ func (b *restorationDataBase) isEmpty() bool { return len(b.spans) == 0 } +func (b *restorationDataBase) isValidateOnly() bool { + return b.validateOnly +} + // isMainBundle implements restorationData. func (restorationDataBase) isMainBundle() bool { return false } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 371e71222bce..89a79103cab6 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -279,12 +279,25 @@ func (rd *restoreDataProcessor) openSSTs( } }() + var recoverFromIterPanic bool + if restoreKnobs, ok := rd.flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + recoverFromIterPanic = restoreKnobs.RecoverFromIterPanic + } + // sendIter sends a multiplexed iterator covering the currently accumulated files over the // channel. sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { + if recoverFromIterPanic { + defer func() { + if r := recover(); r != nil { + log.Errorf(ctx, "recovered from Iter panic %v", r) + } + }() + } + readAsOfIter.Close() for _, dir := range dirsToSend { @@ -389,37 +402,44 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( iter := sst.iter defer sst.cleanup() - // If the system tenant is restoring a guest tenant span, we don't want to - // forward all the restored data to now, as there may be importing tables in - // that span, that depend on the difference in timestamps on restored existing - // vs importing keys to rollback. - writeAtBatchTS := true - if writeAtBatchTS && kr.fromSystemTenant && - (bytes.HasPrefix(entry.Span.Key, keys.TenantPrefix) || bytes.HasPrefix(entry.Span.EndKey, keys.TenantPrefix)) { - log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span) - writeAtBatchTS = false - } + var batcher SSTBatcherExecutor + if rd.spec.ValidateOnly { + batcher = &sstBatcherNoop{} + } else { + // If the system tenant is restoring a guest tenant span, we don't want to + // forward all the restored data to now, as there may be importing tables in + // that span, that depend on the difference in timestamps on restored existing + // vs importing keys to rollback. + writeAtBatchTS := true + if writeAtBatchTS && kr.fromSystemTenant && + (bytes.HasPrefix(entry.Span.Key, keys.TenantPrefix) || bytes.HasPrefix(entry.Span.EndKey, keys.TenantPrefix)) { + log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span) + writeAtBatchTS = false + } - // "disallowing" shadowing of anything older than logical=1 is i.e. allow all - // shadowing. We must allow shadowing in case the RESTORE has to retry any - // ingestions, but setting a (permissive) disallow like this serves to force - // evaluation of AddSSTable to check for overlapping keys. That in turn will - // result in it maintaining exact MVCC stats rather than estimates. Of course - // this comes at the cost of said overlap check, but in the common case of - // non-overlapping ingestion into empty spans, that is just one seek. - disallowShadowingBelow := hlc.Timestamp{Logical: 1} - batcher, err := bulk.MakeSSTBatcher(ctx, - "restore", - db, - evalCtx.Settings, - disallowShadowingBelow, - writeAtBatchTS, - false, /* splitFilledRanges */ - rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), - rd.flowCtx.Cfg.BulkSenderLimiter, - ) - if err != nil { - return summary, err + // "disallowing" shadowing of anything older than logical=1 is i.e. allow all + // shadowing. We must allow shadowing in case the RESTORE has to retry any + // ingestions, but setting a (permissive) disallow like this serves to force + // evaluation of AddSSTable to check for overlapping keys. That in turn will + // result in it maintaining exact MVCC stats rather than estimates. Of course + // this comes at the cost of said overlap check, but in the common case of + // non-overlapping ingestion into empty spans, that is just one seek. + disallowShadowingBelow := hlc.Timestamp{Logical: 1} + + var err error + batcher, err = bulk.MakeSSTBatcher(ctx, + "restore", + db, + evalCtx.Settings, + disallowShadowingBelow, + writeAtBatchTS, + false, /* splitFilledRanges */ + rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), + rd.flowCtx.Cfg.BulkSenderLimiter, + ) + if err != nil { + return summary, err + } } defer batcher.Close(ctx) @@ -539,6 +559,47 @@ func (rd *restoreDataProcessor) ConsumerClosed() { rd.InternalClose() } +// SSTBatcherExecutor wraps the SSTBatcher methods, allowing a validation only restore to +// implement a mock SSTBatcher used purely for job progress tracking. +type SSTBatcherExecutor interface { + AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error + Reset(ctx context.Context) error + Flush(ctx context.Context) error + Close(ctx context.Context) + GetSummary() roachpb.BulkOpSummary +} + +type sstBatcherNoop struct { + // totalRows written by the batcher + totalRows storage.RowCounter +} + +var _ SSTBatcherExecutor = &sstBatcherNoop{} + +// AddMVCCKey merely increments the totalRow Counter. No key gets buffered or written. +func (b *sstBatcherNoop) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error { + return b.totalRows.Count(key.Key) +} + +// Reset resets the counter +func (b *sstBatcherNoop) Reset(ctx context.Context) error { + return nil +} + +// Flush noops. +func (b *sstBatcherNoop) Flush(ctx context.Context) error { + return nil +} + +// Close noops. +func (b *sstBatcherNoop) Close(ctx context.Context) { +} + +// GetSummary returns this batcher's total added rows/bytes/etc. +func (b *sstBatcherNoop) GetSummary() roachpb.BulkOpSummary { + return b.totalRows.BulkOpSummary +} + func init() { rowexec.NewRestoreDataProcessor = newRestoreDataProcessor } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 3f4c60d2c8cb..5b2dc3e9c191 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -389,6 +389,7 @@ func restore( dataToRestore.getRekeys(), dataToRestore.getTenantRekeys(), endTime, + dataToRestore.isValidateOnly(), progCh, ) } @@ -484,6 +485,8 @@ type restoreResumer struct { // afterPreRestore runs on cluster restores after restoring the "preRestore" // data. afterPreRestore func() error + // checksumRecover + checksumRecover func() error } } @@ -657,15 +660,32 @@ func shouldPreRestore(table *tabledesc.Mutable) bool { return ok } -// createImportingDescriptors create the tables that we will restore into. It also -// fetches the information from the old tables that we need for the restore. +// createImportingDescriptors creates the tables that we will restore into and returns up to three +// configurations for separate restoration flows. The three restoration flows are +// +// 1. dataToPreRestore: a restoration flow cfg to ingest a subset of +// system tables (e.g. zone configs) during a cluster restore that are +// required to be set up before the rest of the data gets restored. +// This should be empty during non-cluster restores. +// +// 2. preValidation: a restoration flow cfg to ingest the remainder of system tables, +// during a verify_backup_table_data, cluster level, restores. This should be empty otherwise. +// +// 3. trackedRestore: a restoration flow cfg to ingest the remainder of +// restore targets. This flow should get executed last and should contain the +// bulk of the work, as it is used for job progress tracking. func createImportingDescriptors( ctx context.Context, p sql.JobExecContext, backupCodec keys.SQLCodec, sqlDescs []catalog.Descriptor, r *restoreResumer, -) (*restorationDataBase, *mainRestorationData, error) { +) ( + dataToPreRestore *restorationDataBase, + preValidation *restorationDataBase, + trackedRestore *mainRestorationData, + err error, +) { details := r.job.Details().(jobspb.RestoreDetails) var allMutableDescs []catalog.MutableDescriptor @@ -729,12 +749,18 @@ func createImportingDescriptors( // that is, in the 'old' keyspace, before we reassign the table IDs. preRestoreSpans := spansForAllRestoreTableIndexes(backupCodec, preRestoreTables, nil, details.SchemaOnly) postRestoreSpans := spansForAllRestoreTableIndexes(backupCodec, postRestoreTables, nil, details.SchemaOnly) + var verifySpans []roachpb.Span + if details.VerifyData { + // verifySpans contains the spans that should be read and checksum'd during a + // verify_backup_table_data RESTORE + verifySpans = spansForAllRestoreTableIndexes(backupCodec, postRestoreTables, nil, false) + } log.Eventf(ctx, "starting restore for %d tables", len(mutableTables)) // Assign new IDs to the database descriptors. if err := rewrite.DatabaseDescs(mutableDatabases, details.DescriptorRewrites); err != nil { - return nil, nil, err + return nil, nil, nil, err } databaseDescs := make([]*descpb.DatabaseDescriptor, len(mutableDatabases)) @@ -757,11 +783,11 @@ func createImportingDescriptors( } if err := rewrite.SchemaDescs(schemasToWrite, details.DescriptorRewrites); err != nil { - return nil, nil, err + return nil, nil, nil, err } if err := remapPublicSchemas(ctx, p, mutableDatabases, &schemasToWrite, &writtenSchemas, &details); err != nil { - return nil, nil, err + return nil, nil, nil, err } // Assign new IDs and privileges to the tables, and update all references to @@ -769,7 +795,7 @@ func createImportingDescriptors( if err := rewrite.TableDescs( mutableTables, details.DescriptorRewrites, details.OverrideDB, ); err != nil { - return nil, nil, err + return nil, nil, nil, err } tableDescs := make([]*descpb.TableDescriptor, len(mutableTables)) for i, table := range mutableTables { @@ -807,13 +833,13 @@ func createImportingDescriptors( // descriptors will not be written to disk, and is only for accurate, // in-memory resolution hereon out. if err := rewrite.TypeDescs(types, details.DescriptorRewrites); err != nil { - return nil, nil, err + return nil, nil, nil, err } // Finally, clean up / update any schema changer state inside descriptors // globally. if err := rewrite.MaybeClearSchemaChangerStateInDescs(allMutableDescs); err != nil { - return nil, nil, err + return nil, nil, nil, err } // Set the new descriptors' states to offline. @@ -1110,7 +1136,7 @@ func createImportingDescriptors( return err }) if err != nil { - return nil, nil, err + return nil, nil, nil, err } } @@ -1120,7 +1146,7 @@ func createImportingDescriptors( tableToSerialize := tables[i] newDescBytes, err := protoutil.Marshal(tableToSerialize.DescriptorProto()) if err != nil { - return nil, nil, errors.NewAssertionErrorWithWrappedErrf(err, + return nil, nil, nil, errors.NewAssertionErrorWithWrappedErrf(err, "marshaling descriptor") } rekeys = append(rekeys, execinfrapb.TableRekey{ @@ -1131,7 +1157,7 @@ func createImportingDescriptors( _, backupTenantID, err := keys.DecodeTenantPrefix(backupCodec.TenantPrefix()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } if !backupCodec.TenantPrefix().Equal(p.ExecCfg().Codec.TenantPrefix()) { // Ensure old processors fail if this is a previously unsupported restore of @@ -1168,14 +1194,14 @@ func createImportingDescriptors( pkIDs[roachpb.BulkOpSummaryID(uint64(tbl.GetID()), uint64(tbl.GetPrimaryIndexID()))] = true } - dataToPreRestore := &restorationDataBase{ + dataToPreRestore = &restorationDataBase{ spans: preRestoreSpans, tableRekeys: rekeys, tenantRekeys: tenantRekeys, pkIDs: pkIDs, } - dataToRestore := &mainRestorationData{ + trackedRestore = &mainRestorationData{ restorationDataBase{ spans: postRestoreSpans, tableRekeys: rekeys, @@ -1184,6 +1210,27 @@ func createImportingDescriptors( }, } + preValidation = &restorationDataBase{} + // During a RESTORE with verify_backup_table_data data, progress on + // verifySpans should be the source of job progress (as it will take the most time); therefore, + // wrap them in a mainRestoration struct and unwrap postRestoreSpans + // (only relevant during a cluster restore). + if details.VerifyData { + trackedRestore.restorationDataBase.spans = verifySpans + trackedRestore.restorationDataBase.validateOnly = true + + // Before the main (validation) flow, during a cluster level restore, + // we still need to restore system tables that do NOT get restored in the dataToPreRestore + // flow. This restoration will not get tracked during job progress. + if (details.DescriptorCoverage != tree.AllDescriptors) && len(postRestoreSpans) != 0 { + return nil, nil, nil, errors.AssertionFailedf( + "no spans should get restored in a non cluster, verify_backup_table_data restore") + } + preValidation.spans = postRestoreSpans + preValidation.tableRekeys = rekeys + preValidation.pkIDs = pkIDs + } + if tempSystemDBID != descpb.InvalidID { for _, table := range preRestoreTables { if table.GetParentID() == tempSystemDBID { @@ -1192,11 +1239,19 @@ func createImportingDescriptors( } for _, table := range postRestoreTables { if table.GetParentID() == tempSystemDBID { - dataToRestore.systemTables = append(dataToRestore.systemTables, table) + if details.VerifyData { + // During a verify_backup_table_data RESTORE, system tables are + // restored pre validation. Note that the system tables are still + // added to the trackedRestore flow because after ingestion, the + // restore job uses systemTable metadata hanging from the + // trackedRestore object. + preValidation.systemTables = append(preValidation.systemTables, table) + } + trackedRestore.systemTables = append(trackedRestore.systemTables, table) } } } - return dataToPreRestore, dataToRestore, nil + return dataToPreRestore, preValidation, trackedRestore, nil } // remapPublicSchemas is used to create a descriptor backed public schema @@ -1333,7 +1388,8 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro return err } - preData, mainData, err := createImportingDescriptors(ctx, p, backupCodec, sqlDescs, r) + preData, preValidateData, mainData, err := createImportingDescriptors(ctx, p, backupCodec, + sqlDescs, r) if err != nil { return err } @@ -1444,6 +1500,25 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } } + if !preValidateData.isEmpty() { + res, err := restoreWithRetry( + ctx, + p, + numNodes, + backupManifests, + details.BackupLocalityInfo, + details.EndTime, + preValidateData, + r.job, + details.Encryption, + &kmsEnv, + ) + if err != nil { + return err + } + + resTotal.Add(res) + } { // Restore the main data bundle. We notably only restore the system tables // later. diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 729791f61d80..700e37fdbb3d 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -787,6 +787,7 @@ func resolveOptionsForRestoreJobDescription( SkipMissingViews: opts.SkipMissingViews, Detached: opts.Detached, SchemaOnly: opts.SchemaOnly, + VerifyData: opts.VerifyData, } if opts.EncryptionPassphrase != nil { @@ -880,6 +881,10 @@ func restorePlanHook( return nil, nil, nil, false, errors.New("cannot run RESTORE with schema_only until cluster has fully upgraded to 22.2") } + if !restoreStmt.Options.SchemaOnly && restoreStmt.Options.VerifyData { + return nil, nil, nil, false, + errors.New("to set the verify_backup_table_data option, the schema_only option must be set") + } fromFns := make([]func() ([]string, error), len(restoreStmt.From)) for i := range restoreStmt.From { @@ -1694,6 +1699,7 @@ func doRestorePlan( RestoreSystemUsers: restoreStmt.DescriptorCoverage == tree.SystemUsers, PreRewriteTenantId: oldTenantID, SchemaOnly: restoreStmt.Options.SchemaOnly, + VerifyData: restoreStmt.Options.VerifyData, } jr := jobs.Record{ diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 860b4bab7400..3b4880170a8a 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -67,6 +67,7 @@ func distRestore( tableRekeys []execinfrapb.TableRekey, tenantRekeys []execinfrapb.TenantRekey, restoreTime hlc.Timestamp, + validateOnly bool, progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, ) error { defer close(progCh) @@ -106,7 +107,8 @@ func distRestore( p := planCtx.NewPhysicalPlan() - splitAndScatterSpecs, err := makeSplitAndScatterSpecs(sqlInstanceIDs, chunks, tableRekeys, tenantRekeys) + splitAndScatterSpecs, err := makeSplitAndScatterSpecs(sqlInstanceIDs, chunks, tableRekeys, + tenantRekeys, validateOnly) if err != nil { return nil, nil, err } @@ -118,6 +120,7 @@ func distRestore( TableRekeys: tableRekeys, TenantRekeys: tenantRekeys, PKIDs: pkIDs, + ValidateOnly: validateOnly, } if len(splitAndScatterSpecs) == 0 { @@ -292,6 +295,7 @@ func makeSplitAndScatterSpecs( chunks [][]execinfrapb.RestoreSpanEntry, tableRekeys []execinfrapb.TableRekey, tenantRekeys []execinfrapb.TenantRekey, + validateOnly bool, ) (map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec, error) { specsBySQLInstanceID := make(map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec) for i, chunk := range chunks { @@ -307,6 +311,7 @@ func makeSplitAndScatterSpecs( }}, TableRekeys: tableRekeys, TenantRekeys: tenantRekeys, + ValidateOnly: validateOnly, } } } diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index 676231f6b3a8..3f34fc42da6f 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -42,7 +42,9 @@ type splitAndScatterer interface { scatter(ctx context.Context, codec keys.SQLCodec, scatterKey roachpb.Key) (roachpb.NodeID, error) } -type noopSplitAndScatterer struct{} +type noopSplitAndScatterer struct { + scatterNode roachpb.NodeID +} var _ splitAndScatterer = noopSplitAndScatterer{} @@ -55,7 +57,7 @@ func (n noopSplitAndScatterer) split(_ context.Context, _ keys.SQLCodec, _ roach func (n noopSplitAndScatterer) scatter( _ context.Context, _ keys.SQLCodec, _ roachpb.Key, ) (roachpb.NodeID, error) { - return 0, nil + return n.scatterNode, nil } // dbSplitAndScatter is the production implementation of this processor's @@ -225,6 +227,10 @@ func newSplitAndScatterProcessor( } scatterer := makeSplitAndScatterer(db, kr) + if spec.ValidateOnly { + nodeID, _ := flowCtx.NodeID.OptionalNodeID() + scatterer = noopSplitAndScatterer{nodeID} + } ssp := &splitAndScatterProcessor{ flowCtx: flowCtx, spec: spec, diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-schema-only b/pkg/ccl/backupccl/testdata/backup-restore/restore-schema-only index 53542234456f..623a8c43c917 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/restore-schema-only +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-schema-only @@ -142,3 +142,4 @@ pq: type "d2.public.greeting" already exists # into each of the restored tables. exec-sql INSERT INTO d2.t2 VALUES ('hi'); +---- diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-validation-only b/pkg/ccl/backupccl/testdata/backup-restore/restore-validation-only new file mode 100644 index 000000000000..6434cc8b15f6 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-validation-only @@ -0,0 +1,178 @@ +# Test verify_backup_table_data restore +# Part 1: test that verify_backup_table_data leaves the cluster in the same state as a schema_only +# Part 2: check that it can catch data corruption errors + +########## +# Part 1 +########## + +new-server name=s1 allow-implicit-access +---- + +exec-sql +CREATE DATABASE d; +CREATE TYPE d.greeting AS ENUM ('hello', 'howdy', 'hi'); +CREATE TABLE d.t1 (x INT); +INSERT INTO d.t1 VALUES (1), (2), (3); +CREATE TABLE d.t2 (x d.greeting); +INSERT INTO d.t2 VALUES ('hello'), ('howdy'); +COMMENT ON TABLE d.t1 IS 'This comment better get restored from the backed up system table!'; +---- + +query-sql +SHOW CREATE TABLE d.t1; +---- +d.public.t1 CREATE TABLE public.t1 ( + x INT8 NULL, + rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(), + CONSTRAINT t1_pkey PRIMARY KEY (rowid ASC) +); +COMMENT ON TABLE public.t1 IS 'This comment better get restored from the backed up system table!' + +# drop and create defaultDB to ensure it has a higher ID than by default. We will check that when +# this cluster is restored, the default db with the higher id was also restored +# by default, default db has an id of 100. +query-sql +SELECT id FROM system.namespace WHERE name = 'defaultdb' +---- +100 + +exec-sql +DROP DATABASE defaultdb; +CREATE DATABASE defaultdb; +---- + +query-sql +SELECT count(*) FROM system.namespace WHERE name = 'defaultdb' AND id > 100 +---- +1 + +exec-sql +BACKUP INTO 'nodelocal://1/full_cluster_backup/'; +---- + +exec-sql +BACKUP Database d INTO 'nodelocal://1/full_database_backup/'; +---- + + +# A new cluster in prep for a cluster level schema_only restore. +new-server name=s2 share-io-dir=s1 allow-implicit-access +---- + +# First, ensure cluster level schema_only restore fails fast in same ways as a cluster level restore. +# +# Fail fast if the user passes new_db_name. +exec-sql +RESTORE FROM LATEST IN 'nodelocal://0/full_cluster_backup/' with schema_only, verify_backup_table_data, new_db_name='d2'; +---- +pq: new_db_name can only be used for RESTORE DATABASE with a single target database + + +exec-sql server=s2 +CREATE USER testuser +---- + +# Non admins cannot run schema_only cluster restore +exec-sql user=testuser +RESTORE FROM LATEST IN 'nodelocal://0/full_cluster_backup/' with schema_only, verify_backup_table_data +---- +pq: only users with the admin role are allowed to restore full cluster backups + +# Fail fast using a database backup +exec-sql +RESTORE FROM LATEST IN 'nodelocal://0/full_database_backup/' with schema_only, verify_backup_table_data; +---- +pq: full cluster RESTORE can only be used on full cluster BACKUP files + +exec-sql +RESTORE FROM LATEST IN 'nodelocal://0/full_cluster_backup/' with schema_only, verify_backup_table_data; +---- + +# there should be no data in the restored tables +query-sql +SELECT * FROM d.t1; +---- + + +query-sql +SELECT * FROM d.t2; +---- + +# The backed up cluster was initiated with bank. Ensure it's now empty. +query-sql +SELECT * FROM data.bank; +---- + +# The backed table d.t1 had a comment stored in a system table. This should have been restored. +query-sql +SHOW CREATE TABLE d.t1; +---- +d.public.t1 CREATE TABLE public.t1 ( + x INT8 NULL, + rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(), + CONSTRAINT t1_pkey PRIMARY KEY (rowid ASC) +); +COMMENT ON TABLE public.t1 IS 'This comment better get restored from the backed up system table!' + +# Ensure the defaultdb from the backed up cluster was restored. +query-sql +SELECT count(*) FROM system.namespace WHERE name = 'defaultdb' AND id > 100 +---- +1 + +# Ensure Database Level schema_only restore logic is sound +exec-sql +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/full_database_backup/' with schema_only, new_db_name='d2'; +---- + +# There should be no data in the user tables. +query-sql +SELECT * FROM d2.t1; +---- + +query-sql +SELECT * FROM d2.t2; +---- + + +# Each of the restored types should have namespace entries. Test this by +# trying to create types that would cause namespace conflicts. +exec-sql +CREATE TYPE d2.greeting AS ENUM ('hello', 'hiya') +---- +pq: type "d2.public.greeting" already exists + +# We should be able to resolve each restored type. Test this by inserting +# into each of the restored tables. +exec-sql +INSERT INTO d2.t2 VALUES ('hi'); +---- + + +######## +# Part 2: test this checks corrupt data +######## + +# A new cluster that recovers from checksum error. +# A pebble bug causes the readAsOfIterator.Close() to panic. This should not occur. +# TODO(msbutler): refactor test once this is addressed. +new-server name=s3 share-io-dir=s1 allow-implicit-access testingKnobCfg=RecoverFromIterPanic +---- + +corrupt-backup uri='nodelocal://0/full_database_backup/' +---- + +# Schema only restore misses the data corruption +exec-sql +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/full_database_backup/' with schema_only, new_db_name='d3'; +---- + +# But verify-backup-table-data catches it +# A pebble bug is preventing the proper surfacing of the +# corruption error. +# TODO(msbutler): refactor the test once this is addressed +exec-sql +RESTORE DATABASE d FROM LATEST IN 'nodelocal://0/full_database_backup/' with schema_only, verify_backup_table_data, new_db_name='d4'; +---- + diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 484924087a74..2a9036a32f46 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -383,7 +383,9 @@ message RestoreDetails { // SchemaOnly determines whether to only restore the schema in the backup. bool schema_only = 25; - // NEXT ID: 26. + bool VerifyData = 26; + + // NEXT ID: 27. } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 2d645d678395..7c03a06a9674 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1616,6 +1616,9 @@ type BackupRestoreTestingKnobs struct { // testing. This is typically the bulk mem monitor if not // specified here. BackupMemMonitor *mon.BytesMonitor + + // RecoverFromIterClosePanic prevents the node from panicing during ReadAsOfIterator.Close + RecoverFromIterPanic bool } var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{} diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 18e29b3cd6d5..e7d589d388c7 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -303,7 +303,7 @@ message RestoreDataSpec { // information passed back to track progress in the backup job. map pk_ids = 4 [(gogoproto.customname) = "PKIDs"]; reserved 7; - optional bool schema_only = 8 [(gogoproto.nullable) = false]; + optional bool validate_only = 8 [(gogoproto.nullable) = false]; // NEXT ID: 9. } @@ -318,7 +318,7 @@ message SplitAndScatterSpec { repeated TableRekey table_rekeys = 2 [(gogoproto.nullable) = false]; repeated TenantRekey tenant_rekeys = 3 [(gogoproto.nullable) = false]; reserved 5; - optional bool schema_only = 6 [(gogoproto.nullable) = false]; + optional bool validate_only = 6 [(gogoproto.nullable) = false]; // NEXTID: 7. } diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 37d9c8dfab79..65e097002a80 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -939,7 +939,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs { %token UNBOUNDED UNCOMMITTED UNION UNIQUE UNKNOWN UNLOGGED UNSPLIT %token UPDATE UPSERT UNSET UNTIL USE USER USERS USING UUID -%token VALID VALIDATE VALUE VALUES VARBIT VARCHAR VARIADIC VIEW VARYING VIEWACTIVITY VIEWACTIVITYREDACTED VIEWDEBUG +%token VALID VALIDATE VALUE VALUES VARBIT VARCHAR VARIADIC VERIFY_BACKUP_TABLE_DATA VIEW VARYING VIEWACTIVITY VIEWACTIVITYREDACTED VIEWDEBUG %token VIEWCLUSTERMETADATA VIEWCLUSTERSETTING VIRTUAL VISIBLE VOLATILE VOTERS %token WHEN WHERE WINDOW WITH WITHIN WITHOUT WORK WRITE @@ -3659,6 +3659,10 @@ restore_options: { $$.val = &tree.RestoreOptions{SchemaOnly: true} } +| VERIFY_BACKUP_TABLE_DATA + { + $$.val = &tree.RestoreOptions{VerifyData: true} + } import_format: name { @@ -15403,6 +15407,7 @@ unreserved_keyword: | VALIDATE | VALUE | VARYING +| VERIFY_BACKUP_TABLE_DATA | VIEW | VIEWACTIVITY | VIEWACTIVITYREDACTED diff --git a/pkg/sql/sem/tree/backup.go b/pkg/sql/sem/tree/backup.go index c82787c7532c..098c2d71834c 100644 --- a/pkg/sql/sem/tree/backup.go +++ b/pkg/sql/sem/tree/backup.go @@ -138,6 +138,7 @@ type RestoreOptions struct { IncrementalStorage StringOrPlaceholderOptList AsTenant Expr SchemaOnly bool + VerifyData bool } var _ NodeFormatter = &RestoreOptions{} @@ -412,6 +413,10 @@ func (o *RestoreOptions) Format(ctx *FmtCtx) { maybeAddSep() ctx.WriteString("schema_only") } + if o.VerifyData { + maybeAddSep() + ctx.WriteString("verify_backup_table_data") + } } // CombineWith merges other backup options into this backup options struct. @@ -514,6 +519,13 @@ func (o *RestoreOptions) CombineWith(other *RestoreOptions) error { } else { o.SchemaOnly = other.SchemaOnly } + if o.VerifyData { + if other.VerifyData { + return errors.New("verify_backup_table_data option specified multiple times") + } + } else { + o.VerifyData = other.VerifyData + } return nil } @@ -533,7 +545,8 @@ func (o RestoreOptions) IsDefault() bool { o.NewDBName == options.NewDBName && cmp.Equal(o.IncrementalStorage, options.IncrementalStorage) && o.AsTenant == options.AsTenant && - o.SchemaOnly == options.SchemaOnly + o.SchemaOnly == options.SchemaOnly && + o.VerifyData == options.VerifyData } // BackupTargetList represents a list of targets.