Skip to content

Commit

Permalink
Merge #86136
Browse files Browse the repository at this point in the history
86136: backupccl: add verify_backup_table_data option to RESTORE r=dt a=msbutler

Release note (sql change): this patch adds the verify_backup_table_data flag to
RESTORE. When the user passes this flag, along with the required schema_only
flag, a schema_only RESTORE will get run _and_ all user data will get read from
external storage, checksummed, and disarded before getting written to disk.

This flag provides two additional validation steps that a regular schema_only
RESTORE and a SHOW BACKUP with check_files cannot provide: This RESTORE
verifies that all data can get read and rekeyed to the Restoring Cluster, and
that all data passes a checksum check.

Release justification: low risk, high impact change to improve restore
validation

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Aug 21, 2022
2 parents e6a7dc2 + 58df28d commit 8ed07d1
Show file tree
Hide file tree
Showing 15 changed files with 528 additions and 109 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,7 @@ unreserved_keyword ::=
| 'VALIDATE'
| 'VALUE'
| 'VARYING'
| 'VERIFY_BACKUP_TABLE_DATA'
| 'VIEW'
| 'VIEWACTIVITY'
| 'VIEWACTIVITYREDACTED'
Expand Down Expand Up @@ -2530,6 +2531,7 @@ restore_options ::=
| 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list
| 'TENANT' '=' string_or_placeholder
| 'SCHEMA_ONLY'
| 'VERIFY_BACKUP_TABLE_DATA'

scrub_option_list ::=
( scrub_option ) ( ( ',' scrub_option ) )*
Expand Down
79 changes: 64 additions & 15 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"context"
gosql "database/sql"
"fmt"
"io/ioutil"
"net/url"
"path/filepath"
"regexp"
"strings"
"testing"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -117,13 +120,14 @@ func (d *datadrivenTestState) cleanup(ctx context.Context) {
}

type serverCfg struct {
name string
iodir string
nodes int
splits int
ioConf base.ExternalIODirConfig
localities string
beforeVersion string
name string
iodir string
nodes int
splits int
ioConf base.ExternalIODirConfig
localities string
beforeVersion string
testingKnobCfg string
}

