Skip to content

Commit

Permalink
Merge pull request #94814 from KaiSun314/resolve-intent-pagination-mvcc
Browse files Browse the repository at this point in the history
storage: Add support for TargetBytes for resolve intent + range cmd
  • Loading branch information
nvanbenschoten authored Jan 12, 2023
2 parents 57d9a82 + 8d58e37 commit 5bf7569
Show file tree
Hide file tree
Showing 12 changed files with 472 additions and 37 deletions.
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,18 @@ func ResolveIntent(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
ok, numBytes, resumeSpan, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{TargetBytes: h.TargetBytes})
if err != nil {
return result.Result{}, err
}
reply := resp.(*roachpb.ResolveIntentResponse)
reply.NumBytes = numBytes
if resumeSpan != nil {
reply.ResumeSpan = resumeSpan
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
return result.Result{}, nil
}

var res result.Result
res.Local.ResolvedLocks = []roachpb.LockUpdate{update}
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,22 @@ func ResolveIntentRange(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys})
numKeys, numBytes, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys, TargetBytes: h.TargetBytes})
if err != nil {
return result.Result{}, err
}
reply := resp.(*roachpb.ResolveIntentRangeResponse)
reply.NumKeys = numKeys
reply.NumBytes = numBytes
if resumeSpan != nil {
update.EndKey = resumeSpan.Key
reply.ResumeSpan = resumeSpan
// The given MaxSpanRequestKeys really specifies the number of intents
// resolved, not the number of keys scanned. We could return
// RESUME_INTENT_LIMIT here, but since the given limit is a key limit we
// return RESUME_KEY_LIMIT for symmetry.
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
reply.ResumeReason = resumeReason
}

var res result.Result
Expand Down
205 changes: 205 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,211 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {
})
}

// TestResolveIntentWithTargetBytes tests that ResolveIntent and
// ResolveIntentRange respect the specified TargetBytes i.e. resolve the
// correct set of intents, return the correct data in the response, and ensure
// the underlying write batch is the expected size.
func TestResolveIntentWithTargetBytes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
ts := hlc.Timestamp{WallTime: 1}
bytes := []byte{'a', 'b', 'c', 'd', 'e'}
nKeys := len(bytes)
testKeys := make([]roachpb.Key, nKeys)
values := make([]roachpb.Value, nKeys)
for i, b := range bytes {
testKeys[i] = make([]byte, 1000)
for j := range testKeys[i] {
testKeys[i][j] = b
}
values[i] = roachpb.MakeValueFromBytes([]byte{b})
}
txn := roachpb.MakeTransaction("test", roachpb.Key("a"), 0, ts, 0, 1)

