Skip to content

Commit

Permalink
storage: Add support for TargetBytes for resolve intent + range cmd
Browse files Browse the repository at this point in the history
Intent resolution batches are sequenced on raft and each batch can
consist of 100-200 intents. If an intent key or even value in some cases
are large, it is possible that resolving all intents in the batch would
result in a raft command size exceeding the max raft command size
kv.raft.command.max_size.

To address this, we add support for TargetBytes in resolve intent and
resolve intent range commands, allowing us to stop resolving intents in
the batch as soon as we exceed the TargetBytes max bytes limit.

Adding support for byte size pagination for intent resolver and
RequestBatcher will be added in a seperate PR.

Release note (ops change): Added support for a byte limit on resolve
intent and resolve intent range raft commands to prevent such commands
from exceeding the max raft command size.
  • Loading branch information
KaiSun314 committed Nov 21, 2022
1 parent e98ffcf commit 8ee923b
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 25 deletions.
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,17 @@ func ResolveIntent(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
ok, numBytes, resumeSpan, err := storage.MVCCResolveWriteIntentWithMaxBytes(ctx, readWriter, ms, update, h.TargetBytes)
if err != nil {
return result.Result{}, err
}
reply := resp.(*roachpb.ResolveIntentResponse)
reply.NumBytes = numBytes
if resumeSpan != nil {
reply.ResumeSpan = resumeSpan
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
return result.Result{}, nil
}

var res result.Result
res.Local.ResolvedLocks = []roachpb.LockUpdate{update}
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,22 @@ func ResolveIntentRange(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, h.MaxSpanRequestKeys)
numKeys, numBytes, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRangeWithMaxBytes(
ctx, readWriter, ms, update, h.MaxSpanRequestKeys, h.TargetBytes)
if err != nil {
return result.Result{}, err
}
reply := resp.(*roachpb.ResolveIntentRangeResponse)
reply.NumKeys = numKeys
reply.NumBytes = numBytes
if resumeSpan != nil {
update.EndKey = resumeSpan.Key
reply.ResumeSpan = resumeSpan
// The given MaxSpanRequestKeys really specifies the number of intents
// resolved, not the number of keys scanned. We could return
// RESUME_INTENT_LIMIT here, but since the given limit is a key limit we
// return RESUME_KEY_LIMIT for symmetry.
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
reply.ResumeReason = resumeReason
}

var res result.Result
Expand Down
205 changes: 205 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,211 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {
})
}

// TestResolveIntentWithTargetBytes tests that ResolveIntent and
// ResolveIntentRange respect the specified TargetBytes i.e. resolve the
// correct set of intents, return the correct data in the response, and ensure
// the underlying write batch is the expected size.
func TestResolveIntentWithTargetBytes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
ts := hlc.Timestamp{WallTime: 1}
bytes := []byte{'a', 'b', 'c', 'd', 'e'}
nKeys := len(bytes)
testKeys := make([]roachpb.Key, nKeys)
values := make([]roachpb.Value, nKeys)
for i, b := range bytes {
testKeys[i] = make([]byte, 1000)
for j := range testKeys[i] {
testKeys[i][j] = b
}
values[i] = roachpb.MakeValueFromBytes([]byte{b})
}
txn := roachpb.MakeTransaction("test", roachpb.Key("a"), 0, ts, 0, 1)

