From b95fb60ba0721539d82e16cac66b40379f2fd307 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 22 Jan 2020 11:45:33 -0500 Subject: [PATCH] libroach,engine: support pagination of ExportToSst This commit extends the engine interface to take a targetSize parameter in the ExportToSst method. The iteration will stope if the first version of a key to be added to the SST would lead to targetSize being exceeded. If exportAllRevisions is false, the targetSize will not be exceeded unless the first kv pair exceeds it. This commit additionally fixes a bug in the rocksdb implementation of DBExportToSst whereby the first key in the export request would be skipped. This case likely never occurred because the key passed to Export was rarely exactly the first key to be included (see the change related to seek_key in db.cc). The exportccl.TestRandomKeyAndTimestampExport was extended to excercise various targetSize limits. That test run under stress with the tee engine inspires some confidence and did catch the above mentioned bug. More testing would likely be good. This commit leaves the task of adopting the targetSize parameter for later. Fixes #39717. Release note: None --- c-deps/libroach/db.cc | 46 ++++++++++++++++--- c-deps/libroach/include/libroach.h | 15 ++++-- pkg/ccl/storageccl/export.go | 12 +++-- pkg/ccl/storageccl/export_test.go | 73 ++++++++++++++++++------------ pkg/storage/engine/engine.go | 14 +++++- pkg/storage/engine/pebble.go | 46 +++++++++++++------ pkg/storage/engine/pebble_batch.go | 3 +- pkg/storage/engine/rocksdb.go | 29 +++++++----- pkg/storage/engine/tee.go | 42 +++++++++++------ pkg/storage/spanset/batch.go | 5 +- 10 files changed, 198 insertions(+), 87 deletions(-) diff --git a/c-deps/libroach/db.cc b/c-deps/libroach/db.cc index 414675c7f7d0..5021df27166b 100644 --- a/c-deps/libroach/db.cc +++ b/c-deps/libroach/db.cc @@ -1075,9 +1075,9 @@ DBStatus DBUnlockFile(DBFileLock lock) { return ToDBStatus(rocksdb::Env::Default()->UnlockFile((rocksdb::FileLock*)lock)); } -DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts, - DBEngine* engine, DBString* data, DBString* write_intent, - DBString* summary) { +DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size, + DBIterOptions iter_opts, DBEngine* engine, DBString* data, + DBString* write_intent, DBString* summary, DBString* resume) { DBSstFileWriter* writer = DBSstFileWriterNew(); DBStatus status = DBSstFileWriterOpen(writer); if (status.data != NULL) { @@ -1092,14 +1092,23 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter bool skip_current_key_versions = !export_all_revisions; DBIterState state; const std::string end_key = EncodeKey(end); - for (state = iter.seek(start);; state = iter.next(skip_current_key_versions)) { + // cur_key is used when paginated is true and export_all_revisions is + // true. If we're exporting all revisions and we're returning a paginated + // SST then we need to keep track of when we've finished adding all of the + // versions of a key to the writer. + bool paginated = target_size > 0; + std::string cur_key; + std::string resume_key; + // Seek to the MVCC metadata key for the provided start key and let the + // incremental iterator find the appropriate version. + DBKey seek_key = { .key = start.key }; + for (state = iter.seek(seek_key);; state = iter.next(skip_current_key_versions)) { if (state.status.data != NULL) { DBSstFileWriterClose(writer); return state.status; } else if (!state.valid || kComparator.Compare(iter.key(), end_key) >= 0) { break; } - rocksdb::Slice decoded_key; int64_t wall_time = 0; int32_t logical_time = 0; @@ -1109,6 +1118,14 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter return ToDBString("Unable to decode key"); } + bool is_new_key = !export_all_revisions || decoded_key.compare(rocksdb::Slice(cur_key)) != 0; + if (paginated && export_all_revisions && is_new_key) { + // Reuse the underlying buffer in cur_key. + cur_key.clear(); + cur_key.reserve(decoded_key.size()); + cur_key.assign(decoded_key.data(), decoded_key.size()); + } + // Skip tombstone (len=0) records when start time is zero (non-incremental) // and we are not exporting all versions. bool is_skipping_deletes = start.wall_time == 0 && start.logical == 0 && !export_all_revisions; @@ -1116,6 +1133,17 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter continue; } + // Check to see if this is the first version of key and adding it would + // put us over the limit (we might already be over the limit). + int64_t cur_size = bulkop_summary.data_size(); + int64_t new_size = cur_size + decoded_key.size() + iter.value().size(); + bool is_over_target = paginated && cur_size > 0 && new_size > target_size; + if (is_new_key && is_over_target) { + resume_key.reserve(decoded_key.size()); + resume_key.assign(decoded_key.data(), decoded_key.size()); + break; + } + // Insert key into sst and update statistics. status = DBSstFileWriterAddRaw(writer, iter.key(), iter.value()); if (status.data != NULL) { @@ -1126,8 +1154,7 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter if (!row_counter.Count((iter.key()), &bulkop_summary)) { return ToDBString("Error in row counter"); } - bulkop_summary.set_data_size(bulkop_summary.data_size() + decoded_key.size() + - iter.value().size()); + bulkop_summary.set_data_size(new_size); } *summary = ToDBString(bulkop_summary.SerializeAsString()); @@ -1139,6 +1166,11 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter auto res = DBSstFileWriterFinish(writer, data); DBSstFileWriterClose(writer); + // If we're not returning an error, check to see if we need to return the resume key. + if (res.data == NULL && resume_key.length() > 0) { + *resume = ToDBString(rocksdb::Slice(resume_key)); + } + return res; } diff --git a/c-deps/libroach/include/libroach.h b/c-deps/libroach/include/libroach.h index ea91f8eb7ee4..ad916cac3893 100644 --- a/c-deps/libroach/include/libroach.h +++ b/c-deps/libroach/include/libroach.h @@ -549,14 +549,23 @@ typedef void* DBFileLock; // DBLockFile sets a lock on the specified file using RocksDB's file locking interface. DBStatus DBLockFile(DBSlice filename, DBFileLock* lock); -// DBUnlockFile unlocks the file asscoiated with the specified lock and GCs any allocated memory for +// DBUnlockFile unlocks the file associated with the specified lock and GCs any allocated memory for // the lock. DBStatus DBUnlockFile(DBFileLock lock); // DBExportToSst exports changes over the keyrange and time interval between the // start and end DBKeys to an SSTable using an IncrementalIterator. -DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts, - DBEngine* engine, DBString* data, DBString* write_intent, DBString* summary); +// If target_size is positive, it indicates that the export should produce SSTs +// which are roughly target size. Specifically, it will produce SSTs which contain +// all relevant versions of a key and will not add the first version of a new +// key if it would lead to the SST exceeding the target_size. If export_all_revisions +// is false, the returned SST will be smaller than target_size so long as the first +// kv pair is smaller than target_size. If export_all_revisions is true then +// target_size may be exceeded. If the SST construction stops due to the target_size, +// then resume will be set to the value of the resume key. +DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size, + DBIterOptions iter_opts, DBEngine* engine, DBString* data, + DBString* write_intent, DBString* summary, DBString* resume); // DBEnvOpenReadableFile opens a DBReadableFile in the given engine. DBStatus DBEnvOpenReadableFile(DBEngine* db, DBSlice path, DBReadableFile* file); diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index ef0786d68ec0..9040934a0577 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" + crdberrors "github.com/cockroachdb/errors" "github.com/pkg/errors" ) @@ -130,9 +131,14 @@ func evalExport( } e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey}) - - data, summary, err := e.ExportToSst(args.Key, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, io) - + // TODO(ajwerner): Add a constant or internal cluster setting to control the + // target size for files and then paginate the actual export. The external + // API may need to be modified to deal with the case where ReturnSST is true. + const targetSize = 0 // unlimited + data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, targetSize, io) + if resume != nil { + return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size") + } if err != nil { return result.Result{}, err } diff --git a/pkg/ccl/storageccl/export_test.go b/pkg/ccl/storageccl/export_test.go index 1fc33d513900..56728d5fecb7 100644 --- a/pkg/ccl/storageccl/export_test.go +++ b/pkg/ccl/storageccl/export_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" ) func TestExportCmd(t *testing.T) { @@ -302,6 +303,7 @@ func assertEqualKVs( startTime, endTime hlc.Timestamp, exportAllRevisions bool, enableTimeBoundIteratorOptimization bool, + targetSize uint64, ) func(*testing.T) { return func(t *testing.T) { t.Helper() @@ -328,17 +330,19 @@ func assertEqualKVs( io.MaxTimestampHint = endTime io.MinTimestampHint = startTime.Next() } - sst, _, err := e.ExportToSst(startKey, endKey, startTime, endTime, exportAllRevisions, io) - if err != nil { - t.Fatal(err) + var kvs []engine.MVCCKeyValue + for start := startKey; start != nil; { + var sst []byte + sst, _, start, err = e.ExportToSst(start, endKey, startTime, endTime, exportAllRevisions, targetSize, io) + require.NoError(t, err) + loaded := loadSST(t, sst, startKey, endKey) + kvs = append(kvs, loaded...) } // Compare new C++ implementation against the oracle. expectedKVS := loadSST(t, expected, startKey, endKey) - kvs := loadSST(t, sst, startKey, endKey) - if len(kvs) != len(expectedKVS) { - t.Fatalf("got %d kvs (%+v) but expected %d (%+v)", len(kvs), kvs, len(expected), expected) + t.Fatalf("got %d kvs but expected %d:\n%v\n%v", len(kvs), len(expectedKVS), kvs, expectedKVS) } for i := range kvs { @@ -430,27 +434,40 @@ func TestRandomKeyAndTimestampExport(t *testing.T) { timestamps[i].Logical < timestamps[j].Logical) }) + testWithTargetSize := func(t *testing.T, targetSize uint64) { + if testing.Short() && targetSize > 0 && targetSize < 1<<15 { + t.Skipf("testing with size %d is slow", targetSize) + } + t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false, targetSize)) + t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false, targetSize)) + t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true, targetSize)) + t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true, targetSize)) + + upperBound := randutil.RandIntInRange(rnd, 1, numKeys) + lowerBound := rnd.Intn(upperBound) + + // Exercise random key ranges. + t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false, targetSize)) + t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false, targetSize)) + t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true, targetSize)) + t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true, targetSize)) + + upperBound = randutil.RandIntInRange(rnd, 1, numKeys) + lowerBound = rnd.Intn(upperBound) + + // Exercise random timestamps. + t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false, targetSize)) + t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false, targetSize)) + t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true, targetSize)) + t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true, targetSize)) + } // Exercise min to max time and key ranges. - t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false)) - t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false)) - t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true)) - t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true)) - - upperBound := randutil.RandIntInRange(rnd, 1, numKeys) - lowerBound := rnd.Intn(upperBound) - - // Exercise random key ranges. - t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false)) - t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false)) - t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true)) - t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true)) - - upperBound = randutil.RandIntInRange(rnd, 1, numKeys) - lowerBound = rnd.Intn(upperBound) - - // Exercise random timestamps. - t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false)) - t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false)) - t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true)) - t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true)) + for _, targetSize := range []uint64{ + 0 /* unlimited */, 1 << 10, 1 << 16, 1 << 20, + } { + t.Run(fmt.Sprintf("targetSize=%d", targetSize), func(t *testing.T) { + testWithTargetSize(t, targetSize) + }) + } + } diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index e91b65a8bf44..d2163fb1c1c8 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -200,9 +200,19 @@ type Reader interface { // within the interval is exported. Deletions are included if all revisions are // requested or if the start.Timestamp is non-zero. Returns the bytes of an // SSTable containing the exported keys, the size of exported data, or an error. + // If targetSize is positive, it indicates that the export should produce SSTs + // which are roughly target size. Specifically, it will produce SSTs which contain + // all relevant versions of a key and will not add the first version of a new + // key if it would lead to the SST exceeding the targetSize. If exportAllRevisions + // is false, the returned SST will be smaller than target_size so long as the first + // kv pair is smaller than targetSize. If exportAllRevisions is true then + // targetSize may be exceeded by as much as the size of all of the versions of + // the last key. If the SST construction stops due to the targetSize, + // then a non-nil resumeKey will be returned. ExportToSst( - startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, io IterOptions, - ) ([]byte, roachpb.BulkOpSummary, error) + startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, + exportAllRevisions bool, targetSize uint64, io IterOptions, + ) (sst []byte, _ roachpb.BulkOpSummary, resumeKey roachpb.Key, _ error) // Get returns the value for the given key, nil otherwise. // // Deprecated: use MVCCGet instead. diff --git a/pkg/storage/engine/pebble.go b/pkg/storage/engine/pebble.go index 9a6182ca09d7..da0514847d52 100644 --- a/pkg/storage/engine/pebble.go +++ b/pkg/storage/engine/pebble.go @@ -492,9 +492,10 @@ func (p *Pebble) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } // Get implements the Engine interface. @@ -938,9 +939,10 @@ func (p *pebbleReadOnly) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } func (p *pebbleReadOnly) Get(key MVCCKey) ([]byte, error) { @@ -1059,9 +1061,10 @@ func (p *pebbleSnapshot) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } // Get implements the Reader interface. @@ -1113,8 +1116,9 @@ func pebbleExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { sstFile := &MemFile{} sstWriter := MakeBackupSSTWriter(sstFile) defer sstWriter.Close() @@ -1128,12 +1132,15 @@ func pebbleExportToSst( EndTime: endTS, }) defer iter.Close() + var curKey roachpb.Key // only used if exportAllRevisions + var resumeKey roachpb.Key + paginated := targetSize > 0 for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; { ok, err := iter.Valid() if err != nil { // The error may be a WriteIntentError. In which case, returning it will // cause this command to be retried. - return nil, roachpb.BulkOpSummary{}, err + return nil, roachpb.BulkOpSummary{}, nil, err } if !ok { break @@ -1143,18 +1150,29 @@ func pebbleExportToSst( break } unsafeValue := iter.UnsafeValue() + isNewKey := !exportAllRevisions || unsafeKey.Key.Compare(curKey) != 0 + if paginated && exportAllRevisions && isNewKey { + curKey = append(curKey[:0], unsafeKey.Key...) + } // Skip tombstone (len=0) records when start time is zero (non-incremental) // and we are not exporting all versions. skipTombstones := !exportAllRevisions && startTS.IsEmpty() if len(unsafeValue) > 0 || !skipTombstones { if err := rows.Count(unsafeKey.Key); err != nil { - return nil, roachpb.BulkOpSummary{}, errors.Wrapf(err, "decoding %s", unsafeKey) + return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "decoding %s", unsafeKey) + } + curSize := rows.BulkOpSummary.DataSize + newSize := curSize + int64(len(unsafeKey.Key)+len(unsafeValue)) + isOverTarget := paginated && curSize > 0 && uint64(newSize) > targetSize + if isNewKey && isOverTarget { + resumeKey = append(roachpb.Key{}, unsafeKey.Key...) // allocate the right size + break } - rows.BulkOpSummary.DataSize += int64(len(unsafeKey.Key) + len(unsafeValue)) if err := sstWriter.Put(unsafeKey, unsafeValue); err != nil { - return nil, roachpb.BulkOpSummary{}, errors.Wrapf(err, "adding key %s", unsafeKey) + return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "adding key %s", unsafeKey) } + rows.BulkOpSummary.DataSize = newSize } if exportAllRevisions { @@ -1167,12 +1185,12 @@ func pebbleExportToSst( if rows.BulkOpSummary.DataSize == 0 { // If no records were added to the sstable, skip completing it and return a // nil slice – the export code will discard it anyway (based on 0 DataSize). - return nil, roachpb.BulkOpSummary{}, nil + return nil, roachpb.BulkOpSummary{}, nil, nil } if err := sstWriter.Finish(); err != nil { - return nil, roachpb.BulkOpSummary{}, err + return nil, roachpb.BulkOpSummary{}, nil, err } - return sstFile.Data(), rows.BulkOpSummary, nil + return sstFile.Data(), rows.BulkOpSummary, resumeKey, nil } diff --git a/pkg/storage/engine/pebble_batch.go b/pkg/storage/engine/pebble_batch.go index edecf4ba1294..5fb91a80f4cc 100644 --- a/pkg/storage/engine/pebble_batch.go +++ b/pkg/storage/engine/pebble_batch.go @@ -93,8 +93,9 @@ func (p *pebbleBatch) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { panic("unimplemented") } diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 409a24e0ffb9..87266e0f1093 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -780,36 +780,38 @@ func (r *RocksDB) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { start := MVCCKey{Key: startKey, Timestamp: startTS} end := MVCCKey{Key: endKey, Timestamp: endTS} var data C.DBString var intentErr C.DBString var bulkopSummary C.DBString + var resumeKey C.DBString err := statusToError(C.DBExportToSst(goToCKey(start), goToCKey(end), C.bool(exportAllRevisions), - goToCIterOptions(io), r.rdb, &data, &intentErr, &bulkopSummary)) + C.uint64_t(targetSize), goToCIterOptions(io), r.rdb, &data, &intentErr, &bulkopSummary, &resumeKey)) if err != nil { if err.Error() == "WriteIntentError" { var e roachpb.WriteIntentError if err := protoutil.Unmarshal(cStringToGoBytes(intentErr), &e); err != nil { - return nil, roachpb.BulkOpSummary{}, errors.Wrap(err, "failed to decode write intent error") + return nil, roachpb.BulkOpSummary{}, nil, errors.Wrap(err, "failed to decode write intent error") } - return nil, roachpb.BulkOpSummary{}, &e + return nil, roachpb.BulkOpSummary{}, nil, &e } - return nil, roachpb.BulkOpSummary{}, err + return nil, roachpb.BulkOpSummary{}, nil, err } var summary roachpb.BulkOpSummary if err := protoutil.Unmarshal(cStringToGoBytes(bulkopSummary), &summary); err != nil { - return nil, roachpb.BulkOpSummary{}, errors.Wrap(err, "failed to decode BulkopSummary") + return nil, roachpb.BulkOpSummary{}, nil, errors.Wrap(err, "failed to decode BulkopSummary") } - return cStringToGoBytes(data), summary, nil + return cStringToGoBytes(data), summary, roachpb.Key(cStringToGoBytes(resumeKey)), nil } // Attrs returns the list of attributes describing this engine. This @@ -1003,9 +1005,10 @@ func (r *rocksDBReadOnly) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } func (r *rocksDBReadOnly) Get(key MVCCKey) ([]byte, error) { @@ -1323,9 +1326,10 @@ func (r *rocksDBSnapshot) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } // Get returns the value for the given key, nil otherwise using @@ -1732,8 +1736,9 @@ func (r *rocksDBBatch) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { panic("unimplemented") } diff --git a/pkg/storage/engine/tee.go b/pkg/storage/engine/tee.go index 9c49b1dc8009..1095ac2cfa2d 100644 --- a/pkg/storage/engine/tee.go +++ b/pkg/storage/engine/tee.go @@ -89,18 +89,22 @@ func (t *TeeEngine) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - eng1Sst, bulkOpSummary, err := t.eng1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) - rocksSst, _, err2 := t.eng2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + eng1Sst, bulkOpSummary, resume1, err := t.eng1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) + rocksSst, _, resume2, err2 := t.eng2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil { - return nil, bulkOpSummary, err + return nil, bulkOpSummary, nil, err } if !bytes.Equal(eng1Sst, rocksSst) { log.Fatalf(t.ctx, "mismatching SSTs returned by engines: %v != %v", eng1Sst, rocksSst) } - return eng1Sst, bulkOpSummary, err + if !resume1.Equal(resume2) { + log.Fatalf(t.ctx, "mismatching resume key returned by engines: %v != %v", resume1, resume2) + } + return eng1Sst, bulkOpSummary, resume1, err } // Get implements the Engine interface. @@ -668,18 +672,22 @@ func (t *TeeEngineReader) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - sst1, bulkOpSummary, err := t.reader1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) - sst2, _, err2 := t.reader2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + sst1, bulkOpSummary, resume1, err := t.reader1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) + sst2, _, resume2, err2 := t.reader2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil { - return nil, bulkOpSummary, err + return nil, bulkOpSummary, nil, err } if !bytes.Equal(sst1, sst2) { log.Fatalf(t.ctx, "mismatching SSTs returned by engines: %v != %v", sst1, sst2) } - return sst1, bulkOpSummary, err + if !resume1.Equal(resume2) { + log.Fatalf(t.ctx, "mismatching resume key returned by engines: %v != %v", resume1, resume2) + } + return sst1, bulkOpSummary, resume1, err } // Get implements the Reader interface. @@ -768,18 +776,22 @@ func (t *TeeEngineBatch) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - sst1, bulkOpSummary, err := t.batch1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) - sst2, _, err2 := t.batch2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + sst1, bulkOpSummary, resume1, err := t.batch1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) + sst2, _, resume2, err2 := t.batch2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil { - return nil, bulkOpSummary, err + return nil, bulkOpSummary, nil, err } if !bytes.Equal(sst1, sst2) { log.Fatalf(t.ctx, "mismatching SSTs returned by engines: %v != %v", sst1, sst2) } - return sst1, bulkOpSummary, err + if !resume1.Equal(resume2) { + log.Fatalf(t.ctx, "mismatching resume key returned by engines: %v != %v", resume1, resume2) + } + return sst1, bulkOpSummary, resume1, err } // Get implements the Batch interface. diff --git a/pkg/storage/spanset/batch.go b/pkg/storage/spanset/batch.go index 8560cffc0eaa..88f775cae082 100644 --- a/pkg/storage/spanset/batch.go +++ b/pkg/storage/spanset/batch.go @@ -284,9 +284,10 @@ func (s spanSetReader) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io engine.IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return s.r.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return s.r.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } func (s spanSetReader) Get(key engine.MVCCKey) ([]byte, error) {