diff --git a/c-deps/libroach/db.cc b/c-deps/libroach/db.cc index 414675c7f7d0..5021df27166b 100644 --- a/c-deps/libroach/db.cc +++ b/c-deps/libroach/db.cc @@ -1075,9 +1075,9 @@ DBStatus DBUnlockFile(DBFileLock lock) { return ToDBStatus(rocksdb::Env::Default()->UnlockFile((rocksdb::FileLock*)lock)); } -DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts, - DBEngine* engine, DBString* data, DBString* write_intent, - DBString* summary) { +DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size, + DBIterOptions iter_opts, DBEngine* engine, DBString* data, + DBString* write_intent, DBString* summary, DBString* resume) { DBSstFileWriter* writer = DBSstFileWriterNew(); DBStatus status = DBSstFileWriterOpen(writer); if (status.data != NULL) { @@ -1092,14 +1092,23 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter bool skip_current_key_versions = !export_all_revisions; DBIterState state; const std::string end_key = EncodeKey(end); - for (state = iter.seek(start);; state = iter.next(skip_current_key_versions)) { + // cur_key is used when paginated is true and export_all_revisions is + // true. If we're exporting all revisions and we're returning a paginated + // SST then we need to keep track of when we've finished adding all of the + // versions of a key to the writer. + bool paginated = target_size > 0; + std::string cur_key; + std::string resume_key; + // Seek to the MVCC metadata key for the provided start key and let the + // incremental iterator find the appropriate version. + DBKey seek_key = { .key = start.key }; + for (state = iter.seek(seek_key);; state = iter.next(skip_current_key_versions)) { if (state.status.data != NULL) { DBSstFileWriterClose(writer); return state.status; } else if (!state.valid || kComparator.Compare(iter.key(), end_key) >= 0) { break; } - rocksdb::Slice decoded_key; int64_t wall_time = 0; int32_t logical_time = 0; @@ -1109,6 +1118,14 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter return ToDBString("Unable to decode key"); } + bool is_new_key = !export_all_revisions || decoded_key.compare(rocksdb::Slice(cur_key)) != 0; + if (paginated && export_all_revisions && is_new_key) { + // Reuse the underlying buffer in cur_key. + cur_key.clear(); + cur_key.reserve(decoded_key.size()); + cur_key.assign(decoded_key.data(), decoded_key.size()); + } + // Skip tombstone (len=0) records when start time is zero (non-incremental) // and we are not exporting all versions. bool is_skipping_deletes = start.wall_time == 0 && start.logical == 0 && !export_all_revisions; @@ -1116,6 +1133,17 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter continue; } + // Check to see if this is the first version of key and adding it would + // put us over the limit (we might already be over the limit). + int64_t cur_size = bulkop_summary.data_size(); + int64_t new_size = cur_size + decoded_key.size() + iter.value().size(); + bool is_over_target = paginated && cur_size > 0 && new_size > target_size; + if (is_new_key && is_over_target) { + resume_key.reserve(decoded_key.size()); + resume_key.assign(decoded_key.data(), decoded_key.size()); + break; + } + // Insert key into sst and update statistics. status = DBSstFileWriterAddRaw(writer, iter.key(), iter.value()); if (status.data != NULL) { @@ -1126,8 +1154,7 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter if (!row_counter.Count((iter.key()), &bulkop_summary)) { return ToDBString("Error in row counter"); } - bulkop_summary.set_data_size(bulkop_summary.data_size() + decoded_key.size() + - iter.value().size()); + bulkop_summary.set_data_size(new_size); } *summary = ToDBString(bulkop_summary.SerializeAsString()); @@ -1139,6 +1166,11 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter auto res = DBSstFileWriterFinish(writer, data); DBSstFileWriterClose(writer); + // If we're not returning an error, check to see if we need to return the resume key. + if (res.data == NULL && resume_key.length() > 0) { + *resume = ToDBString(rocksdb::Slice(resume_key)); + } + return res; } diff --git a/c-deps/libroach/include/libroach.h b/c-deps/libroach/include/libroach.h index ea91f8eb7ee4..ad916cac3893 100644 --- a/c-deps/libroach/include/libroach.h +++ b/c-deps/libroach/include/libroach.h @@ -549,14 +549,23 @@ typedef void* DBFileLock; // DBLockFile sets a lock on the specified file using RocksDB's file locking interface. DBStatus DBLockFile(DBSlice filename, DBFileLock* lock); -// DBUnlockFile unlocks the file asscoiated with the specified lock and GCs any allocated memory for +// DBUnlockFile unlocks the file associated with the specified lock and GCs any allocated memory for // the lock. DBStatus DBUnlockFile(DBFileLock lock); // DBExportToSst exports changes over the keyrange and time interval between the // start and end DBKeys to an SSTable using an IncrementalIterator. -DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts, - DBEngine* engine, DBString* data, DBString* write_intent, DBString* summary); +// If target_size is positive, it indicates that the export should produce SSTs +// which are roughly target size. Specifically, it will produce SSTs which contain +// all relevant versions of a key and will not add the first version of a new +// key if it would lead to the SST exceeding the target_size. If export_all_revisions +// is false, the returned SST will be smaller than target_size so long as the first +// kv pair is smaller than target_size. If export_all_revisions is true then +// target_size may be exceeded. If the SST construction stops due to the target_size, +// then resume will be set to the value of the resume key. +DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size, + DBIterOptions iter_opts, DBEngine* engine, DBString* data, + DBString* write_intent, DBString* summary, DBString* resume); // DBEnvOpenReadableFile opens a DBReadableFile in the given engine. DBStatus DBEnvOpenReadableFile(DBEngine* db, DBSlice path, DBReadableFile* file); diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index ef0786d68ec0..9040934a0577 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" + crdberrors "github.com/cockroachdb/errors" "github.com/pkg/errors" ) @@ -130,9 +131,14 @@ func evalExport( } e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey}) - - data, summary, err := e.ExportToSst(args.Key, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, io) - + // TODO(ajwerner): Add a constant or internal cluster setting to control the + // target size for files and then paginate the actual export. The external + // API may need to be modified to deal with the case where ReturnSST is true. + const targetSize = 0 // unlimited + data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, targetSize, io) + if resume != nil { + return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size") + } if err != nil { return result.Result{}, err } diff --git a/pkg/ccl/storageccl/export_test.go b/pkg/ccl/storageccl/export_test.go index 1fc33d513900..56728d5fecb7 100644 --- a/pkg/ccl/storageccl/export_test.go +++ b/pkg/ccl/storageccl/export_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" ) func TestExportCmd(t *testing.T) { @@ -302,6 +303,7 @@ func assertEqualKVs( startTime, endTime hlc.Timestamp, exportAllRevisions bool, enableTimeBoundIteratorOptimization bool, + targetSize uint64, ) func(*testing.T) { return func(t *testing.T) { t.Helper() @@ -328,17 +330,19 @@ func assertEqualKVs( io.MaxTimestampHint = endTime io.MinTimestampHint = startTime.Next() } - sst, _, err := e.ExportToSst(startKey, endKey, startTime, endTime, exportAllRevisions, io) - if err != nil { - t.Fatal(err) + var kvs []engine.MVCCKeyValue + for start := startKey; start != nil; { + var sst []byte + sst, _, start, err = e.ExportToSst(start, endKey, startTime, endTime, exportAllRevisions, targetSize, io) + require.NoError(t, err) + loaded := loadSST(t, sst, startKey, endKey) + kvs = append(kvs, loaded...) } // Compare new C++ implementation against the oracle. expectedKVS := loadSST(t, expected, startKey, endKey) - kvs := loadSST(t, sst, startKey, endKey) - if len(kvs) != len(expectedKVS) { - t.Fatalf("got %d kvs (%+v) but expected %d (%+v)", len(kvs), kvs, len(expected), expected) + t.Fatalf("got %d kvs but expected %d:\n%v\n%v", len(kvs), len(expectedKVS), kvs, expectedKVS) } for i := range kvs { @@ -430,27 +434,40 @@ func TestRandomKeyAndTimestampExport(t *testing.T) { timestamps[i].Logical < timestamps[j].Logical) }) + testWithTargetSize := func(t *testing.T, targetSize uint64) { + if testing.Short() && targetSize > 0 && targetSize < 1<<15 { + t.Skipf("testing with size %d is slow", targetSize) + } + t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false, targetSize)) + t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false, targetSize)) + t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true, targetSize)) + t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true, targetSize)) + + upperBound := randutil.RandIntInRange(rnd, 1, numKeys) + lowerBound := rnd.Intn(upperBound) + + // Exercise random key ranges. + t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false, targetSize)) + t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false, targetSize)) + t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true, targetSize)) + t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true, targetSize)) + + upperBound = randutil.RandIntInRange(rnd, 1, numKeys) + lowerBound = rnd.Intn(upperBound) + + // Exercise random timestamps. + t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false, targetSize)) + t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false, targetSize)) + t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true, targetSize)) + t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true, targetSize)) + } // Exercise min to max time and key ranges. - t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false)) - t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false)) - t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true)) - t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true)) - - upperBound := randutil.RandIntInRange(rnd, 1, numKeys) - lowerBound := rnd.Intn(upperBound) - - // Exercise random key ranges. - t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false)) - t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false)) - t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true)) - t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true)) - - upperBound = randutil.RandIntInRange(rnd, 1, numKeys) - lowerBound = rnd.Intn(upperBound) - - // Exercise random timestamps. - t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false)) - t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false)) - t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true)) - t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true)) + for _, targetSize := range []uint64{ + 0 /* unlimited */, 1 << 10, 1 << 16, 1 << 20, + } { + t.Run(fmt.Sprintf("targetSize=%d", targetSize), func(t *testing.T) { + testWithTargetSize(t, targetSize) + }) + } + } diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index e91b65a8bf44..d2163fb1c1c8 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -200,9 +200,19 @@ type Reader interface { // within the interval is exported. Deletions are included if all revisions are // requested or if the start.Timestamp is non-zero. Returns the bytes of an // SSTable containing the exported keys, the size of exported data, or an error. + // If targetSize is positive, it indicates that the export should produce SSTs + // which are roughly target size. Specifically, it will produce SSTs which contain + // all relevant versions of a key and will not add the first version of a new + // key if it would lead to the SST exceeding the targetSize. If exportAllRevisions + // is false, the returned SST will be smaller than target_size so long as the first + // kv pair is smaller than targetSize. If exportAllRevisions is true then + // targetSize may be exceeded by as much as the size of all of the versions of + // the last key. If the SST construction stops due to the targetSize, + // then a non-nil resumeKey will be returned. ExportToSst( - startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, io IterOptions, - ) ([]byte, roachpb.BulkOpSummary, error) + startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, + exportAllRevisions bool, targetSize uint64, io IterOptions, + ) (sst []byte, _ roachpb.BulkOpSummary, resumeKey roachpb.Key, _ error) // Get returns the value for the given key, nil otherwise. // // Deprecated: use MVCCGet instead. diff --git a/pkg/storage/engine/pebble.go b/pkg/storage/engine/pebble.go index 9a6182ca09d7..da0514847d52 100644 --- a/pkg/storage/engine/pebble.go +++ b/pkg/storage/engine/pebble.go @@ -492,9 +492,10 @@ func (p *Pebble) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } // Get implements the Engine interface. @@ -938,9 +939,10 @@ func (p *pebbleReadOnly) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } func (p *pebbleReadOnly) Get(key MVCCKey) ([]byte, error) { @@ -1059,9 +1061,10 @@ func (p *pebbleSnapshot) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } // Get implements the Reader interface. @@ -1113,8 +1116,9 @@ func pebbleExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { sstFile := &MemFile{} sstWriter := MakeBackupSSTWriter(sstFile) defer sstWriter.Close() @@ -1128,12 +1132,15 @@ func pebbleExportToSst( EndTime: endTS, }) defer iter.Close() + var curKey roachpb.Key // only used if exportAllRevisions + var resumeKey roachpb.Key + paginated := targetSize > 0 for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; { ok, err := iter.Valid() if err != nil { // The error may be a WriteIntentError. In which case, returning it will // cause this command to be retried. - return nil, roachpb.BulkOpSummary{}, err + return nil, roachpb.BulkOpSummary{}, nil, err } if !ok { break @@ -1143,18 +1150,29 @@ func pebbleExportToSst( break } unsafeValue := iter.UnsafeValue() + isNewKey := !exportAllRevisions || unsafeKey.Key.Compare(curKey) != 0 + if paginated && exportAllRevisions && isNewKey { + curKey = append(curKey[:0], unsafeKey.Key...) + } // Skip tombstone (len=0) records when start time is zero (non-incremental) // and we are not exporting all versions. skipTombstones := !exportAllRevisions && startTS.IsEmpty() if len(unsafeValue) > 0 || !skipTombstones { if err := rows.Count(unsafeKey.Key); err != nil { - return nil, roachpb.BulkOpSummary{}, errors.Wrapf(err, "decoding %s", unsafeKey) + return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "decoding %s", unsafeKey) + } + curSize := rows.BulkOpSummary.DataSize + newSize := curSize + int64(len(unsafeKey.Key)+len(unsafeValue)) + isOverTarget := paginated && curSize > 0 && uint64(newSize) > targetSize + if isNewKey && isOverTarget { + resumeKey = append(roachpb.Key{}, unsafeKey.Key...) // allocate the right size + break } - rows.BulkOpSummary.DataSize += int64(len(unsafeKey.Key) + len(unsafeValue)) if err := sstWriter.Put(unsafeKey, unsafeValue); err != nil { - return nil, roachpb.BulkOpSummary{}, errors.Wrapf(err, "adding key %s", unsafeKey) + return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "adding key %s", unsafeKey) } + rows.BulkOpSummary.DataSize = newSize } if exportAllRevisions { @@ -1167,12 +1185,12 @@ func pebbleExportToSst( if rows.BulkOpSummary.DataSize == 0 { // If no records were added to the sstable, skip completing it and return a // nil slice – the export code will discard it anyway (based on 0 DataSize). - return nil, roachpb.BulkOpSummary{}, nil + return nil, roachpb.BulkOpSummary{}, nil, nil } if err := sstWriter.Finish(); err != nil { - return nil, roachpb.BulkOpSummary{}, err + return nil, roachpb.BulkOpSummary{}, nil, err } - return sstFile.Data(), rows.BulkOpSummary, nil + return sstFile.Data(), rows.BulkOpSummary, resumeKey, nil } diff --git a/pkg/storage/engine/pebble_batch.go b/pkg/storage/engine/pebble_batch.go index edecf4ba1294..5fb91a80f4cc 100644 --- a/pkg/storage/engine/pebble_batch.go +++ b/pkg/storage/engine/pebble_batch.go @@ -93,8 +93,9 @@ func (p *pebbleBatch) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { panic("unimplemented") } diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 409a24e0ffb9..87266e0f1093 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -780,36 +780,38 @@ func (r *RocksDB) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { start := MVCCKey{Key: startKey, Timestamp: startTS} end := MVCCKey{Key: endKey, Timestamp: endTS} var data C.DBString var intentErr C.DBString var bulkopSummary C.DBString + var resumeKey C.DBString err := statusToError(C.DBExportToSst(goToCKey(start), goToCKey(end), C.bool(exportAllRevisions), - goToCIterOptions(io), r.rdb, &data, &intentErr, &bulkopSummary)) + C.uint64_t(targetSize), goToCIterOptions(io), r.rdb, &data, &intentErr, &bulkopSummary, &resumeKey)) if err != nil { if err.Error() == "WriteIntentError" { var e roachpb.WriteIntentError if err := protoutil.Unmarshal(cStringToGoBytes(intentErr), &e); err != nil { - return nil, roachpb.BulkOpSummary{}, errors.Wrap(err, "failed to decode write intent error") + return nil, roachpb.BulkOpSummary{}, nil, errors.Wrap(err, "failed to decode write intent error") } - return nil, roachpb.BulkOpSummary{}, &e + return nil, roachpb.BulkOpSummary{}, nil, &e } - return nil, roachpb.BulkOpSummary{}, err + return nil, roachpb.BulkOpSummary{}, nil, err } var summary roachpb.BulkOpSummary if err := protoutil.Unmarshal(cStringToGoBytes(bulkopSummary), &summary); err != nil { - return nil, roachpb.BulkOpSummary{}, errors.Wrap(err, "failed to decode BulkopSummary") + return nil, roachpb.BulkOpSummary{}, nil, errors.Wrap(err, "failed to decode BulkopSummary") } - return cStringToGoBytes(data), summary, nil + return cStringToGoBytes(data), summary, roachpb.Key(cStringToGoBytes(resumeKey)), nil } // Attrs returns the list of attributes describing this engine. This @@ -1003,9 +1005,10 @@ func (r *rocksDBReadOnly) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } func (r *rocksDBReadOnly) Get(key MVCCKey) ([]byte, error) { @@ -1323,9 +1326,10 @@ func (r *rocksDBSnapshot) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return r.parent.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } // Get returns the value for the given key, nil otherwise using @@ -1732,8 +1736,9 @@ func (r *rocksDBBatch) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { panic("unimplemented") } diff --git a/pkg/storage/engine/tee.go b/pkg/storage/engine/tee.go index 9c49b1dc8009..1095ac2cfa2d 100644 --- a/pkg/storage/engine/tee.go +++ b/pkg/storage/engine/tee.go @@ -89,18 +89,22 @@ func (t *TeeEngine) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - eng1Sst, bulkOpSummary, err := t.eng1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) - rocksSst, _, err2 := t.eng2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + eng1Sst, bulkOpSummary, resume1, err := t.eng1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) + rocksSst, _, resume2, err2 := t.eng2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil { - return nil, bulkOpSummary, err + return nil, bulkOpSummary, nil, err } if !bytes.Equal(eng1Sst, rocksSst) { log.Fatalf(t.ctx, "mismatching SSTs returned by engines: %v != %v", eng1Sst, rocksSst) } - return eng1Sst, bulkOpSummary, err + if !resume1.Equal(resume2) { + log.Fatalf(t.ctx, "mismatching resume key returned by engines: %v != %v", resume1, resume2) + } + return eng1Sst, bulkOpSummary, resume1, err } // Get implements the Engine interface. @@ -668,18 +672,22 @@ func (t *TeeEngineReader) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - sst1, bulkOpSummary, err := t.reader1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) - sst2, _, err2 := t.reader2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + sst1, bulkOpSummary, resume1, err := t.reader1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) + sst2, _, resume2, err2 := t.reader2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil { - return nil, bulkOpSummary, err + return nil, bulkOpSummary, nil, err } if !bytes.Equal(sst1, sst2) { log.Fatalf(t.ctx, "mismatching SSTs returned by engines: %v != %v", sst1, sst2) } - return sst1, bulkOpSummary, err + if !resume1.Equal(resume2) { + log.Fatalf(t.ctx, "mismatching resume key returned by engines: %v != %v", resume1, resume2) + } + return sst1, bulkOpSummary, resume1, err } // Get implements the Reader interface. @@ -768,18 +776,22 @@ func (t *TeeEngineBatch) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - sst1, bulkOpSummary, err := t.batch1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) - sst2, _, err2 := t.batch2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + sst1, bulkOpSummary, resume1, err := t.batch1.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) + sst2, _, resume2, err2 := t.batch2.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil { - return nil, bulkOpSummary, err + return nil, bulkOpSummary, nil, err } if !bytes.Equal(sst1, sst2) { log.Fatalf(t.ctx, "mismatching SSTs returned by engines: %v != %v", sst1, sst2) } - return sst1, bulkOpSummary, err + if !resume1.Equal(resume2) { + log.Fatalf(t.ctx, "mismatching resume key returned by engines: %v != %v", resume1, resume2) + } + return sst1, bulkOpSummary, resume1, err } // Get implements the Batch interface. diff --git a/pkg/storage/spanset/batch.go b/pkg/storage/spanset/batch.go index 8560cffc0eaa..88f775cae082 100644 --- a/pkg/storage/spanset/batch.go +++ b/pkg/storage/spanset/batch.go @@ -284,9 +284,10 @@ func (s spanSetReader) ExportToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, + targetSize uint64, io engine.IterOptions, -) ([]byte, roachpb.BulkOpSummary, error) { - return s.r.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, io) +) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { + return s.r.ExportToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io) } func (s spanSetReader) Get(key engine.MVCCKey) ([]byte, error) {