testutils.RunTrueAndFalse(t, "ranged", func(t *testing.T, ranged bool) {
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()
st := makeClusterSettingsUsingEngineIntentsSetting(db)

for i, testKey := range testKeys {
err := storage.MVCCPut(ctx, batch, nil, testKey, ts, hlc.ClockTimestamp{}, values[i], &txn)
require.NoError(t, err)
}
initialBytes := batch.Len()

if !ranged {
// Resolve a point intent for testKeys[0].
ri := roachpb.ResolveIntentRequest{
IntentTxn: txn.TxnMeta,
Status: roachpb.COMMITTED,
}
ri.Key = testKeys[0]

{
// Case 1: TargetBytes = -1. In this case, we should not resolve any
// intents.
resp := &roachpb.ResolveIntentResponse{}
_, err := ResolveIntent(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &ri,
Header: roachpb.Header{
TargetBytes: -1,
},
},
resp,
)
require.NoError(t, err)
require.Equal(t, resp.NumBytes, int64(0))
require.Equal(t, resp.ResumeSpan.Key, testKeys[0])
require.Equal(t, resp.ResumeReason, roachpb.RESUME_BYTE_LIMIT)
require.NoError(t, err)
numBytes := batch.Len()
require.Equal(t, numBytes, initialBytes)

_, _, err = storage.MVCCGet(ctx, batch, testKeys[0], ts, storage.MVCCGetOptions{})
require.Error(t, err)
}

{
// Case 2: TargetBytes = 500. In this case, we should resolve the
// intent for testKeys[0].
resp := &roachpb.ResolveIntentResponse{}
_, err := ResolveIntent(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &ri,
Header: roachpb.Header{
TargetBytes: 500,
},
},
resp,
)
require.Greater(t, resp.NumBytes, int64(1000))
require.Less(t, resp.NumBytes, int64(1100))
require.Nil(t, resp.ResumeSpan)
require.Equal(t, resp.ResumeReason, roachpb.RESUME_UNKNOWN)
require.NoError(t, err)
numBytes := batch.Len()
require.Greater(t, numBytes, initialBytes+1000)
require.Less(t, numBytes, initialBytes+1100)

value, _, err := storage.MVCCGet(ctx, batch, testKeys[0], ts, storage.MVCCGetOptions{})
require.NoError(t, err)
require.Equal(t, values[0].RawBytes, value.RawBytes,
"the value %s in get result does not match the value %s in request", values[0].RawBytes, value.RawBytes)
}
} else {
// Resolve an intent range for testKeys[0], testKeys[1], ...,
// testKeys[4].
rir := roachpb.ResolveIntentRangeRequest{
IntentTxn: txn.TxnMeta,
Status: roachpb.COMMITTED,
}
rir.Key = testKeys[0]
rir.EndKey = testKeys[nKeys-1].Next()

{
// Case 1: TargetBytes = -1. In this case, we should not resolve any
// intents.
respr := &roachpb.ResolveIntentRangeResponse{}
_, err := ResolveIntentRange(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &rir,
Header: roachpb.Header{
TargetBytes: -1,
},
},
respr,
)
require.NoError(t, err)
require.Equal(t, respr.NumKeys, int64(0))
require.Equal(t, respr.NumBytes, int64(0))
require.Equal(t, respr.ResumeSpan.Key, testKeys[0])
require.Equal(t, respr.ResumeSpan.EndKey, testKeys[nKeys-1].Next())
require.Equal(t, respr.ResumeReason, roachpb.RESUME_BYTE_LIMIT)
require.NoError(t, err)
numBytes := batch.Len()
require.Equal(t, numBytes, initialBytes)

_, _, err = storage.MVCCGet(ctx, batch, testKeys[0], ts, storage.MVCCGetOptions{})
require.Error(t, err)
}

{
// Case 2: TargetBytes = 2900. In this case, we should resolve the
// first 3 intents - testKey[0], testKeys[1], and testKeys[2] (since we
// resolve intents until we exceed the TargetBytes limit).
respr := &roachpb.ResolveIntentRangeResponse{}
_, err := ResolveIntentRange(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &rir,
Header: roachpb.Header{
TargetBytes: 2900,
},
},
respr,
)
require.Equal(t, respr.NumKeys, int64(3))
require.Greater(t, respr.NumBytes, int64(3000))
require.Less(t, respr.NumBytes, int64(3300))
require.Equal(t, respr.ResumeSpan.Key, testKeys[2].Next())
require.Equal(t, respr.ResumeSpan.EndKey, testKeys[nKeys-1].Next())
require.Equal(t, respr.ResumeReason, roachpb.RESUME_BYTE_LIMIT)
require.NoError(t, err)
numBytes := batch.Len()
require.Greater(t, numBytes, initialBytes+3000)
require.Less(t, numBytes, initialBytes+3300)

