Skip to content

Commit

Permalink
storage: limit RevertRange batches to 32mb
Browse files Browse the repository at this point in the history
The existing limit in key-count/span-count can produce batches in excess of 64mb,
if, for example, they have very large keys. These batches then are rejected for
exceeding the raft command size limit.

This adds an aditional hard-coded limit of 32mb on the write batch to which keys
or spans to clear are added (if the command is executed against a non-Batch the
limit is ignored). The size of the batch is re-checked once every 32 keys.

Release note (bug fix): avoid creating batches that exceed the raft command limit (64mb)  when reverting ranges that contain very large keys.
  • Loading branch information
dt committed Feb 17, 2021
1 parent db32968 commit dfe6752
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 35 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_revert_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func isEmptyKeyTimeRange(
return !ok, err
}

const maxRevertRangeBatchBytes = 32 << 20

// RevertRange wipes all MVCC versions more recent than TargetTime (up to the
// command timestamp) of the keys covered by the specified span, adjusting the
// MVCC stats accordingly.
Expand Down Expand Up @@ -88,6 +90,7 @@ func RevertRange(

resume, err := storage.MVCCClearTimeRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey,
args.TargetTime, cArgs.Header.Timestamp, cArgs.Header.MaxSpanRequestKeys,
maxRevertRangeBatchBytes,
args.EnableTimeBoundIteratorOptimization)
if err != nil {
return result.Result{}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/metamorphic/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ type mvccClearTimeRangeOp struct {
func (m mvccClearTimeRangeOp) run(ctx context.Context) string {
writer := m.m.getReadWriter(m.writer)
span, err := storage.MVCCClearTimeRange(ctx, writer, &enginepb.MVCCStats{}, m.key, m.endKey,
m.startTime, m.endTime, math.MaxInt64, true /* useTBI */)
m.startTime, m.endTime, math.MaxInt64, math.MaxInt64, true /* useTBI */)
if err != nil {
return fmt.Sprintf("error: %s", err)
}
Expand Down
50 changes: 38 additions & 12 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,6 +2071,11 @@ func MVCCMerge(
// buffer of keys selected for deletion but not yet flushed (as done to detect
// long runs for cleaning in a single ClearRange).
//
// Limiting the number of keys or ranges of keys processed can still cause a
// batch that is too large -- in number of bytes -- for raft to replicate if the
// keys are very large. So if the total length of the keys or key spans cleared
// exceeds maxBatchByteSize it will also stop and return a reusme span.
//
// This function handles the stats computations to determine the correct
// incremental deltas of clearing these keys (and correctly determining if it
// does or not not change the live and gc keys).
Expand All @@ -2083,10 +2088,11 @@ func MVCCClearTimeRange(
ms *enginepb.MVCCStats,
key, endKey roachpb.Key,
startTime, endTime hlc.Timestamp,
maxBatchSize int64,
maxBatchSize, maxBatchByteSize int64,
useTBI bool,
) (*roachpb.Span, error) {
var batchSize int64
var batchByteSize int64
var resume *roachpb.Span

// When iterating, instead of immediately clearing a matching key we can
Expand All @@ -2110,7 +2116,6 @@ func MVCCClearTimeRange(
"MVCCStats passed in to MVCCClearTimeRange must be non-nil to ensure proper stats" +
" computation during Clear operations")
}

clearMatchingKey := func(k MVCCKey) {
if len(clearRangeStart.Key) == 0 {
// Currently buffering keys to clear one-by-one.
Expand All @@ -2133,23 +2138,40 @@ func MVCCClearTimeRange(
if err := rw.ClearMVCCRange(clearRangeStart, nonMatch); err != nil {
return err
}
batchByteSize += int64(clearRangeStart.EncodedSize() + nonMatch.EncodedSize())
batchSize++
clearRangeStart = MVCCKey{}
} else if bufSize > 0 {
var encodedBufSize int64
for i := 0; i < bufSize; i++ {
if buf[i].Timestamp.IsEmpty() {
// Inline metadata. Not an intent because iteration below fails
// if it sees an intent.
if err := rw.ClearUnversioned(buf[i].Key); err != nil {
return err
}
} else {
if err := rw.ClearMVCC(buf[i]); err != nil {
return err
encodedBufSize += int64(buf[i].EncodedSize())
}
// Even though we didn't get a large enough number of keys to switch to
// clearrange, the byte size of the keys we did get is now too large to
// encode them all within the byte size limit, so use clearrange anyway.
if batchByteSize+encodedBufSize >= maxBatchByteSize {
if err := rw.ClearMVCCRange(buf[0], nonMatch); err != nil {
return err
}
batchByteSize += int64(buf[0].EncodedSize() + nonMatch.EncodedSize())
batchSize++
} else {
for i := 0; i < bufSize; i++ {
if buf[i].Timestamp.IsEmpty() {
// Inline metadata. Not an intent because iteration below fails
// if it sees an intent.
if err := rw.ClearUnversioned(buf[i].Key); err != nil {
return err
}
} else {
if err := rw.ClearMVCC(buf[i]); err != nil {
return err
}
}
batchByteSize += encodedBufSize
}
batchSize += int64(bufSize)
}
batchSize += int64(bufSize)
bufSize = 0
}
return nil
Expand Down Expand Up @@ -2219,6 +2241,10 @@ func MVCCClearTimeRange(
resume = &roachpb.Span{Key: append([]byte{}, k.Key...), EndKey: endKey}
break
}
if batchByteSize > maxBatchByteSize {
resume = &roachpb.Span{Key: append([]byte{}, k.Key...), EndKey: endKey}
break
}
clearMatchingKey(k)
clearedMetaKey.Key = append(clearedMetaKey.Key[:0], k.Key...)
clearedMeta.KeyBytes = MVCCVersionTimestampSize
Expand Down
54 changes: 32 additions & 22 deletions pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,12 +2127,29 @@ func TestMVCCClearTimeRange(t *testing.T) {
require.Equal(t, expected, res.KVs)
}

resumingClear := func(
t *testing.T,
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
key, endKey roachpb.Key,
ts, endTs hlc.Timestamp,
sz int64,
useTBI bool,
) {
resume, err := MVCCClearTimeRange(ctx, rw, ms, key, endKey, ts, endTs, sz, 1, useTBI)
require.NoError(t, err)
for resume != nil {
resume, err = MVCCClearTimeRange(ctx, rw, ms, resume.Key, resume.EndKey, ts, endTs, sz, 1, useTBI)
require.NoError(t, err)
}
}
for _, useTBI := range []bool{true, false} {
t.Run(fmt.Sprintf("useTBI-%t", useTBI), func(t *testing.T) {
t.Run("clear > ts0", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, 1<<10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts0, ts0Content)
Expand All @@ -2143,9 +2160,7 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear > ts1 ", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10,
useTBI)
require.NoError(t, err)
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10, useTBI)
assertKVs(t, e, ts1, ts1Content)
assertKVs(t, e, ts2, ts1Content)
assertKVs(t, e, ts5, ts1Content)
Expand All @@ -2154,27 +2169,25 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear > ts2", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts2, ts5, 10,
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts2, ts5, 10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts2Content)
assertKVs(t, e, ts5, ts2Content)
})

t.Run("clear > ts3", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10,
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts3, ts3Content)
assertKVs(t, e, ts5, ts3Content)
})

t.Run("clear > ts4 (nothing) ", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts4, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts4, ts5, 10, 1<<10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts4, ts4Content)
Expand All @@ -2184,7 +2197,7 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear > ts5 (nothing)", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts5, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts5, ts5, 10, 1<<10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts4, ts4Content)
Expand All @@ -2194,17 +2207,16 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear up to k5 to ts0", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey1, testKey5, ts0, ts5, 10,
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, testKey1, testKey5, ts0, ts5, 10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, []roachpb.KeyValue{{Key: testKey5, Value: v2}})
assertKVs(t, e, ts5, []roachpb.KeyValue{{Key: testKey5, Value: v4}})
})

