Skip to content

Commit

Permalink
Merge #47492
Browse files Browse the repository at this point in the history
47492: kv: respect exhausted key limit during ranged intent resolution r=nvanbenschoten a=nvanbenschoten

Fixes #47471.
Fixes #40935.

This commit fixes a long-standing bug where ranged intent resolution would not respect the MaxSpanRequestKeys set on a batch once the limit had already been exhausted by other requests in the same batch. Instead of treating the limit as exhausted, ranged intent resolution would consider the limit nonexistent (unbounded). This bug was triggering an assertion in DistSender. We became more likely to hit this issue in v20.1 because we started performing ranged intent resolution more often due to implicit SELECT FOR UPDATE.

This commit fixes the bug in two ways:
1. it addresses the root cause, updating MVCCResolveWriteIntentRangeUsingIter to properly respect the limit placed on the request when it is exhauted.
2. it disables the assertion in DistSender when it detects that we are hitting this bug. This ensures that we don't hit the assertion in mixed version clusters (see #40935). By the time we're in DistSender, the damage is already done and has already potentially resulted in a large Raft entry. Maintaining the assertion doesn't do us any good.

Release notes (bug fix): a bug that could could trigger an assertion with the text "received X results, limit was Y" has been fixed. The underlying bug was only performance related and could not cause user-visible correctness violations.

