Skip to content

Commit

Permalink
storage: support MVCC range tombstones in MVCCExportToSST
Browse files Browse the repository at this point in the history
This patch adds support for exporting MVCC range tombstones in
`MVCCExportToSST()`, and by extension in the KV `Export` method.
This will only happen after two version gates are enabled:

* `EnablePebbleFormatVersionRangeKeys`: begins emitting SSTs in
  `Pebblev2` format, which supports Pebble range keys.

* `MVCCRangeTombstones`: allows writing MVCC range tombstones via KV.

MVCC range tombstones are emitted in the same way as point tombstones:
all tombstones if `ExportAllRevisions` is enabled, or the latest visible
tombstone if `StartTS` is given.

MVCC range tombstones are truncated to the SST bounds. For example, if
exporting the span `a-f` then any range tombstones wider than the span
will be truncated to `[a-f)`. If the export hits a limit e.g. at `c`
then any MVCC range tombstones in the returned SST are truncated to
`[a-c)`.

If `StopMidKey` is enabled, then it's possible for two subsequent
exports to contain overlapping MVCC range tombstones. For example, given
the range tombstone `[a-f)@5`, if we return a resume key at `c@3` then
the response will contain a truncated MVCC range tombstone `[a-c\0)@5`
which covers the point keys at `c`, but resuming from `c@3` will contain
the MVCC range tombstone `[c-f)@5` which overlaps with the MVCC range
tombstone in the previous response for the interval `[c-c\0)@5`.
`AddSSTable` will allow this overlap during ingestion once it supports
MVCC range tombstones.

Release note: None
  • Loading branch information