t.Run("clear > ts0 in empty span (nothing)", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, 1<<10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts2Content)
Expand All @@ -2214,7 +2226,7 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear > ts0 in empty span [k3,k5) (nothing)", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, 1<<10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts2Content)
Expand All @@ -2224,7 +2236,7 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear k3 and up in ts0 > x >= ts1 (nothing)", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, keyMax, ts0, ts1, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, keyMax, ts0, ts1, 10, 1<<10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts2Content)
Expand All @@ -2241,25 +2253,24 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear everything hitting intent fails", func(t *testing.T) {
e := setupKVsWithIntent(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, 1<<10,
useTBI)
require.EqualError(t, err, "conflicting intents on \"/db3\"")
})

t.Run("clear exactly hitting intent fails", func(t *testing.T) {
e := setupKVsWithIntent(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey4, ts2, ts3, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey4, ts2, ts3, 10, 1<<10,
useTBI)
require.EqualError(t, err, "conflicting intents on \"/db3\"")
})

t.Run("clear everything above intent", func(t *testing.T) {
e := setupKVsWithIntent(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10,
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts2Content)

// Scan (< k3 to avoid intent) to confirm that k2 was indeed reverted to
Expand All @@ -2283,9 +2294,8 @@ func TestMVCCClearTimeRange(t *testing.T) {
e := setupKVsWithIntent(t)
defer e.Close()
assertKVs(t, e, ts2, ts2Content)
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts2, 10,
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts2, 10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts1Content)
})
})
Expand Down Expand Up @@ -2400,7 +2410,7 @@ func TestMVCCClearTimeRangeOnRandomData(t *testing.T) {
startKey := localMax
for {
resume, err := MVCCClearTimeRange(ctx, e, &ms, startKey, keyMax, revertTo, now,
100, useTBI)
100, 1, useTBI)
require.NoError(t, err)
if resume == nil {
break
Expand Down

0 comments on commit dfe6752

Please sign in to comment.