Skip to content

Commit

Permalink
batcheval: add MVCC-compliant RevertRange variant
Browse files Browse the repository at this point in the history
This adds a new parameter `ExperimentalPreserveHistory` which, rather
than clearing keys above the target time, will write new values or
tombstones that reflect the state at the target time. For long runs of
new keys, this will instead drop an MVCC range tombstone. This makes the
command respect e.g. MVCC immutability, the closed timestamp, and
timestamp cache.

Note that MVCC range tombstones are currently experimental, and as such
this parameter is also experimental. Callers must call
`storage.CanUseExperimentalMVCCRangeTombstones()` before using it.

Release note: None
  • Loading branch information
erikgrinaker committed Feb 13, 2022
1 parent dac98d4 commit 61df93e
Show file tree
Hide file tree
Showing 8 changed files with 635 additions and 29 deletions.
45 changes: 30 additions & 15 deletions pkg/kv/kvserver/batcheval/cmd_revert_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ func isEmptyKeyTimeRange(
// that there is *a* key in the SST that is in the time range. Thus we should
// proceed to iteration that actually checks timestamps on each key.
iter := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
LowerBound: from, UpperBound: to,
MinTimestampHint: since.Next() /* make exclusive */, MaxTimestampHint: until,
// TODO(erikgrinaker): Make sure TBIs respect range keys too.
KeyTypes: storage.IterKeyTypePointsAndRanges, // revert any range keys as well
LowerBound: from,
UpperBound: to,
MinTimestampHint: since.Next(), // exclusive
MaxTimestampHint: until,
})
defer iter.Close()
iter.SeekGE(storage.MVCCKey{Key: from})
Expand All @@ -78,29 +82,40 @@ func RevertRange(

args := cArgs.Args.(*roachpb.RevertRangeRequest)
reply := resp.(*roachpb.RevertRangeResponse)
pd := result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
MVCCHistoryMutation: &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{
Spans: []roachpb.Span{{Key: args.Key, EndKey: args.EndKey}},
},
},
}

if empty, err := isEmptyKeyTimeRange(
readWriter, args.Key, args.EndKey, args.TargetTime, cArgs.Header.Timestamp,
); err != nil {
return result.Result{}, err
} else if empty {
log.VEventf(ctx, 2, "no keys to clear in specified time range")
log.VEventf(ctx, 2, "no keys to revert in specified time range")
return result.Result{}, nil
}

log.VEventf(ctx, 2, "clearing keys with timestamp (%v, %v]", args.TargetTime, cArgs.Header.Timestamp)
log.VEventf(ctx, 2, "reverting keys with timestamp (%v, %v]",
args.TargetTime, cArgs.Header.Timestamp)