testutils.RunTrueAndFalse(t, "ranged", func(t *testing.T, ranged bool) {
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()
st := makeClusterSettingsUsingEngineIntentsSetting(db)

for i, testKey := range testKeys {
err := storage.MVCCPut(ctx, batch, nil, testKey, ts, hlc.ClockTimestamp{}, values[i], &txn)
require.NoError(t, err)
}
initialBytes := batch.Len()

if !ranged {
// Resolve a point intent for testKeys[0].
ri := roachpb.ResolveIntentRequest{
IntentTxn: txn.TxnMeta,
Status: roachpb.COMMITTED,
}
ri.Key = testKeys[0]

{
// Case 1: TargetBytes = -1. In this case, we should not resolve any
// intents.
resp := &roachpb.ResolveIntentResponse{}
_, err := ResolveIntent(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &ri,
Header: roachpb.Header{
TargetBytes: -1,
},
},
resp,
)
require.NoError(t, err)
require.Equal(t, resp.NumBytes, int64(0))
require.Equal(t, resp.ResumeSpan.Key, testKeys[0])
require.Equal(t, resp.ResumeReason, roachpb.RESUME_BYTE_LIMIT)
require.NoError(t, err)
numBytes := batch.Len()
require.Equal(t, numBytes, initialBytes)

_, _, err = storage.MVCCGet(ctx, batch, testKeys[0], ts, storage.MVCCGetOptions{})
require.Error(t, err)
}

{
// Case 2: TargetBytes = 500. In this case, we should resolve the
// intent for testKeys[0].
resp := &roachpb.ResolveIntentResponse{}
_, err := ResolveIntent(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &ri,
Header: roachpb.Header{
TargetBytes: 500,
},
},
resp,
)
require.Greater(t, resp.NumBytes, int64(1000))
require.Less(t, resp.NumBytes, int64(1100))
require.Nil(t, resp.ResumeSpan)
require.Equal(t, resp.ResumeReason, roachpb.RESUME_UNKNOWN)
require.NoError(t, err)
numBytes := batch.Len()
require.Greater(t, numBytes, initialBytes+1000)
require.Less(t, numBytes, initialBytes+1100)

value, _, err := storage.MVCCGet(ctx, batch, testKeys[0], ts, storage.MVCCGetOptions{})
require.NoError(t, err)
require.Equal(t, values[0].RawBytes, value.RawBytes,
"the value %s in get result does not match the value %s in request", values[0].RawBytes, value.RawBytes)
}
} else {
// Resolve an intent range for testKeys[0], testKeys[1], ...,
// testKeys[4].
rir := roachpb.ResolveIntentRangeRequest{
IntentTxn: txn.TxnMeta,
Status: roachpb.COMMITTED,
}
rir.Key = testKeys[0]
rir.EndKey = testKeys[nKeys-1].Next()

{
// Case 1: TargetBytes = -1. In this case, we should not resolve any
// intents.
respr := &roachpb.ResolveIntentRangeResponse{}
_, err := ResolveIntentRange(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &rir,
Header: roachpb.Header{
TargetBytes: -1,
},
},
respr,
)
require.NoError(t, err)
require.Equal(t, respr.NumKeys, int64(0))
require.Equal(t, respr.NumBytes, int64(0))
require.Equal(t, respr.ResumeSpan.Key, testKeys[0])
require.Equal(t, respr.ResumeSpan.EndKey, testKeys[nKeys-1].Next())
require.Equal(t, respr.ResumeReason, roachpb.RESUME_BYTE_LIMIT)
require.NoError(t, err)
numBytes := batch.Len()
require.Equal(t, numBytes, initialBytes)

_, _, err = storage.MVCCGet(ctx, batch, testKeys[0], ts, storage.MVCCGetOptions{})
require.Error(t, err)
}

{
// Case 2: TargetBytes = 2900. In this case, we should resolve the
// first 3 intents - testKey[0], testKeys[1], and testKeys[2] (since we
// resolve intents until we exceed the TargetBytes limit).
respr := &roachpb.ResolveIntentRangeResponse{}
_, err := ResolveIntentRange(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &rir,
Header: roachpb.Header{
TargetBytes: 2900,
},
},
respr,
)
require.Equal(t, respr.NumKeys, int64(3))
require.Greater(t, respr.NumBytes, int64(3000))
require.Less(t, respr.NumBytes, int64(3300))
require.Equal(t, respr.ResumeSpan.Key, testKeys[2].Next())
require.Equal(t, respr.ResumeSpan.EndKey, testKeys[nKeys-1].Next())
require.Equal(t, respr.ResumeReason, roachpb.RESUME_BYTE_LIMIT)
require.NoError(t, err)
numBytes := batch.Len()
require.Greater(t, numBytes, initialBytes+3000)
require.Less(t, numBytes, initialBytes+3300)

value, _, err := storage.MVCCGet(ctx, batch, testKeys[2], ts, storage.MVCCGetOptions{})
require.NoError(t, err)
require.Equal(t, values[2].RawBytes, value.RawBytes,
"the value %s in get result does not match the value %s in request", values[2].RawBytes, value.RawBytes)
_, _, err = storage.MVCCGet(ctx, batch, testKeys[3], ts, storage.MVCCGetOptions{})
require.Error(t, err)
}