Release justification: fixes a medium-priority bug in existing functionality. The bug could result in an assertion failure and a node crashing. Even though this was an old bug (present in many releases before v20.1), it became a lot easier to hit in v20.1 because we started performing ranged intent resolution more often due to implicit SELECT FOR UPDATE.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Apr 14, 2020
2 parents 9898c64 + 1ab9eca commit 18b0247
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 19 deletions.
17 changes: 15 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,8 +1247,21 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// passed recursively to further divideAndSendBatchToRanges() calls.
if ba.MaxSpanRequestKeys > 0 {
if replyResults > ba.MaxSpanRequestKeys {
log.Fatalf(ctx, "received %d results, limit was %d",
replyResults, ba.MaxSpanRequestKeys)
// NOTE: v19.2 and below have a bug where MaxSpanRequestKeys
// is not respected by ResolveIntentRangeRequest once the
// limit has already been exhausted by the batch. This is
// mostly harmless (or at least, the damage has already been
// done by this point and resulted in a large Raft entry)
// and has been fixed in v20.1+, so don't bother hitting the
// assertion.
//
// TODO(nvanbenschoten): remove this hack in v20.2.
if _, ok := ba.GetArg(roachpb.ResolveIntentRange); ok {
replyResults = ba.MaxSpanRequestKeys
} else {
log.Fatalf(ctx, "received %d results, limit was %d",
replyResults, ba.MaxSpanRequestKeys)
}
}
ba.MaxSpanRequestKeys -= replyResults
// Exiting; any missing responses will be filled in via defer().
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ func evaluateBatch(
// of results from the limit going forward. Exhausting the limit results
// in a limit of -1. This makes sure that we still execute the rest of
// the batch, but with limit-aware operations returning no data.
if limit := baHeader.MaxSpanRequestKeys; limit > 0 {
retResults := reply.Header().NumKeys
if limit, retResults := baHeader.MaxSpanRequestKeys, reply.Header().NumKeys; limit > 0 {
if retResults > limit {
index, retResults, limit := index, retResults, limit // don't alloc unless branch taken
err := errorutil.UnexpectedWithIssueErrorf(46652,
Expand All @@ -355,6 +354,14 @@ func evaluateBatch(
// mean "no limit").
baHeader.MaxSpanRequestKeys = -1
}
} else if limit < 0 {
if retResults > 0 {
index, retResults := index, retResults // don't alloc unless branch taken
log.Fatalf(ctx,
"received %d results, limit was exhausted (original limit: %d, batch=%s idx=%d)",
errors.Safe(retResults), errors.Safe(ba.Header.MaxSpanRequestKeys),
errors.Safe(ba.Summary()), errors.Safe(index))
}
}
// Same as for MaxSpanRequestKeys above, keep track of the limit and
// make sure to fall through to -1 instead of hitting zero (which
Expand Down
177 changes: 165 additions & 12 deletions pkg/kv/kvserver/replica_evaluate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func TestEvaluateBatch(t *testing.T) {
{
// Three scans that observe 3, 1, and 0 keys, respectively. An
// unreplicated lock should be acquired on each key that is scanned.
name: "scan with key locking",
name: "scans with key locking",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAD := scanArgsString("a", "d")
Expand All @@ -277,7 +277,7 @@ func TestEvaluateBatch(t *testing.T) {
},
{
// Ditto in reverse.
name: "reverse scan with key locking",
name: "reverse scans with key locking",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAD := revScanArgsString("a", "d")
Expand All @@ -300,7 +300,7 @@ func TestEvaluateBatch(t *testing.T) {
{
// Three scans that observe 3, 1, and 0 keys, respectively. No
// transaction set, so no locks should be acquired.
name: "scan with key locking without txn",
name: "scans with key locking without txn",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAD := scanArgsString("a", "d")
Expand All @@ -321,7 +321,7 @@ func TestEvaluateBatch(t *testing.T) {
},
{
// Ditto in reverse.
name: "reverse scan with key locking without txn",
name: "reverse scans with key locking without txn",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAD := revScanArgsString("a", "d")
Expand Down Expand Up @@ -379,6 +379,52 @@ func TestEvaluateBatch(t *testing.T) {
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{

// Scanning with key locking and a MaxSpanRequestKeys limit should
// acquire an unreplicated lock on each key returned and no locks on
// keys past the limit. One the batch's limit is exhausted, no more
// rows are scanner nor locks acquired.
name: "scans with key locking and MaxSpanRequestKeys=3",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAE := scanArgsString("a", "e")
scanAE.KeyLocking = lock.Exclusive
d.ba.Add(scanAE)
scanHJ := scanArgsString("h", "j")
scanHJ.KeyLocking = lock.Exclusive
d.ba.Add(scanHJ)
d.ba.Txn = &txn
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a", "b", "c"}, nil)
verifyResumeSpans(t, r, "d-e", "h-j")
verifyAcquiredLocks(t, r, lock.Unreplicated, "a", "b", "c")
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{
// Ditto in reverse.
name: "reverse scans with key locking and MaxSpanRequestKeys=3",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAE := revScanArgsString("a", "e")
scanAE.KeyLocking = lock.Exclusive
d.ba.Add(scanAE)
scanHJ := scanArgsString("h", "j")
scanHJ.KeyLocking = lock.Exclusive
d.ba.Add(scanHJ)
d.ba.Txn = &txn
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"d", "c", "b"}, nil)
verifyResumeSpans(t, r, "a-a\x00", "h-j")
verifyAcquiredLocks(t, r, lock.Unreplicated, "d", "c", "b")
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{
// Scanning with key locking and a TargetBytes limit should acquire
// an unreplicated lock on each key returned and no locks on keys
Expand Down Expand Up @@ -417,6 +463,100 @@ func TestEvaluateBatch(t *testing.T) {
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{
// Scanning with key locking and a TargetBytes limit should acquire
// an unreplicated lock on each key returned and no locks on keys
// past the limit. One the batch's limit is exhausted, no more rows
// are scanner nor locks acquired.
name: "scans with key locking and TargetBytes=1",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAE := scanArgsString("a", "e")
scanAE.KeyLocking = lock.Exclusive
d.ba.Add(scanAE)
scanHJ := scanArgsString("h", "j")
scanHJ.KeyLocking = lock.Exclusive
d.ba.Add(scanHJ)
d.ba.Txn = &txn
d.ba.TargetBytes = 1
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, nil)
verifyResumeSpans(t, r, "b-e", "h-j")
verifyAcquiredLocks(t, r, lock.Unreplicated, "a")
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
{
// Ditto in reverse.
name: "reverse scans with key locking and TargetBytes=1",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
scanAE := revScanArgsString("a", "e")
scanAE.KeyLocking = lock.Exclusive
d.ba.Add(scanAE)
scanHJ := scanArgsString("h", "j")
scanHJ.KeyLocking = lock.Exclusive
d.ba.Add(scanHJ)
d.ba.Txn = &txn
d.ba.TargetBytes = 1
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"d"}, nil)
verifyResumeSpans(t, r, "a-c\x00", "h-j")
verifyAcquiredLocks(t, r, lock.Unreplicated, "d")
verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...)
},
},
//
// Test suite for ResolveIntentRange with and without limits.
//
{
// Three range intent resolutions that observe 3, 1, and 0 intent,
// respectively. All intents should be resolved.
name: "ranged intent resolution",
setup: func(t *testing.T, d *data) {
writeABCDEFIntents(t, d, &txn)
d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED))
},
check: func(t *testing.T, r resp) {
verifyNumKeys(t, r, 3, 1, 0)
verifyResumeSpans(t, r, "", "", "")
},
},
{
// Resolving intents with a giant limit should resolve everything.
name: "ranged intent resolution with giant MaxSpanRequestKeys",
setup: func(t *testing.T, d *data) {
writeABCDEFIntents(t, d, &txn)
d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED))
d.ba.MaxSpanRequestKeys = 100000
},
check: func(t *testing.T, r resp) {
verifyNumKeys(t, r, 3, 1, 0)
verifyResumeSpans(t, r, "", "", "")
},
},
{
// A batch limited to resolve only up to 3 keys should respect that
// limit. The limit is saturated by the first request in the batch.
name: "ranged intent resolution with MaxSpanRequestKeys=3",
setup: func(t *testing.T, d *data) {
writeABCDEFIntents(t, d, &txn)
d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED))
d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED))
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyNumKeys(t, r, 3, 0, 0)
verifyResumeSpans(t, r, "c\x00-d", "e-f", "h-j")
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -474,10 +614,14 @@ type testCase struct {
}

func writeABCDEF(t *testing.T, d *data) {
writeABCDEFIntents(t, d, nil /* txn */)
}

func writeABCDEFIntents(t *testing.T, d *data, txn *roachpb.Transaction) {
for _, k := range []string{"a", "b", "c", "d", "e", "f"} {
require.NoError(t, storage.MVCCPut(
context.Background(), d.eng, nil /* ms */, roachpb.Key(k), d.ba.Timestamp,
roachpb.MakeValueFromString("value-"+k), nil /* txn */))
roachpb.MakeValueFromString("value-"+k), txn))
}
}

Expand Down Expand Up @@ -516,17 +660,26 @@ func verifyScanResult(t *testing.T, r resp, keysPerResp ...[]string) {
}
}

func verifyNumKeys(t *testing.T, r resp, keysPerResp ...int) {
require.Nil(t, r.pErr)
require.NotNil(t, r.br)
require.Len(t, r.br.Responses, len(keysPerResp))
for i, keys := range keysPerResp {
actKeys := int(r.br.Responses[i].GetInner().Header().NumKeys)
require.Equal(t, keys, actKeys, "in response #%i", i+1)
}
}

func verifyResumeSpans(t *testing.T, r resp, resumeSpans ...string) {
for i, span := range resumeSpans {
if span == "" {
continue // don't check request
}
rs := r.br.Responses[i].GetInner().Header().ResumeSpan
var act string
if rs != nil {
act = fmt.Sprintf("%s-%s", string(rs.Key), string(rs.EndKey))
if span == "" {
require.Nil(t, rs)
} else {
require.NotNil(t, rs)
act := fmt.Sprintf("%s-%s", string(rs.Key), string(rs.EndKey))
require.Equal(t, span, act, "#%d", i+1)
}
require.Equal(t, span, act, "#%d", i+1)
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,6 +1756,16 @@ func queryIntentArgs(
}
}

func resolveIntentRangeArgsString(
s, e string, txn enginepb.TxnMeta, status roachpb.TransactionStatus,
) *roachpb.ResolveIntentRangeRequest {
return &roachpb.ResolveIntentRangeRequest{
RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(s), EndKey: roachpb.Key(e)},
IntentTxn: txn,
Status: status,
}
}

func internalMergeArgs(key []byte, value roachpb.Value) roachpb.MergeRequest {
return roachpb.MergeRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3050,8 +3050,12 @@ func MVCCResolveWriteIntentRange(
// MVCCResolveWriteIntentRangeUsingIter commits or aborts (rolls back)
// the range of write intents specified by start and end keys for a
// given txn. ResolveWriteIntentRange will skip write intents of other
// txns. Returns the number of intents resolved and a resume span if
// the max keys limit was exceeded. A max of zero means unbounded.
// txns.
//
// Returns the number of intents resolved and a resume span if the max
// keys limit was exceeded. A max of zero means unbounded. A max of -1
// means resolve nothing and return the entire intent span as the resume
// span.
func MVCCResolveWriteIntentRangeUsingIter(
ctx context.Context,
rw ReadWriter,
Expand All @@ -3060,6 +3064,11 @@ func MVCCResolveWriteIntentRangeUsingIter(
intent roachpb.LockUpdate,
max int64,
) (int64, *roachpb.Span, error) {
if max < 0 {
resumeSpan := intent.Span // don't inline or `intent` would escape to heap
return 0, &resumeSpan, nil
}

encKey := MakeMVCCMetadataKey(intent.Key)
encEndKey := MakeMVCCMetadataKey(intent.EndKey)
nextKey := encKey
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ var commands = map[string]cmd{
"txn_update": {typTxnUpdate, cmdTxnUpdate},

"resolve_intent": {typDataUpdate, cmdResolveIntent},
"check_intent": {typReadOnly, cmdCheckIntent},
// TODO(nvanbenschoten): test "resolve_intent_range".
"check_intent": {typReadOnly, cmdCheckIntent},

"clear_range": {typDataUpdate, cmdClearRange},
"cput": {typDataUpdate, cmdCPut},
Expand Down

0 comments on commit 18b0247

Please sign in to comment.