Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: respect exhausted key limit during ranged intent resolution #47492

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,8 +1247,21 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// passed recursively to further divideAndSendBatchToRanges() calls.
if ba.MaxSpanRequestKeys > 0 {
if replyResults > ba.MaxSpanRequestKeys {
log.Fatalf(ctx, "received %d results, limit was %d",
replyResults, ba.MaxSpanRequestKeys)
// NOTE: v19.2 and below have a bug where MaxSpanRequestKeys
// is not respected by ResolveIntentRangeRequest once the
// limit has already been exhausted by the batch. This is
// mostly harmless (or at least, the damage has already been
// done by this point and resulted in a large Raft entry)
// and has been fixed in v20.1+, so don't bother hitting the
// assertion.
//
// TODO(nvanbenschoten): remove this hack in v20.2.
if _, ok := ba.GetArg(roachpb.ResolveIntentRange); ok {
replyResults = ba.MaxSpanRequestKeys
} else {
log.Fatalf(ctx, "received %d results, limit was %d",
replyResults, ba.MaxSpanRequestKeys)
}
}
ba.MaxSpanRequestKeys -= replyResults
// Exiting; any missing responses will be filled in via defer().
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ func evaluateBatch(
// of results from the limit going forward. Exhausting the limit results
// in a limit of -1. This makes sure that we still execute the rest of
// the batch, but with limit-aware operations returning no data.
if limit := baHeader.MaxSpanRequestKeys; limit > 0 {
retResults := reply.Header().NumKeys
if limit, retResults := baHeader.MaxSpanRequestKeys, reply.Header().NumKeys; limit > 0 {
if retResults > limit {
index, retResults, limit := index, retResults, limit // don't alloc unless branch taken
err := errorutil.UnexpectedWithIssueErrorf(46652,
Expand All @@ -355,6 +354,14 @@ func evaluateBatch(
// mean "no limit").
baHeader.MaxSpanRequestKeys = -1
}
} else if limit < 0 {
if retResults > 0 {
index, retResults := index, retResults // don't alloc unless branch taken
log.Fatalf(ctx,
"received %d results, limit was exhausted (original limit: %d, batch=%s idx=%d)",
errors.Safe(retResults), errors.Safe(ba.Header.MaxSpanRequestKeys),
errors.Safe(ba.Summary()), errors.Safe(index))
}
}
// Same as for MaxSpanRequestKeys above, keep track of the limit and
// make sure to fall through to -1 instead of hitting zero (which
Expand Down
177 changes: 165 additions & 12 deletions pkg/kv/kvserver/replica_evaluate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func TestEvaluateBatch(t *testing.T) {
{
// Three scans that observe 3, 1, and 0 keys, respectively. An
// unreplicated lock should be acquired on each key that is scanned.
name: "scan with key locking",
name: "scans with key locking",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAD := scanArgsString("a", "d")
Expand All @@ -277,7 +277,7 @@ func TestEvaluateBatch(t *testing.T) {
},
{
// Ditto in reverse.
name: "reverse scan with key locking",
name: "reverse scans with key locking",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAD := revScanArgsString("a", "d")
Expand All @@ -300,7 +300,7 @@ func TestEvaluateBatch(t *testing.T) {
{
// Three scans that observe 3, 1, and 0 keys, respectively. No
// transaction set, so no locks should be acquired.
name: "scan with key locking without txn",
name: "scans with key locking without txn",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAD := scanArgsString("a", "d")
Expand All @@ -321,7 +321,7 @@ func TestEvaluateBatch(t *testing.T) {
},
{
// Ditto in reverse.
name: "reverse scan with key locking without txn",
name: "reverse scans with key locking without txn",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAD := revScanArgsString("a", "d")
Expand Down Expand Up @@ -379,6 +379,52 @@ func TestEvaluateBatch(t *testing.T) {
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{

// Scanning with key locking and a MaxSpanRequestKeys limit should
// acquire an unreplicated lock on each key returned and no locks on
// keys past the limit. One the batch's limit is exhausted, no more
// rows are scanner nor locks acquired.
name: "scans with key locking and MaxSpanRequestKeys=3",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAE := scanArgsString("a", "e")
scanAE.KeyLocking = lock.Exclusive
d.ba.Add(scanAE)
scanHJ := scanArgsString("h", "j")
scanHJ.KeyLocking = lock.Exclusive
d.ba.Add(scanHJ)
d.ba.Txn = &txn
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a", "b", "c"}, nil)
verifyResumeSpans(t, r, "d-e", "h-j")
verifyAcquiredLocks(t, r, lock.Unreplicated, "a", "b", "c")
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{
// Ditto in reverse.
name: "reverse scans with key locking and MaxSpanRequestKeys=3",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAE := revScanArgsString("a", "e")
scanAE.KeyLocking = lock.Exclusive
d.ba.Add(scanAE)
scanHJ := scanArgsString("h", "j")
scanHJ.KeyLocking = lock.Exclusive
d.ba.Add(scanHJ)
d.ba.Txn = &txn
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"d", "c", "b"}, nil)
verifyResumeSpans(t, r, "a-a\x00", "h-j")
verifyAcquiredLocks(t, r, lock.Unreplicated, "d", "c", "b")
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{
// Scanning with key locking and a TargetBytes limit should acquire
// an unreplicated lock on each key returned and no locks on keys
Expand Down Expand Up @@ -417,6 +463,100 @@ func TestEvaluateBatch(t *testing.T) {
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{
// Scanning with key locking and a TargetBytes limit should acquire
// an unreplicated lock on each key returned and no locks on keys
// past the limit. One the batch's limit is exhausted, no more rows
// are scanner nor locks acquired.
name: "scans with key locking and TargetBytes=1",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAE := scanArgsString("a", "e")
scanAE.KeyLocking = lock.Exclusive
d.ba.Add(scanAE)
scanHJ := scanArgsString("h", "j")
scanHJ.KeyLocking = lock.Exclusive
d.ba.Add(scanHJ)
d.ba.Txn = &txn
d.ba.TargetBytes = 1
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, nil)
verifyResumeSpans(t, r, "b-e", "h-j")
verifyAcquiredLocks(t, r, lock.Unreplicated, "a")
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{
// Ditto in reverse.
name: "reverse scans with key locking and TargetBytes=1",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAE := revScanArgsString("a", "e")
scanAE.KeyLocking = lock.Exclusive
d.ba.Add(scanAE)
scanHJ := scanArgsString("h", "j")
scanHJ.KeyLocking = lock.Exclusive
d.ba.Add(scanHJ)
d.ba.Txn = &txn
d.ba.TargetBytes = 1
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"d"}, nil)
verifyResumeSpans(t, r, "a-c\x00", "h-j")
verifyAcquiredLocks(t, r, lock.Unreplicated, "d")
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
//
// Test suite for ResolveIntentRange with and without limits.
//
{
// Three range intent resolutions that observe 3, 1, and 0 intent,
// respectively. All intents should be resolved.
name: "ranged intent resolution",
setup: func(t *testing.T, d *data) {
writeABCDEFIntents(t, d, &txn)
d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED))
},
check: func(t *testing.T, r resp) {
verifyNumKeys(t, r, 3, 1, 0)
verifyResumeSpans(t, r, "", "", "")
},
},
{
// Resolving intents with a giant limit should resolve everything.
name: "ranged intent resolution with giant MaxSpanRequestKeys",
setup: func(t *testing.T, d *data) {
writeABCDEFIntents(t, d, &txn)
d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED))
d.ba.MaxSpanRequestKeys = 100000
},
check: func(t *testing.T, r resp) {
verifyNumKeys(t, r, 3, 1, 0)
verifyResumeSpans(t, r, "", "", "")
},
},
{
// A batch limited to resolve only up to 3 keys should respect that
// limit. The limit is saturated by the first request in the batch.
name: "ranged intent resolution with MaxSpanRequestKeys=3",
setup: func(t *testing.T, d *data) {
writeABCDEFIntents(t, d, &txn)
d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED))
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyNumKeys(t, r, 3, 0, 0)
verifyResumeSpans(t, r, "c\x00-d", "e-f", "h-j")
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -474,10 +614,14 @@ type testCase struct {
}

func writeABCDEF(t *testing.T, d *data) {
writeABCDEFIntents(t, d, nil /* txn */)
}

func writeABCDEFIntents(t *testing.T, d *data, txn *roachpb.Transaction) {
for _, k := range []string{"a", "b", "c", "d", "e", "f"} {
require.NoError(t, storage.MVCCPut(
context.Background(), d.eng, nil /* ms */, roachpb.Key(k), d.ba.Timestamp,
roachpb.MakeValueFromString("value-"+k), nil /* txn */))
roachpb.MakeValueFromString("value-"+k), txn))
}
}

Expand Down Expand Up @@ -516,17 +660,26 @@ func verifyScanResult(t *testing.T, r resp, keysPerResp ...[]string) {
}
}

func verifyNumKeys(t *testing.T, r resp, keysPerResp ...int) {
require.Nil(t, r.pErr)
require.NotNil(t, r.br)
require.Len(t, r.br.Responses, len(keysPerResp))
for i, keys := range keysPerResp {
actKeys := int(r.br.Responses[i].GetInner().Header().NumKeys)
require.Equal(t, keys, actKeys, "in response #%i", i+1)
}
}

func verifyResumeSpans(t *testing.T, r resp, resumeSpans ...string) {
for i, span := range resumeSpans {
if span == "" {
continue // don't check request
}
rs := r.br.Responses[i].GetInner().Header().ResumeSpan
var act string
if rs != nil {
act = fmt.Sprintf("%s-%s", string(rs.Key), string(rs.EndKey))
if span == "" {
require.Nil(t, rs)
} else {
require.NotNil(t, rs)
act := fmt.Sprintf("%s-%s", string(rs.Key), string(rs.EndKey))
require.Equal(t, span, act, "#%d", i+1)
}
require.Equal(t, span, act, "#%d", i+1)
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,6 +1756,16 @@ func queryIntentArgs(
}
}

func resolveIntentRangeArgsString(
s, e string, txn enginepb.TxnMeta, status roachpb.TransactionStatus,
) *roachpb.ResolveIntentRangeRequest {
return &roachpb.ResolveIntentRangeRequest{
RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(s), EndKey: roachpb.Key(e)},
IntentTxn: txn,
Status: status,
}
}

func internalMergeArgs(key []byte, value roachpb.Value) roachpb.MergeRequest {
return roachpb.MergeRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3050,8 +3050,12 @@ func MVCCResolveWriteIntentRange(
// MVCCResolveWriteIntentRangeUsingIter commits or aborts (rolls back)
// the range of write intents specified by start and end keys for a
// given txn. ResolveWriteIntentRange will skip write intents of other
// txns. Returns the number of intents resolved and a resume span if
// the max keys limit was exceeded. A max of zero means unbounded.
// txns.
//
// Returns the number of intents resolved and a resume span if the max
// keys limit was exceeded. A max of zero means unbounded. A max of -1
// means resolve nothing and return the entire intent span as the resume
// span.
func MVCCResolveWriteIntentRangeUsingIter(
ctx context.Context,
rw ReadWriter,
Expand All @@ -3060,6 +3064,11 @@ func MVCCResolveWriteIntentRangeUsingIter(
intent roachpb.LockUpdate,
max int64,
) (int64, *roachpb.Span, error) {
if max < 0 {
resumeSpan := intent.Span // don't inline or `intent` would escape to heap
return 0, &resumeSpan, nil
}

encKey := MakeMVCCMetadataKey(intent.Key)
encEndKey := MakeMVCCMetadataKey(intent.EndKey)
nextKey := encKey
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ var commands = map[string]cmd{
"txn_update": {typTxnUpdate, cmdTxnUpdate},

"resolve_intent": {typDataUpdate, cmdResolveIntent},
"check_intent": {typReadOnly, cmdCheckIntent},
// TODO(nvanbenschoten): test "resolve_intent_range".
"check_intent": {typReadOnly, cmdCheckIntent},

"clear_range": {typDataUpdate, cmdClearRange},
"cput": {typDataUpdate, cmdCPut},
Expand Down