diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index 8b76c42f563d..f421d257a9b0 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -327,6 +328,15 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // + expect-pausepoint: expects the schema change job to end up in a paused state because // of a pausepoint error. // +// - "kv" [args] +// Issues a kv request +// +// Supported arguments: +// +// + type: kv request type. Currently, only DeleteRange is supported +// +// + target: SQL target. Currently, only table names are supported. +// // - "nudge-and-wait-for-temp-cleanup" // Nudges the temporary object reconciliation loop to run, and waits for completion. func TestDataDriven(t *testing.T) { @@ -626,6 +636,15 @@ func TestDataDriven(t *testing.T) { } return "" + case "kv": + var request string + d.ScanArgs(t, "request", &request) + + var target string + d.ScanArgs(t, "target", &target) + handleKVRequest(ctx, t, lastCreatedServer, ds, request, target) + return "" + case "save-cluster-ts": server := lastCreatedServer user := "root" @@ -677,6 +696,32 @@ func TestDataDriven(t *testing.T) { }) } +func handleKVRequest( + ctx context.Context, t *testing.T, server string, ds datadrivenTestState, request, target string, +) { + user := "root" + if request == "DeleteRange" { + var tableID uint32 + err := ds.getSQLDB(t, server, user).QueryRow(`SELECT id FROM system.namespace WHERE name = $1`, + target).Scan(&tableID) + require.NoError(t, err) + bankSpan := makeTableSpan(tableID) + dr := roachpb.DeleteRangeRequest{ + // Bogus span to make it a valid request. + RequestHeader: roachpb.RequestHeader{ + Key: bankSpan.Key, + EndKey: bankSpan.EndKey, + }, + UseRangeTombstone: true, + } + if _, err := kv.SendWrapped(ctx, ds.servers[server].DistSenderI().(*kvcoord.DistSender), &dr); err != nil { + t.Fatal(err) + } + } else { + t.Fatalf("Unknown kv request") + } +} + // findMostRecentJobWithType returns the most recently created job of `job_type` // jobType. func findMostRecentJobWithType( diff --git a/pkg/ccl/backupccl/file_sst_sink.go b/pkg/ccl/backupccl/file_sst_sink.go index 3de271d2974c..b5c49da6d13e 100644 --- a/pkg/ccl/backupccl/file_sst_sink.go +++ b/pkg/ccl/backupccl/file_sst_sink.go @@ -268,32 +268,16 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error { log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName) - // Copy SST content. - sst, err := storage.NewMemSSTIterator(resp.dataSST, false) - if err != nil { + // To speed up SST reading, surface all the point keys first, flush, + // then surface all the range keys and flush. + // + // TODO(msbutler): investigate using single a single iterator that surfaces + // all point keys first and then all range keys + if err := s.copyPointKeys(resp.dataSST); err != nil { return err } - defer sst.Close() - - sst.SeekGE(storage.MVCCKey{Key: keys.MinKey}) - for { - if valid, err := sst.Valid(); !valid || err != nil { - if err != nil { - return err - } - break - } - k := sst.UnsafeKey() - if k.Timestamp.IsEmpty() { - if err := s.sst.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil { - return err - } - } else { - if err := s.sst.PutRawMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil { - return err - } - } - sst.Next() + if err := s.copyRangeKeys(resp.dataSST); err != nil { + return err } // If this span extended the last span added -- that is, picked up where it @@ -328,6 +312,66 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error { return nil } +func (s *fileSSTSink) copyPointKeys(dataSST []byte) error { + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storage.NewPebbleMemSSTIterator(dataSST, false, iterOpts) + if err != nil { + return err + } + defer iter.Close() + + for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() { + if valid, err := iter.Valid(); !valid || err != nil { + if err != nil { + return err + } + break + } + k := iter.UnsafeKey() + if k.Timestamp.IsEmpty() { + if err := s.sst.PutUnversioned(k.Key, iter.UnsafeValue()); err != nil { + return err + } + } else { + if err := s.sst.PutRawMVCC(iter.UnsafeKey(), iter.UnsafeValue()); err != nil { + return err + } + } + } + return nil +} + +func (s *fileSSTSink) copyRangeKeys(dataSST []byte) error { + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypeRangesOnly, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storage.NewPebbleMemSSTIterator(dataSST, false, iterOpts) + if err != nil { + return err + } + defer iter.Close() + + for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return err + } else if !ok { + break + } + for _, rkv := range iter.RangeKeys() { + if err := s.sst.PutRawMVCCRangeKey(rkv.RangeKey, rkv.Value); err != nil { + return err + } + } + } + return nil +} + func generateUniqueSSTName(nodeID base.SQLInstanceID) string { // The data/ prefix, including a /, is intended to group SSTs in most of the // common file/bucket browse UIs. diff --git a/pkg/ccl/backupccl/testdata/backup-restore/rangekeys b/pkg/ccl/backupccl/testdata/backup-restore/rangekeys new file mode 100644 index 000000000000..ba9be196e212 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/rangekeys @@ -0,0 +1,84 @@ +# Tests that Backups without Revisions History and Restore properly handle +# range keys + +new-server name=s1 +---- + +exec-sql +CREATE DATABASE orig; +USE orig; +CREATE TABLE foo (i INT PRIMARY KEY, s STRING); +INSERT INTO foo VALUES (1, 'x'),(2,'y'); +CREATE TABLE baz (i INT PRIMARY KEY, s STRING); +INSERT INTO baz VALUES (11, 'xx'),(22,'yy'); +---- + +# Ensure a full backup properly captures range keys +# - with foo, delete then insert, and ensure no original data surfaces in restore +# - with baz: chill for now + +kv request=DeleteRange target=foo +---- + +exec-sql +INSERT INTO foo VALUES (3,'z'); +---- + +exec-sql +BACKUP INTO 'nodelocal://0/test-root/'; +---- + +exec-sql +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' with new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo; +---- +1 + +query-sql +SELECT count(*) from orig1.baz; +---- +2 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +# Ensure incremental backup without revision history +# handles range tombstones: +# - with foo, insert and ensure latest data from previous backup surfaces in RESTORE +# - with baz, delete then insert, and ensure no data from previous backup surfaces in RESTORE + +exec-sql +INSERT INTO foo VALUES (4,'a'),(5,'b'); +---- + +kv request=DeleteRange target=baz +---- + +exec-sql +INSERT INTO baz VALUES (33,'zz'); +---- + +exec-sql +BACKUP INTO LATEST IN 'nodelocal://0/test-root/'; +---- + +exec-sql +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' with new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +3 + +query-sql +SELECT count(*) from orig1.baz +---- +1 + + + diff --git a/pkg/ccl/backupccl/testdata/backup-restore/rangekeys-revision-history b/pkg/ccl/backupccl/testdata/backup-restore/rangekeys-revision-history new file mode 100644 index 000000000000..0516db04b08b --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/rangekeys-revision-history @@ -0,0 +1,148 @@ +# Tests that Backups with Revisions History and As Of System Time +# Restore properly handle range keys in tables foo and baz +# - t0: inital dataset +# - t1: delrange on foo +# - t2: one insert in foo +# - full backup +# - t3: 2 inserts in foo; delrange on baz +# - t4: one insert in baz +# - incremental backup + +new-server name=s1 +---- + +exec-sql +CREATE DATABASE orig; +USE orig; +CREATE TABLE foo (i INT PRIMARY KEY, s STRING); +INSERT INTO foo VALUES (1, 'x'),(2,'y'); +CREATE TABLE baz (i INT PRIMARY KEY, s STRING); +INSERT INTO baz VALUES (11, 'xx'),(22,'yy'); +---- + +save-cluster-ts tag=t0 +---- + +kv request=DeleteRange target=foo +---- + +save-cluster-ts tag=t1 +---- + +exec-sql +INSERT INTO foo VALUES (3,'z'); +---- + +save-cluster-ts tag=t2 +---- + +exec-sql +BACKUP INTO 'nodelocal://0/test-root/' with revision_history; +---- + +exec-sql +INSERT INTO foo VALUES (4,'a'),(5,'b'); +---- + +kv request=DeleteRange target=baz +---- + +save-cluster-ts tag=t3 +---- + +exec-sql +INSERT INTO baz VALUES (33,'zz'); +---- + +save-cluster-ts tag=t4 +---- + +exec-sql +BACKUP INTO LATEST IN 'nodelocal://0/test-root/' with revision_history; +---- + +restore aost=t0 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t0 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +2 + +query-sql +SELECT count(*) from orig1.baz +---- +2 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +restore aost=t1 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t1 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +0 + +query-sql +SELECT count(*) from orig1.baz +---- +2 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +restore aost=t2 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t2 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +1 + +query-sql +SELECT count(*) from orig1.baz +---- +2 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +restore aost=t3 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t3 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +3 + +query-sql +SELECT count(*) from orig1.baz +---- +0 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +restore aost=t4 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t4 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +3 + +query-sql +SELECT count(*) from orig1.baz +---- +1 diff --git a/pkg/kv/kvclient/revision_reader.go b/pkg/kv/kvclient/revision_reader.go index f51292a3e0f2..e3f87eaa8845 100644 --- a/pkg/kv/kvclient/revision_reader.go +++ b/pkg/kv/kvclient/revision_reader.go @@ -48,7 +48,12 @@ func GetAllRevisions( var res []VersionedValues for _, file := range resp.(*roachpb.ExportResponse).Files { - iter, err := storage.NewMemSSTIterator(file.SST, false) + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: file.Span.Key, + UpperBound: file.Span.EndKey, + } + iter, err := storage.NewPebbleMemSSTIterator(file.SST, true, iterOpts) if err != nil { return nil, err }