diff --git a/pkg/kv/kvserver/batcheval/cmd_revert_range.go b/pkg/kv/kvserver/batcheval/cmd_revert_range.go index afc1cae3fec2..e93fc8a19880 100644 --- a/pkg/kv/kvserver/batcheval/cmd_revert_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_revert_range.go @@ -57,6 +57,8 @@ func isEmptyKeyTimeRange( return !ok, err } +const maxRevertRangeBatchBytes = 32 << 20 + // RevertRange wipes all MVCC versions more recent than TargetTime (up to the // command timestamp) of the keys covered by the specified span, adjusting the // MVCC stats accordingly. @@ -88,6 +90,7 @@ func RevertRange( resume, err := storage.MVCCClearTimeRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, args.TargetTime, cArgs.Header.Timestamp, cArgs.Header.MaxSpanRequestKeys, + maxRevertRangeBatchBytes, args.EnableTimeBoundIteratorOptimization) if err != nil { return result.Result{}, err diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 93d22b04974e..9d9e3fb20dfe 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -293,7 +293,7 @@ type mvccClearTimeRangeOp struct { func (m mvccClearTimeRangeOp) run(ctx context.Context) string { writer := m.m.getReadWriter(m.writer) span, err := storage.MVCCClearTimeRange(ctx, writer, &enginepb.MVCCStats{}, m.key, m.endKey, - m.startTime, m.endTime, math.MaxInt64, true /* useTBI */) + m.startTime, m.endTime, math.MaxInt64, math.MaxInt64, true /* useTBI */) if err != nil { return fmt.Sprintf("error: %s", err) } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 95829d36a45c..22e4d83250fb 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2071,6 +2071,11 @@ func MVCCMerge( // buffer of keys selected for deletion but not yet flushed (as done to detect // long runs for cleaning in a single ClearRange). // +// Limiting the number of keys or ranges of keys processed can still cause a +// batch that is too large -- in number of bytes -- for raft to replicate if the +// keys are very large. So if the total length of the keys or key spans cleared +// exceeds maxBatchByteSize it will also stop and return a reusme span. +// // This function handles the stats computations to determine the correct // incremental deltas of clearing these keys (and correctly determining if it // does or not not change the live and gc keys). @@ -2083,10 +2088,11 @@ func MVCCClearTimeRange( ms *enginepb.MVCCStats, key, endKey roachpb.Key, startTime, endTime hlc.Timestamp, - maxBatchSize int64, + maxBatchSize, maxBatchByteSize int64, useTBI bool, ) (*roachpb.Span, error) { var batchSize int64 + var batchByteSize int64 var resume *roachpb.Span // When iterating, instead of immediately clearing a matching key we can @@ -2110,7 +2116,6 @@ func MVCCClearTimeRange( "MVCCStats passed in to MVCCClearTimeRange must be non-nil to ensure proper stats" + " computation during Clear operations") } - clearMatchingKey := func(k MVCCKey) { if len(clearRangeStart.Key) == 0 { // Currently buffering keys to clear one-by-one. @@ -2133,23 +2138,40 @@ func MVCCClearTimeRange( if err := rw.ClearMVCCRange(clearRangeStart, nonMatch); err != nil { return err } + batchByteSize += int64(clearRangeStart.EncodedSize() + nonMatch.EncodedSize()) batchSize++ clearRangeStart = MVCCKey{} } else if bufSize > 0 { + var encodedBufSize int64 for i := 0; i < bufSize; i++ { - if buf[i].Timestamp.IsEmpty() { - // Inline metadata. Not an intent because iteration below fails - // if it sees an intent. - if err := rw.ClearUnversioned(buf[i].Key); err != nil { - return err - } - } else { - if err := rw.ClearMVCC(buf[i]); err != nil { - return err + encodedBufSize += int64(buf[i].EncodedSize()) + } + // Even though we didn't get a large enough number of keys to switch to + // clearrange, the byte size of the keys we did get is now too large to + // encode them all within the byte size limit, so use clearrange anyway. + if batchByteSize+encodedBufSize >= maxBatchByteSize { + if err := rw.ClearMVCCRange(buf[0], nonMatch); err != nil { + return err + } + batchByteSize += int64(buf[0].EncodedSize() + nonMatch.EncodedSize()) + batchSize++ + } else { + for i := 0; i < bufSize; i++ { + if buf[i].Timestamp.IsEmpty() { + // Inline metadata. Not an intent because iteration below fails + // if it sees an intent. + if err := rw.ClearUnversioned(buf[i].Key); err != nil { + return err + } + } else { + if err := rw.ClearMVCC(buf[i]); err != nil { + return err + } } + batchByteSize += encodedBufSize } + batchSize += int64(bufSize) } - batchSize += int64(bufSize) bufSize = 0 } return nil @@ -2219,6 +2241,10 @@ func MVCCClearTimeRange( resume = &roachpb.Span{Key: append([]byte{}, k.Key...), EndKey: endKey} break } + if batchByteSize > maxBatchByteSize { + resume = &roachpb.Span{Key: append([]byte{}, k.Key...), EndKey: endKey} + break + } clearMatchingKey(k) clearedMetaKey.Key = append(clearedMetaKey.Key[:0], k.Key...) clearedMeta.KeyBytes = MVCCVersionTimestampSize diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 1afba5c1a32f..19f9ca4bbec8 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -2127,12 +2127,29 @@ func TestMVCCClearTimeRange(t *testing.T) { require.Equal(t, expected, res.KVs) } + resumingClear := func( + t *testing.T, + ctx context.Context, + rw ReadWriter, + ms *enginepb.MVCCStats, + key, endKey roachpb.Key, + ts, endTs hlc.Timestamp, + sz int64, + useTBI bool, + ) { + resume, err := MVCCClearTimeRange(ctx, rw, ms, key, endKey, ts, endTs, sz, 1, useTBI) + require.NoError(t, err) + for resume != nil { + resume, err = MVCCClearTimeRange(ctx, rw, ms, resume.Key, resume.EndKey, ts, endTs, sz, 1, useTBI) + require.NoError(t, err) + } + } for _, useTBI := range []bool{true, false} { t.Run(fmt.Sprintf("useTBI-%t", useTBI), func(t *testing.T) { t.Run("clear > ts0", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, 1<<10, useTBI) require.NoError(t, err) assertKVs(t, e, ts0, ts0Content) @@ -2143,9 +2160,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts1 ", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10, - useTBI) - require.NoError(t, err) + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10, useTBI) assertKVs(t, e, ts1, ts1Content) assertKVs(t, e, ts2, ts1Content) assertKVs(t, e, ts5, ts1Content) @@ -2154,9 +2169,8 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts2", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts2, ts5, 10, + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts2, ts5, 10, useTBI) - require.NoError(t, err) assertKVs(t, e, ts2, ts2Content) assertKVs(t, e, ts5, ts2Content) }) @@ -2164,9 +2178,8 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts3", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, useTBI) - require.NoError(t, err) assertKVs(t, e, ts3, ts3Content) assertKVs(t, e, ts5, ts3Content) }) @@ -2174,7 +2187,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts4 (nothing) ", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts4, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts4, ts5, 10, 1<<10, useTBI) require.NoError(t, err) assertKVs(t, e, ts4, ts4Content) @@ -2184,7 +2197,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts5 (nothing)", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts5, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts5, ts5, 10, 1<<10, useTBI) require.NoError(t, err) assertKVs(t, e, ts4, ts4Content) @@ -2194,9 +2207,8 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear up to k5 to ts0", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey1, testKey5, ts0, ts5, 10, + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, testKey1, testKey5, ts0, ts5, 10, useTBI) - require.NoError(t, err) assertKVs(t, e, ts2, []roachpb.KeyValue{{Key: testKey5, Value: v2}}) assertKVs(t, e, ts5, []roachpb.KeyValue{{Key: testKey5, Value: v4}}) }) @@ -2204,7 +2216,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts0 in empty span (nothing)", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, 1<<10, useTBI) require.NoError(t, err) assertKVs(t, e, ts2, ts2Content) @@ -2214,7 +2226,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear > ts0 in empty span [k3,k5) (nothing)", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, 1<<10, useTBI) require.NoError(t, err) assertKVs(t, e, ts2, ts2Content) @@ -2224,7 +2236,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear k3 and up in ts0 > x >= ts1 (nothing)", func(t *testing.T) { e := setupKVs(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, keyMax, ts0, ts1, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, keyMax, ts0, ts1, 10, 1<<10, useTBI) require.NoError(t, err) assertKVs(t, e, ts2, ts2Content) @@ -2241,7 +2253,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear everything hitting intent fails", func(t *testing.T) { e := setupKVsWithIntent(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, 1<<10, useTBI) require.EqualError(t, err, "conflicting intents on \"/db3\"") }) @@ -2249,7 +2261,7 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear exactly hitting intent fails", func(t *testing.T) { e := setupKVsWithIntent(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey4, ts2, ts3, 10, + _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey4, ts2, ts3, 10, 1<<10, useTBI) require.EqualError(t, err, "conflicting intents on \"/db3\"") }) @@ -2257,9 +2269,8 @@ func TestMVCCClearTimeRange(t *testing.T) { t.Run("clear everything above intent", func(t *testing.T) { e := setupKVsWithIntent(t) defer e.Close() - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, useTBI) - require.NoError(t, err) assertKVs(t, e, ts2, ts2Content) // Scan (< k3 to avoid intent) to confirm that k2 was indeed reverted to @@ -2283,9 +2294,8 @@ func TestMVCCClearTimeRange(t *testing.T) { e := setupKVsWithIntent(t) defer e.Close() assertKVs(t, e, ts2, ts2Content) - _, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts2, 10, + resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts2, 10, useTBI) - require.NoError(t, err) assertKVs(t, e, ts2, ts1Content) }) }) @@ -2400,7 +2410,7 @@ func TestMVCCClearTimeRangeOnRandomData(t *testing.T) { startKey := localMax for { resume, err := MVCCClearTimeRange(ctx, e, &ms, startKey, keyMax, revertTo, now, - 100, useTBI) + 100, 1, useTBI) require.NoError(t, err) if resume == nil { break