Skip to content

Commit

Permalink
backupccl: re-backup spans that come online during incremental backups
Browse files Browse the repository at this point in the history
This commit fixes a bug where backup would miss non-transactional writes
(via AddSSTable) during incremental backups. These backups were missed
because AddSSTable can write to a timestamp that is before the previous
incremental backup. So, if a table was written to while OFFLINE (e.g. by
a RESTORE or an IMPORT), during a backup, the following incremental
backup may miss some data.

To resolve this, BACKUP now re-backs up all of the data of OFFLINE
tables on incremental backups that put this table back online. This
comes with the drawback of some incremental backups (when a restore or
import completes) will be much slower since it has to recapture all of
the data.

There is planned future work so that these incrementals only the new
data written by the RESTORE or IMPORT, rather than resorting to backing
up the entire table again. This will be addressed in a later PR.

Release note (bug fix): Incremental cluster backups may have missed data
written to tables while they were OFFLINE. In practice this can happen
if a RESTORE or IMPORT was running across incremental backups.
  • Loading branch information
pbardea committed Apr 9, 2021
1 parent 8e68ba0 commit 8581a4d
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 21 deletions.
78 changes: 78 additions & 0 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,13 @@ func backupPlanHook(
if err != nil {
return errors.Wrap(err, "invalid previous backups")
}

tableSpans, err := getReintroducedSpans(ctx, p, prevBackups, tables, revs, endTime)
if err != nil {
return err
}
newSpans = append(newSpans, tableSpans...)

if coveredTime != startTime {
return errors.Wrapf(err, "expected previous backups to cover until time %v, got %v", startTime, coveredTime)
}
Expand Down Expand Up @@ -1105,6 +1112,77 @@ func backupPlanHook(
return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil
}

// getReintroducedSpans checks to see if any spans need to be re-backed up from
// ts = 0. This may be the case if a span was OFFLINE in the previous backup and
// has come back online since. The entire span needs to be re-backed up because
// we may otherwise miss AddSSTable requests which write to a timestamp older
// than the last incremental.
func getReintroducedSpans(
ctx context.Context,
p sql.PlanHookState,
prevBackups []BackupManifest,
tables []catalog.TableDescriptor,
revs []BackupManifest_DescriptorRevision,
endTime hlc.Timestamp,
) ([]roachpb.Span, error) {
reintroducedTables := make(map[descpb.ID]struct{})

offlineInLastBackup := make(map[descpb.ID]struct{})
lastBackup := prevBackups[len(prevBackups)-1]
for _, desc := range lastBackup.Descriptors {
// TODO(pbardea): Also check that lastWriteTime is set once those are
// populated on the table descriptor.
if table := descpb.TableFromDescriptor(&desc, hlc.Timestamp{}); table != nil && table.Offline() {
offlineInLastBackup[table.GetID()] = struct{}{}
}
}

// If the table was offline in the last backup, but becomes PUBLIC, then it
// needs to be re-included since we may have missed non-transactional writes.
tablesToReinclude := make([]catalog.TableDescriptor, 0)
for _, desc := range tables {
if _, wasOffline := offlineInLastBackup[desc.GetID()]; wasOffline && desc.Public() {
tablesToReinclude = append(tablesToReinclude, desc)
reintroducedTables[desc.GetID()] = struct{}{}
}
}

// Tables should be re-introduced if any revision of the table was PUBLIC. A
// table may have been OFFLINE at the time of the last backup, and OFFLINE at
// the time of the current backup, but may have been PUBLIC at some time in
// between.
for _, rev := range revs {
rawTable := descpb.TableFromDescriptor(rev.Desc, hlc.Timestamp{})
if rawTable == nil {
continue
}
table := tabledesc.NewImmutable(*rawTable)
if _, wasOffline := offlineInLastBackup[table.GetID()]; wasOffline && table.Public() {
tablesToReinclude = append(tablesToReinclude, table)
reintroducedTables[table.GetID()] = struct{}{}
}
}

// All revisions of the table that we're re-introducing must also be
// considered.
allRevs := make([]BackupManifest_DescriptorRevision, 0, len(revs))
for _, rev := range revs {
rawTable := descpb.TableFromDescriptor(rev.Desc, hlc.Timestamp{})
if rawTable == nil {
continue
}
if rawTable == nil {
continue
}
if _, ok := reintroducedTables[rawTable.GetID()]; ok {
allRevs = append(allRevs, rev)
}
}

tableSpans := spansForAllTableIndexes(p.ExecCfg().Codec, tablesToReinclude, allRevs)
return tableSpans, nil
}

func makeNewEncryptionOptions(
ctx context.Context, encryptionParams backupEncryptionParams,
) (*jobspb.BackupEncryptionOptions, *jobspb.EncryptionInfo, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6870,7 +6870,7 @@ func TestBackupOnlyPublicIndexes(t *testing.T) {
sqlDB.Exec(t, `DROP DATABASE restoredb CASCADE;`)
}

// Restore to a time aftere the index was dropped and double check that we
// Restore to a time after the index was dropped and double check that we
// didn't bring back any keys from the dropped index.
{
blockBackfills = make(chan struct{}) // block the synthesized schema change job
Expand Down
97 changes: 97 additions & 0 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package backupccl

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -23,12 +24,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

// Large test to ensure that all of the system table data is being restored in
Expand Down Expand Up @@ -752,3 +756,96 @@ func TestClusterRevisionHistory(t *testing.T) {
}

}

// TestReintroduceOfflineSpans is a regression test for #62564, which tracks a
// bug where AddSSTable requests to OFFLINE tables may be missed by cluster
// incremental backups since they can write at a timestamp older than the last
// backup.
func TestReintroduceOfflineSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "likely slow under race")

// Block restores on the source cluster.
blockDBRestore := make(chan struct{})
dbRestoreStarted := make(chan struct{})
// The data is split such that there will be 10 span entries to process.
restoreBlockEntiresThreshold := 4
entriesCount := 0
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
if entriesCount == 0 {
close(dbRestoreStarted)
}
if entriesCount == restoreBlockEntiresThreshold {
<-blockDBRestore
}

entriesCount++
},
}},
}
params.ServerArgs.Knobs = knobs