resume, err := storage.MVCCClearTimeRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey,
args.TargetTime, cArgs.Header.Timestamp, cArgs.Header.MaxSpanRequestKeys,
maxRevertRangeBatchBytes,
args.EnableTimeBoundIteratorOptimization)
var pd result.Result
var resume *roachpb.Span
var err error
if args.ExperimentalPreserveHistory {
const deleteRangeThreshold = 100
maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
// TODO(erikgrinaker): Write a test for this once MVCC range tombstones are
// properly written to batches and replicated.
// TODO(erikgrinaker): Test that this records MVCC logical ops correctly.
resume, err = storage.ExperimentalMVCCRevertRange(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, cArgs.Header.Timestamp, args.TargetTime, deleteRangeThreshold,
cArgs.Header.MaxSpanRequestKeys, maxRevertRangeBatchBytes, maxIntents)
} else {
resume, err = storage.MVCCClearTimeRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey,
args.TargetTime, cArgs.Header.Timestamp, cArgs.Header.MaxSpanRequestKeys,
maxRevertRangeBatchBytes,
args.EnableTimeBoundIteratorOptimization)
pd.Replicated.MVCCHistoryMutation = &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{
Spans: []roachpb.Span{{Key: args.Key, EndKey: args.EndKey}},
}
}
if err != nil {
return result.Result{}, err
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,8 @@ func (crr *ClearRangeRequest) ShallowCopy() Request {
}

// ShallowCopy implements the Request interface.
func (crr *RevertRangeRequest) ShallowCopy() Request {
shallowCopy := *crr
func (rrr *RevertRangeRequest) ShallowCopy() Request {
shallowCopy := *rrr
return &shallowCopy
}

Expand Down Expand Up @@ -1263,7 +1263,13 @@ func (*ClearRangeRequest) flags() flag { return isWrite | isRange | isAlone }

// Note that RevertRange commands cannot be part of a transaction as
// they clear all MVCC versions above their target time.
func (*RevertRangeRequest) flags() flag { return isWrite | isRange }
func (rrr *RevertRangeRequest) flags() flag {
if rrr.ExperimentalPreserveHistory {
return isRead | isWrite | isRange | isAlone | updatesTSCache | appliesTSCache
}
// TODO(erikgrinaker): add isAlone?
return isWrite | isRange
}

func (sr *ScanRequest) flags() flag {
maybeLocking := flagForLockStrength(sr.KeyLocking)
Expand Down
26 changes: 21 additions & 5 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -402,18 +402,34 @@ message ClearRangeResponse {
}


// A RevertRangeRequest specifies a range of keys in which to clear all MVCC
// revisions more recent than some TargetTime from the underlying engine, thus
// reverting the range (from the perspective of an MVCC scan) to that time.
// A RevertRangeRequest specifies a range of keys to revert to some past time.
// By default, it will clear all revision more recent that TargetTime from the
// underlying engine. However, this violates several guarantees including MVCC
// immutability, the closed timestamp, timestamp cache, and others. See the
// ExperimentalPreserveHistory parameter which will uphold these guarantees.
message RevertRangeRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

// TargetTime specifies a the time to which to "revert" the range by clearing
// any MVCC key with a strictly higher timestamp. TargetTime must be higher
// TargetTime specifies a the time to which to "revert" the range to. Any
// versions later than TargetTime will be undone. TargetTime must be higher
// than the GC Threshold for the replica - so that it is assured that the keys
// for that time are still there — or the request will fail.
util.hlc.Timestamp target_time = 2 [(gogoproto.nullable) = false];

// ExperimentalPreserveHistory will preserve MVCC history by, rather than
// clearing newer versions, deleting them using tombstones or updating them
// back to their original value as of the target time. Long runs of key
// deletions will use an MVCC range tombstone instead. This respects the
// closed timestamp and timestamp cache.
//
// The caller must check storage.CanUseExperimentalMVCCRangeTombstones()
// before enabling this parameter.
//
// This parameter is EXPERIMENTAL: range tombstones are under active
// development, and have severe limitations including being ignored by all
// KV and MVCC APIs and only being stored in memory.
bool experimental_preserve_history = 5;

bool enable_time_bound_iterator_optimization = 3;

// IgnoreGcThreshold can be set by a caller to ignore the target-time when
Expand Down
1 change: 1 addition & 0 deletions pkg/roachpb/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func TestFlagCombinations(t *testing.T) {
&DeleteRangeRequest{UseExperimentalRangeTombstone: true},
&GetRequest{KeyLocking: lock.Exclusive},
&ReverseScanRequest{KeyLocking: lock.Exclusive},
&RevertRangeRequest{ExperimentalPreserveHistory: true},
&ScanRequest{KeyLocking: lock.Exclusive},
}

Expand Down
222 changes: 217 additions & 5 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,8 @@ func MVCCMerge(
//
// If the underlying iterator encounters an intent with a timestamp in the span
// (startTime, endTime], or any inline meta, this method will return an error.
//
// TODO(erikgrinaker): This should clear any range keys as well.
func MVCCClearTimeRange(
_ context.Context,
rw ReadWriter,
Expand Down Expand Up @@ -2230,9 +2232,6 @@ func MVCCDeleteRange(
//
// This function is EXPERIMENTAL. Range tombstones are not supported throughout
// the MVCC API, and the on-disk format is unstable.
//
// TODO(erikgrinaker): Needs conflict handling, e.g. WriteTooOldError.
// TODO(erikgrinaker): Needs MVCCStats handling.
func ExperimentalMVCCDeleteRangeUsingTombstone(
ctx context.Context,
rw ReadWriter,
Expand All @@ -2246,8 +2245,16 @@ func ExperimentalMVCCDeleteRangeUsingTombstone(
} else if len(intents) > 0 {
return &roachpb.WriteIntentError{Intents: intents}
}
return rw.ExperimentalPutMVCCRangeKey(MVCCRangeKey{
StartKey: startKey, EndKey: endKey, Timestamp: timestamp}, nil)
return experimentalMVCCDeleteRangeUsingTombstoneInternal(ctx, rw, ms, MVCCRangeKey{
StartKey: startKey, EndKey: endKey, Timestamp: timestamp})
}

// TODO(erikgrinaker): Needs MVCCStats handling.
// TODO(erikgrinaker): Needs conflict handling, e.g. WriteTooOldError.
func experimentalMVCCDeleteRangeUsingTombstoneInternal(
ctx context.Context, rw ReadWriter, ms *enginepb.MVCCStats, rangeKey MVCCRangeKey,
) error {
return rw.ExperimentalPutMVCCRangeKey(rangeKey, nil)
}

func recordIteratorStats(traceSpan *tracing.Span, iteratorStats IteratorStats) {
Expand All @@ -2266,6 +2273,211 @@ func recordIteratorStats(traceSpan *tracing.Span, iteratorStats IteratorStats) {
}
}

// ExperimentalMVCCRevertRange will revert a range back to its state as of some
// past timestamp, writing tombstones or key updates as appropriate at the given
// write timestamp. Long runs of key deletions will be written using MVCC range
// tombstones.
//
// This function cannot be used in a transaction. However, it will scan for
// existing intents and return a WriteIntentError, and scan for newer writes
// and return WriteTooOldError.
//
// This function is EXPERIMENTAL. Range tombstones are not supported throughout
// the MVCC API, and the on-disk format is unstable.
//
// TODO(erikgrinaker): Handle range keys.
func ExperimentalMVCCRevertRange(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
startKey, endKey roachpb.Key,
writeTimestamp hlc.Timestamp,
revertTimestamp hlc.Timestamp,
deleteRangeThreshold int,
maxBatchSize int64,
maxBatchBytes int64,
maxIntents int64,
) (*roachpb.Span, error) {
// We must resolve any intents within the span, so we may as well scan for
// separated intents before doing any work.
if intents, err := ScanIntents(ctx, rw, startKey, endKey, maxIntents, 0); err != nil {
return nil, err
} else if len(intents) > 0 {
return nil, &roachpb.WriteIntentError{Intents: intents}
}

// We accumulate point deletes in deleteBuf until we either reach
// deleteRangeThreshold and switch to using a range deletion tombstone
// anchored at deleteRangeStart, or until we hit a visible key at which
// point we flush the deleteBuf as point deletes.
var deleteRangeStart roachpb.Key
var deleteBuf []roachpb.Key
var deleteBufIdx int
var deleteBufBytes int64
if deleteRangeThreshold > 1 {
deleteBuf = make([]roachpb.Key, deleteRangeThreshold-1)
}

putBuf := newPutBuffer()
defer putBuf.release()

var batchSize, batchBytes int64

flushDeletes := func(nonMatch roachpb.Key) error {
if len(deleteRangeStart) > 0 {
err := experimentalMVCCDeleteRangeUsingTombstoneInternal(ctx, rw, ms, MVCCRangeKey{
StartKey: deleteRangeStart, EndKey: nonMatch, Timestamp: writeTimestamp})
deleteRangeStart = nil
batchBytes += int64(encodedMVCCKeyLength(MVCCKey{Key: nonMatch})) // account for end key
return err
}

if deleteBufIdx > 0 {
iter := newMVCCIterator(rw, false, IterOptions{Prefix: true})
defer iter.Close()
for i := 0; i < deleteBufIdx; i++ {
err := mvccPutInternal(
ctx, rw, iter, ms, deleteBuf[i], writeTimestamp, nil, nil, putBuf, nil)
if err != nil {
return err
}
}
deleteBufIdx = 0
deleteBufBytes = 0
}
return nil
}

revert := func(k roachpb.Key, v []byte) (*roachpb.Key, error) {
// For non-deletions, we have to flush any pending deletes first. This may also
// flush a range tombstone, which will add to batchBytes.
if len(v) > 0 {
if err := flushDeletes(k); err != nil {
return nil, err
}
}

// If the batch is full, return a resume key after flushing any deletes.
if batchSize >= maxBatchSize || batchBytes >= maxBatchBytes {
err := flushDeletes(k)
return &k, err
}
bytes := int64(encodedMVCCKeyLength(MVCCKey{Key: k, Timestamp: writeTimestamp}) + len(v))

if len(v) > 0 || len(deleteBuf) == 0 {
batchSize++
batchBytes += bytes
iter := newMVCCIterator(rw, false, IterOptions{Prefix: true})
defer iter.Close()
return nil, mvccPutInternal(ctx, rw, iter, ms, k, writeTimestamp, v, nil, putBuf, nil)

} else if len(deleteRangeStart) == 0 {
// We're currently buffering point deletions.
if deleteBufIdx < len(deleteBuf) {
deleteBuf[deleteBufIdx] = append(deleteBuf[deleteBufIdx][:0], k...)
deleteBufIdx++
deleteBufBytes += bytes
batchSize++
batchBytes += bytes
} else {
// Buffer is full -- switch to tracking the start of the range delete. We
// remove the buffered keys from the batch size, and instead only track
// the range key.
batchSize -= int64(deleteBufIdx) - 1 // -1 accounts for the range key
batchBytes -= deleteBufBytes -
int64(encodedMVCCKeyLength(MVCCKey{Key: deleteBuf[0], Timestamp: writeTimestamp}))
deleteRangeStart = deleteBuf[0]
deleteBufIdx = 0
deleteBufBytes = 0
}
}
return nil, nil
}

// We set up an ingremental iterator from the revert time to look for any
// changes that need to be reverted. However, we also need to inspect older
// values to e.g. find the value to revert to or make sure we don't drop range
// tombstones across them -- we do this by using the IgnoringTime() methods on
// the MVCCIncrementalIterator.
iter := NewMVCCIncrementalIterator(rw, MVCCIncrementalIterOptions{
EnableTimeBoundIteratorOptimization: true,
EndKey: endKey,
StartTime: revertTimestamp,
EndTime: writeTimestamp, // puts will error on any newer versions
})
defer iter.Close()

var revertKey roachpb.Key
var revertValue, revertValueFrom []byte
iter.SeekGE(MVCCKey{Key: startKey})
for {
if ok, err := iter.Valid(); err != nil {
return nil, err
} else if !ok {
break
}

key := iter.UnsafeKey()

if key.Timestamp.IsEmpty() {
return nil, errors.Errorf("encountered inline key %s", key)
}

// If a key was scheduled for reversion, revert it when the key changes,
// but only if the original value differs from the latest value.
if len(revertKey) > 0 && !revertKey.Equal(key.Key) {
if !bytes.Equal(revertValue, revertValueFrom) {
if resumeKey, err := revert(revertKey, revertValue); err != nil || resumeKey != nil {
return &roachpb.Span{Key: *resumeKey, EndKey: endKey}, err
}
}
revertKey, revertValue, revertValueFrom = nil, nil, nil // TODO(erikgrinaker): reuse slices
}

if revertTimestamp.Less(key.Timestamp) {
// Schedule this key for reversion.
if len(revertKey) == 0 {
// TODO(erikgrinaker): reuse byte slices
revertKey = key.Key.Clone()
revertValueFrom = append([]byte(nil), iter.Value()...)
}

// Move the iterator to the next key, even if <= resumeTimestamp. If it
// finds an old version of this key, it will set the value to revert to.
iter.NextIgnoringTime()

} else if bytes.Equal(revertKey, key.Key) {
// This is the version of revertKey that we should revert back to.
revertValue = append([]byte(nil), iter.Value()...)
iter.Next()

} else {
// This is a different key at or below the revert timestamp. If it's not a
// tombstone then we have to flush any deletes up to here to avoid
// dropping a range tombstone across it.
if len(iter.Value()) > 0 {
if err := flushDeletes(key.Key); err != nil {
return nil, err
}
iter.Next()
} else {
// If this is a tombstone, we have to move to the next key (ignoring
// TBI) to check that it is also a tombstone. Otherwise, we can end up
// dropping a range tombstone across a visible key.
iter.NextKeyIgnoringTime()
}
}
}

// Handle a revert at the very end of the iteration.
if len(revertKey) > 0 && !bytes.Equal(revertValue, revertValueFrom) {
if resumeKey, err := revert(revertKey, revertValue); err != nil || resumeKey != nil {
return &roachpb.Span{Key: *resumeKey, EndKey: endKey}, err
}
}
return nil, flushDeletes(endKey)
}

func mvccScanToBytes(
ctx context.Context,
iter MVCCIterator,
Expand Down
Loading

0 comments on commit 61df93e

Please sign in to comment.