erikgrinaker committed Jun 24, 2022
1 parent 851e329 commit a1ecb11
Show file tree
Hide file tree
Showing 4 changed files with 1,036 additions and 45 deletions.
12 changes: 11 additions & 1 deletion pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,15 @@ message ExportRequest {
// large history being broken up into target_file_size chunks and prevent
// blowing up on memory usage. This option is only allowed together with
// return_sst since caller should reconstruct full tables.
//
// NB: If the result contains MVCC range tombstones, this can cause MVCC range
// tombstones in two subsequent SSTs to overlap. For example, given the range
// tombstone [a-f)@5, if we return a resume key at c@3 then the response will
// contain a truncated MVCC range tombstone [a-c\0)@5 which covers the point
// keys at c, but resuming from c@3 will contain the MVCC range tombstone
// [c-f)@5 which overlaps with the MVCC range tombstone in the previous
// response for the interval [c-c\0)@5. AddSSTable will allow this overlap
// during ingestion.
bool split_mid_key = 13;

// Return the exported SST data in the response.
Expand Down Expand Up @@ -1583,7 +1592,8 @@ message BulkOpSummary {
reserved 4;
// EntryCounts contains the number of keys processed for each tableID/indexID
// pair, stored under the key (tableID << 32) | indexID. This EntryCount key
// generation logic is also available in the BulkOpSummaryID helper.
// generation logic is also available in the BulkOpSummaryID helper. It does
// not take MVCC range tombstones into account.
map<uint64, int64> entry_counts = 5;
}

Expand Down
253 changes: 209 additions & 44 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4210,17 +4210,17 @@ func ComputeStatsForRange(
}

// MVCCExportToSST exports changes to the keyrange [StartKey, EndKey) over the
// interval (StartTS, EndTS] as a Pebble SST. Deletions are included if all
// revisions are requested or if the StartTS is non-zero. This function looks at
// MVCC versions and intents, and returns an error if an intent is found.
// MVCCExportOptions determine ranges as well as additional export options, see
// struct definition for details.
// interval (StartTS, EndTS] as a Pebble SST. See MVCCExportOptions for options.
//
// Data is written to dest as it is collected. If an error is returned then
// dest contents are undefined.
// Tombstones are included if all revisions are requested (all tombstones) or if
// the StartTS is non-zero (latest tombstone), including both MVCC point
// tombstones and MVCC range tombstones. Intents within the time interval will
// return a WriteIntentError, while intents outside the time interval are
// ignored.
//
// Returns an export summary and a resume key that allows resuming the export if
// it reached a limit.
// it reached a limit. Data is written to dest as it is collected. If an error
// is returned then dest contents are undefined.
func MVCCExportToSST(
ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, MVCCKey, error) {
Expand All @@ -4230,39 +4230,79 @@ func MVCCExportToSST(
sstWriter := MakeBackupSSTWriter(ctx, cs, dest)
defer sstWriter.Close()

var rows RowCounter
iter := NewMVCCIncrementalIterator(
reader,
MVCCIncrementalIterOptions{
EndKey: opts.EndKey,
StartTime: opts.StartTS,
EndTime: opts.EndTS,
IntentPolicy: MVCCIncrementalIterIntentPolicyAggregate,
})
// If we're not exporting all revisions then we can mask point keys below any
// MVCC range tombstones, since we don't care about them.
var rangeKeyMasking hlc.Timestamp
if !opts.ExportAllRevisions {
rangeKeyMasking = opts.EndTS
}

iter := NewMVCCIncrementalIterator(reader, MVCCIncrementalIterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
StartKey: opts.StartKey.Key,
EndKey: opts.EndKey,
StartTime: opts.StartTS,
EndTime: opts.EndTS,
RangeKeyMaskingBelow: rangeKeyMasking,
IntentPolicy: MVCCIncrementalIterIntentPolicyAggregate,
})
defer iter.Close()
var curKey roachpb.Key // only used if exportAllRevisions
var resumeKey roachpb.Key
var resumeTS hlc.Timestamp

paginated := opts.TargetSize > 0
trackKeyBoundary := paginated || opts.ResourceLimiter != nil
firstIteration := true
for iter.SeekGE(opts.StartKey); ; {
ok, err := iter.Valid()
if err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}
if !ok {
break
skipTombstones := !opts.ExportAllRevisions && opts.StartTS.IsEmpty()

var rows RowCounter
var curKey roachpb.Key // only used if exportAllRevisions
var resumeKey MVCCKey
var rangeKeys []MVCCRangeKeyValue
var rangeKeysEnd roachpb.Key
var rangeKeysSize int64

// maxRangeKeysSizeIfTruncated calculates the worst-case size of the currently
// buffered rangeKeys if we were to stop iteration after the current resumeKey
// and flush the pending range keys with the new truncation bound given by
// resumeKey.
//
// For example, if we've buffered the range keys [a-c)@2 and [a-c)@1 with
// total size 4 bytes, and then hit a byte limit between the point keys bbb@3
// and bbb@1, we have to truncate the two range keys to [a-bbb\0)@1 and
// [a-bbb\0)@2. The size of the flushed range keys is now 10 bytes, not 4
// bytes. Since we're never allowed to exceed MaxSize, we have to check before
// adding bbb@3 that we have room for both bbb@3 and the pending range keys if
// they were to be truncated and flushed after it.
//
// This could either truncate the range keys at resumeKey, at resumeKey.Next()
// if StopMidKey is enabled, or at the range keys' actual end key if there
// doesn't end up being any further point keys covered by it and we go on to
// flush them as-is at their normal end key. We need to make sure we have
// enough MaxSize budget to flush them in all of these cases.
maxRangeKeysSizeIfTruncated := func(resumeKey roachpb.Key) int64 {
if rangeKeysSize == 0 {
return 0
}
unsafeKey := iter.UnsafeKey()
if unsafeKey.Key.Compare(opts.EndKey) >= 0 {
break
// We could be truncated in the middle of a point key version series, which
// would require adding on a \0 byte via Key.Next(), so let's assume that.
maxSize := rangeKeysSize
if s := maxSize + int64(len(rangeKeys)*(len(resumeKey)-len(rangeKeysEnd)+1)); s > maxSize {
maxSize = s
}
return maxSize
}

if iter.NumCollectedIntents() > 0 {
iter.SeekGE(opts.StartKey)
for {
if ok, err := iter.Valid(); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
} else if !ok {
break
} else if iter.NumCollectedIntents() > 0 {
break
}

unsafeKey := iter.UnsafeKey()

isNewKey := !opts.ExportAllRevisions || !unsafeKey.Key.Equal(curKey)
if trackKeyBoundary && opts.ExportAllRevisions && isNewKey {
curKey = append(curKey[:0], unsafeKey.Key...)
Expand All @@ -4286,15 +4326,93 @@ func MVCCExportToSST(
// split is allowed.
if limit >= ResourceLimitReachedSoft && isNewKey || limit == ResourceLimitReachedHard && opts.StopMidKey {
// Reached iteration limit, stop with resume span
resumeKey = append(make(roachpb.Key, 0, len(unsafeKey.Key)), unsafeKey.Key...)
if !isNewKey {
resumeTS = unsafeKey.Timestamp
resumeKey = unsafeKey.Clone()
if isNewKey {
resumeKey.Timestamp = hlc.Timestamp{}
}
break
}
}
}

// When we encounter an MVCC range tombstone stack, we buffer it in
// rangeKeys until we've moved past it or iteration ends (e.g. due to a
// limit). If we return a resume key then we need to truncate the final
// range key stack (and thus the SST) to the resume key, so we can't flush
// them until we've moved past.
hasPoint, hasRange := iter.HasPointAndRange()

// First, flush any pending range tombstones when we reach their end key.
//
// TODO(erikgrinaker): Optimize the range key case here, to avoid these
// comparisons. Pebble should expose an API to cheaply detect range key
// changes.
if len(rangeKeysEnd) > 0 && bytes.Compare(unsafeKey.Key, rangeKeysEnd) >= 0 {
for _, rkv := range rangeKeys {
mvccValue, ok, err := tryDecodeSimpleMVCCValue(rkv.Value)
if !ok && err == nil {
mvccValue, err = decodeExtendedMVCCValue(rkv.Value)
}
if err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err,
"decoding mvcc value %s", rkv.Value)
}
// Export only the inner roachpb.Value, not the MVCCValue header.
mvccValue = MVCCValue{Value: mvccValue.Value}
if err := sstWriter.ExperimentalPutMVCCRangeKey(rkv.RangeKey, mvccValue); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}
}
rows.BulkOpSummary.DataSize += rangeKeysSize
rangeKeys, rangeKeysEnd, rangeKeysSize = rangeKeys[:0], nil, 0
}

// If we find any new range keys and we haven't buffered any range keys yet,
// buffer them.
if hasRange && !skipTombstones && len(rangeKeys) == 0 {
rangeBounds := iter.RangeBounds()
rangeKeysEnd = append(rangeKeysEnd[:0], rangeBounds.EndKey...)

for _, rkv := range iter.RangeKeys() {
rangeKeys = append(rangeKeys, rkv.Clone())
rangeKeysSize += int64(len(rkv.RangeKey.StartKey) + len(rkv.RangeKey.EndKey) + len(rkv.Value))
if !opts.ExportAllRevisions {
break
}
}

// Check if the range keys exceed a limit, using similar logic as point
// keys. We have to check both the size of the range keys as they are (in
// case we emit them as-is), and the size of the range keys if they were
// to be truncated at the start key due to a resume span (which could
// happen if the next point key exceeds the max size).
//
// TODO(erikgrinaker): The limit logic here is a bit of a mess, but we're
// complying with the existing point key logic for now. We should get rid
// of some of the options and clean this up.
curSize := rows.BulkOpSummary.DataSize
reachedTargetSize := opts.TargetSize > 0 && uint64(curSize) >= opts.TargetSize
newSize := curSize + maxRangeKeysSizeIfTruncated(rangeBounds.Key)
reachedMaxSize := opts.MaxSize > 0 && newSize > int64(opts.MaxSize)
if paginated && (reachedTargetSize || reachedMaxSize) {
rangeKeys, rangeKeysEnd, rangeKeysSize = rangeKeys[:0], nil, 0
resumeKey = unsafeKey.Clone()
break
}
if reachedMaxSize {
return roachpb.BulkOpSummary{}, MVCCKey{}, &ExceedMaxSizeError{
reached: newSize, maxSize: opts.MaxSize}
}
}

// If we're on a bare range key, step forward. We can't use NextKey()
// because there may be a point key version at the range key's start bound.
if !hasPoint {
iter.Next()
continue
}

// Process point keys.
unsafeValue := iter.UnsafeValue()
skip := false
if unsafeKey.IsValue() {
Expand All @@ -4311,7 +4429,6 @@ func MVCCExportToSST(

// Skip tombstone records when start time is zero (non-incremental)
// and we are not exporting all versions.
skipTombstones := !opts.ExportAllRevisions && opts.StartTS.IsEmpty()
skip = skipTombstones && mvccValue.IsTombstone()
}

Expand All @@ -4320,22 +4437,26 @@ func MVCCExportToSST(
return roachpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, "decoding %s", unsafeKey)
}
curSize := rows.BulkOpSummary.DataSize
reachedTargetSize := curSize > 0 && uint64(curSize) >= opts.TargetSize
newSize := curSize + int64(len(unsafeKey.Key)+len(unsafeValue))
reachedMaxSize := opts.MaxSize > 0 && newSize > int64(opts.MaxSize)
curSizeWithRangeKeys := curSize + maxRangeKeysSizeIfTruncated(unsafeKey.Key)
reachedTargetSize := curSizeWithRangeKeys > 0 &&
uint64(curSizeWithRangeKeys) >= opts.TargetSize
kvSize := int64(len(unsafeKey.Key) + len(unsafeValue))
newSize := curSize + kvSize
newSizeWithRangeKeys := curSizeWithRangeKeys + kvSize
reachedMaxSize := opts.MaxSize > 0 && newSizeWithRangeKeys > int64(opts.MaxSize)
// When paginating we stop writing in two cases:
// - target size is reached and we wrote all versions of a key
// - maximum size reached and we are allowed to stop mid key
if paginated && (isNewKey && reachedTargetSize || opts.StopMidKey && reachedMaxSize) {
// Allocate the right size for resumeKey rather than using curKey.
resumeKey = append(make(roachpb.Key, 0, len(unsafeKey.Key)), unsafeKey.Key...)
if opts.StopMidKey && !isNewKey {
resumeTS = unsafeKey.Timestamp
resumeKey = unsafeKey.Clone()
if isNewKey || !opts.StopMidKey {
resumeKey.Timestamp = hlc.Timestamp{}
}
break
}
if reachedMaxSize {
return roachpb.BulkOpSummary{}, MVCCKey{}, &ExceedMaxSizeError{reached: newSize, maxSize: opts.MaxSize}
return roachpb.BulkOpSummary{}, MVCCKey{}, &ExceedMaxSizeError{
reached: newSizeWithRangeKeys, maxSize: opts.MaxSize}
}
if unsafeKey.Timestamp.IsEmpty() {
// This should never be an intent since the incremental iterator returns
Expand Down Expand Up @@ -4375,6 +4496,41 @@ func MVCCExportToSST(
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}

// Flush any pending buffered range keys, truncated to the resume key (if
// any). If there is a resume timestamp, i.e. when resuming in between two
// versions, the range keys must cover the resume key too. This will cause the
// next export's range keys to overlap with this one, e.g.: [a-f) with resume
// key c@7 will export range keys [a-c\0) first, and then [c-f) when resuming,
// which overlaps at [c-c\0).
if len(rangeKeys) > 0 {
// Calculate the new rangeKeysSize due to the new resume bounds.
if len(resumeKey.Key) > 0 && rangeKeysEnd.Compare(resumeKey.Key) > 0 {
oldEndLen := len(rangeKeysEnd)
rangeKeysEnd = resumeKey.Key
if resumeKey.Timestamp.IsSet() {
rangeKeysEnd = rangeKeysEnd.Next()
}
rangeKeysSize += int64(len(rangeKeys) * (len(rangeKeysEnd) - oldEndLen))
}
for _, rkv := range rangeKeys {
rkv.RangeKey.EndKey = rangeKeysEnd
mvccValue, ok, err := tryDecodeSimpleMVCCValue(rkv.Value)
if !ok && err == nil {
mvccValue, err = decodeExtendedMVCCValue(rkv.Value)
}
if err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err,
"decoding mvcc value %s", rkv.Value)
}
// Export only the inner roachpb.Value, not the MVCCValue header.
mvccValue = MVCCValue{Value: mvccValue.Value}
if err := sstWriter.ExperimentalPutMVCCRangeKey(rkv.RangeKey, mvccValue); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}
}
rows.BulkOpSummary.DataSize += rangeKeysSize
}

if rows.BulkOpSummary.DataSize == 0 {
// If no records were added to the sstable, skip completing it and return a
// nil slice – the export code will discard it anyway (based on 0 DataSize).
Expand All @@ -4385,7 +4541,7 @@ func MVCCExportToSST(
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}

return rows.BulkOpSummary, MVCCKey{Key: resumeKey, Timestamp: resumeTS}, nil
return rows.BulkOpSummary, resumeKey, nil
}

// MVCCExportOptions contains options for MVCCExportToSST.
Expand Down Expand Up @@ -4423,6 +4579,15 @@ type MVCCExportOptions struct {
// adding all versions until it reaches next key or end of range. If true, it
// would stop immediately when targetSize is reached and return the next versions
// timestamp in resumeTs so that subsequent operation can pass it to firstKeyTs.
//
// NB: If the result contains MVCC range tombstones, this can cause MVCC range
// tombstones in two subsequent SSTs to overlap. For example, given the range
// tombstone [a-f)@5, if we return a resume key at c@3 then the response will
// contain a truncated MVCC range tombstone [a-c\0)@5 which covers the point
// keys at c, but resuming from c@3 will contain the MVCC range tombstone
// [c-f)@5 which overlaps with the MVCC range tombstone in the previous
// response for the interval [c-c\0)@5. AddSSTable will allow this overlap
// during ingestion.
StopMidKey bool
// ResourceLimiter limits how long iterator could run until it exhausts allocated
// resources. Export queries limiter in its iteration loop to break out once
Expand Down
Loading

0 comments on commit a1ecb11

Please sign in to comment.