Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: limit RevertRange batches to 32mb #59716

Merged
merged 1 commit into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_revert_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func isEmptyKeyTimeRange(
return !ok, err
}

const maxRevertRangeBatchBytes = 32 << 20

// RevertRange wipes all MVCC versions more recent than TargetTime (up to the
// command timestamp) of the keys covered by the specified span, adjusting the
// MVCC stats accordingly.
Expand Down Expand Up @@ -88,6 +90,7 @@ func RevertRange(

resume, err := storage.MVCCClearTimeRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey,
args.TargetTime, cArgs.Header.Timestamp, cArgs.Header.MaxSpanRequestKeys,
maxRevertRangeBatchBytes,
args.EnableTimeBoundIteratorOptimization)
if err != nil {
return result.Result{}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/metamorphic/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ type mvccClearTimeRangeOp struct {
func (m mvccClearTimeRangeOp) run(ctx context.Context) string {
writer := m.m.getReadWriter(m.writer)
span, err := storage.MVCCClearTimeRange(ctx, writer, &enginepb.MVCCStats{}, m.key, m.endKey,
m.startTime, m.endTime, math.MaxInt64, true /* useTBI */)
m.startTime, m.endTime, math.MaxInt64, math.MaxInt64, true /* useTBI */)
if err != nil {
return fmt.Sprintf("error: %s", err)
}
Expand Down
50 changes: 38 additions & 12 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,6 +2071,11 @@ func MVCCMerge(
// buffer of keys selected for deletion but not yet flushed (as done to detect
// long runs for cleaning in a single ClearRange).
//
// Limiting the number of keys or ranges of keys processed can still cause a
// batch that is too large -- in number of bytes -- for raft to replicate if the
// keys are very large. So if the total length of the keys or key spans cleared
// exceeds maxBatchByteSize it will also stop and return a resume span.
//
// This function handles the stats computations to determine the correct
// incremental deltas of clearing these keys (and correctly determining if it
// does or not not change the live and gc keys).
Expand All @@ -2083,10 +2088,11 @@ func MVCCClearTimeRange(
ms *enginepb.MVCCStats,
key, endKey roachpb.Key,
startTime, endTime hlc.Timestamp,
maxBatchSize int64,
maxBatchSize, maxBatchByteSize int64,
useTBI bool,
) (*roachpb.Span, error) {
var batchSize int64
var batchByteSize int64
var resume *roachpb.Span

// When iterating, instead of immediately clearing a matching key we can
Expand All @@ -2110,7 +2116,6 @@ func MVCCClearTimeRange(
"MVCCStats passed in to MVCCClearTimeRange must be non-nil to ensure proper stats" +
" computation during Clear operations")
}

clearMatchingKey := func(k MVCCKey) {
if len(clearRangeStart.Key) == 0 {
// Currently buffering keys to clear one-by-one.
Expand All @@ -2133,23 +2138,40 @@ func MVCCClearTimeRange(
if err := rw.ClearMVCCRange(clearRangeStart, nonMatch); err != nil {
return err
}
batchByteSize += int64(clearRangeStart.EncodedSize() + nonMatch.EncodedSize())
batchSize++
clearRangeStart = MVCCKey{}
} else if bufSize > 0 {
var encodedBufSize int64
for i := 0; i < bufSize; i++ {
if buf[i].Timestamp.IsEmpty() {
// Inline metadata. Not an intent because iteration below fails
// if it sees an intent.
if err := rw.ClearUnversioned(buf[i].Key); err != nil {
return err
}
} else {
if err := rw.ClearMVCC(buf[i]); err != nil {
return err
encodedBufSize += int64(buf[i].EncodedSize())
}
// Even though we didn't get a large enough number of keys to switch to
// clearrange, the byte size of the keys we did get is now too large to
// encode them all within the byte size limit, so use clearrange anyway.
if batchByteSize+encodedBufSize >= maxBatchByteSize {
if err := rw.ClearMVCCRange(buf[0], nonMatch); err != nil {
return err
}
batchByteSize += int64(buf[0].EncodedSize() + nonMatch.EncodedSize())
batchSize++
} else {
for i := 0; i < bufSize; i++ {
if buf[i].Timestamp.IsEmpty() {
// Inline metadata. Not an intent because iteration below fails
// if it sees an intent.
if err := rw.ClearUnversioned(buf[i].Key); err != nil {
return err
}
} else {
if err := rw.ClearMVCC(buf[i]); err != nil {
return err
}
}
}
batchByteSize += encodedBufSize
batchSize += int64(bufSize)
}
batchSize += int64(bufSize)
bufSize = 0
}
return nil
Expand Down Expand Up @@ -2219,6 +2241,10 @@ func MVCCClearTimeRange(
resume = &roachpb.Span{Key: append([]byte{}, k.Key...), EndKey: endKey}
break
}
if batchByteSize > maxBatchByteSize {
resume = &roachpb.Span{Key: append([]byte{}, k.Key...), EndKey: endKey}
break
}
clearMatchingKey(k)
clearedMetaKey.Key = append(clearedMetaKey.Key[:0], k.Key...)
clearedMeta.KeyBytes = MVCCVersionTimestampSize
Expand Down
89 changes: 67 additions & 22 deletions pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,12 +2127,35 @@ func TestMVCCClearTimeRange(t *testing.T) {
require.Equal(t, expected, res.KVs)
}

const kb = 1024

resumingClear := func(
t *testing.T,
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
key, endKey roachpb.Key,
ts, endTs hlc.Timestamp,
sz int64,
byteLimit int64,
useTBI bool,
) int {
resume, err := MVCCClearTimeRange(ctx, rw, ms, key, endKey, ts, endTs, sz, byteLimit, useTBI)
require.NoError(t, err)
attempts := 1
for resume != nil {
resume, err = MVCCClearTimeRange(ctx, rw, ms, resume.Key, resume.EndKey, ts, endTs, sz, byteLimit, useTBI)
require.NoError(t, err)
attempts++
}
return attempts
}
for _, useTBI := range []bool{true, false} {
t.Run(fmt.Sprintf("useTBI-%t", useTBI), func(t *testing.T) {
t.Run("clear > ts0", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, 1<<10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts0, ts0Content)
Expand All @@ -2143,9 +2166,29 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear > ts1 ", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10,
attempts := resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10, kb, useTBI)
require.Equal(t, 1, attempts)
assertKVs(t, e, ts1, ts1Content)
assertKVs(t, e, ts2, ts1Content)
assertKVs(t, e, ts5, ts1Content)
})
t.Run("clear > ts1 count-size batch", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
attempts := resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 1, kb,
useTBI)
require.NoError(t, err)
require.Equal(t, 2, attempts)
assertKVs(t, e, ts1, ts1Content)
assertKVs(t, e, ts2, ts1Content)
assertKVs(t, e, ts5, ts1Content)
})

t.Run("clear > ts1 byte-size batch", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
attempts := resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts5, 10, 1,
useTBI)
require.Equal(t, 2, attempts)
assertKVs(t, e, ts1, ts1Content)
assertKVs(t, e, ts2, ts1Content)
assertKVs(t, e, ts5, ts1Content)
Expand All @@ -2154,27 +2197,26 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear > ts2", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts2, ts5, 10,
attempts := resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts2, ts5, 10, kb,
useTBI)
require.NoError(t, err)
require.Equal(t, 1, attempts)
assertKVs(t, e, ts2, ts2Content)
assertKVs(t, e, ts5, ts2Content)
})

t.Run("clear > ts3", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10,
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, kb,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts3, ts3Content)
assertKVs(t, e, ts5, ts3Content)
})