value, _, err := storage.MVCCGet(ctx, batch, testKeys[2], ts, storage.MVCCGetOptions{})
require.NoError(t, err)
require.Equal(t, values[2].RawBytes, value.RawBytes,
"the value %s in get result does not match the value %s in request", values[2].RawBytes, value.RawBytes)
_, _, err = storage.MVCCGet(ctx, batch, testKeys[3], ts, storage.MVCCGetOptions{})
require.Error(t, err)
}

{
// Case 3: TargetBytes = 1100 (on remaining intents - testKeys[3] and
// testKeys[4]). In this case, we should resolve the remaining
// intents - testKey[4] and testKeys[5] (since we resolve intents until
// we exceed the TargetBytes limit).
respr := &roachpb.ResolveIntentRangeResponse{}
_, err := ResolveIntentRange(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &rir,
Header: roachpb.Header{
TargetBytes: 1100,
},
},
respr,
)
require.Equal(t, respr.NumKeys, int64(2))
require.Greater(t, respr.NumBytes, int64(2000))
require.Less(t, respr.NumBytes, int64(2200))
require.Nil(t, respr.ResumeSpan)
require.Equal(t, respr.ResumeReason, roachpb.RESUME_UNKNOWN)
require.NoError(t, err)
numBytes := batch.Len()
require.Greater(t, numBytes, initialBytes+5000)
require.Less(t, numBytes, initialBytes+5500)

value, _, err := storage.MVCCGet(ctx, batch, testKeys[nKeys-1], ts, storage.MVCCGetOptions{})
require.NoError(t, err)
require.Equal(t, values[nKeys-1].RawBytes, value.RawBytes,
"the value %s in get result does not match the value %s in request", values[nKeys-1].RawBytes, value.RawBytes)
}
}
})
}

func makeClusterSettingsUsingEngineIntentsSetting(engine storage.Engine) *cluster.Settings {
version := clusterversion.TestingBinaryVersion
return cluster.MakeTestingClusterSettingsWithVersions(version, version, true)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,10 @@ func (s spanSetReader) PinEngineStateForIterators() error {
return s.r.PinEngineStateForIterators()
}

func (s spanSetReader) Len() int {
return s.r.Len()
}

type spanSetWriter struct {
w storage.Writer
spans *SpanSet
Expand Down
17 changes: 14 additions & 3 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2404,15 +2404,26 @@ message Header {
// complications discussed in "Unordered requests". For now, that's a good
// enough reason to disallow such batches.
int64 max_span_request_keys = 8;
// TargetBytes will have different behaviour depending on the request type.
//
// Forward and Reverse Scans:
// If set to a non-zero value, sets a target (in bytes) for how large the
// response may grow. This is only supported for (forward and reverse) scans
// and limits the number of rows scanned (and returned). The target will only
// be overshot when the first result is larger than the target, unless
// response may grow. For forward and reverse scans, TargetBytes limits the
// number of rows scanned (and returned). The target will only be overshot
// when the first result is larger than the target, unless
// target_bytes_allow_empty is set. A suitable resume span will be returned.
//
// The semantics around overlapping requests, unordered requests, and
// supported requests from max_span_request_keys apply to the target_bytes
// option as well.
//
// Resolve Intent and Resolve Intent Range:
// If set to a non-zero value, sets a target (in bytes) for how large the
// write batch from intent resolution may grow. For resolve intent and
// resolve intent range, TargetBytes limits the number of intents resolved.
// We will resolve intents until the number of bytes added to the write batch
// by intent resolution exceeds the TargetBytes limit. A suitable resume span
// will be returned.
int64 target_bytes = 15;
// If positive, Scan and ReverseScan requests with limits (MaxSpanRequestKeys
// or TargetBytes) will not return results with partial SQL rows at the end
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@ type Reader interface {
// the first call to PinEngineStateForIterators.
// REQUIRES: ConsistentIterators returns true.
PinEngineStateForIterators() error

Len() int
}

// Writer is the write interface to an engine's data.
Expand Down
Loading

0 comments on commit 8ee923b

Please sign in to comment.