diff --git a/pkg/ccl/backupccl/targets.go b/pkg/ccl/backupccl/targets.go index 1e66d7bc3a8f..c01ae63fc2e2 100644 --- a/pkg/ccl/backupccl/targets.go +++ b/pkg/ccl/backupccl/targets.go @@ -10,6 +10,7 @@ package backupccl import ( "context" + "reflect" "sort" "strings" @@ -491,9 +492,10 @@ type EntryFiles []roachpb.ImportRequest_File // from backup manifests. // exported to cliccl for exporting data directly from backup sst. type BackupTableEntry struct { - Desc catalog.TableDescriptor - Span roachpb.Span - Files []EntryFiles + Desc catalog.TableDescriptor + Span roachpb.Span + Files []EntryFiles + LastSchemaChangeTime hlc.Timestamp } // MakeBackupTableEntry looks up the descriptor of fullyQualifiedTableName @@ -568,10 +570,13 @@ func MakeBackupTableEntry( return BackupTableEntry{}, errors.Wrapf(err, "making spans for table %s", fullyQualifiedTableName) } + lastSchemaChangeTime := findLastSchemaChangeTime(backupManifests, tbDesc, endTime) + backupTableEntry := BackupTableEntry{ tbDesc, tablePrimaryIndexSpan, make([]EntryFiles, 0), + lastSchemaChangeTime, } for _, e := range entry { @@ -580,3 +585,29 @@ func MakeBackupTableEntry( return backupTableEntry, nil } + +func findLastSchemaChangeTime( + backupManifests []BackupManifest, tbDesc catalog.TableDescriptor, endTime hlc.Timestamp, +) hlc.Timestamp { + lastSchemaChangeTime := endTime + for i := len(backupManifests) - 1; i >= 0; i-- { + manifest := backupManifests[i] + for j := len(manifest.DescriptorChanges) - 1; j >= 0; j-- { + rev := manifest.DescriptorChanges[j] + + if endTime.LessEq(rev.Time) { + continue + } + + if rev.ID == tbDesc.GetID() { + d := catalogkv.NewBuilder(rev.Desc).BuildExistingMutable() + revDesc, _ := catalog.AsTableDescriptor(d) + if !reflect.DeepEqual(revDesc.PublicColumns(), tbDesc.PublicColumns()) { + return lastSchemaChangeTime + } + lastSchemaChangeTime = rev.Time + } + } + } + return lastSchemaChangeTime +} diff --git a/pkg/ccl/cliccl/BUILD.bazel b/pkg/ccl/cliccl/BUILD.bazel index 43af066cdf13..d4845e27d952 100644 --- a/pkg/ccl/cliccl/BUILD.bazel +++ b/pkg/ccl/cliccl/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//pkg/server", "//pkg/settings/cluster", "//pkg/sql/catalog", + "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/tabledesc", "//pkg/sql/row", diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 7301b35bcbd7..0d04dc58de2d 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/row" @@ -54,6 +55,10 @@ import ( "github.com/spf13/cobra" ) +const ( + backupOptRevisionHistory = "revision_history" +) + type key struct { rawByte []byte typ string @@ -115,6 +120,7 @@ var debugBackupArgs struct { nullas string maxRows int startKey key + withRevisions bool rowCount int } @@ -132,6 +138,7 @@ func setDebugContextDefault() { debugBackupArgs.maxRows = 0 debugBackupArgs.startKey = key{} debugBackupArgs.rowCount = 0 + debugBackupArgs.withRevisions = false } func init() { @@ -232,7 +239,18 @@ func init() { cliflags.StartKey.Name, cliflags.StartKey.Usage()) - cli.DebugCmd.AddCommand(backupCmds) + exportDataCmd.Flags().BoolVar( + &debugBackupArgs.withRevisions, + cliflags.ExportRevisions.Name, + false, /*value*/ + cliflags.ExportRevisions.Usage()) + + exportDataCmd.Flags().StringVarP( + &debugBackupArgs.readTime, + cliflags.ExportRevisionsUpTo.Name, + cliflags.ExportRevisionsUpTo.Shorthand, + "", /*value*/ + cliflags.ExportRevisionsUpTo.Usage()) backupSubCmds := []*cobra.Command{ showCmd, @@ -245,6 +263,7 @@ func init() { backupCmds.AddCommand(cmd) cmd.Flags().AddFlagSet(backupFlags) } + cli.DebugCmd.AddCommand(backupCmds) } func newBlobFactory(ctx context.Context, dialing roachpb.NodeID) (blobs.BlobClient, error) { @@ -403,7 +422,14 @@ func runExportDataCmd(cmd *cobra.Command, args []string) error { manifests = append(manifests, manifest) } - endTime, err := evalAsOfTimestamp(debugBackupArgs.readTime) + if debugBackupArgs.withRevisions && manifests[0].MVCCFilter != backupccl.MVCCFilter_All { + return errors.WithHintf( + errors.Newf("invalid flag: %s", cliflags.ExportRevisions.Name), + "requires backup created with %q", backupOptRevisionHistory, + ) + } + + endTime, err := evalAsOfTimestamp(debugBackupArgs.readTime, manifests) if err != nil { return errors.Wrapf(err, "eval as of timestamp %s", debugBackupArgs.readTime) } @@ -427,9 +453,11 @@ func runExportDataCmd(cmd *cobra.Command, args []string) error { return nil } -func evalAsOfTimestamp(readTime string) (hlc.Timestamp, error) { +func evalAsOfTimestamp( + readTime string, manifests []backupccl.BackupManifest, +) (hlc.Timestamp, error) { if readTime == "" { - return hlc.Timestamp{}, nil + return manifests[len(manifests)-1].EndTime, nil } var err error // Attempt to parse as timestamp. @@ -468,8 +496,14 @@ func showData( } defer rf.Close(ctx) + if debugBackupArgs.withRevisions { + startT := entry.LastSchemaChangeTime.GoTime().UTC() + endT := endTime.GoTime().UTC() + fmt.Fprintf(os.Stderr, "DETECTED SCHEMA CHANGE AT %s, ONLY SHOWING UPDATES IN RANGE [%s, %s]\n", startT, startT, endT) + } + for _, files := range entry.Files { - if err := processEntryFiles(ctx, rf, files, entry.Span, endTime, writer); err != nil { + if err := processEntryFiles(ctx, rf, files, entry.Span, entry.LastSchemaChangeTime, endTime, writer); err != nil { return err } if debugBackupArgs.maxRows != 0 && debugBackupArgs.rowCount >= debugBackupArgs.maxRows { @@ -531,17 +565,31 @@ func makeRowFetcher( ) (row.Fetcher, error) { var colIdxMap catalog.TableColMap var valNeededForCol util.FastIntSet + colDescs := make([]catalog.Column, len(entry.Desc.PublicColumns())) for i, col := range entry.Desc.PublicColumns() { colIdxMap.Set(col.GetID(), i) valNeededForCol.Add(i) + colDescs[i] = col + } + + if debugBackupArgs.withRevisions { + newIndex := len(entry.Desc.PublicColumns()) + newCol, err := entry.Desc.FindColumnWithName(colinfo.MVCCTimestampColumnName) + if err != nil { + return row.Fetcher{}, errors.Wrapf(err, "get mvcc timestamp column") + } + colIdxMap.Set(newCol.GetID(), newIndex) + valNeededForCol.Add(newIndex) + colDescs = append(colDescs, newCol) } + table := row.FetcherTableArgs{ Spans: []roachpb.Span{entry.Span}, Desc: entry.Desc, Index: entry.Desc.GetPrimaryIndex(), ColIdxMap: colIdxMap, IsSecondaryIndex: false, - Cols: entry.Desc.PublicColumns(), + Cols: colDescs, ValNeededForCol: valNeededForCol, } @@ -567,6 +615,7 @@ func processEntryFiles( rf row.Fetcher, files backupccl.EntryFiles, span roachpb.Span, + startTime hlc.Timestamp, endTime hlc.Timestamp, writer *csv.Writer, ) (err error) { @@ -592,7 +641,7 @@ func processEntryFiles( startKeyMVCC.Key = roachpb.Key(debugBackupArgs.startKey.rawByte) } } - kvFetcher := row.MakeBackupSSTKVFetcher(startKeyMVCC, endKeyMVCC, iter, endTime) + kvFetcher := row.MakeBackupSSTKVFetcher(startKeyMVCC, endKeyMVCC, iter, startTime, endTime, debugBackupArgs.withRevisions) if err := rf.StartScanFrom(ctx, &kvFetcher); err != nil { return errors.Wrapf(err, "row fetcher starts scan") @@ -608,6 +657,16 @@ func processEntryFiles( } rowDisplay := make([]string, datums.Len()) for i, datum := range datums { + + if debugBackupArgs.withRevisions && i == datums.Len()-1 { + approx, err := tree.DecimalToInexactDTimestamp(datum.(*tree.DDecimal)) + if err != nil { + return errors.Wrapf(err, "convert datum %s to mvcc timestamp", datum) + } + rowDisplay[i] = approx.UTC().String() + break + } + if datum == tree.DNull { rowDisplay[i] = debugBackupArgs.nullas } else { diff --git a/pkg/ccl/cliccl/debug_backup_test.go b/pkg/ccl/cliccl/debug_backup_test.go index cb749cc01898..89a9f35e71b4 100644 --- a/pkg/ccl/cliccl/debug_backup_test.go +++ b/pkg/ccl/cliccl/debug_backup_test.go @@ -540,9 +540,10 @@ func TestExportDataAOST(t *testing.T) { require.NoError(t, err) expectedError := fmt.Sprintf( "ERROR: fetching entry: unknown read time: %s\n"+ - "HINT: reading data for requested time requires that BACKUP was created with \"revision_history\" "+ + "HINT: reading data for requested time requires that BACKUP was created with %q "+ "or should specify the time to be an exact backup time, nearest backup time is %s\n", timeutil.Unix(0, ts.WallTime).UTC(), + backupOptRevisionHistory, timeutil.Unix(0, ts1.WallTime).UTC()) checkExpectedOutput(t, expectedError, out) }) @@ -693,6 +694,140 @@ func TestExportDataAOST(t *testing.T) { } } +func TestExportDataWithRevisions(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + c := cli.NewCLITest(cli.TestCLIParams{T: t, NoServer: true}) + defer c.Cleanup() + + ctx := context.Background() + dir, cleanFn := testutils.TempDir(t) + defer cleanFn() + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir, Insecure: true}) + defer srv.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE DATABASE testDB`) + sqlDB.Exec(t, `USE testDB`) + sqlDB.Exec(t, `CREATE TABLE fooTable (id INT PRIMARY KEY, value INT, tag STRING)`) + + const backupPath = "nodelocal://0/fooFolder" + const backupPathWithRev = "nodelocal://0/fooFolderRev" + + sqlDB.Exec(t, `INSERT INTO fooTable VALUES (1, 123, 'cat')`) + var tsInsert time.Time + sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=1`).Scan(&tsInsert) + + sqlDB.Exec(t, `INSERT INTO fooTable VALUES (2, 223, 'dog')`) + var tsInsert2 time.Time + sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=2`).Scan(&tsInsert2) + + ts1 := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO $1 AS OF SYSTEM TIME '%s'`, ts1.AsOfSystemTime()), backupPath) + sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO $1 AS OF SYSTEM TIME '%s' WITH revision_history`, ts1.AsOfSystemTime()), backupPathWithRev) + + sqlDB.Exec(t, `ALTER TABLE fooTable ADD COLUMN active BOOL`) + sqlDB.Exec(t, `INSERT INTO fooTable VALUES (3, 323, 'mickey mouse', true)`) + var tsInsert3 time.Time + sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=3`).Scan(&tsInsert3) + ts2 := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO $1 AS OF SYSTEM TIME '%s' WITH revision_history`, ts2.AsOfSystemTime()), backupPathWithRev) + + sqlDB.Exec(t, `SET sql_safe_updates=false`) + sqlDB.Exec(t, `ALTER TABLE fooTable DROP COLUMN value`) + var tsDropColumn time.Time + sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=3`).Scan(&tsDropColumn) + + sqlDB.Exec(t, `UPDATE fooTable SET tag=('lion') WHERE id = 1`) + var tsUpdate time.Time + sqlDB.QueryRow(t, `SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) from fooTable where id=1`).Scan(&tsUpdate) + + ts3 := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO $1 AS OF SYSTEM TIME '%s' WITH revision_history`, ts3.AsOfSystemTime()), backupPathWithRev) + + sqlDB.Exec(t, `CREATE INDEX extra ON fooTable (id)`) + ts4 := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + sqlDB.Exec(t, fmt.Sprintf(`BACKUP TO $1 AS OF SYSTEM TIME '%s' WITH revision_history`, ts4.AsOfSystemTime()), backupPathWithRev) + + t.Run("show-data-revisions-of-backup-without-revision-history", func(t *testing.T) { + setDebugContextDefault() + out, err := c.RunWithCapture(fmt.Sprintf("debug backup export %s --table=%s --with-revisions --external-io-dir=%s", + backupPath, + "testDB.public.fooTable", + dir)) + require.NoError(t, err) + expectedError := "ERROR: invalid flag: with-revisions\nHINT: requires backup created with \"revision_history\"\n" + checkExpectedOutput(t, expectedError, out) + }) + + testCases := []struct { + name string + tableName string + backupPaths []string + expectedData string + upToTimestamp string + }{ + { + "show-data-revisions-of-a-single-full-backup", + "testDB.public.fooTable", + []string{ + backupPathWithRev, + }, + fmt.Sprintf("1,123,'cat',%s\n2,223,'dog',%s\n", tsInsert.UTC(), tsInsert2.UTC()), + "", + }, + { + "show-data-revisions-after-adding-an-colum", + "testDB.public.fooTable", + []string{ + backupPathWithRev, + backupPathWithRev + ts2.GoTime().Format(backupccl.DateBasedIncFolderName), + backupPathWithRev + ts3.GoTime().Format(backupccl.DateBasedIncFolderName), + }, + fmt.Sprintf("3,323,'mickey mouse',true,%s\n", tsInsert3.UTC()), + ts2.AsOfSystemTime(), + }, + { + "show-data-revisions-after-dropping-an-colum-and-update-value", + "testDB.public.fooTable", + []string{ + backupPathWithRev, + backupPathWithRev + ts2.GoTime().Format(backupccl.DateBasedIncFolderName), + backupPathWithRev + ts3.GoTime().Format(backupccl.DateBasedIncFolderName), + }, + fmt.Sprintf("1,'lion',null,%s\n1,'cat',null,%s\n2,'dog',null,%s\n3,'mickey mouse',true,%s\n", + tsUpdate.UTC(), tsDropColumn.UTC(), tsDropColumn.UTC(), tsDropColumn.UTC()), + ts3.AsOfSystemTime(), + }, { + "show-data-revisions-after-adding-index", + "testDB.public.fooTable", + []string{ + backupPathWithRev, + backupPathWithRev + ts2.GoTime().Format(backupccl.DateBasedIncFolderName), + backupPathWithRev + ts3.GoTime().Format(backupccl.DateBasedIncFolderName), + backupPathWithRev + ts4.GoTime().Format(backupccl.DateBasedIncFolderName), + }, + fmt.Sprintf("1,'lion',null,%s\n1,'cat',null,%s\n2,'dog',null,%s\n3,'mickey mouse',true,%s\n", + tsUpdate.UTC(), tsDropColumn.UTC(), tsDropColumn.UTC(), tsDropColumn.UTC()), + ts4.AsOfSystemTime(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + setDebugContextDefault() + out, err := c.RunWithCapture(fmt.Sprintf("debug backup export %s --table=%s --with-revisions --up-to=%s --external-io-dir=%s", + strings.Join(tc.backupPaths, " "), + tc.tableName, + tc.upToTimestamp, + dir)) + require.NoError(t, err) + checkExpectedOutput(t, tc.expectedData, out) + }) + } +} + func checkExpectedOutput(t *testing.T, expected string, out string) { endOfCmd := strings.Index(out, "\n") output := out[endOfCmd+1:] diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go index 8078af627ccc..3cbf54a7e45e 100644 --- a/pkg/cli/cliflags/flags.go +++ b/pkg/cli/cliflags/flags.go @@ -1572,4 +1572,14 @@ The bytekey format does not require table-key prefix.`, Name: "max-rows", Description: `Maximum number of rows to return (Default 0 is unlimited).`, } + + ExportRevisions = FlagInfo{ + Name: "with-revisions", + Description: `Export revisions of data from a backup table since the last schema change.`, + } + + ExportRevisionsUpTo = FlagInfo{ + Name: "up-to", + Description: `Export revisions of data from a backup table up to a specific timestamp.`, + } ) diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 1f3578c0371b..63ea0880b4dc 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -169,22 +169,30 @@ func (f *SpanKVFetcher) nextBatch( func (f *SpanKVFetcher) close(context.Context) {} // BackupSSTKVFetcher is a kvBatchFetcher that wraps storage.SimpleMVCCIterator -// and returns a single kv from backupSST. +// and returns a batch of kv from backupSST. type BackupSSTKVFetcher struct { - iter storage.SimpleMVCCIterator - endKeyMVCC storage.MVCCKey - endTime hlc.Timestamp + iter storage.SimpleMVCCIterator + endKeyMVCC storage.MVCCKey + startTime hlc.Timestamp + endTime hlc.Timestamp + withRevisions bool } // MakeBackupSSTKVFetcher creates a BackupSSTKVFetcher and // advances the iter to the first key >= startKeyMVCC func MakeBackupSSTKVFetcher( - startKeyMVCC, endKeyMVCC storage.MVCCKey, iter storage.SimpleMVCCIterator, endTime hlc.Timestamp, + startKeyMVCC, endKeyMVCC storage.MVCCKey, + iter storage.SimpleMVCCIterator, + startTime hlc.Timestamp, + endTime hlc.Timestamp, + withRev bool, ) BackupSSTKVFetcher { res := BackupSSTKVFetcher{ iter, endKeyMVCC, + startTime, endTime, + withRev, } res.iter.SeekGE(startKeyMVCC) return res @@ -193,6 +201,18 @@ func MakeBackupSSTKVFetcher( func (f *BackupSSTKVFetcher) nextBatch( _ context.Context, ) (ok bool, kvs []roachpb.KeyValue, batchResponse []byte, span roachpb.Span, err error) { + res := make([]roachpb.KeyValue, 0) + + copyKV := func(mvccKey storage.MVCCKey, value []byte) roachpb.KeyValue { + keyCopy := make([]byte, len(mvccKey.Key)) + copy(keyCopy, mvccKey.Key) + valueCopy := make([]byte, len(value)) + copy(valueCopy, value) + return roachpb.KeyValue{ + Key: keyCopy, + Value: roachpb.Value{RawBytes: valueCopy, Timestamp: mvccKey.Timestamp}, + } + } for { valid, err := f.iter.Valid() @@ -200,46 +220,49 @@ func (f *BackupSSTKVFetcher) nextBatch( err = errors.Wrapf(err, "iter key value of table data") return false, nil, nil, roachpb.Span{}, err } + if !valid || !f.iter.UnsafeKey().Less(f.endKeyMVCC) { - return false, nil, nil, roachpb.Span{}, nil + break } + if !f.endTime.IsEmpty() { if f.endTime.Less(f.iter.UnsafeKey().Timestamp) { f.iter.Next() continue } } - if len(f.iter.UnsafeValue()) == 0 { - if f.endTime.IsEmpty() || f.iter.UnsafeKey().Timestamp.Less(f.endTime) { - // Value is deleted at endTime. + + if f.withRevisions { + if f.iter.UnsafeKey().Timestamp.Less(f.startTime) { f.iter.NextKey() continue - } else { - // Otherwise we call Next to trace back the correct revision. - f.iter.Next() - continue + } + } else { + if len(f.iter.UnsafeValue()) == 0 { + if f.endTime.IsEmpty() || f.iter.UnsafeKey().Timestamp.Less(f.endTime) { + // Value is deleted at endTime. + f.iter.NextKey() + continue + } else { + // Otherwise we call Next to trace back the correct revision. + f.iter.Next() + continue + } } } - break - } - - keyScratch := f.iter.UnsafeKey().Key - keyCopy := make([]byte, len(keyScratch)) - copy(keyCopy, keyScratch) - - valueScratch := f.iter.UnsafeValue() - valueCopy := make([]byte, len(valueScratch)) - copy(valueCopy, valueScratch) - value := roachpb.Value{RawBytes: valueCopy, Timestamp: f.iter.UnsafeKey().Timestamp} + res = append(res, copyKV(f.iter.UnsafeKey(), f.iter.UnsafeValue())) - res := []roachpb.KeyValue{{ - Key: keyCopy, - Value: value, - }} - - f.iter.NextKey() + if f.withRevisions { + f.iter.Next() + } else { + f.iter.NextKey() + } + } + if len(res) == 0 { + return false, nil, nil, roachpb.Span{}, err + } return true, res, nil, roachpb.Span{}, nil }