t.Run("clear > ts4 (nothing) ", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts4, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts4, ts5, 10, kb,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts4, ts4Content)
Expand All @@ -2184,7 +2226,7 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear > ts5 (nothing)", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts5, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts5, ts5, 10, kb,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts4, ts4Content)
Expand All @@ -2194,17 +2236,16 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear up to k5 to ts0", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey1, testKey5, ts0, ts5, 10,
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, testKey1, testKey5, ts0, ts5, 10, kb,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, []roachpb.KeyValue{{Key: testKey5, Value: v2}})
assertKVs(t, e, ts5, []roachpb.KeyValue{{Key: testKey5, Value: v4}})
})

t.Run("clear > ts0 in empty span (nothing)", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, kb,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts2Content)
Expand All @@ -2214,7 +2255,7 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear > ts0 in empty span [k3,k5) (nothing)", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey5, ts0, ts5, 10, 1<<10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts2Content)
Expand All @@ -2224,7 +2265,7 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear k3 and up in ts0 > x >= ts1 (nothing)", func(t *testing.T) {
e := setupKVs(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, keyMax, ts0, ts1, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, keyMax, ts0, ts1, 10, 1<<10,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts2Content)
Expand All @@ -2241,25 +2282,24 @@ func TestMVCCClearTimeRange(t *testing.T) {
t.Run("clear everything hitting intent fails", func(t *testing.T) {
e := setupKVsWithIntent(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts0, ts5, 10, 1<<10,
useTBI)
require.EqualError(t, err, "conflicting intents on \"/db3\"")
})

t.Run("clear exactly hitting intent fails", func(t *testing.T) {
e := setupKVsWithIntent(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey4, ts2, ts3, 10,
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, testKey3, testKey4, ts2, ts3, 10, 1<<10,
useTBI)
require.EqualError(t, err, "conflicting intents on \"/db3\"")
})

t.Run("clear everything above intent", func(t *testing.T) {
e := setupKVsWithIntent(t)
defer e.Close()
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10,
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts3, ts5, 10, kb,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts2Content)

// Scan (< k3 to avoid intent) to confirm that k2 was indeed reverted to
Expand All @@ -2283,9 +2323,8 @@ func TestMVCCClearTimeRange(t *testing.T) {
e := setupKVsWithIntent(t)
defer e.Close()
assertKVs(t, e, ts2, ts2Content)
_, err := MVCCClearTimeRange(ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts2, 10,
resumingClear(t, ctx, e, &enginepb.MVCCStats{}, localMax, keyMax, ts1, ts2, 10, kb,
useTBI)
require.NoError(t, err)
assertKVs(t, e, ts2, ts1Content)
})
})
Expand Down Expand Up @@ -2387,7 +2426,11 @@ func TestMVCCClearTimeRangeOnRandomData(t *testing.T) {
}
reverts[0] = swathTime - 1
sort.Ints(reverts)

const byteLimit = 1000
const keyLimit = 100
keyLen := int64(len(roachpb.Key(fmt.Sprintf("%05d", 1)))) + MVCCVersionTimestampSize
maxAttempts := (numKVs * keyLen) / byteLimit
var attempts int64
for i := len(reverts) - 1; i >= 0; i-- {
for _, useTBI := range []bool{false, true} {
t.Run(fmt.Sprintf("useTBI-%t revert-%d", useTBI, i), func(t *testing.T) {
Expand All @@ -2399,8 +2442,9 @@ func TestMVCCClearTimeRangeOnRandomData(t *testing.T) {
// Revert to the revert time.
startKey := localMax
for {
attempts++
resume, err := MVCCClearTimeRange(ctx, e, &ms, startKey, keyMax, revertTo, now,
100, useTBI)
keyLimit, byteLimit, useTBI)
require.NoError(t, err)
if resume == nil {
break
Expand All @@ -2417,6 +2461,7 @@ func TestMVCCClearTimeRangeOnRandomData(t *testing.T) {
})
}
}
require.LessOrEqual(t, attempts, maxAttempts)
})
}
}
Expand Down