Skip to content

Commit

Permalink
kvserver: add Time Bound Iteration to DeleteRange
Browse files Browse the repository at this point in the history
Previously, a kv client could only pass an AOST timestamp to a DelRange
request. Now, the user can pass a lower bound timestamp, causing
the kvserver to leverage time bound iteration while issuing delete requests.

Specifically, the server uses an MVCCIncrementalIterator to iterate over the
target span at the client provided time bounds, track a continuous run of keys
in that time bound, and flush the run via point and MVCC range tombstones
depending on the size of the run.

In a future pr, this operation will replace the use of RevertRange during IMPORT
INTO rollbacks to make them MVCC compatible.

Informs cockroachdb#70428

Release note: none
  • Loading branch information
msbutler committed Jul 16, 2022
1 parent b07ceeb commit 90b2956
Show file tree
Hide file tree
Showing 5 changed files with 588 additions and 5 deletions.
33 changes: 29 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package batcheval

import (
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -67,8 +68,8 @@ func DeleteRange(
h := cArgs.Header
reply := resp.(*roachpb.DeleteRangeResponse)

// Use experimental MVCC range tombstone if requested.
if args.UseRangeTombstone {
// Use MVCC range tombstone if requested.
if args.UseRangeTombstone || args.Predicates != nil {
if cArgs.Header.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
Expand All @@ -85,8 +86,32 @@ func DeleteRange(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV)

err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, maxIntents)
if args.Predicates == nil {
err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, maxIntents)
return result.Result{}, err
}
maxBatchSize := h.MaxSpanRequestKeys
if h.MaxSpanRequestKeys == 0 {
maxBatchSize = math.MaxInt64
}

// The minimum number of keys required in a run to use a range tombstone
//
// TODO (msbutler): Tune the threshold once DeleteRange and DeleteRangeUsingTombstone have
// been further optimized.
defaultRangeTombstoneThreshold := int64(64)
resumeSpan, err := storage.PredicateMVCCDeleteRange(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.Predicates, maxBatchSize, maxRevertRangeBatchBytes, defaultRangeTombstoneThreshold)

// TODO (msbutler): plumb number of keys deleted into response, if needed
if resumeSpan != nil {
reply.ResumeSpan = resumeSpan
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
}
// Return result is always empty, since the reply is populated into the
// resp pointer that's passed into the function
return result.Result{}, err
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,24 @@ message DeleteRangeRequest {
// The caller must check the MVCCRangeTombstones version gate before using
// this parameter, as it is new in 22.2.
bool use_range_tombstone = 5;

DeleteRangePredicates predicates = 6 [(gogoproto.nullable) = true];
}

// DeleteRangePredicates will conduct predicate based DeleteRange, if specified.
message DeleteRangePredicates {
// StartTime specifies an exclusive lower bound to surface keys
// for deletion. If specified, DeleteRange will issue tombstones to keys
// within the span [startKey, endKey) that also have MVCC versions with
// timestamps between (startTime, endTime].
//
// The main application for this is a rollback of IMPORT INTO on a
// non-empty table. Here, the DeleteRange must only delete keys written by the
// import. In other words, older, pre-import, data cannot be touched. Because
// IMPORT INTO takes a table offline and does not allow masking an existing key,
// this operation will not issue tombstones to pre-import data that were
// written at or below predicateTime.
util.hlc.Timestamp start_time = 6 [(gogoproto.nullable) = false];
}

// A DeleteRangeResponse is the return value from the DeleteRange()
Expand Down
251 changes: 250 additions & 1 deletion pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2321,8 +2321,16 @@ func MVCCClearTimeRange(
})
defer iter.Close()

// clearedMetaKey is the latest surfaced key that will get cleared
var clearedMetaKey MVCCKey
var clearedMeta, restoredMeta enginepb.MVCCMetadata

// clearedMeta contains metadata on the clearedMetaKey
var clearedMeta enginepb.MVCCMetadata

// restoredMeta contains metadata on the previous version the clearedMetaKey.
// Once the key in clearedMetaKey is cleared, the key represented in
// restoredMeta becomes the latest version of this MVCC key.
var restoredMeta enginepb.MVCCMetadata
iter.SeekGE(MVCCKey{Key: key})
for {
if ok, err := iter.Valid(); err != nil {
Expand Down Expand Up @@ -2466,6 +2474,247 @@ func MVCCDeleteRange(
return keys, res.ResumeSpan, res.NumKeys, nil
}

// PredicateMVCCDeleteRange issues MVCC tombstones at endTime to keys within the
// span [startKey, endKey) that also have MVCC versions that match the predicate
// filters. Long runs of keys will get deleted with a range Tombstone, while
// smaller runs will get deleted with point tombstones.
//
// This operation is non-transactional, but will check for existing intents in
// the target key span, regardless of timestamp, and return a WriteIntentError
// containing up to maxIntents intents.
//
// If an MVCC key surfaced has a timestamp at or above endTime,
// PredicateMVCCDeleteRange returns an error without a resumeSpan, even if
// tombstones were already written to disk. To resolve, manual intervention is necessary.
//
// Limiting the number of keys or ranges of keys processed, via maxBatchSize,
// can still cause a batch that is too large -- in number of bytes -- for raft
// to replicate if the keys are very large. So if the total length of the keys
// or key spans cleared exceeds maxBatchByteSize it will also stop and return a
// resume span.
func PredicateMVCCDeleteRange(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
startKey, endKey roachpb.Key,
endTime hlc.Timestamp,
localTimestamp hlc.ClockTimestamp,
leftPeekBound, rightPeekBound roachpb.Key,
predicates *roachpb.DeleteRangePredicates,
maxBatchSize, maxBatchByteSize int64,
rangeTombstoneThreshold int64,
) (*roachpb.Span, error) {

var batchSize int64
var batchByteSize int64

// runSize is the number of non-tombstone keys in the run. Since runSize is used to
// track the number of tombstones that will get written in a run and because
// new point tombstones are not written on top of current tombstones, surfaced
// tombstones are not counted in runSize.
var runSize int64

// runByteSize is the number of bytes from non-tombstone keys in the current run
var runByteSize int64
var runStart, runEnd MVCCKey

const maxIntents = 0

if ms == nil {
return nil, errors.AssertionFailedf(
"MVCCStats passed in to PredicateMVCCDeleteRange must be non-nil to ensure proper stats" +
" computation during Delete operations")
}

// Check for any overlapping intents, and return them to be resolved.
if intents, err := ScanIntents(ctx, rw, startKey, endKey, maxIntents, 0); err != nil {
return nil, err
} else if len(intents) > 0 {
return nil, &roachpb.WriteIntentError{Intents: intents}
}

// continueRun returns two bools: the first is true if the current run should
// continue; the second is true if the latest key is a tombstone. If a non-nil
// error is returned, the booleans are invalid. The run should continue if:
//
// 1) The latest version of the key is a point or range tombstone, with a timestamp below
// the client provided EndTime. Since the goal is to create long runs,
// any tombstoned key should continue the run.
//
// 2) The latest key is not a tombstone, matches the predicates,
// and has a timestamp below EndTime.
continueRun := func(k MVCCKey, iter SimpleMVCCIterator) (bool, bool, error) {
vRaw := iter.UnsafeValue()
hasPointKey, hasRangeKey := iter.HasPointAndRange()
if hasRangeKey {
rangeKeys := iter.RangeKeys()
if endTime.LessEq(rangeKeys[0].RangeKey.Timestamp) {
return false, false, roachpb.NewWriteTooOldError(endTime,
rangeKeys[0].RangeKey.Timestamp.Next(), k.Key.Clone())
}
if !hasPointKey {
// landed on bare range key.
return true, true, nil
}
if k.Timestamp.Less(rangeKeys[0].RangeKey.Timestamp) {
// The latest range tombstone shadows the point key; ok to continue run.
return true, true, nil
}
}

// At this point, there exists a point key that shadows all range keys,
// if they exist.
if endTime.LessEq(k.Timestamp) {
return false, false, roachpb.NewWriteTooOldError(endTime, k.Timestamp.Next(), k.Key.Clone())
}
if len(vRaw) == 0 {
// The latest version of the key is a point tombstone.
return true, true, nil
}

// The latest key is a non-tombstoned point key. Conduct predicate filtering.
if k.Timestamp.LessEq(predicates.StartTime) {
return false, false, nil
}

// TODO (msbutler): use MVCCValueHeader to match on job ID predicate
_, err := DecodeMVCCValue(vRaw)
if err != nil {
return false, false, err
}
return true, false, nil
}

flushDeleteKeys := func(nonMatch MVCCKey) error {
if runSize == 0 {
return nil
}
if runSize >= rangeTombstoneThreshold ||
// Even if we didn't get a large enough number of keys to switch to
// using range tombstones, the byte size of the keys we did get is now too large to
// encode them all within the byte size limit, so use a range tombstone anyway.
batchByteSize+runByteSize >= maxBatchByteSize {
if err := MVCCDeleteRangeUsingTombstone(ctx, rw, ms,
runStart.Key, nonMatch.Key, endTime, localTimestamp, leftPeekBound, rightPeekBound,
maxIntents); err != nil {
return err
}
batchByteSize += int64(runStart.EncodedSize() + nonMatch.EncodedSize())
batchSize++
} else if runSize > 0 {
// Use Point tombstones
batchByteSize += runByteSize
batchSize += runSize
_, _, _, err := MVCCDeleteRange(
ctx, rw, ms, runStart.Key, nonMatch.Key,
0, endTime, localTimestamp, nil, false)
if err != nil {
return err
}
}
runSize = 0
runStart = MVCCKey{}
runEnd = MVCCKey{}
return nil
}

// Using the IncrementalIterator with the time-bound iter optimization could
// potentially be a big win here -- the expected use-case for this is to run
// over an entire table's span with a very recent timestamp, issuing tombstones to
// writes of some failed IMPORT and that could very likely only have hit
// some small subset of the table's keyspace.
//
// The MVCCIncrementalIterator uses a non-time-bound iter as its source
// of truth, and only uses the TBI iterator as an optimization when finding
// the next KV to iterate over. This pattern allows us to quickly skip over
// swaths of uninteresting keys, but then iterates over the latest key of each MVCC key.
//
// Notice that the iterator's EndTime is set to hlc.MaxTimestamp, in order to
// detect and fail on any keys written at or after the client provided
// endTime. We don't _expect_ to hit intents or newer keys in the client
// provided span since the PredicateMVCCDeleteRange is only intended for
// non-live key spans, but there could be an intent leftover.
iter := NewMVCCIncrementalIterator(rw, MVCCIncrementalIterOptions{
EndKey: endKey,
StartTime: predicates.StartTime,
EndTime: hlc.MaxTimestamp,
RangeKeyMaskingBelow: endTime,
KeyTypes: IterKeyTypePointsAndRanges,
})
defer iter.Close()

iter.SeekGE(MVCCKey{Key: startKey})
for {
if ok, err := iter.Valid(); err != nil {
return nil, err
} else if !ok {
break
}
k := iter.UnsafeKey()
toContinue, isTombstone, err := continueRun(k, iter)
if err != nil {
return nil, errors.CombineErrors(err, flushDeleteKeys(k))
}
if isTombstone {
if hasPoint, hasRange := iter.HasPointAndRange(); hasRange && !hasPoint {
// Because range key information can be inferred at point keys,
// skip over the surfaced range key, and reason about shadowed keys at
// the surfaced point key.
//
// E.g. Scanning the keys below:
// 2 a2
// 1 o---o
// a b
//
// would result in two surfaced keys:
// {a-b}@1;
// a2, {a-b}@1
//
// Note that the range key gets surfaced before the point key,
// even though the point key shadows it.
iter.NextIgnoringTime()
} else {
iter.NextKeyIgnoringTime()
}
} else if toContinue {
if batchSize+runSize >= maxBatchSize || batchByteSize+runByteSize >= maxBatchByteSize {
// The matched key will be the start the resume span.
if err := flushDeleteKeys(MVCCKey{Key: k.Key}); err != nil {
return nil, err
}
return &roachpb.Span{Key: append([]byte{}, k.Key...), EndKey: endKey}, nil
}
if runSize == 0 {
runStart.Key = append(runStart.Key[:0], k.Key...)
runStart.Timestamp = k.Timestamp
}

runEnd.Key = append(runEnd.Key[:0], k.Key...)
runEnd.Timestamp = k.Timestamp

runSize++
runByteSize += int64(k.EncodedSize())

// Move the iterator to the next key/value in linear iteration even if it
// lies outside (startTime, endTime), to see if there's a need to flush.
iter.NextKeyIgnoringTime()
} else {
// This key does not match. Flush the run of matching keys,
// to prevent issuing tombstones on keys that do not match the predicates.
if err := flushDeleteKeys(k); err != nil {
return nil, err
}
// Move the incremental iterator to the next valid MVCC key that can be
// deleted. If TBI was enabled when initializing the incremental iterator,
// this step could jump over large swaths of keys that do not qualify for
// clearing.
iter.NextKey()
}
}

return nil, flushDeleteKeys(MVCCKey{Key: endKey})
}

// MVCCDeleteRangeUsingTombstone deletes the given MVCC keyspan at the given
// timestamp using an MVCC range tombstone (rather than MVCC point tombstones).
// This operation is non-transactional, but will check for existing intents and
Expand Down
Loading

0 comments on commit 90b2956

Please sign in to comment.