Skip to content

Commit

Permalink
Merge pull request #63304 from pbardea/backport21.1-63121
Browse files Browse the repository at this point in the history
release-21.1: backupccl: re-backup spans that come online during incremental backups
  • Loading branch information
pbardea authored Apr 12, 2021
2 parents f5cca5b + 3bd095f commit 8f66611
Show file tree
Hide file tree
Showing 6 changed files with 526 additions and 24 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ go_test(
"create_scheduled_backup_test.go",
"full_cluster_backup_restore_test.go",
"helpers_test.go",
"import_spans_test.go",
"main_test.go",
"partitioned_backup_test.go",
"restore_mid_schema_change_test.go",
Expand Down
84 changes: 80 additions & 4 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,9 +1055,8 @@ func backupPlanHook(
_, coveredTime, err := makeImportSpans(
spans,
prevBackups,
nil, /*backupLocalityInfo*/
keys.MinKey,
p.User(),
nil, /*backupLocalityMaps*/
keys.MinKey, /* lowWatermark */
func(span covering.Range, start, end hlc.Timestamp) error {
if start.IsEmpty() {
newSpans = append(newSpans, roachpb.Span{Key: span.Start, EndKey: span.End})
Expand All @@ -1069,6 +1068,13 @@ func backupPlanHook(
if err != nil {
return errors.Wrap(err, "invalid previous backups")
}

tableSpans, err := getReintroducedSpans(ctx, p, prevBackups, tables, revs, endTime)
if err != nil {
return err
}
newSpans = append(newSpans, tableSpans...)

if coveredTime != startTime {
return errors.Errorf("expected previous backups to cover until time %v, got %v", startTime, coveredTime)
}
Expand Down Expand Up @@ -1111,7 +1117,6 @@ func backupPlanHook(
append(prevBackups, backupManifest),
nil, /*backupLocalityInfo*/
keys.MinKey,
p.User(),
errOnMissingRange,
); err != nil {
return err
Expand Down Expand Up @@ -1323,6 +1328,77 @@ func backupPlanHook(
return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil
}

// getReintroducedSpans checks to see if any spans need to be re-backed up from
// ts = 0. This may be the case if a span was OFFLINE in the previous backup and
// has come back online since. The entire span needs to be re-backed up because
// we may otherwise miss AddSSTable requests which write to a timestamp older
// than the last incremental.
func getReintroducedSpans(
ctx context.Context,
p sql.PlanHookState,
prevBackups []BackupManifest,
tables []catalog.TableDescriptor,
revs []BackupManifest_DescriptorRevision,
endTime hlc.Timestamp,
) ([]roachpb.Span, error) {
reintroducedTables := make(map[descpb.ID]struct{})

offlineInLastBackup := make(map[descpb.ID]struct{})
lastBackup := prevBackups[len(prevBackups)-1]
for _, desc := range lastBackup.Descriptors {
// TODO(pbardea): Also check that lastWriteTime is set once those are
// populated on the table descriptor.
if table, _, _, _ := descpb.FromDescriptor(&desc); table != nil && table.Offline() {
offlineInLastBackup[table.GetID()] = struct{}{}
}
}

// If the table was offline in the last backup, but becomes PUBLIC, then it
// needs to be re-included since we may have missed non-transactional writes.
tablesToReinclude := make([]catalog.TableDescriptor, 0)
for _, desc := range tables {
if _, wasOffline := offlineInLastBackup[desc.GetID()]; wasOffline && desc.Public() {
tablesToReinclude = append(tablesToReinclude, desc)
reintroducedTables[desc.GetID()] = struct{}{}
}
}

// Tables should be re-introduced if any revision of the table was PUBLIC. A
// table may have been OFFLINE at the time of the last backup, and OFFLINE at
// the time of the current backup, but may have been PUBLIC at some time in
// between.
for _, rev := range revs {
rawTable, _, _, _ := descpb.FromDescriptor(rev.Desc)
if rawTable == nil {
continue
}
table := tabledesc.NewBuilder(rawTable).BuildImmutableTable()
if _, wasOffline := offlineInLastBackup[table.GetID()]; wasOffline && table.Public() {
tablesToReinclude = append(tablesToReinclude, table)
reintroducedTables[table.GetID()] = struct{}{}
}
}

// All revisions of the table that we're re-introducing must also be
// considered.
allRevs := make([]BackupManifest_DescriptorRevision, 0, len(revs))
for _, rev := range revs {
rawTable, _, _, _ := descpb.FromDescriptor(rev.Desc)
if rawTable == nil {
continue
}
if _, ok := reintroducedTables[rawTable.GetID()]; ok {
allRevs = append(allRevs, rev)
}
}

tableSpans, err := spansForAllTableIndexes(ctx, p.ExecCfg(), endTime, tablesToReinclude, allRevs)
if err != nil {
return nil, err
}
return tableSpans, nil
}

func makeNewEncryptionOptions(
ctx context.Context, encryptionParams backupEncryptionParams,
) (*jobspb.BackupEncryptionOptions, *jobspb.EncryptionInfo, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8007,7 +8007,7 @@ func TestBackupOnlyPublicIndexes(t *testing.T) {
sqlDB.Exec(t, `DROP DATABASE restoredb CASCADE;`)
}

// Restore to a time aftere the index was dropped and double check that we
// Restore to a time after the index was dropped and double check that we
// didn't bring back any keys from the dropped index.
{
blockBackfills = make(chan struct{}) // block the synthesized schema change job
Expand Down
96 changes: 96 additions & 0 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package backupccl

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand All @@ -33,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

// Large test to ensure that all of the system table data is being restored in
Expand Down Expand Up @@ -801,3 +804,96 @@ func TestClusterRevisionHistory(t *testing.T) {
}

}

// TestReintroduceOfflineSpans is a regression test for #62564, which tracks a
// bug where AddSSTable requests to OFFLINE tables may be missed by cluster
// incremental backups since they can write at a timestamp older than the last
// backup.
func TestReintroduceOfflineSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "likely slow under race")

// Block restores on the source cluster.
blockDBRestore := make(chan struct{})
dbRestoreStarted := make(chan struct{})
// The data is split such that there will be 10 span entries to process.
restoreBlockEntiresThreshold := 4
entriesCount := 0
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
if entriesCount == 0 {
close(dbRestoreStarted)
}
if entriesCount == restoreBlockEntiresThreshold {
<-blockDBRestore
}

entriesCount++
},
}},
}
params.ServerArgs.Knobs = knobs

const numAccounts = 1000
ctx, _, srcDB, tempDir, cleanupSrc := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupSrc()
defer cleanupDst()

dbBackupLoc := "nodelocal://0/my_db_backup"
clusterBackupLoc := "nodelocal://0/my_cluster_backup"

// Take a backup that we'll use to create an OFFLINE descriptor.
srcDB.Exec(t, `CREATE INDEX new_idx ON data.bank (balance)`)
srcDB.Exec(t, `BACKUP DATABASE data TO $1 WITH revision_history`, dbBackupLoc)

srcDB.Exec(t, `CREATE DATABASE restoredb;`)

// Take a base full backup.
srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

var g errgroup.Group
g.Go(func() error {
_, err := srcDB.DB.ExecContext(ctx, `RESTORE data.bank FROM $1 WITH into_db='restoredb'`, dbBackupLoc)
return err
})

// Take an incremental backup after the database restore starts.
<-dbRestoreStarted
srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// All the restore to finish. This will issue AddSSTable requests at a
// timestamp that is before the last incremental we just took.
close(blockDBRestore)

// Wait for the database restore to finish, and take another incremental
// backup that will miss the AddSSTable writes.
require.NoError(t, g.Wait())

var tsBefore string
srcDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsBefore)

// Drop an index on the restored table to ensure that the dropped index was
// also re-included.
srcDB.Exec(t, `DROP INDEX new_idx`)

srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)

// Assert that the restored database has the same number
// of rows in both the source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
}
Loading

0 comments on commit 8f66611

Please sign in to comment.