From d38b734c57b5777ef13d603e9b63232f5e8301dc Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 17 Feb 2022 09:56:58 +0000 Subject: [PATCH 1/3] roachpb: add `isAlone` for `RevertRangeRequest` Since `RevertRange` mutates MVCC history, we want them to be alone in a batch. The DistSender will split any batches that have multiple such requests. Release note: None --- pkg/roachpb/api.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 1e80c4ccf582..7d484b1d65b3 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1272,7 +1272,9 @@ func (*ClearRangeRequest) flags() flag { // Note that RevertRange commands cannot be part of a transaction as // they clear all MVCC versions above their target time. -func (*RevertRangeRequest) flags() flag { return isWrite | isRange | bypassesReplicaCircuitBreaker } +func (*RevertRangeRequest) flags() flag { + return isWrite | isRange | isAlone | bypassesReplicaCircuitBreaker +} func (sr *ScanRequest) flags() flag { maybeLocking := flagForLockStrength(sr.KeyLocking) From 18ab26e4b129b0e53a8759d362d70c7b1588c574 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 17 Feb 2022 10:27:05 +0000 Subject: [PATCH 2/3] storage: add `NextKeyIgnoringTime()` for `MVCCIncrementalIterator` This patch adds a method `NextKeyIgnoringTime()` for `MVCCIncrementalIterator`. This can be used to find the next key (as opposed to version) of the iterator, ignoring the time bounds. It's similar to `NextIgnoringTime()`, but calls `NextKey()` instead of `Next()` on the underlying iterator. Release note: None --- pkg/storage/mvcc_incremental_iterator.go | 32 ++- pkg/storage/mvcc_incremental_iterator_test.go | 208 ++++++++++++++++++ 2 files changed, 238 insertions(+), 2 deletions(-) diff --git a/pkg/storage/mvcc_incremental_iterator.go b/pkg/storage/mvcc_incremental_iterator.go index a199c7329cd9..2aa1a2a0d07b 100644 --- a/pkg/storage/mvcc_incremental_iterator.go +++ b/pkg/storage/mvcc_incremental_iterator.go @@ -508,8 +508,8 @@ func (i *MVCCIncrementalIterator) UnsafeValue() []byte { // NextIgnoringTime returns the next key/value that would be encountered in a // non-incremental iteration by moving the underlying non-TBI iterator forward. -// This method throws an error if it encounters an intent in the time range -// (startTime, endTime] or sees an inline value. +// Intents in the time range (startTime,EndTime] and inline values are handled +// according to the iterator policy. func (i *MVCCIncrementalIterator) NextIgnoringTime() { for { i.iter.Next() @@ -533,6 +533,34 @@ func (i *MVCCIncrementalIterator) NextIgnoringTime() { } } +// NextKeyIgnoringTime returns the next distinct key that would be encountered +// in a non-incremental iteration by moving the underlying non-TBI iterator +// forward. Intents in the time range (startTime,EndTime] and inline values are +// handled according to the iterator policy. +func (i *MVCCIncrementalIterator) NextKeyIgnoringTime() { + i.iter.NextKey() + for { + if !i.checkValidAndSaveErr() { + return + } + + if err := i.initMetaAndCheckForIntentOrInlineError(); err != nil { + return + } + + // We have encountered an intent but it does not lie in the timestamp span + // (startTime, endTime] so we do not throw an error, and attempt to move to + // the next valid KV. + if i.meta.Txn != nil && i.intentPolicy != MVCCIncrementalIterIntentPolicyEmit { + i.Next() + continue + } + + // We have a valid KV or an intent to emit. + return + } +} + // NumCollectedIntents returns number of intents encountered during iteration. // This is only the case when intent aggregation is enabled, otherwise it is // always 0. diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index 53d60d36b18a..cdc8bf454dfb 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -297,6 +297,37 @@ func nextIgnoreTimeExpectErr( } } +func nextKeyIgnoreTimeExpectErr( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + errString string, +) { + // The semantics of the methods NextIgnoringTime() should not change whether + // or not we enable the TBI optimization. + for _, useTBI := range []bool{true, false} { + t.Run(fmt.Sprintf("useTBI-%t", useTBI), func(t *testing.T) { + iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ + EndKey: endKey, + EnableTimeBoundIteratorOptimization: useTBI, + StartTime: startTime, + EndTime: endTime, + }) + defer iter.Close() + for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iter.NextKeyIgnoringTime() { + if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break + } + // pass + } + if _, err := iter.Valid(); !testutils.IsError(err, errString) { + t.Fatalf("expected error %q but got %v", errString, err) + } + }) + } +} + func assertNextIgnoreTimeIteratedKVs( t *testing.T, e Engine, @@ -340,6 +371,49 @@ func assertNextIgnoreTimeIteratedKVs( } } +func assertNextKeyIgnoreTimeIteratedKVs( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + expected []MVCCKeyValue, +) { + // The semantics of the methods NextKeyIgnoringTime() should not change whether + // or not we enable the TBI optimization. + for _, useTBI := range []bool{true, false} { + t.Run(fmt.Sprintf("useTBI-%t", useTBI), func(t *testing.T) { + iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ + EndKey: endKey, + EnableTimeBoundIteratorOptimization: useTBI, + StartTime: startTime, + EndTime: endTime, + }) + defer iter.Close() + var kvs []MVCCKeyValue + for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iter.NextKeyIgnoringTime() { + if ok, err := iter.Valid(); err != nil { + t.Fatalf("unexpected error: %+v", err) + } else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break + } + kvs = append(kvs, MVCCKeyValue{Key: iter.Key(), Value: iter.Value()}) + } + + if len(kvs) != len(expected) { + t.Fatalf("got %d kvs but expected %d: %v", len(kvs), len(expected), kvs) + } + for i := range kvs { + if !kvs[i].Key.Equal(expected[i].Key) { + t.Fatalf("%d key: got %v but expected %v", i, kvs[i].Key, expected[i].Key) + } + if !bytes.Equal(kvs[i].Value, expected[i].Value) { + t.Fatalf("%d value: got %x but expected %x", i, kvs[i].Value, expected[i].Value) + } + } + }) + } +} + func assertIteratedKVs( t *testing.T, e Engine, @@ -557,6 +631,140 @@ func TestMVCCIncrementalIteratorNextIgnoringTime(t *testing.T) { } } +// TestMVCCIncrementalIteratorNextKeyIgnoringTime tests the iteration semantics +// of the method NextKeyIgnoreTime(). This method is supposed to return all new +// keys that would be encountered in a non-incremental iteration. +func TestMVCCIncrementalIteratorNextKeyIgnoringTime(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + var ( + keyMax = roachpb.KeyMax + testKey1 = roachpb.Key("/db1") + testKey2 = roachpb.Key("/db2") + + testValue1 = []byte("val1") + testValue2 = []byte("val2") + testValue3 = []byte("val3") + testValue4 = []byte("val4") + + // Use a non-zero min, since we use IsEmpty to decide if a ts should be used + // as upper/lower-bound during iterator initialization. + tsMin = hlc.Timestamp{WallTime: 0, Logical: 1} + ts1 = hlc.Timestamp{WallTime: 1, Logical: 0} + ts2 = hlc.Timestamp{WallTime: 2, Logical: 0} + ts3 = hlc.Timestamp{WallTime: 3, Logical: 0} + ts4 = hlc.Timestamp{WallTime: 4, Logical: 0} + tsMax = hlc.Timestamp{WallTime: math.MaxInt64, Logical: 0} + ) + + kv1_1_1 := makeKVT(testKey1, testValue1, ts1) + kv1_2_2 := makeKVT(testKey1, testValue2, ts2) + kv2_2_2 := makeKVT(testKey2, testValue3, ts2) + kv1_3Deleted := makeKVT(testKey1, nil, ts3) + + for _, engineImpl := range mvccEngineImpls { + t.Run(engineImpl.name, func(t *testing.T) { + e := engineImpl.create() + defer e.Close() + + t.Run("empty", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, tsMin, ts3, nil) + }) + + for _, kv := range kvs(kv1_1_1, kv1_2_2, kv2_2_2) { + v := roachpb.Value{RawBytes: kv.Value} + if err := MVCCPut(ctx, e, nil, kv.Key.Key, kv.Key.Timestamp, v, nil); err != nil { + t.Fatal(err) + } + } + + // Exercise time ranges. + t.Run("ts (0-0]", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, tsMin, tsMin, nil) + }) + // Returns the kv_2_2_2 even though it is outside (startTime, endTime]. + t.Run("ts (0-1]", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, tsMin, ts1, kvs(kv1_1_1, kv2_2_2)) + }) + t.Run("ts (0-∞]", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, tsMin, tsMax, kvs(kv1_2_2, kv2_2_2)) + }) + t.Run("ts (1-1]", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, ts1, ts1, nil) + }) + t.Run("ts (1-2]", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, ts1, ts2, kvs(kv1_2_2, kv2_2_2)) + }) + t.Run("ts (2-2]", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, ts2, ts2, nil) + }) + + // Exercise key ranges. + t.Run("kv [1-1)", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, testKey1, testKey1, tsMin, tsMax, nil) + }) + t.Run("kv [1-2)", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, testKey1, testKey2, tsMin, tsMax, kvs(kv1_2_2)) + }) + + // Exercise deletion. + if err := MVCCDelete(ctx, e, nil, testKey1, ts3, nil); err != nil { + t.Fatal(err) + } + // Returns the kv_1_1_1 even though it is outside (startTime, endTime]. + t.Run("del", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, ts1, tsMax, kvs(kv1_3Deleted, + kv2_2_2)) + }) + + // Insert an intent of testKey2. + txn1ID := uuid.MakeV4() + txn1 := roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + Key: testKey2, + ID: txn1ID, + Epoch: 1, + WriteTimestamp: ts4, + }, + ReadTimestamp: ts4, + } + txn1Val := roachpb.Value{RawBytes: testValue4} + if err := MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.ReadTimestamp, txn1Val, &txn1); err != nil { + t.Fatal(err) + } + + // We have to be careful that we are testing the intent handling logic of + // NextIgnoreTime() rather than the first SeekGE(). We do this by + // ensuring that the SeekGE() doesn't encounter an intent. + t.Run("intents", func(t *testing.T) { + nextKeyIgnoreTimeExpectErr(t, e, testKey1, testKey2.PrefixEnd(), tsMin, tsMax, "conflicting intents") + }) + t.Run("intents", func(t *testing.T) { + nextKeyIgnoreTimeExpectErr(t, e, localMax, keyMax, tsMin, ts4, "conflicting intents") + }) + // Intents above the upper time bound or beneath the lower time bound must + // be ignored. Note that the lower time bound is exclusive while the upper + // time bound is inclusive. + // + // The intent at ts=4 for kv2 lies outside the timespan + // (startTime, endTime] so we do not raise an error and just move on to + // its versioned KV. + t.Run("intents", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, tsMin, ts3, kvs(kv1_3Deleted, + kv2_2_2)) + }) + t.Run("intents", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, ts4, tsMax, kvs()) + }) + t.Run("intents", func(t *testing.T) { + assertNextKeyIgnoreTimeIteratedKVs(t, e, localMax, keyMax, ts4.Next(), tsMax, kvs()) + }) + }) + } +} + func TestMVCCIncrementalIteratorInlinePolicy(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) From e1acd7529ae8cf7b699c55cb99d340cf7a030a8d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 12 Feb 2022 17:11:58 +0000 Subject: [PATCH 3/3] batcheval: add MVCC-compliant `RevertRange` variant This adds a new parameter `ExperimentalPreserveHistory` to `RevertRange` which, rather than clearing keys above the target time, will write new values or tombstones that reflect the state at the target time. For long runs of deleted keys, this will instead drop an MVCC range tombstone. This makes the command respect e.g. MVCC immutability, the closed timestamp, and timestamp cache. Note that MVCC range tombstones are currently experimental, and as such this parameter is also experimental. Callers must call `storage.CanUseExperimentalMVCCRangeTombstones()` before using it. Release note: None --- pkg/kv/kvserver/batcheval/cmd_revert_range.go | 44 ++- pkg/roachpb/api.go | 15 +- pkg/roachpb/api.proto | 26 +- pkg/roachpb/api_test.go | 1 + pkg/storage/mvcc.go | 233 ++++++++++++++- pkg/storage/mvcc_history_test.go | 57 +++- .../testdata/mvcc_histories/revert_range | 275 ++++++++++++++++++ 7 files changed, 620 insertions(+), 31 deletions(-) create mode 100644 pkg/storage/testdata/mvcc_histories/revert_range diff --git a/pkg/kv/kvserver/batcheval/cmd_revert_range.go b/pkg/kv/kvserver/batcheval/cmd_revert_range.go index 7e7fe6e05a58..1afcfbd39f87 100644 --- a/pkg/kv/kvserver/batcheval/cmd_revert_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_revert_range.go @@ -51,8 +51,12 @@ func isEmptyKeyTimeRange( // that there is *a* key in the SST that is in the time range. Thus we should // proceed to iteration that actually checks timestamps on each key. iter := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ - LowerBound: from, UpperBound: to, - MinTimestampHint: since.Next() /* make exclusive */, MaxTimestampHint: until, + // TODO(erikgrinaker): Make sure TBIs respect range keys too. + KeyTypes: storage.IterKeyTypePointsAndRanges, // revert any range keys as well + LowerBound: from, + UpperBound: to, + MinTimestampHint: since.Next(), // exclusive + MaxTimestampHint: until, }) defer iter.Close() iter.SeekGE(storage.MVCCKey{Key: from}) @@ -78,29 +82,39 @@ func RevertRange( args := cArgs.Args.(*roachpb.RevertRangeRequest) reply := resp.(*roachpb.RevertRangeResponse) - pd := result.Result{ - Replicated: kvserverpb.ReplicatedEvalResult{ - MVCCHistoryMutation: &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ - Spans: []roachpb.Span{{Key: args.Key, EndKey: args.EndKey}}, - }, - }, - } if empty, err := isEmptyKeyTimeRange( readWriter, args.Key, args.EndKey, args.TargetTime, cArgs.Header.Timestamp, ); err != nil { return result.Result{}, err } else if empty { - log.VEventf(ctx, 2, "no keys to clear in specified time range") + log.VEventf(ctx, 2, "no keys to revert in specified time range") return result.Result{}, nil } - log.VEventf(ctx, 2, "clearing keys with timestamp (%v, %v]", args.TargetTime, cArgs.Header.Timestamp) + log.VEventf(ctx, 2, "reverting keys with timestamp (%v, %v]", + args.TargetTime, cArgs.Header.Timestamp) - resume, err := storage.MVCCClearTimeRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, - args.TargetTime, cArgs.Header.Timestamp, cArgs.Header.MaxSpanRequestKeys, - maxRevertRangeBatchBytes, - args.EnableTimeBoundIteratorOptimization) + var pd result.Result + var resume *roachpb.Span + var err error + if args.ExperimentalPreserveHistory { + const deleteRangeThreshold = 100 + maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV) + // TODO(erikgrinaker): Write a test for this once MVCC range tombstones are + // properly written to batches and replicated. + // TODO(erikgrinaker): Test that this records MVCC logical ops correctly. + resume, err = storage.ExperimentalMVCCRevertRange(ctx, readWriter, cArgs.Stats, + args.Key, args.EndKey, cArgs.Header.Timestamp, args.TargetTime, deleteRangeThreshold, + cArgs.Header.MaxSpanRequestKeys, maxRevertRangeBatchBytes, maxIntents) + } else { + resume, err = storage.MVCCClearTimeRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, + args.TargetTime, cArgs.Header.Timestamp, cArgs.Header.MaxSpanRequestKeys, + maxRevertRangeBatchBytes, args.EnableTimeBoundIteratorOptimization) + pd.Replicated.MVCCHistoryMutation = &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ + Spans: []roachpb.Span{{Key: args.Key, EndKey: args.EndKey}}, + } + } if err != nil { return result.Result{}, err } diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 7d484b1d65b3..2514b5ef09e3 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -778,8 +778,8 @@ func (crr *ClearRangeRequest) ShallowCopy() Request { } // ShallowCopy implements the Request interface. -func (crr *RevertRangeRequest) ShallowCopy() Request { - shallowCopy := *crr +func (rrr *RevertRangeRequest) ShallowCopy() Request { + shallowCopy := *rrr return &shallowCopy } @@ -1270,9 +1270,14 @@ func (*ClearRangeRequest) flags() flag { return isWrite | isRange | isAlone | bypassesReplicaCircuitBreaker } -// Note that RevertRange commands cannot be part of a transaction as -// they clear all MVCC versions above their target time. -func (*RevertRangeRequest) flags() flag { +// Note that RevertRange commands cannot be part of a transaction, as they +// either clear MVCC versions or write MVCC range tombstones, neither of which +// is supported within transactions. +func (rrr *RevertRangeRequest) flags() flag { + if rrr.ExperimentalPreserveHistory { + return isRead | isWrite | isRange | isAlone | updatesTSCache | appliesTSCache | + bypassesReplicaCircuitBreaker + } return isWrite | isRange | isAlone | bypassesReplicaCircuitBreaker } diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 2fb632af53f9..23ffd29e018a 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -402,18 +402,34 @@ message ClearRangeResponse { } -// A RevertRangeRequest specifies a range of keys in which to clear all MVCC -// revisions more recent than some TargetTime from the underlying engine, thus -// reverting the range (from the perspective of an MVCC scan) to that time. +// A RevertRangeRequest specifies a range of keys to revert to some past time. +// By default, it will clear all revision more recent that TargetTime from the +// underlying engine. However, this violates several guarantees including MVCC +// immutability, the closed timestamp, timestamp cache, and others. See the +// ExperimentalPreserveHistory parameter which will uphold these guarantees. message RevertRangeRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; - // TargetTime specifies a the time to which to "revert" the range by clearing - // any MVCC key with a strictly higher timestamp. TargetTime must be higher + // TargetTime specifies a the time to which to "revert" the range to. Any + // versions later than TargetTime will be undone. TargetTime must be higher // than the GC Threshold for the replica - so that it is assured that the keys // for that time are still there — or the request will fail. util.hlc.Timestamp target_time = 2 [(gogoproto.nullable) = false]; + // ExperimentalPreserveHistory will preserve MVCC history by, rather than + // clearing newer versions, deleting them using tombstones or updating them + // back to their original value as of the target time. Long runs of key + // deletions will use an MVCC range tombstone instead. This respects the + // closed timestamp and timestamp cache. + // + // The caller must check storage.CanUseExperimentalMVCCRangeTombstones() + // before enabling this parameter. + // + // This parameter is EXPERIMENTAL: range tombstones are under active + // development, and have severe limitations including being ignored by all + // KV and MVCC APIs and only being stored in memory. + bool experimental_preserve_history = 5; + bool enable_time_bound_iterator_optimization = 3; // IgnoreGcThreshold can be set by a caller to ignore the target-time when diff --git a/pkg/roachpb/api_test.go b/pkg/roachpb/api_test.go index dea42892987a..9abfa2cace00 100644 --- a/pkg/roachpb/api_test.go +++ b/pkg/roachpb/api_test.go @@ -321,6 +321,7 @@ func TestFlagCombinations(t *testing.T) { &DeleteRangeRequest{UseExperimentalRangeTombstone: true}, &GetRequest{KeyLocking: lock.Exclusive}, &ReverseScanRequest{KeyLocking: lock.Exclusive}, + &RevertRangeRequest{ExperimentalPreserveHistory: true}, &ScanRequest{KeyLocking: lock.Exclusive}, } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 4fddb07b7515..3bd3548e1e3e 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -1967,6 +1967,8 @@ func MVCCMerge( // // If the underlying iterator encounters an intent with a timestamp in the span // (startTime, endTime], or any inline meta, this method will return an error. +// +// TODO(erikgrinaker): This should clear any range keys as well. func MVCCClearTimeRange( _ context.Context, rw ReadWriter, @@ -2231,9 +2233,6 @@ func MVCCDeleteRange( // This method is EXPERIMENTAL: range keys are under active development, and // have severe limitations including being ignored by all KV and MVCC APIs and // only being stored in memory. -// -// TODO(erikgrinaker): Needs handling of conflicts (e.g. WriteTooOldError), -// MVCCStats, and tests. func ExperimentalMVCCDeleteRangeUsingTombstone( ctx context.Context, rw ReadWriter, @@ -2247,8 +2246,16 @@ func ExperimentalMVCCDeleteRangeUsingTombstone( } else if len(intents) > 0 { return &roachpb.WriteIntentError{Intents: intents} } - return rw.ExperimentalPutMVCCRangeKey(MVCCRangeKey{ - StartKey: startKey, EndKey: endKey, Timestamp: timestamp}, nil) + return experimentalMVCCDeleteRangeUsingTombstoneInternal(ctx, rw, ms, MVCCRangeKey{ + StartKey: startKey, EndKey: endKey, Timestamp: timestamp}) +} + +// TODO(erikgrinaker): Needs handling of conflicts (e.g. WriteTooOldError), +// MVCCStats, and tests. +func experimentalMVCCDeleteRangeUsingTombstoneInternal( + ctx context.Context, rw ReadWriter, ms *enginepb.MVCCStats, rangeKey MVCCRangeKey, +) error { + return rw.ExperimentalPutMVCCRangeKey(rangeKey, nil) } func recordIteratorStats(traceSpan *tracing.Span, iteratorStats IteratorStats) { @@ -2267,6 +2274,222 @@ func recordIteratorStats(traceSpan *tracing.Span, iteratorStats IteratorStats) { } } +// ExperimentalMVCCRevertRange will revert a range back to its state as of some +// past timestamp, writing tombstones or key updates as appropriate at the given +// write timestamp. Long runs of key deletions will be written using MVCC range +// tombstones. +// +// This function cannot be used in a transaction. However, it will scan for +// existing intents and return a WriteIntentError, and scan for newer writes +// and return WriteTooOldError. +// +// This function is EXPERIMENTAL. Range tombstones are not supported throughout +// the MVCC API, and the on-disk format is unstable. +// +// TODO(erikgrinaker): Handle range keys. +func ExperimentalMVCCRevertRange( + ctx context.Context, + rw ReadWriter, + ms *enginepb.MVCCStats, + startKey, endKey roachpb.Key, + writeTimestamp hlc.Timestamp, + revertTimestamp hlc.Timestamp, + deleteRangeThreshold int, + maxBatchSize int64, + maxBatchBytes int64, + maxIntents int64, +) (*roachpb.Span, error) { + // We must resolve any intents within the span, so we may as well scan for + // separated intents before doing any work. + if intents, err := ScanIntents(ctx, rw, startKey, endKey, maxIntents, 0); err != nil { + return nil, err + } else if len(intents) > 0 { + return nil, &roachpb.WriteIntentError{Intents: intents} + } + + // We accumulate point deletes in deleteBuf until we either reach + // deleteRangeThreshold and switch to using a range deletion tombstone + // anchored at deleteRangeStart, or until we hit a visible key at which + // point we flush the deleteBuf as point deletes. + var deleteRangeStart roachpb.Key + var deleteBuf []roachpb.Key + var deleteBufIdx int + var deleteBufBytes int64 + if deleteRangeThreshold > 1 { + deleteBuf = make([]roachpb.Key, deleteRangeThreshold-1) + } + + putBuf := newPutBuffer() + defer putBuf.release() + + var batchSize, batchBytes int64 + + flushDeletes := func(nonMatch roachpb.Key) error { + if len(deleteRangeStart) > 0 { + err := experimentalMVCCDeleteRangeUsingTombstoneInternal(ctx, rw, ms, MVCCRangeKey{ + StartKey: deleteRangeStart, EndKey: nonMatch, Timestamp: writeTimestamp}) + deleteRangeStart = nil + batchBytes += int64(encodedMVCCKeyLength(MVCCKey{Key: nonMatch})) // account for end key + return err + } + + if deleteBufIdx > 0 { + iter := newMVCCIterator(rw, false, IterOptions{Prefix: true}) + defer iter.Close() + for i := 0; i < deleteBufIdx; i++ { + err := mvccPutInternal( + ctx, rw, iter, ms, deleteBuf[i], writeTimestamp, nil, nil, putBuf, nil) + if err != nil { + return err + } + } + deleteBufIdx = 0 + deleteBufBytes = 0 + } + return nil + } + + revert := func(k roachpb.Key, v []byte) (*roachpb.Key, error) { + // For non-deletions, we have to flush any pending deletes first. This may also + // flush a range tombstone, which will add to batchBytes. + if len(v) > 0 { + if err := flushDeletes(k); err != nil { + return nil, err + } + } + + // If the batch is full, return a resume key after flushing any deletes. + if batchSize >= maxBatchSize || batchBytes >= maxBatchBytes { + err := flushDeletes(k) + return &k, err + } + bytes := int64(encodedMVCCKeyLength(MVCCKey{Key: k, Timestamp: writeTimestamp}) + len(v)) + + if len(v) > 0 || len(deleteBuf) == 0 { + batchSize++ + batchBytes += bytes + iter := newMVCCIterator(rw, false, IterOptions{Prefix: true}) + defer iter.Close() + return nil, mvccPutInternal(ctx, rw, iter, ms, k, writeTimestamp, v, nil, putBuf, nil) + + } else if len(deleteRangeStart) == 0 { + // We're currently buffering point deletions. + if deleteBufIdx < len(deleteBuf) { + deleteBuf[deleteBufIdx] = append(deleteBuf[deleteBufIdx][:0], k...) + deleteBufIdx++ + deleteBufBytes += bytes + batchSize++ + batchBytes += bytes + } else { + // Buffer is full -- switch to tracking the start of the range delete. We + // remove the buffered keys from the batch size, and instead only track + // the range key. + batchSize -= int64(deleteBufIdx) - 1 // -1 accounts for the range key + batchBytes -= deleteBufBytes - + int64(encodedMVCCKeyLength(MVCCKey{Key: deleteBuf[0], Timestamp: writeTimestamp})) + deleteRangeStart = deleteBuf[0] + deleteBufIdx = 0 + deleteBufBytes = 0 + } + } + return nil, nil + } + + // We set up an incremental iterator from the revert time to look for any + // changes that need to be reverted. However, we also need to inspect older + // values to e.g. find the value to revert to or make sure we don't drop range + // tombstones across them -- we do this by using the IgnoringTime() methods on + // the MVCCIncrementalIterator. + iter := NewMVCCIncrementalIterator(rw, MVCCIncrementalIterOptions{ + EnableTimeBoundIteratorOptimization: true, + EndKey: endKey, + StartTime: revertTimestamp, + EndTime: writeTimestamp, // puts will error on any newer versions + }) + defer iter.Close() + + // TODO(erikgrinaker): Consider rewriting the below to iterate over keys and + // versions separately, which might make the logic clearer. Also consider + // using a struct to manage the deletion state. + var revertKey roachpb.Key + var revertValue, revertValueFrom []byte + iter.SeekGE(MVCCKey{Key: startKey}) + for { + if ok, err := iter.Valid(); err != nil { + return nil, err + } else if !ok { + break + } + + key := iter.UnsafeKey() + + if key.Timestamp.IsEmpty() { + return nil, errors.Errorf("encountered inline key %s", key) + } + + // If a key was scheduled for reversion, revert it when the key changes, + // but only if the original value differs from the latest value. + if len(revertKey) > 0 && !revertKey.Equal(key.Key) { + if !bytes.Equal(revertValue, revertValueFrom) { + if resumeKey, err := revert(revertKey, revertValue); err != nil || resumeKey != nil { + return &roachpb.Span{Key: *resumeKey, EndKey: endKey}, err + } + } + revertKey, revertValue, revertValueFrom = nil, nil, nil // TODO(erikgrinaker): reuse slices + } + + if revertTimestamp.Less(key.Timestamp) { + // Schedule this key for reversion. + if len(revertKey) == 0 { + // TODO(erikgrinaker): reuse byte slices + revertKey = key.Key.Clone() + revertValueFrom = append([]byte(nil), iter.Value()...) + } + + // Move the iterator to the next key, even if <= resumeTimestamp. If it + // finds an old version of this key, it will set the value to revert to. + iter.NextIgnoringTime() + + } else if bytes.Equal(revertKey, key.Key) { + // This is the version of revertKey that we should revert back to. If it + // is visible we can move to the next TBI key, because it's going to flush + // any pending deletes. However, if it's a tombstone we have to move to + // the next key even if it's outside of the TBI bounds to see if it's + // visible and avoid dropping a range tombstone across it. + revertValue = append([]byte(nil), iter.Value()...) + if len(revertValue) > 0 { + iter.Next() + } else { + iter.NextKeyIgnoringTime() + } + + } else if len(iter.Value()) > 0 { + // This is a different visible key at or below the revert timestamp. We + // have to flush any deletes up to here to avoid dropping a range + // tombstone across it. + if err := flushDeletes(key.Key); err != nil { + return nil, err + } + iter.Next() + + } else { + // This is a tombstone for a different key. We have to move to the next + // key (ignoring TBI) to check whether it could be a visible key outside + // of the time bounds -- if it is, we don't want to drop a range tombstone + // across it. + iter.NextKeyIgnoringTime() + } + } + + // Handle a revert at the very end of the iteration. + if len(revertKey) > 0 && !bytes.Equal(revertValue, revertValueFrom) { + if resumeKey, err := revert(revertKey, revertValue); err != nil || resumeKey != nil { + return &roachpb.Span{Key: *resumeKey, EndKey: endKey}, err + } + } + return nil, flushDeletes(endKey) +} + func mvccScanToBytes( ctx context.Context, iter MVCCIterator, diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 9f6b865591c1..9451f0beb91d 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -69,6 +69,7 @@ import ( // // clear_range k= end= // clear_range_key k= end= [ts=[,]] +// revert_range [ts=[,]] k= end= revertTS=[,int] [deleteRangeThreshold=] [maxBatchSize=] [maxBatchBytes=] // // Where `` can be a simple string, or a string // prefixed by the following characters: @@ -427,6 +428,7 @@ var commands = map[string]cmd{ "iter_range_keys": {typReadOnly, cmdIterRangeKeys}, "merge": {typDataUpdate, cmdMerge}, "put": {typDataUpdate, cmdPut}, + "revert_range": {typDataUpdate, cmdRevertRange}, "scan": {typReadOnly, cmdScan}, } @@ -604,7 +606,27 @@ func cmdCheckIntent(e *evalCtx) error { func cmdClearRange(e *evalCtx) error { key, endKey := e.getKeyRange() - return e.engine.ClearMVCCRangeAndIntents(key, endKey) + if err := e.engine.ClearMVCCRangeAndIntents(key, endKey); err != nil { + return err + } + // TODO(erikgrinaker): Consider removing range keys in ClearMVCCRangeAndIntents. + iter := NewMVCCRangeKeyIterator(e.engine, MVCCRangeKeyIterOptions{ + LowerBound: key, + UpperBound: endKey, + }) + defer iter.Close() + for { + if ok, err := iter.Valid(); err != nil { + return err + } else if !ok { + break + } + if err := e.engine.ExperimentalClearMVCCRangeKey(iter.Key()); err != nil { + return err + } + iter.Next() + } + return nil } func cmdClearRangeKey(e *evalCtx) error { @@ -814,6 +836,39 @@ func cmdPut(e *evalCtx) error { }) } +func cmdRevertRange(e *evalCtx) error { + ts := e.getTs(nil) + revertTS := e.getTsWithName(nil, "revertTS") + key, endKey := e.getKeyRange() + + deleteRangeThreshold := 100 + if e.hasArg("deleteRangeThreshold") { + e.scanArg("deleteRangeThreshold", &deleteRangeThreshold) + } + + maxBatchSize := 1000 + if e.hasArg("maxBatchSize") { + e.scanArg("maxBatchSize", &maxBatchSize) + } + + maxBatchBytes := int(1e6) + if e.hasArg("maxBatchBytes") { + e.scanArg("maxBatchBytes", &maxBatchBytes) + } + + return e.withWriter("revertRange", func(rw ReadWriter) error { + resumeSpan, err := ExperimentalMVCCRevertRange(e.ctx, rw, nil, key, endKey, ts, revertTS, + deleteRangeThreshold, int64(maxBatchSize), int64(maxBatchBytes), 1000) + if err != nil { + return err + } + if resumeSpan != nil { + e.results.buf.Printf("revert_range: resume span [%s,%s)\n", resumeSpan.Key, resumeSpan.EndKey) + } + return err + }) +} + func cmdScan(e *evalCtx) error { txn := e.getTxn(optional) key, endKey := e.getKeyRange() diff --git a/pkg/storage/testdata/mvcc_histories/revert_range b/pkg/storage/testdata/mvcc_histories/revert_range new file mode 100644 index 000000000000..9b39bbadf828 --- /dev/null +++ b/pkg/storage/testdata/mvcc_histories/revert_range @@ -0,0 +1,275 @@ +# TODO(erikgrinaker): Test this with existing range keys too. +# TODO(erikgrinaker): Conflict and intent tests. +# TODO(erikgrinaker): Instead of duplicating the data set, we can use +# clear_range to clear the revert_range write timestamp. However, this requires +# support for clearing range keys in MVCCClearTimeRange. + +run ok +clear_range k=a end=z +put k=a v=1 ts=1 +put k=a v=2 ts=2 +put k=b v=2 ts=2 +del k=b ts=3 +put k=c v=1 ts=1 +del k=c ts=2 +put k=c v=3 ts=3 +put k=d v=1 ts=1 +put k=e v=2 ts=2 +del k=f ts=1 +put k=g v=3 ts=3 +revert_range k=a end=z ts=5 revertTS=1 deleteRangeThreshold=2 +scan ts=5 k=a end=z # NB: does not respect range tombstones yet +---- +scan: "a" -> /BYTES/1 @5.000000000,0 +scan: "b" -> / @3.000000000,0 +scan: "c" -> /BYTES/1 @5.000000000,0 +scan: "d" -> /BYTES/1 @1.000000000,0 +scan: "e" -> /BYTES/2 @2.000000000,0 +scan: "f" -> / @1.000000000,0 +scan: "g" -> /BYTES/3 @3.000000000,0 +>> at end: +range key: {e-z}/5.000000000,0 -> [] +data: "a"/5.000000000,0 -> /BYTES/1 +data: "a"/2.000000000,0 -> /BYTES/2 +data: "a"/1.000000000,0 -> /BYTES/1 +data: "b"/3.000000000,0 -> / +data: "b"/2.000000000,0 -> /BYTES/2 +data: "c"/5.000000000,0 -> /BYTES/1 +data: "c"/3.000000000,0 -> /BYTES/3 +data: "c"/2.000000000,0 -> / +data: "c"/1.000000000,0 -> /BYTES/1 +data: "d"/1.000000000,0 -> /BYTES/1 +data: "e"/2.000000000,0 -> /BYTES/2 +data: "f"/1.000000000,0 -> / +data: "g"/3.000000000,0 -> /BYTES/3 + +run ok +clear_range k=a end=z +put k=a v=1 ts=1 +put k=a v=2 ts=2 +put k=b v=2 ts=2 +del k=b ts=3 +put k=c v=1 ts=1 +del k=c ts=2 +put k=c v=3 ts=3 +put k=d v=1 ts=1 +put k=e v=2 ts=2 +del k=f ts=1 +put k=g v=3 ts=3 +revert_range k=a end=z ts=5 revertTS=2 deleteRangeThreshold=2 +scan ts=5 k=a end=z # NB: does not respect range tombstones yet +---- +scan: "a" -> /BYTES/2 @2.000000000,0 +scan: "b" -> /BYTES/2 @5.000000000,0 +scan: "c" -> / @5.000000000,0 +scan: "d" -> /BYTES/1 @1.000000000,0 +scan: "e" -> /BYTES/2 @2.000000000,0 +scan: "f" -> / @1.000000000,0 +scan: "g" -> / @5.000000000,0 +>> at end: +data: "a"/2.000000000,0 -> /BYTES/2 +data: "a"/1.000000000,0 -> /BYTES/1 +data: "b"/5.000000000,0 -> /BYTES/2 +data: "b"/3.000000000,0 -> / +data: "b"/2.000000000,0 -> /BYTES/2 +data: "c"/5.000000000,0 -> / +data: "c"/3.000000000,0 -> /BYTES/3 +data: "c"/2.000000000,0 -> / +data: "c"/1.000000000,0 -> /BYTES/1 +data: "d"/1.000000000,0 -> /BYTES/1 +data: "e"/2.000000000,0 -> /BYTES/2 +data: "f"/1.000000000,0 -> / +data: "g"/5.000000000,0 -> / +data: "g"/3.000000000,0 -> /BYTES/3 + +run ok +clear_range k=a end=z +put k=a v=1 ts=1 +put k=a v=2 ts=2 +put k=b v=2 ts=2 +del k=b ts=3 +put k=c v=1 ts=1 +del k=c ts=2 +put k=c v=3 ts=3 +put k=d v=1 ts=1 +put k=e v=2 ts=2 +del k=f ts=1 +put k=g v=3 ts=3 +revert_range k=a end=z ts=5 revertTS=3 deleteRangeThreshold=2 +scan ts=5 k=a end=z # NB: does not respect range tombstones yet +---- +scan: "a" -> /BYTES/2 @2.000000000,0 +scan: "b" -> / @3.000000000,0 +scan: "c" -> /BYTES/3 @3.000000000,0 +scan: "d" -> /BYTES/1 @1.000000000,0 +scan: "e" -> /BYTES/2 @2.000000000,0 +scan: "f" -> / @1.000000000,0 +scan: "g" -> /BYTES/3 @3.000000000,0 +>> at end: +data: "a"/2.000000000,0 -> /BYTES/2 +data: "a"/1.000000000,0 -> /BYTES/1 +data: "b"/3.000000000,0 -> / +data: "b"/2.000000000,0 -> /BYTES/2 +data: "c"/3.000000000,0 -> /BYTES/3 +data: "c"/2.000000000,0 -> / +data: "c"/1.000000000,0 -> /BYTES/1 +data: "d"/1.000000000,0 -> /BYTES/1 +data: "e"/2.000000000,0 -> /BYTES/2 +data: "f"/1.000000000,0 -> / +data: "g"/3.000000000,0 -> /BYTES/3 + +run ok +clear_range k=a end=z +put k=a v=1 ts=1 +put k=a v=2 ts=2 +put k=b v=2 ts=2 +del k=b ts=3 +put k=c v=1 ts=1 +del k=c ts=2 +put k=c v=3 ts=3 +put k=d v=1 ts=1 +put k=e v=2 ts=2 +del k=f ts=1 +put k=g v=3 ts=3 +revert_range k=c end=z ts=5 revertTS=1 deleteRangeThreshold=2 maxBatchSize=1 +scan ts=5 k=a end=z # NB: does not respect range tombstones yet +---- +revert_range: resume span ["e","z") +scan: "a" -> /BYTES/2 @2.000000000,0 +scan: "b" -> / @3.000000000,0 +scan: "c" -> /BYTES/1 @5.000000000,0 +scan: "d" -> /BYTES/1 @1.000000000,0 +scan: "e" -> /BYTES/2 @2.000000000,0 +scan: "f" -> / @1.000000000,0 +scan: "g" -> /BYTES/3 @3.000000000,0 +>> at end: +data: "a"/2.000000000,0 -> /BYTES/2 +data: "a"/1.000000000,0 -> /BYTES/1 +data: "b"/3.000000000,0 -> / +data: "b"/2.000000000,0 -> /BYTES/2 +data: "c"/5.000000000,0 -> /BYTES/1 +data: "c"/3.000000000,0 -> /BYTES/3 +data: "c"/2.000000000,0 -> / +data: "c"/1.000000000,0 -> /BYTES/1 +data: "d"/1.000000000,0 -> /BYTES/1 +data: "e"/2.000000000,0 -> /BYTES/2 +data: "f"/1.000000000,0 -> / +data: "g"/3.000000000,0 -> /BYTES/3 + +run ok +clear_range k=a end=z +put k=a v=1 ts=1 +put k=a v=2 ts=2 +put k=b v=2 ts=2 +del k=b ts=3 +put k=c v=1 ts=1 +del k=c ts=2 +put k=c v=3 ts=3 +put k=d v=1 ts=1 +put k=e v=2 ts=2 +del k=f ts=1 +put k=g v=3 ts=3 +revert_range k=a end=z ts=5 revertTS=1 deleteRangeThreshold=10 maxBatchSize=3 +scan ts=5 k=a end=z # NB: does not respect range tombstones yet +---- +revert_range: resume span ["g","z") +scan: "a" -> /BYTES/1 @5.000000000,0 +scan: "b" -> / @3.000000000,0 +scan: "c" -> /BYTES/1 @5.000000000,0 +scan: "d" -> /BYTES/1 @1.000000000,0 +scan: "e" -> / @5.000000000,0 +scan: "f" -> / @1.000000000,0 +scan: "g" -> /BYTES/3 @3.000000000,0 +>> at end: +data: "a"/5.000000000,0 -> /BYTES/1 +data: "a"/2.000000000,0 -> /BYTES/2 +data: "a"/1.000000000,0 -> /BYTES/1 +data: "b"/3.000000000,0 -> / +data: "b"/2.000000000,0 -> /BYTES/2 +data: "c"/5.000000000,0 -> /BYTES/1 +data: "c"/3.000000000,0 -> /BYTES/3 +data: "c"/2.000000000,0 -> / +data: "c"/1.000000000,0 -> /BYTES/1 +data: "d"/1.000000000,0 -> /BYTES/1 +data: "e"/5.000000000,0 -> / +data: "e"/2.000000000,0 -> /BYTES/2 +data: "f"/1.000000000,0 -> / +data: "g"/3.000000000,0 -> /BYTES/3 + +run ok +clear_range k=a end=z +put k=a v=1 ts=1 +put k=a v=2 ts=2 +put k=b v=2 ts=2 +del k=b ts=3 +put k=c v=1 ts=1 +del k=c ts=2 +put k=c v=3 ts=3 +put k=d v=1 ts=1 +put k=e v=2 ts=2 +del k=f ts=1 +put k=g v=3 ts=3 +revert_range k=a end=z ts=5 revertTS=1 deleteRangeThreshold=2 maxBatchSize=3 +scan ts=5 k=a end=z # NB: does not respect range tombstones yet +---- +revert_range: resume span ["g","z") +scan: "a" -> /BYTES/1 @5.000000000,0 +scan: "b" -> / @3.000000000,0 +scan: "c" -> /BYTES/1 @5.000000000,0 +scan: "d" -> /BYTES/1 @1.000000000,0 +scan: "e" -> / @5.000000000,0 +scan: "f" -> / @1.000000000,0 +scan: "g" -> /BYTES/3 @3.000000000,0 +>> at end: +data: "a"/5.000000000,0 -> /BYTES/1 +data: "a"/2.000000000,0 -> /BYTES/2 +data: "a"/1.000000000,0 -> /BYTES/1 +data: "b"/3.000000000,0 -> / +data: "b"/2.000000000,0 -> /BYTES/2 +data: "c"/5.000000000,0 -> /BYTES/1 +data: "c"/3.000000000,0 -> /BYTES/3 +data: "c"/2.000000000,0 -> / +data: "c"/1.000000000,0 -> /BYTES/1 +data: "d"/1.000000000,0 -> /BYTES/1 +data: "e"/5.000000000,0 -> / +data: "e"/2.000000000,0 -> /BYTES/2 +data: "f"/1.000000000,0 -> / +data: "g"/3.000000000,0 -> /BYTES/3 + +run ok +clear_range k=a end=z +put k=a v=1 ts=1 +put k=a v=2 ts=2 +put k=b v=2 ts=2 +del k=b ts=3 +put k=c v=1 ts=1 +del k=c ts=2 +put k=c v=3 ts=3 +put k=d v=1 ts=1 +put k=e v=2 ts=2 +del k=f ts=1 +put k=g v=3 ts=3 +revert_range k=c end=z ts=5 revertTS=1 deleteRangeThreshold=3 maxBatchBytes=1 +scan ts=5 k=a end=z # NB: does not respect range tombstones yet +---- +revert_range: resume span ["e","z") +scan: "a" -> /BYTES/2 @2.000000000,0 +scan: "b" -> / @3.000000000,0 +scan: "c" -> /BYTES/1 @5.000000000,0 +scan: "d" -> /BYTES/1 @1.000000000,0 +scan: "e" -> /BYTES/2 @2.000000000,0 +scan: "f" -> / @1.000000000,0 +scan: "g" -> /BYTES/3 @3.000000000,0 +>> at end: +data: "a"/2.000000000,0 -> /BYTES/2 +data: "a"/1.000000000,0 -> /BYTES/1 +data: "b"/3.000000000,0 -> / +data: "b"/2.000000000,0 -> /BYTES/2 +data: "c"/5.000000000,0 -> /BYTES/1 +data: "c"/3.000000000,0 -> /BYTES/3 +data: "c"/2.000000000,0 -> / +data: "c"/1.000000000,0 -> /BYTES/1 +data: "d"/1.000000000,0 -> /BYTES/1 +data: "e"/2.000000000,0 -> /BYTES/2 +data: "f"/1.000000000,0 -> / +data: "g"/3.000000000,0 -> /BYTES/3