Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: add verify_backup_table_data option to RESTORE #86136

Merged
merged 2 commits into from
Aug 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,7 @@ unreserved_keyword ::=
| 'VALIDATE'
| 'VALUE'
| 'VARYING'
| 'VERIFY_BACKUP_TABLE_DATA'
| 'VIEW'
| 'VIEWACTIVITY'
| 'VIEWACTIVITYREDACTED'
Expand Down Expand Up @@ -2530,6 +2531,7 @@ restore_options ::=
| 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list
| 'TENANT' '=' string_or_placeholder
| 'SCHEMA_ONLY'
| 'VERIFY_BACKUP_TABLE_DATA'

scrub_option_list ::=
( scrub_option ) ( ( ',' scrub_option ) )*
Expand Down
79 changes: 64 additions & 15 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"context"
gosql "database/sql"
"fmt"
"io/ioutil"
"net/url"
"path/filepath"
"regexp"
"strings"
"testing"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -117,13 +120,14 @@ func (d *datadrivenTestState) cleanup(ctx context.Context) {
}

type serverCfg struct {
name string
iodir string
nodes int
splits int
ioConf base.ExternalIODirConfig
localities string
beforeVersion string
name string
iodir string
nodes int
splits int
ioConf base.ExternalIODirConfig
localities string
beforeVersion string
testingKnobCfg string
}

