From c6ecb4d6fb6e925776664babb2255602cc9d6f79 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 14 Apr 2020 16:04:16 -0400 Subject: [PATCH 1/2] kv: add test cases for multiple locking scans with limit --- pkg/kv/kvserver/replica_evaluate_test.go | 99 +++++++++++++++++++++++- 1 file changed, 95 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/replica_evaluate_test.go b/pkg/kv/kvserver/replica_evaluate_test.go index 68830ada3423..98d6430bd14f 100644 --- a/pkg/kv/kvserver/replica_evaluate_test.go +++ b/pkg/kv/kvserver/replica_evaluate_test.go @@ -255,7 +255,7 @@ func TestEvaluateBatch(t *testing.T) { { // Three scans that observe 3, 1, and 0 keys, respectively. An // unreplicated lock should be acquired on each key that is scanned. - name: "scan with key locking", + name: "scans with key locking", setup: func(t *testing.T, d *data) { writeABCDEF(t, d) scanAD := scanArgsString("a", "d") @@ -277,7 +277,7 @@ func TestEvaluateBatch(t *testing.T) { }, { // Ditto in reverse. - name: "reverse scan with key locking", + name: "reverse scans with key locking", setup: func(t *testing.T, d *data) { writeABCDEF(t, d) scanAD := revScanArgsString("a", "d") @@ -300,7 +300,7 @@ func TestEvaluateBatch(t *testing.T) { { // Three scans that observe 3, 1, and 0 keys, respectively. No // transaction set, so no locks should be acquired. - name: "scan with key locking without txn", + name: "scans with key locking without txn", setup: func(t *testing.T, d *data) { writeABCDEF(t, d) scanAD := scanArgsString("a", "d") @@ -321,7 +321,7 @@ func TestEvaluateBatch(t *testing.T) { }, { // Ditto in reverse. - name: "reverse scan with key locking without txn", + name: "reverse scans with key locking without txn", setup: func(t *testing.T, d *data) { writeABCDEF(t, d) scanAD := revScanArgsString("a", "d") @@ -379,6 +379,52 @@ func TestEvaluateBatch(t *testing.T) { verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...) }, }, + { + + // Scanning with key locking and a MaxSpanRequestKeys limit should + // acquire an unreplicated lock on each key returned and no locks on + // keys past the limit. One the batch's limit is exhausted, no more + // rows are scanner nor locks acquired. + name: "scans with key locking and MaxSpanRequestKeys=3", + setup: func(t *testing.T, d *data) { + writeABCDEF(t, d) + scanAE := scanArgsString("a", "e") + scanAE.KeyLocking = lock.Exclusive + d.ba.Add(scanAE) + scanHJ := scanArgsString("h", "j") + scanHJ.KeyLocking = lock.Exclusive + d.ba.Add(scanHJ) + d.ba.Txn = &txn + d.ba.MaxSpanRequestKeys = 3 + }, + check: func(t *testing.T, r resp) { + verifyScanResult(t, r, []string{"a", "b", "c"}, nil) + verifyResumeSpans(t, r, "d-e", "h-j") + verifyAcquiredLocks(t, r, lock.Unreplicated, "a", "b", "c") + verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...) + }, + }, + { + // Ditto in reverse. + name: "reverse scans with key locking and MaxSpanRequestKeys=3", + setup: func(t *testing.T, d *data) { + writeABCDEF(t, d) + scanAE := revScanArgsString("a", "e") + scanAE.KeyLocking = lock.Exclusive + d.ba.Add(scanAE) + scanHJ := scanArgsString("h", "j") + scanHJ.KeyLocking = lock.Exclusive + d.ba.Add(scanHJ) + d.ba.Txn = &txn + d.ba.MaxSpanRequestKeys = 3 + }, + check: func(t *testing.T, r resp) { + verifyScanResult(t, r, []string{"d", "c", "b"}, nil) + verifyResumeSpans(t, r, "a-a\x00", "h-j") + verifyAcquiredLocks(t, r, lock.Unreplicated, "d", "c", "b") + verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...) + }, + }, { // Scanning with key locking and a TargetBytes limit should acquire // an unreplicated lock on each key returned and no locks on keys @@ -417,6 +463,51 @@ func TestEvaluateBatch(t *testing.T) { verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...) }, }, + { + // Scanning with key locking and a TargetBytes limit should acquire + // an unreplicated lock on each key returned and no locks on keys + // past the limit. One the batch's limit is exhausted, no more rows + // are scanner nor locks acquired. + name: "scans with key locking and TargetBytes=1", + setup: func(t *testing.T, d *data) { + writeABCDEF(t, d) + scanAE := scanArgsString("a", "e") + scanAE.KeyLocking = lock.Exclusive + d.ba.Add(scanAE) + scanHJ := scanArgsString("h", "j") + scanHJ.KeyLocking = lock.Exclusive + d.ba.Add(scanHJ) + d.ba.Txn = &txn + d.ba.TargetBytes = 1 + }, + check: func(t *testing.T, r resp) { + verifyScanResult(t, r, []string{"a"}, nil) + verifyResumeSpans(t, r, "b-e", "h-j") + verifyAcquiredLocks(t, r, lock.Unreplicated, "a") + verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...) + }, + }, + { + // Ditto in reverse. + name: "reverse scans with key locking and TargetBytes=1", + setup: func(t *testing.T, d *data) { + writeABCDEF(t, d) + scanAE := revScanArgsString("a", "e") + scanAE.KeyLocking = lock.Exclusive + d.ba.Add(scanAE) + scanHJ := scanArgsString("h", "j") + scanHJ.KeyLocking = lock.Exclusive + d.ba.Add(scanHJ) + d.ba.Txn = &txn + d.ba.TargetBytes = 1 + }, + check: func(t *testing.T, r resp) { + verifyScanResult(t, r, []string{"d"}, nil) + verifyResumeSpans(t, r, "a-c\x00", "h-j") + verifyAcquiredLocks(t, r, lock.Unreplicated, "d") + verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...) + }, + }, } for _, tc := range tcs { From 1ab9eca77d37a9a9a5e734b5050f1060fbf20313 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 14 Apr 2020 16:48:49 -0400 Subject: [PATCH 2/2] kv: respect exhausted key limit during ranged intent resolution Fixes #47471. Fixes #40935. This commit fixes a long-standing bug where ranged intent resolution would not respect the MaxSpanRequestKeys set on a batch once the limit had already been exhausted by other requests in the same batch. Instead of treating the limit as exhausted, ranged intent resolution would consider the limit nonexistent (unbounded). This bug was triggering an assertion in DistSender. We became more likely to hit this issue in v20.1 because we started performing ranged intent resolution more often due to implicit SELECT FOR UPDATE. This commit fixes the bug in two ways: 1. it addresses the root cause, updating MVCCResolveWriteIntentRangeUsingIter to properly respect the limit placed on the request when it is exhauted. 2. it disables the assertion in DistSender when it detects that we are hitting this bug. This ensures that we don't hit the assertion in mixed version clusters (see #40935). By the time we're in DistSender, the damage is already done and has already potentially resulted in a large Raft entry. Maintaining the assertion doesn't do us any good. Release notes (bug fix): a bug that could could trigger an assertion with the text "received X results, limit was Y" has been fixed. The underlying bug was only performance related and could not cause user-visible correctness violations. Release justification: fixes a medium-priority bug in existing functionality. The bug could result in an assertion failure and a node crashing. Even though this was an old bug (present in many releases before v20.1), it became a lot easier to hit in v20.1 because we started performing ranged intent resolution more often due to implicit SELECT FOR UPDATE. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 17 +++++- pkg/kv/kvserver/replica_evaluate.go | 11 +++- pkg/kv/kvserver/replica_evaluate_test.go | 78 +++++++++++++++++++++--- pkg/kv/kvserver/replica_test.go | 10 +++ pkg/storage/mvcc.go | 13 +++- pkg/storage/mvcc_history_test.go | 3 +- 6 files changed, 117 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index fa9c01bfa72f..c978f8dc7f2e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1247,8 +1247,21 @@ func (ds *DistSender) divideAndSendBatchToRanges( // passed recursively to further divideAndSendBatchToRanges() calls. if ba.MaxSpanRequestKeys > 0 { if replyResults > ba.MaxSpanRequestKeys { - log.Fatalf(ctx, "received %d results, limit was %d", - replyResults, ba.MaxSpanRequestKeys) + // NOTE: v19.2 and below have a bug where MaxSpanRequestKeys + // is not respected by ResolveIntentRangeRequest once the + // limit has already been exhausted by the batch. This is + // mostly harmless (or at least, the damage has already been + // done by this point and resulted in a large Raft entry) + // and has been fixed in v20.1+, so don't bother hitting the + // assertion. + // + // TODO(nvanbenschoten): remove this hack in v20.2. + if _, ok := ba.GetArg(roachpb.ResolveIntentRange); ok { + replyResults = ba.MaxSpanRequestKeys + } else { + log.Fatalf(ctx, "received %d results, limit was %d", + replyResults, ba.MaxSpanRequestKeys) + } } ba.MaxSpanRequestKeys -= replyResults // Exiting; any missing responses will be filled in via defer(). diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 0ec59b5f5ffb..6c28a0495ba7 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -334,8 +334,7 @@ func evaluateBatch( // of results from the limit going forward. Exhausting the limit results // in a limit of -1. This makes sure that we still execute the rest of // the batch, but with limit-aware operations returning no data. - if limit := baHeader.MaxSpanRequestKeys; limit > 0 { - retResults := reply.Header().NumKeys + if limit, retResults := baHeader.MaxSpanRequestKeys, reply.Header().NumKeys; limit > 0 { if retResults > limit { index, retResults, limit := index, retResults, limit // don't alloc unless branch taken err := errorutil.UnexpectedWithIssueErrorf(46652, @@ -355,6 +354,14 @@ func evaluateBatch( // mean "no limit"). baHeader.MaxSpanRequestKeys = -1 } + } else if limit < 0 { + if retResults > 0 { + index, retResults := index, retResults // don't alloc unless branch taken + log.Fatalf(ctx, + "received %d results, limit was exhausted (original limit: %d, batch=%s idx=%d)", + errors.Safe(retResults), errors.Safe(ba.Header.MaxSpanRequestKeys), + errors.Safe(ba.Summary()), errors.Safe(index)) + } } // Same as for MaxSpanRequestKeys above, keep track of the limit and // make sure to fall through to -1 instead of hitting zero (which diff --git a/pkg/kv/kvserver/replica_evaluate_test.go b/pkg/kv/kvserver/replica_evaluate_test.go index 98d6430bd14f..d75fd13a5c33 100644 --- a/pkg/kv/kvserver/replica_evaluate_test.go +++ b/pkg/kv/kvserver/replica_evaluate_test.go @@ -508,6 +508,55 @@ func TestEvaluateBatch(t *testing.T) { verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...) }, }, + // + // Test suite for ResolveIntentRange with and without limits. + // + { + // Three range intent resolutions that observe 3, 1, and 0 intent, + // respectively. All intents should be resolved. + name: "ranged intent resolution", + setup: func(t *testing.T, d *data) { + writeABCDEFIntents(t, d, &txn) + d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED)) + }, + check: func(t *testing.T, r resp) { + verifyNumKeys(t, r, 3, 1, 0) + verifyResumeSpans(t, r, "", "", "") + }, + }, + { + // Resolving intents with a giant limit should resolve everything. + name: "ranged intent resolution with giant MaxSpanRequestKeys", + setup: func(t *testing.T, d *data) { + writeABCDEFIntents(t, d, &txn) + d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.MaxSpanRequestKeys = 100000 + }, + check: func(t *testing.T, r resp) { + verifyNumKeys(t, r, 3, 1, 0) + verifyResumeSpans(t, r, "", "", "") + }, + }, + { + // A batch limited to resolve only up to 3 keys should respect that + // limit. The limit is saturated by the first request in the batch. + name: "ranged intent resolution with MaxSpanRequestKeys=3", + setup: func(t *testing.T, d *data) { + writeABCDEFIntents(t, d, &txn) + d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.MaxSpanRequestKeys = 3 + }, + check: func(t *testing.T, r resp) { + verifyNumKeys(t, r, 3, 0, 0) + verifyResumeSpans(t, r, "c\x00-d", "e-f", "h-j") + }, + }, } for _, tc := range tcs { @@ -565,10 +614,14 @@ type testCase struct { } func writeABCDEF(t *testing.T, d *data) { + writeABCDEFIntents(t, d, nil /* txn */) +} + +func writeABCDEFIntents(t *testing.T, d *data, txn *roachpb.Transaction) { for _, k := range []string{"a", "b", "c", "d", "e", "f"} { require.NoError(t, storage.MVCCPut( context.Background(), d.eng, nil /* ms */, roachpb.Key(k), d.ba.Timestamp, - roachpb.MakeValueFromString("value-"+k), nil /* txn */)) + roachpb.MakeValueFromString("value-"+k), txn)) } } @@ -607,17 +660,26 @@ func verifyScanResult(t *testing.T, r resp, keysPerResp ...[]string) { } } +func verifyNumKeys(t *testing.T, r resp, keysPerResp ...int) { + require.Nil(t, r.pErr) + require.NotNil(t, r.br) + require.Len(t, r.br.Responses, len(keysPerResp)) + for i, keys := range keysPerResp { + actKeys := int(r.br.Responses[i].GetInner().Header().NumKeys) + require.Equal(t, keys, actKeys, "in response #%i", i+1) + } +} + func verifyResumeSpans(t *testing.T, r resp, resumeSpans ...string) { for i, span := range resumeSpans { - if span == "" { - continue // don't check request - } rs := r.br.Responses[i].GetInner().Header().ResumeSpan - var act string - if rs != nil { - act = fmt.Sprintf("%s-%s", string(rs.Key), string(rs.EndKey)) + if span == "" { + require.Nil(t, rs) + } else { + require.NotNil(t, rs) + act := fmt.Sprintf("%s-%s", string(rs.Key), string(rs.EndKey)) + require.Equal(t, span, act, "#%d", i+1) } - require.Equal(t, span, act, "#%d", i+1) } } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 11af3e2ceb86..839cc8a8e3ca 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1756,6 +1756,16 @@ func queryIntentArgs( } } +func resolveIntentRangeArgsString( + s, e string, txn enginepb.TxnMeta, status roachpb.TransactionStatus, +) *roachpb.ResolveIntentRangeRequest { + return &roachpb.ResolveIntentRangeRequest{ + RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(s), EndKey: roachpb.Key(e)}, + IntentTxn: txn, + Status: status, + } +} + func internalMergeArgs(key []byte, value roachpb.Value) roachpb.MergeRequest { return roachpb.MergeRequest{ RequestHeader: roachpb.RequestHeader{ diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 2a56e02034da..c3d4742b715a 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -3050,8 +3050,12 @@ func MVCCResolveWriteIntentRange( // MVCCResolveWriteIntentRangeUsingIter commits or aborts (rolls back) // the range of write intents specified by start and end keys for a // given txn. ResolveWriteIntentRange will skip write intents of other -// txns. Returns the number of intents resolved and a resume span if -// the max keys limit was exceeded. A max of zero means unbounded. +// txns. +// +// Returns the number of intents resolved and a resume span if the max +// keys limit was exceeded. A max of zero means unbounded. A max of -1 +// means resolve nothing and return the entire intent span as the resume +// span. func MVCCResolveWriteIntentRangeUsingIter( ctx context.Context, rw ReadWriter, @@ -3060,6 +3064,11 @@ func MVCCResolveWriteIntentRangeUsingIter( intent roachpb.LockUpdate, max int64, ) (int64, *roachpb.Span, error) { + if max < 0 { + resumeSpan := intent.Span // don't inline or `intent` would escape to heap + return 0, &resumeSpan, nil + } + encKey := MakeMVCCMetadataKey(intent.Key) encEndKey := MakeMVCCMetadataKey(intent.EndKey) nextKey := encKey diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 75b14016d29e..3ddfd033be7a 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -378,7 +378,8 @@ var commands = map[string]cmd{ "txn_update": {typTxnUpdate, cmdTxnUpdate}, "resolve_intent": {typDataUpdate, cmdResolveIntent}, - "check_intent": {typReadOnly, cmdCheckIntent}, + // TODO(nvanbenschoten): test "resolve_intent_range". + "check_intent": {typReadOnly, cmdCheckIntent}, "clear_range": {typDataUpdate, cmdClearRange}, "cput": {typDataUpdate, cmdCPut},