Skip to content

Commit

Permalink
backupccl: add verify_backup_table_data option to RESTORE
Browse files Browse the repository at this point in the history
Release note (sql change): this patch adds the verify_backup_table_data flag to
RESTORE. When the user passes this flag, along with the required schema_only
flag, a schema_only RESTORE will get run _and_ all user data will get read from
external storage, checksummed, and disarded before getting written to disk.

This flag provides two additional validation steps that a regular schema_only
RESTORE and a SHOW BACKUP with check_files cannot provide: This RESTORE
verifies that all data can get read and rekeyed to the Restoring Cluster, and
that all data passes a checksum check.

Release justification: low risk, high impact change to improve restore
validation
  • Loading branch information
msbutler committed Aug 18, 2022
1 parent 0e47c9b commit ca97c04
Show file tree
Hide file tree
Showing 16 changed files with 416 additions and 40 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,7 @@ unreserved_keyword ::=
| 'VALIDATE'
| 'VALUE'
| 'VARYING'
| 'VERIFY_BACKUP_TABLE_DATA'
| 'VIEW'
| 'VIEWACTIVITY'
| 'VIEWACTIVITYREDACTED'
Expand Down Expand Up @@ -2530,6 +2531,7 @@ restore_options ::=
| 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list
| 'TENANT' '=' string_or_placeholder
| 'SCHEMA_ONLY'
| 'VERIFY_BACKUP_TABLE_DATA'

scrub_option_list ::=
( scrub_option ) ( ( ',' scrub_option ) )*
Expand Down
79 changes: 64 additions & 15 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"context"
gosql "database/sql"
"fmt"
"io/ioutil"
"net/url"
"path/filepath"
"regexp"
"strings"
"testing"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -117,13 +120,14 @@ func (d *datadrivenTestState) cleanup(ctx context.Context) {
}

type serverCfg struct {
name string
iodir string
nodes int
splits int
ioConf base.ExternalIODirConfig
localities string
beforeVersion string
name string
iodir string
nodes int
splits int
ioConf base.ExternalIODirConfig
localities string
beforeVersion string
testingKnobCfg string
}

func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error {
Expand Down Expand Up @@ -171,6 +175,18 @@ func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error {
}
params.ServerArgsPerNode = serverArgsPerNode
}
if cfg.testingKnobCfg != "" {
switch cfg.testingKnobCfg {
case "RecoverFromIterPanic":
params.ServerArgs.Knobs.DistSQL = &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RecoverFromIterPanic: true,
},
}
default:
t.Fatalf("TestingKnobCfg %s not found", cfg.testingKnobCfg)
}
}
if cfg.iodir == "" {
tc, _, cfg.iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, cfg.splits,
InitManualReplication, params)
Expand Down Expand Up @@ -255,10 +271,14 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
// version before the passed in <beforeVersion> key. See cockroach_versions.go
// for possible values.
//
// + testingKnobCfg: specifies a key to a hardcoded testingKnob configuration
//
//
// - "upgrade-server version=<version>"
// Upgrade the cluster version of the active server to the passed in
// clusterVersion key. See cockroach_versions.go for possible values.
//
//
// - "exec-sql [server=<name>] [user=<name>] [args]"
// Executes the input SQL query on the target server. By default, server is
// the last created server.
Expand Down Expand Up @@ -347,6 +367,9 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
//
// + target: SQL target. Currently, only table names are supported.
//
//
// - "corrupt-backup" uri=<collectionUri>
// Finds the latest backup in the provided collection uri an flips a bit in one SST in the backup
func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -375,7 +398,7 @@ func TestDataDriven(t *testing.T) {
return ""

case "new-server":
var name, shareDirWith, iodir, localities, beforeVersion string
var name, shareDirWith, iodir, localities, beforeVersion, testingKnobCfg string
var splits int
nodes := singleNode
var io base.ExternalIODirConfig
Expand Down Expand Up @@ -404,16 +427,20 @@ func TestDataDriven(t *testing.T) {
if d.HasArg("beforeVersion") {
d.ScanArgs(t, "beforeVersion", &beforeVersion)
}
if d.HasArg("testingKnobCfg") {
d.ScanArgs(t, "testingKnobCfg", &testingKnobCfg)
}

lastCreatedServer = name
cfg := serverCfg{
name: name,
iodir: iodir,
nodes: nodes,
splits: splits,
ioConf: io,
localities: localities,
beforeVersion: beforeVersion,
name: name,
iodir: iodir,
nodes: nodes,
splits: splits,
ioConf: io,
localities: localities,
beforeVersion: beforeVersion,
testingKnobCfg: testingKnobCfg,
}
err := ds.addServer(t, cfg)
if err != nil {
Expand Down Expand Up @@ -816,6 +843,28 @@ func TestDataDriven(t *testing.T) {
})
require.NoError(t, err)
return ""

case "corrupt-backup":
server := lastCreatedServer
user := "root"
var uri string
d.ScanArgs(t, "uri", &uri)
parsedURI, err := url.Parse(strings.Replace(uri, "'", "", -1))
require.NoError(t, err)
var filePath string
filePathQuery := fmt.Sprintf("SELECT path FROM [SHOW BACKUP FILES FROM LATEST IN %s] LIMIT 1", uri)
err = ds.getSQLDB(t, server, user).QueryRow(filePathQuery).Scan(&filePath)
require.NoError(t, err)
fullPath := filepath.Join(ds.getIODir(t, server), parsedURI.Path, filePath)
print(fullPath)
data, err := ioutil.ReadFile(fullPath)
require.NoError(t, err)
data[20] ^= 1
if err := ioutil.WriteFile(fullPath, data, 0644 /* perm */); err != nil {
t.Fatal(err)
}
return ""

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/backupccl/restoration_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type restorationData interface {
getTenantRekeys() []execinfrapb.TenantRekey
getPKIDs() map[uint64]bool

// isValidateOnly returns ture iff only validation should occur
isValidateOnly() bool

// addTenant extends the set of data needed to restore to include a new tenant.
addTenant(fromID, toID roachpb.TenantID)

Expand Down Expand Up @@ -74,6 +77,9 @@ type restorationDataBase struct {
// systemTables store the system tables that need to be restored for cluster
// backups. Should be nil otherwise.
systemTables []catalog.TableDescriptor

// validateOnly indicates this data should only get read from external storage, not written
validateOnly bool
}

// restorationDataBase implements restorationData.
Expand Down Expand Up @@ -119,6 +125,10 @@ func (b *restorationDataBase) isEmpty() bool {
return len(b.spans) == 0
}

func (b *restorationDataBase) isValidateOnly() bool {
return b.validateOnly
}

// isMainBundle implements restorationData.
func (restorationDataBase) isMainBundle() bool { return false }

Expand Down
14 changes: 14 additions & 0 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,25 @@ func (rd *restoreDataProcessor) openSSTs(
}
}()

var recoverFromIterPanic bool
if restoreKnobs, ok := rd.flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
recoverFromIterPanic = restoreKnobs.RecoverFromIterPanic
}

// sendIter sends a multiplexed iterator covering the currently accumulated files over the
// channel.
sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
if recoverFromIterPanic {
defer func() {
if r := recover(); r != nil {
log.Errorf(ctx, "recovered from Iter panic %v", r)
}
}()
}

readAsOfIter.Close()

for _, dir := range dirsToSend {
Expand Down Expand Up @@ -417,6 +430,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
rd.spec.ValidateOnly,
)
if err != nil {
return summary, err
Expand Down
Loading

0 comments on commit ca97c04

Please sign in to comment.