const numAccounts = 1000
ctx, _, srcDB, tempDir, cleanupSrc := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitNone, params)
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{})
defer cleanupSrc()
defer cleanupDst()

dbBackupLoc := "nodelocal://0/my_db_backup"
clusterBackupLoc := "nodelocal://0/my_cluster_backup"

// Take a backup that we'll use to create an OFFLINE descriptor.
srcDB.Exec(t, `CREATE INDEX new_idx ON data.bank (balance)`)
srcDB.Exec(t, `BACKUP DATABASE data TO $1 WITH revision_history`, dbBackupLoc)

srcDB.Exec(t, `CREATE DATABASE restoredb;`)

// Take a base full backup.
srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

var g errgroup.Group
g.Go(func() error {
_, err := srcDB.DB.ExecContext(ctx, `RESTORE data.bank FROM $1 WITH into_db='restoredb'`, dbBackupLoc)
return err
})

// Take an incremental backup after the database restore starts.
<-dbRestoreStarted
srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// All the restore to finish. This will issue AddSSTable requests at a
// timestamp that is before the last incremental we just took.
close(blockDBRestore)

// Wait for the database restore to finish, and take another incremental
// backup that will miss the AddSSTable writes.
require.NoError(t, g.Wait())

var tsBefore string
srcDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsBefore)

// Drop an index on the restored table to ensure that the dropped index was
// also re-included.
srcDB.Exec(t, `DROP INDEX new_idx`)

srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)

// Assert that the restored database has the same number
// of rows in both the source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
}
32 changes: 12 additions & 20 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ func makeImportSpans(
rangeLoop:
for _, importRange := range importRanges {
needed := false
// ts keeps track of the latest time that we've backed up for this span.
var ts hlc.Timestamp
var latestCoveredTime hlc.Timestamp
var files []roachpb.ImportRequest_File
payloads := importRange.Payload.([]interface{})
for _, p := range payloads {
Expand All @@ -236,26 +235,19 @@ rangeLoop:
needed = true
case backupSpan:
// The latest time we've backed up this span may be ahead of the start
// time of this entry. This is because some spans can be re-introduced.
// Spans are re-introduced when they were taken OFFLINE (and therefore
// processed non-transactional writes) and brought back online (PUBLIC).
// They need to be re-introduced so that BACKUP re-captures the
// non-transactional writes which may have a write timestamp at any
// timestamp as far back as when the descriptor was originally taken
// OFFLINE.
// In practice, it's expected that ts == ie.start here. When that is not
// the case ts should be greater than ie.start and ie.start should be 0.
// This is safe because the iterators used to read the data from this
// backup can support reading from files with duplicate data. For more
// information see #62564.
if ts.Less(ie.start) {
// time of this entry. This is because some spans can be
// "re-introduced", meaning that they were previously backed up but
// still appear in introducedSpans. Spans are re-introduced when they
// were taken OFFLINE (and therefore processed non-transactional writes)
// and brought back online (PUBLIC). For more information see #62564.
if latestCoveredTime.Less(ie.start) {
return nil, hlc.Timestamp{}, errors.Errorf(
"no backup covers time [%s,%s) for range [%s,%s) or backups listed out of order (mismatched start time)",
ts, ie.start,
latestCoveredTime, ie.start,
roachpb.Key(importRange.Start), roachpb.Key(importRange.End))
}
if !ie.end.Less(ts) {
ts = ie.end
if !ie.end.Less(latestCoveredTime) {
latestCoveredTime = ie.end
}
case backupFile:
if len(ie.file.Path) > 0 {
Expand All @@ -268,8 +260,8 @@ rangeLoop:
}
}
if needed {
if ts != maxEndTime {
if err := onMissing(importRange, ts, maxEndTime); err != nil {
if latestCoveredTime != maxEndTime {
if err := onMissing(importRange, latestCoveredTime, maxEndTime); err != nil {
return nil, hlc.Timestamp{}, err
}
}
Expand Down

0 comments on commit 8581a4d

Please sign in to comment.