{
// Case 3: TargetBytes = 1100 (on remaining intents - testKeys[3] and
// testKeys[4]). In this case, we should resolve the remaining
// intents - testKey[4] and testKeys[5] (since we resolve intents until
// we exceed the TargetBytes limit).
respr := &roachpb.ResolveIntentRangeResponse{}
_, err := ResolveIntentRange(ctx, batch,
CommandArgs{
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &rir,
Header: roachpb.Header{
TargetBytes: 1100,
},
},
respr,
)
require.Equal(t, respr.NumKeys, int64(2))
require.Greater(t, respr.NumBytes, int64(2000))
require.Less(t, respr.NumBytes, int64(2200))
require.Nil(t, respr.ResumeSpan)
require.Equal(t, respr.ResumeReason, roachpb.RESUME_UNKNOWN)
require.NoError(t, err)
numBytes := batch.Len()
require.Greater(t, numBytes, initialBytes+5000)
require.Less(t, numBytes, initialBytes+5500)

value, _, err := storage.MVCCGet(ctx, batch, testKeys[nKeys-1], ts, storage.MVCCGetOptions{})
require.NoError(t, err)
require.Equal(t, values[nKeys-1].RawBytes, value.RawBytes,
"the value %s in get result does not match the value %s in request", values[nKeys-1].RawBytes, value.RawBytes)
}
}
})
}

func makeClusterSettingsUsingEngineIntentsSetting(engine storage.Engine) *cluster.Settings {
version := clusterversion.TestingBinaryVersion
return cluster.MakeTestingClusterSettingsWithVersions(version, version, true)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,10 @@ func (s spanSetWriter) ShouldWriteLocalTimestamps(ctx context.Context) bool {
return s.w.ShouldWriteLocalTimestamps(ctx)
}

func (s spanSetWriter) BufferedSize() int {
return s.w.BufferedSize()
}

// ReadWriter is used outside of the spanset package internally, in ccl.
type ReadWriter struct {
spanSetReader
Expand Down
17 changes: 14 additions & 3 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2492,12 +2492,23 @@ message Header {
// complications discussed in "Unordered requests". For now, that's a good
// enough reason to disallow such batches.
int64 max_span_request_keys = 8;
// TargetBytes will have different behaviour depending on the request type.
//
// Forward and Reverse Scans:
// If set to a non-zero value, sets a target (in bytes) for how large the
// response may grow. This is only supported for (forward and reverse) scans
// and limits the number of rows scanned (and returned). The target will only
// be overshot when the first result is larger than the target, unless
// response may grow. For forward and reverse scans, TargetBytes limits the
// number of rows scanned (and returned). The target will only be overshot
// when the first result is larger than the target, unless
// target_bytes_allow_empty is set. A suitable resume span will be returned.
//
// Resolve Intent and Resolve Intent Range:
// If set to a non-zero value, sets a target (in bytes) for how large the
// write batch from intent resolution may grow. For resolve intent and resolve
// intent range: TargetBytes limits the number of intents resolved. We will
// resolve intents until the number of bytes added to the write batch by
// intent resolution exceeds the TargetBytes limit. A suitable resume span
// will be returned.
//
// The semantics around overlapping requests, unordered requests, and
// supported requests from max_span_request_keys apply to the target_bytes
// option as well.
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,12 @@ type Writer interface {
// This method is temporary, to handle the transition from clusters where not
// all nodes understand local timestamps.
ShouldWriteLocalTimestamps(ctx context.Context) bool

// BufferedSize returns the size of the underlying buffered writes if the
// Writer implementation is buffered, and 0 if the Writer implementation is
// not buffered. Buffered writers are expected to always give a monotonically
// increasing size.
BufferedSize() int
}

// ReadWriter is the read/write interface to an engine's data.
Expand Down
Loading

0 comments on commit 5bf7569

Please sign in to comment.