func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error {
Expand Down Expand Up @@ -171,6 +175,18 @@ func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error {
}
params.ServerArgsPerNode = serverArgsPerNode
}
if cfg.testingKnobCfg != "" {
switch cfg.testingKnobCfg {
case "RecoverFromIterPanic":
params.ServerArgs.Knobs.DistSQL = &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RecoverFromIterPanic: true,
},
}
default:
t.Fatalf("TestingKnobCfg %s not found", cfg.testingKnobCfg)
}
}
if cfg.iodir == "" {
tc, _, cfg.iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, cfg.splits,
InitManualReplication, params)
Expand Down Expand Up @@ -255,10 +271,14 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
// version before the passed in <beforeVersion> key. See cockroach_versions.go
// for possible values.
//
// + testingKnobCfg: specifies a key to a hardcoded testingKnob configuration
//
//
// - "upgrade-server version=<version>"
// Upgrade the cluster version of the active server to the passed in
// clusterVersion key. See cockroach_versions.go for possible values.
//
//
// - "exec-sql [server=<name>] [user=<name>] [args]"
// Executes the input SQL query on the target server. By default, server is
// the last created server.
Expand Down Expand Up @@ -347,6 +367,9 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
//
// + target: SQL target. Currently, only table names are supported.
//
//
// - "corrupt-backup" uri=<collectionUri>
// Finds the latest backup in the provided collection uri an flips a bit in one SST in the backup
func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -375,7 +398,7 @@ func TestDataDriven(t *testing.T) {
return ""

case "new-server":
var name, shareDirWith, iodir, localities, beforeVersion string
var name, shareDirWith, iodir, localities, beforeVersion, testingKnobCfg string
var splits int
nodes := singleNode
var io base.ExternalIODirConfig
Expand Down Expand Up @@ -404,16 +427,20 @@ func TestDataDriven(t *testing.T) {
if d.HasArg("beforeVersion") {
d.ScanArgs(t, "beforeVersion", &beforeVersion)
}
if d.HasArg("testingKnobCfg") {
d.ScanArgs(t, "testingKnobCfg", &testingKnobCfg)
}

lastCreatedServer = name
cfg := serverCfg{
name: name,
iodir: iodir,
nodes: nodes,
splits: splits,
ioConf: io,
localities: localities,
beforeVersion: beforeVersion,
name: name,
iodir: iodir,
nodes: nodes,
splits: splits,
ioConf: io,
localities: localities,
beforeVersion: beforeVersion,
testingKnobCfg: testingKnobCfg,
}
err := ds.addServer(t, cfg)
if err != nil {
Expand Down Expand Up @@ -816,6 +843,28 @@ func TestDataDriven(t *testing.T) {
})
require.NoError(t, err)
return ""

case "corrupt-backup":
server := lastCreatedServer
user := "root"
var uri string
d.ScanArgs(t, "uri", &uri)
parsedURI, err := url.Parse(strings.Replace(uri, "'", "", -1))
require.NoError(t, err)
var filePath string
filePathQuery := fmt.Sprintf("SELECT path FROM [SHOW BACKUP FILES FROM LATEST IN %s] LIMIT 1", uri)
err = ds.getSQLDB(t, server, user).QueryRow(filePathQuery).Scan(&filePath)
require.NoError(t, err)
fullPath := filepath.Join(ds.getIODir(t, server), parsedURI.Path, filePath)
print(fullPath)
data, err := ioutil.ReadFile(fullPath)
require.NoError(t, err)
data[20] ^= 1
if err := ioutil.WriteFile(fullPath, data, 0644 /* perm */); err != nil {
t.Fatal(err)
}
return ""

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/backupccl/restoration_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type restorationData interface {
getTenantRekeys() []execinfrapb.TenantRekey
getPKIDs() map[uint64]bool

// isValidateOnly returns ture iff only validation should occur
isValidateOnly() bool

// addTenant extends the set of data needed to restore to include a new tenant.
addTenant(fromID, toID roachpb.TenantID)

Expand Down Expand Up @@ -74,6 +77,9 @@ type restorationDataBase struct {
// systemTables store the system tables that need to be restored for cluster
// backups. Should be nil otherwise.
systemTables []catalog.TableDescriptor

// validateOnly indicates this data should only get read from external storage, not written
validateOnly bool
}

// restorationDataBase implements restorationData.
Expand Down Expand Up @@ -119,6 +125,10 @@ func (b *restorationDataBase) isEmpty() bool {
return len(b.spans) == 0
}

func (b *restorationDataBase) isValidateOnly() bool {
return b.validateOnly
}

// isMainBundle implements restorationData.
func (restorationDataBase) isMainBundle() bool { return false }

Expand Down
121 changes: 91 additions & 30 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,25 @@ func (rd *restoreDataProcessor) openSSTs(
}
}()

var recoverFromIterPanic bool
if restoreKnobs, ok := rd.flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
recoverFromIterPanic = restoreKnobs.RecoverFromIterPanic
}

// sendIter sends a multiplexed iterator covering the currently accumulated files over the
// channel.
sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
if recoverFromIterPanic {
defer func() {
if r := recover(); r != nil {
log.Errorf(ctx, "recovered from Iter panic %v", r)
}
}()
}

readAsOfIter.Close()

for _, dir := range dirsToSend {
Expand Down Expand Up @@ -389,37 +402,44 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
iter := sst.iter
defer sst.cleanup()

// If the system tenant is restoring a guest tenant span, we don't want to
// forward all the restored data to now, as there may be importing tables in
// that span, that depend on the difference in timestamps on restored existing
// vs importing keys to rollback.
writeAtBatchTS := true
if writeAtBatchTS && kr.fromSystemTenant &&
(bytes.HasPrefix(entry.Span.Key, keys.TenantPrefix) || bytes.HasPrefix(entry.Span.EndKey, keys.TenantPrefix)) {
log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span)
writeAtBatchTS = false
}
var batcher SSTBatcherExecutor
if rd.spec.ValidateOnly {
batcher = &sstBatcherNoop{}
} else {
// If the system tenant is restoring a guest tenant span, we don't want to
// forward all the restored data to now, as there may be importing tables in
// that span, that depend on the difference in timestamps on restored existing
// vs importing keys to rollback.
writeAtBatchTS := true
if writeAtBatchTS && kr.fromSystemTenant &&
(bytes.HasPrefix(entry.Span.Key, keys.TenantPrefix) || bytes.HasPrefix(entry.Span.EndKey, keys.TenantPrefix)) {
log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span)
writeAtBatchTS = false
}

// "disallowing" shadowing of anything older than logical=1 is i.e. allow all
// shadowing. We must allow shadowing in case the RESTORE has to retry any
// ingestions, but setting a (permissive) disallow like this serves to force
// evaluation of AddSSTable to check for overlapping keys. That in turn will
// result in it maintaining exact MVCC stats rather than estimates. Of course
// this comes at the cost of said overlap check, but in the common case of
// non-overlapping ingestion into empty spans, that is just one seek.
disallowShadowingBelow := hlc.Timestamp{Logical: 1}
batcher, err := bulk.MakeSSTBatcher(ctx,
"restore",
db,
evalCtx.Settings,
disallowShadowingBelow,
writeAtBatchTS,
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
)
if err != nil {
return summary, err
// "disallowing" shadowing of anything older than logical=1 is i.e. allow all
// shadowing. We must allow shadowing in case the RESTORE has to retry any
// ingestions, but setting a (permissive) disallow like this serves to force
// evaluation of AddSSTable to check for overlapping keys. That in turn will
// result in it maintaining exact MVCC stats rather than estimates. Of course
// this comes at the cost of said overlap check, but in the common case of
// non-overlapping ingestion into empty spans, that is just one seek.
disallowShadowingBelow := hlc.Timestamp{Logical: 1}

var err error
batcher, err = bulk.MakeSSTBatcher(ctx,
"restore",
db,
evalCtx.Settings,
disallowShadowingBelow,
writeAtBatchTS,
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
)
if err != nil {
return summary, err
}
}
defer batcher.Close(ctx)

Expand Down Expand Up @@ -539,6 +559,47 @@ func (rd *restoreDataProcessor) ConsumerClosed() {
rd.InternalClose()
}

// SSTBatcherExecutor wraps the SSTBatcher methods, allowing a validation only restore to
// implement a mock SSTBatcher used purely for job progress tracking.
type SSTBatcherExecutor interface {
AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error
Reset(ctx context.Context) error
Flush(ctx context.Context) error
Close(ctx context.Context)
GetSummary() roachpb.BulkOpSummary
}

type sstBatcherNoop struct {
// totalRows written by the batcher
totalRows storage.RowCounter
}

var _ SSTBatcherExecutor = &sstBatcherNoop{}

// AddMVCCKey merely increments the totalRow Counter. No key gets buffered or written.
func (b *sstBatcherNoop) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
return b.totalRows.Count(key.Key)
}

// Reset resets the counter
func (b *sstBatcherNoop) Reset(ctx context.Context) error {
return nil
}

// Flush noops.
func (b *sstBatcherNoop) Flush(ctx context.Context) error {
return nil
}

// Close noops.
func (b *sstBatcherNoop) Close(ctx context.Context) {
}

// GetSummary returns this batcher's total added rows/bytes/etc.
func (b *sstBatcherNoop) GetSummary() roachpb.BulkOpSummary {
return b.totalRows.BulkOpSummary
}

func init() {
rowexec.NewRestoreDataProcessor = newRestoreDataProcessor
}
Loading

0 comments on commit 8ed07d1

Please sign in to comment.