func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error {
Expand Down Expand Up @@ -171,6 +175,18 @@ func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error {
}
params.ServerArgsPerNode = serverArgsPerNode
}
if cfg.testingKnobCfg != "" {
switch cfg.testingKnobCfg {
case "RecoverFromIterPanic":
params.ServerArgs.Knobs.DistSQL = &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RecoverFromIterPanic: true,
},
}
default:
t.Fatalf("TestingKnobCfg %s not found", cfg.testingKnobCfg)
}
}
if cfg.iodir == "" {
tc, _, cfg.iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, cfg.splits,
InitManualReplication, params)
Expand Down Expand Up @@ -255,10 +271,14 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
// version before the passed in <beforeVersion> key. See cockroach_versions.go
// for possible values.
//
// + testingKnobCfg: specifies a key to a hardcoded testingKnob configuration
//
//
// - "upgrade-server version=<version>"
// Upgrade the cluster version of the active server to the passed in
// clusterVersion key. See cockroach_versions.go for possible values.
//
//
// - "exec-sql [server=<name>] [user=<name>] [args]"
// Executes the input SQL query on the target server. By default, server is
// the last created server.
Expand Down Expand Up @@ -347,6 +367,9 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
//
// + target: SQL target. Currently, only table names are supported.
//
//
// - "corrupt-backup" uri=<collectionUri>
// Finds the latest backup in the provided collection uri an flips a bit in one SST in the backup
func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -375,7 +398,7 @@ func TestDataDriven(t *testing.T) {
return ""

case "new-server":
var name, shareDirWith, iodir, localities, beforeVersion string
var name, shareDirWith, iodir, localities, beforeVersion, testingKnobCfg string
var splits int
nodes := singleNode
var io base.ExternalIODirConfig
Expand Down Expand Up @@ -404,16 +427,20 @@ func TestDataDriven(t *testing.T) {
if d.HasArg("beforeVersion") {
d.ScanArgs(t, "beforeVersion", &beforeVersion)
}
if d.HasArg("testingKnobCfg") {
d.ScanArgs(t, "testingKnobCfg", &testingKnobCfg)
}

lastCreatedServer = name
cfg := serverCfg{
name: name,
iodir: iodir,
nodes: nodes,
splits: splits,
ioConf: io,
localities: localities,
beforeVersion: beforeVersion,
name: name,
iodir: iodir,
nodes: nodes,
splits: splits,
ioConf: io,
localities: localities,
beforeVersion: beforeVersion,
testingKnobCfg: testingKnobCfg,
}
err := ds.addServer(t, cfg)
if err != nil {
Expand Down Expand Up @@ -816,6 +843,28 @@ func TestDataDriven(t *testing.T) {
})
require.NoError(t, err)
return ""

case "corrupt-backup":
server := lastCreatedServer
user := "root"
var uri string
d.ScanArgs(t, "uri", &uri)
parsedURI, err := url.Parse(strings.Replace(uri, "'", "", -1))
require.NoError(t, err)
var filePath string
filePathQuery := fmt.Sprintf("SELECT path FROM [SHOW BACKUP FILES FROM LATEST IN %s] LIMIT 1", uri)
err = ds.getSQLDB(t, server, user).QueryRow(filePathQuery).Scan(&filePath)
require.NoError(t, err)
fullPath := filepath.Join(ds.getIODir(t, server), parsedURI.Path, filePath)
print(fullPath)
data, err := ioutil.ReadFile(fullPath)
require.NoError(t, err)
data[20] ^= 1
if err := ioutil.WriteFile(fullPath, data, 0644 /* perm */); err != nil {
t.Fatal(err)
}
return ""

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/backupccl/restoration_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type restorationData interface {
getTenantRekeys() []execinfrapb.TenantRekey
getPKIDs() map[uint64]bool

// isValidateOnly returns ture iff only validation should occur
isValidateOnly() bool

// addTenant extends the set of data needed to restore to include a new tenant.
addTenant(fromID, toID roachpb.TenantID)

Expand Down Expand Up @@ -74,6 +77,9 @@ type restorationDataBase struct {
// systemTables store the system tables that need to be restored for cluster
// backups. Should be nil otherwise.
systemTables []catalog.TableDescriptor

// validateOnly indicates this data should only get read from external storage, not written
validateOnly bool
}

// restorationDataBase implements restorationData.
Expand Down Expand Up @@ -119,6 +125,10 @@ func (b *restorationDataBase) isEmpty() bool {
return len(b.spans) == 0
}

func (b *restorationDataBase) isValidateOnly() bool {
return b.validateOnly
}

// isMainBundle implements restorationData.
func (restorationDataBase) isMainBundle() bool { return false }

Expand Down
121 changes: 91 additions & 30 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,25 @@ func (rd *restoreDataProcessor) openSSTs(
}
}()

var recoverFromIterPanic bool
if restoreKnobs, ok := rd.flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
recoverFromIterPanic = restoreKnobs.RecoverFromIterPanic
}

// sendIter sends a multiplexed iterator covering the currently accumulated files over the
// channel.
sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
if recoverFromIterPanic {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, we shouldn't need this once we fix the pebble iterator.

defer func() {
if r := recover(); r != nil {
log.Errorf(ctx, "recovered from Iter panic %v", r)
}
}()
}

readAsOfIter.Close()

for _, dir := range dirsToSend {
Expand Down Expand Up @@ -389,37 +402,44 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
iter := sst.iter
defer sst.cleanup()

// If the system tenant is restoring a guest tenant span, we don't want to
// forward all the restored data to now, as there may be importing tables in
// that span, that depend on the difference in timestamps on restored existing
// vs importing keys to rollback.
writeAtBatchTS := true
if writeAtBatchTS && kr.fromSystemTenant &&
(bytes.HasPrefix(entry.Span.Key, keys.TenantPrefix) || bytes.HasPrefix(entry.Span.EndKey, keys.TenantPrefix)) {
log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span)
writeAtBatchTS = false
}
var batcher SSTBatcherExecutor
if rd.spec.ValidateOnly {
batcher = &sstBatcherNoop{}
} else {
// If the system tenant is restoring a guest tenant span, we don't want to
// forward all the restored data to now, as there may be importing tables in
// that span, that depend on the difference in timestamps on restored existing
// vs importing keys to rollback.
writeAtBatchTS := true
if writeAtBatchTS && kr.fromSystemTenant &&
(bytes.HasPrefix(entry.Span.Key, keys.TenantPrefix) || bytes.HasPrefix(entry.Span.EndKey, keys.TenantPrefix)) {
log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span)
writeAtBatchTS = false
}

// "disallowing" shadowing of anything older than logical=1 is i.e. allow all
// shadowing. We must allow shadowing in case the RESTORE has to retry any
// ingestions, but setting a (permissive) disallow like this serves to force
// evaluation of AddSSTable to check for overlapping keys. That in turn will
// result in it maintaining exact MVCC stats rather than estimates. Of course
// this comes at the cost of said overlap check, but in the common case of
// non-overlapping ingestion into empty spans, that is just one seek.
disallowShadowingBelow := hlc.Timestamp{Logical: 1}
batcher, err := bulk.MakeSSTBatcher(ctx,
"restore",
db,
evalCtx.Settings,
disallowShadowingBelow,
writeAtBatchTS,
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
)
if err != nil {
return summary, err
// "disallowing" shadowing of anything older than logical=1 is i.e. allow all
// shadowing. We must allow shadowing in case the RESTORE has to retry any
// ingestions, but setting a (permissive) disallow like this serves to force
// evaluation of AddSSTable to check for overlapping keys. That in turn will
// result in it maintaining exact MVCC stats rather than estimates. Of course
// this comes at the cost of said overlap check, but in the common case of
// non-overlapping ingestion into empty spans, that is just one seek.
disallowShadowingBelow := hlc.Timestamp{Logical: 1}

var err error
batcher, err = bulk.MakeSSTBatcher(ctx,
"restore",
db,
evalCtx.Settings,
disallowShadowingBelow,
writeAtBatchTS,
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
)
if err != nil {
return summary, err
}
}
defer batcher.Close(ctx)

Expand Down Expand Up @@ -539,6 +559,47 @@ func (rd *restoreDataProcessor) ConsumerClosed() {
rd.InternalClose()
}

// SSTBatcherExecutor wraps the SSTBatcher methods, allowing a validation only restore to
// implement a mock SSTBatcher used purely for job progress tracking.
type SSTBatcherExecutor interface {
AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error
Reset(ctx context.Context) error
Flush(ctx context.Context) error
Close(ctx context.Context)
GetSummary() roachpb.BulkOpSummary
}

type sstBatcherNoop struct {
// totalRows written by the batcher
totalRows storage.RowCounter
}

var _ SSTBatcherExecutor = &sstBatcherNoop{}

// AddMVCCKey merely increments the totalRow Counter. No key gets buffered or written.
func (b *sstBatcherNoop) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
return b.totalRows.Count(key.Key)
}

// Reset resets the counter
func (b *sstBatcherNoop) Reset(ctx context.Context) error {
return nil
}

// Flush noops.
func (b *sstBatcherNoop) Flush(ctx context.Context) error {
return nil
}

// Close noops.
func (b *sstBatcherNoop) Close(ctx context.Context) {
}

// GetSummary returns this batcher's total added rows/bytes/etc.
func (b *sstBatcherNoop) GetSummary() roachpb.BulkOpSummary {
return b.totalRows.BulkOpSummary
}

func init() {
rowexec.NewRestoreDataProcessor = newRestoreDataProcessor
}
Loading