Skip to content

Commit

Permalink
cliccl/debug_backup.go: add --with-revisions flag to allow exorting…
Browse files Browse the repository at this point in the history
… revisions of data

This patch adds `--with-revisions` on `debug export` to allow users
to export revisions of table data. If `--with-revisions` is specified,
revisions of data are returned to users, with an extra column
displaying the revision time of that record.

Release note (cli change): This is an experimenal/beta feature of backup debug
tool to allow users to export revisions of data from backup. We add `--with-revisions`
on `debug export` to allow users to export revisions of table data.
If `--with-revisions` is specified, revisions of data are returned to users,
with an extra column displaying the revision time of that record.
  • Loading branch information
Elliebababa committed Apr 28, 2021
1 parent 4d8331f commit 78520e5
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 44 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ import (
)

const (
backupOptRevisionHistory = "revision_history"
// BackupOptRevisionHistory is the option name for backup with revision history
// exported to cliccl for backup revision inspection
BackupOptRevisionHistory = "revision_history"
backupOptIncludeInterleaves = "include_deprecated_interleaves"
backupOptEncPassphrase = "encryption_passphrase"
backupOptEncKMS = "kms"
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,13 @@ func resolveBackupManifests(
const errPrefix = "invalid RESTORE timestamp: restoring to arbitrary time requires that BACKUP for requested time be created with '%s' option."
if i == 0 {
return nil, nil, nil, errors.Errorf(
errPrefix+" nearest backup time is %s", backupOptRevisionHistory,
errPrefix+" nearest backup time is %s", BackupOptRevisionHistory,
timeutil.Unix(0, b.EndTime.WallTime).UTC(),
)
}
return nil, nil, nil, errors.Errorf(
errPrefix+" nearest BACKUP times are %s or %s",
backupOptRevisionHistory,
BackupOptRevisionHistory,
timeutil.Unix(0, mainBackupManifests[i-1].EndTime.WallTime).UTC(),
timeutil.Unix(0, b.EndTime.WallTime).UTC(),
)
Expand Down
33 changes: 29 additions & 4 deletions pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package backupccl

import (
"context"
"reflect"
"sort"
"strings"

Expand Down Expand Up @@ -451,9 +452,10 @@ type EntryFiles []roachpb.ImportRequest_File
// from backup manifests.
// exported to cliccl for exporting data directly from backup sst.
type BackupTableEntry struct {
Desc catalog.TableDescriptor
Span roachpb.Span
Files []EntryFiles
Desc catalog.TableDescriptor
Span roachpb.Span
Files []EntryFiles
LastSchemaChangeTime hlc.Timestamp
}

// MakeBackupTableEntry looks up the descriptor of fullyQualifiedTableName
Expand Down Expand Up @@ -481,7 +483,7 @@ func MakeBackupTableEntry(
" or should specify the time to be an exact backup time, nearest backup time is %s"
return BackupTableEntry{}, errors.WithHintf(
errors.Newf("unknown read time: %s", timeutil.Unix(0, endTime.WallTime).UTC()),
errorHints, backupOptRevisionHistory, timeutil.Unix(0, b.EndTime.WallTime).UTC(),
errorHints, BackupOptRevisionHistory, timeutil.Unix(0, b.EndTime.WallTime).UTC(),
)
}
ind = i
Expand Down Expand Up @@ -528,10 +530,33 @@ func MakeBackupTableEntry(
return BackupTableEntry{}, errors.Wrapf(err, "making spans for table %s", fullyQualifiedTableName)
}

lastSchemaChangeTime := endTime
for i := len(backupManifests) - 1; i >= 0; i-- {
manifest := backupManifests[i]
for j := len(manifest.DescriptorChanges) - 1; j >= 0; j-- {
rev := manifest.DescriptorChanges[j]

if endTime.LessEq(rev.Time) {
continue
}

if rev.ID == tbDesc.GetID() {
d := catalogkv.NewBuilder(rev.Desc).BuildExistingMutable()
revDesc, _ := catalog.AsTableDescriptor(d)
if !reflect.DeepEqual(revDesc.PublicColumns(), tbDesc.PublicColumns()) {
goto EXIT
}
lastSchemaChangeTime = rev.Time
}
}
}
EXIT:

backupTableEntry := BackupTableEntry{
tbDesc,
tablePrimaryIndexSpan,
make([]EntryFiles, 0),
lastSchemaChangeTime,
}

for _, e := range entry {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/cliccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/row",
Expand Down
69 changes: 62 additions & 7 deletions pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand Down Expand Up @@ -63,6 +64,7 @@ var debugBackupArgs struct {
nullas string
maxRows int
startKey cli.MVCCKey
withRevisions bool

rowCount int
}
Expand All @@ -80,6 +82,7 @@ func setDebugContextDefault() {
debugBackupArgs.maxRows = 0
debugBackupArgs.startKey = cli.MVCCKey{}
debugBackupArgs.rowCount = 0
debugBackupArgs.withRevisions = false
}

func init() {
Expand Down Expand Up @@ -180,7 +183,18 @@ func init() {
cliflags.StartKey.Name,
cliflags.StartKey.Usage())

cli.DebugCmd.AddCommand(backupCmds)
exportDataCmd.Flags().BoolVar(
&debugBackupArgs.withRevisions,
cliflags.ExportRevisions.Name,
false, /*value*/
cliflags.ExportRevisions.Usage())

exportDataCmd.Flags().StringVarP(
&debugBackupArgs.readTime,
cliflags.ExportRevisionsUpTo.Name,
cliflags.ExportRevisionsUpTo.Shorthand,
"", /*value*/
cliflags.ExportRevisionsUpTo.Usage())

backupSubCmds := []*cobra.Command{
showCmd,
Expand All @@ -193,6 +207,7 @@ func init() {
backupCmds.AddCommand(cmd)
cmd.Flags().AddFlagSet(backupFlags)
}
cli.DebugCmd.AddCommand(backupCmds)
}

func newBlobFactory(ctx context.Context, dialing roachpb.NodeID) (blobs.BlobClient, error) {
Expand Down Expand Up @@ -351,7 +366,14 @@ func runExportDataCmd(cmd *cobra.Command, args []string) error {
manifests = append(manifests, manifest)
}

endTime, err := evalAsOfTimestamp(debugBackupArgs.readTime)
if debugBackupArgs.withRevisions && manifests[0].MVCCFilter != backupccl.MVCCFilter_All {
return errors.WithHintf(
errors.Newf("invalid flag: %s", cliflags.ExportRevisions.Name),
"requires backup created with %q", backupccl.BackupOptRevisionHistory,
)
}

endTime, err := evalAsOfTimestamp(debugBackupArgs.readTime, manifests)
if err != nil {
return errors.Wrapf(err, "eval as of timestamp %s", debugBackupArgs.readTime)
}
Expand All @@ -375,9 +397,11 @@ func runExportDataCmd(cmd *cobra.Command, args []string) error {
return nil
}

func evalAsOfTimestamp(readTime string) (hlc.Timestamp, error) {
func evalAsOfTimestamp(
readTime string, manifests []backupccl.BackupManifest,
) (hlc.Timestamp, error) {
if readTime == "" {
return hlc.Timestamp{}, nil
return manifests[len(manifests)-1].EndTime, nil
}
var err error
// Attempt to parse as timestamp.
Expand Down Expand Up @@ -416,8 +440,14 @@ func showData(
}
defer rf.Close(ctx)

if debugBackupArgs.withRevisions {
startT := entry.LastSchemaChangeTime.GoTime().UTC()
endT := endTime.GoTime().UTC()
fmt.Printf("DETECTED SCHEMA CHANGE AT %s, ONLY SHOWING UPDATES IN RANGE [%s, %s]\n", startT, startT, endT)
}

for _, files := range entry.Files {
if err := processEntryFiles(ctx, rf, files, entry.Span, endTime, writer); err != nil {
if err := processEntryFiles(ctx, rf, files, entry.Span, entry.LastSchemaChangeTime, endTime, writer); err != nil {
return err
}
if debugBackupArgs.maxRows != 0 && debugBackupArgs.rowCount >= debugBackupArgs.maxRows {
Expand Down Expand Up @@ -479,17 +509,31 @@ func makeRowFetcher(
) (row.Fetcher, error) {
var colIdxMap catalog.TableColMap
var valNeededForCol util.FastIntSet
colDescs := make([]catalog.Column, len(entry.Desc.PublicColumns()))
for i, col := range entry.Desc.PublicColumns() {
colIdxMap.Set(col.GetID(), i)
valNeededForCol.Add(i)
colDescs[i] = col
}

if debugBackupArgs.withRevisions {
newIndex := len(entry.Desc.PublicColumns())
newCol, err := entry.Desc.FindColumnWithName(colinfo.MVCCTimestampColumnName)
if err != nil {
return row.Fetcher{}, errors.Wrapf(err, "get mvcc timestamp column")
}
colIdxMap.Set(newCol.GetID(), newIndex)
valNeededForCol.Add(newIndex)
colDescs = append(colDescs, newCol)
}

table := row.FetcherTableArgs{
Spans: []roachpb.Span{entry.Span},
Desc: entry.Desc,
Index: entry.Desc.GetPrimaryIndex(),
ColIdxMap: colIdxMap,
IsSecondaryIndex: false,
Cols: entry.Desc.PublicColumns(),
Cols: colDescs,
ValNeededForCol: valNeededForCol,
}

Expand All @@ -515,6 +559,7 @@ func processEntryFiles(
rf row.Fetcher,
files backupccl.EntryFiles,
span roachpb.Span,
startTime hlc.Timestamp,
endTime hlc.Timestamp,
writer *csv.Writer,
) (err error) {
Expand All @@ -536,7 +581,7 @@ func processEntryFiles(
if len(debugBackupArgs.startKey.Key) != 0 {
startKeyMVCC.Key = debugBackupArgs.startKey.Key
}
kvFetcher := row.MakeBackupSSTKVFetcher(startKeyMVCC, endKeyMVCC, iter, endTime)
kvFetcher := row.MakeBackupSSTKVFetcher(startKeyMVCC, endKeyMVCC, iter, startTime, endTime, debugBackupArgs.withRevisions)

if err := rf.StartScanFrom(ctx, &kvFetcher); err != nil {
return errors.Wrapf(err, "row fetcher starts scan")
Expand All @@ -552,6 +597,16 @@ func processEntryFiles(
}
rowDisplay := make([]string, datums.Len())
for i, datum := range datums {

if debugBackupArgs.withRevisions && i == datums.Len()-1 {
approx, err := tree.DecimalToInexactDTimestamp(datum.(*tree.DDecimal))
if err != nil {
return errors.Wrapf(err, "convert datum %s to mvcc timestamp", datum)
}
rowDisplay[i] = approx.UTC().String()
break
}

if datum == tree.DNull {
rowDisplay[i] = debugBackupArgs.nullas
} else {
Expand Down
119 changes: 119 additions & 0 deletions pkg/ccl/cliccl/debug_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,125 @@ func TestExportDataAOST(t *testing.T) {
}
}

func TestExportDataWithRevisions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

c := cli.NewCLITest(cli.TestCLIParams{T: t, NoServer: true})
defer c.Cleanup()

ctx := context.Background()
dir, cleanFn := testutils.TempDir(t)
defer cleanFn()
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir, Insecure: true})
defer srv.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE DATABASE testDB`)
sqlDB.Exec(t, `USE testDB`)
sqlDB.Exec(t, `CREATE TABLE fooTable (id INT PRIMARY KEY, value INT, tag STRING)`)

const backupPath = "nodelocal://0/fooFolder"
const backupPathWithRev = "nodelocal://0/fooFolderRev"

sqlDB.Exec(t, `INSERT INTO fooTable VALUES (1, 123, 'cat')`)
var tsInsert time.Time
sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=1`).Scan(&tsInsert)

sqlDB.Exec(t, `INSERT INTO fooTable VALUES (2, 223, 'dog')`)
var tsInsert2 time.Time
sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=2`).Scan(&tsInsert2)

ts1 := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO $1 AS OF SYSTEM TIME '%s'`, ts1.AsOfSystemTime()), backupPath)
sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO $1 AS OF SYSTEM TIME '%s' WITH revision_history`, ts1.AsOfSystemTime()), backupPathWithRev)

sqlDB.Exec(t, `ALTER TABLE fooTable ADD COLUMN active BOOL`)
sqlDB.Exec(t, `INSERT INTO fooTable VALUES (3, 323, 'mickey mouse', true)`)
var tsInsert3 time.Time
sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=3`).Scan(&tsInsert3)
ts2 := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO $1 AS OF SYSTEM TIME '%s' WITH revision_history`, ts2.AsOfSystemTime()), backupPathWithRev)

sqlDB.Exec(t, `SET sql_safe_updates=false`)
sqlDB.Exec(t, `ALTER TABLE fooTable DROP COLUMN value`)
var tsDropColumn time.Time
sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=3`).Scan(&tsDropColumn)

sqlDB.Exec(t, `UPDATE fooTable SET tag=('lion') WHERE id = 1`)
var tsUpdate time.Time
sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=1`).Scan(&tsUpdate)

ts3 := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO $1 AS OF SYSTEM TIME '%s' WITH revision_history`, ts3.AsOfSystemTime()), backupPathWithRev)

t.Run("show-data-revisions-of-backup-without-revision-history", func(t *testing.T) {
setDebugContextDefault()
out, err := c.RunWithCapture(fmt.Sprintf("debug backup export %s --table=%s --with-revisions --external-io-dir=%s",
backupPath,
"testDB.public.fooTable",
dir))
require.NoError(t, err)
expectedError := "ERROR: invalid flag: with-revisions\nHINT: requires backup created with \"revision_history\"\n"
checkExpectedOutput(t, expectedError, out)
})

testCases := []struct {
name string
tableName string
backupPaths []string
expectedData string
upToTimestamp string
}{
{
"show-data-revisions-of-a-single-full-backup",
"testDB.public.fooTable",
[]string{
backupPathWithRev,
},
fmt.Sprintf("1,123,'cat',%s\n2,223,'dog',%s\n", tsInsert.UTC(), tsInsert2.UTC()),
"",
},
{
"show-data-revisions-after-adding-an-colum",
"testDB.public.fooTable",
[]string{
backupPathWithRev,
backupPathWithRev + ts2.GoTime().Format(backupccl.DateBasedIncFolderName),
backupPathWithRev + ts3.GoTime().Format(backupccl.DateBasedIncFolderName),
},
fmt.Sprintf("3,323,'mickey mouse',true,%s\n", tsInsert3.UTC()),
ts2.AsOfSystemTime(),
},
{
"show-data-revisions-after-dropping-an-colum-and-update-value",
"testDB.public.fooTable",
[]string{
backupPathWithRev,
backupPathWithRev + ts2.GoTime().Format(backupccl.DateBasedIncFolderName),
backupPathWithRev + ts3.GoTime().Format(backupccl.DateBasedIncFolderName),
},
fmt.Sprintf("1,'lion',null,%s\n1,'cat',null,%s\n2,'dog',null,%s\n3,'mickey mouse',true,%s\n",
tsUpdate.UTC(), tsDropColumn.UTC(), tsDropColumn.UTC(), tsDropColumn.UTC()),
ts3.AsOfSystemTime(),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
setDebugContextDefault()
out, err := c.RunWithCapture(fmt.Sprintf("debug backup export %s --table=%s --with-revisions --up-to=%s --external-io-dir=%s",
strings.Join(tc.backupPaths, " "),
tc.tableName,
tc.upToTimestamp,
dir))
require.NoError(t, err)
trimmedOut := out[strings.Index(out, "\n")+1:]
checkExpectedOutput(t, tc.expectedData, trimmedOut)
})
}
}

func checkExpectedOutput(t *testing.T, expected string, out string) {
endOfCmd := strings.Index(out, "\n")
output := out[endOfCmd+1:]
Expand Down
Loading

0 comments on commit 78520e5

Please sign in to comment.