diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index fa9c01bfa72f..c978f8dc7f2e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1247,8 +1247,21 @@ func (ds *DistSender) divideAndSendBatchToRanges( // passed recursively to further divideAndSendBatchToRanges() calls. if ba.MaxSpanRequestKeys > 0 { if replyResults > ba.MaxSpanRequestKeys { - log.Fatalf(ctx, "received %d results, limit was %d", - replyResults, ba.MaxSpanRequestKeys) + // NOTE: v19.2 and below have a bug where MaxSpanRequestKeys + // is not respected by ResolveIntentRangeRequest once the + // limit has already been exhausted by the batch. This is + // mostly harmless (or at least, the damage has already been + // done by this point and resulted in a large Raft entry) + // and has been fixed in v20.1+, so don't bother hitting the + // assertion. + // + // TODO(nvanbenschoten): remove this hack in v20.2. + if _, ok := ba.GetArg(roachpb.ResolveIntentRange); ok { + replyResults = ba.MaxSpanRequestKeys + } else { + log.Fatalf(ctx, "received %d results, limit was %d", + replyResults, ba.MaxSpanRequestKeys) + } } ba.MaxSpanRequestKeys -= replyResults // Exiting; any missing responses will be filled in via defer(). diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 0ec59b5f5ffb..6c28a0495ba7 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -334,8 +334,7 @@ func evaluateBatch( // of results from the limit going forward. Exhausting the limit results // in a limit of -1. This makes sure that we still execute the rest of // the batch, but with limit-aware operations returning no data. - if limit := baHeader.MaxSpanRequestKeys; limit > 0 { - retResults := reply.Header().NumKeys + if limit, retResults := baHeader.MaxSpanRequestKeys, reply.Header().NumKeys; limit > 0 { if retResults > limit { index, retResults, limit := index, retResults, limit // don't alloc unless branch taken err := errorutil.UnexpectedWithIssueErrorf(46652, @@ -355,6 +354,14 @@ func evaluateBatch( // mean "no limit"). baHeader.MaxSpanRequestKeys = -1 } + } else if limit < 0 { + if retResults > 0 { + index, retResults := index, retResults // don't alloc unless branch taken + log.Fatalf(ctx, + "received %d results, limit was exhausted (original limit: %d, batch=%s idx=%d)", + errors.Safe(retResults), errors.Safe(ba.Header.MaxSpanRequestKeys), + errors.Safe(ba.Summary()), errors.Safe(index)) + } } // Same as for MaxSpanRequestKeys above, keep track of the limit and // make sure to fall through to -1 instead of hitting zero (which diff --git a/pkg/kv/kvserver/replica_evaluate_test.go b/pkg/kv/kvserver/replica_evaluate_test.go index 98d6430bd14f..d75fd13a5c33 100644 --- a/pkg/kv/kvserver/replica_evaluate_test.go +++ b/pkg/kv/kvserver/replica_evaluate_test.go @@ -508,6 +508,55 @@ func TestEvaluateBatch(t *testing.T) { verifyAcquiredLocks(t, r, lock.Replicated, []string(nil)...) }, }, + // + // Test suite for ResolveIntentRange with and without limits. + // + { + // Three range intent resolutions that observe 3, 1, and 0 intent, + // respectively. All intents should be resolved. + name: "ranged intent resolution", + setup: func(t *testing.T, d *data) { + writeABCDEFIntents(t, d, &txn) + d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED)) + }, + check: func(t *testing.T, r resp) { + verifyNumKeys(t, r, 3, 1, 0) + verifyResumeSpans(t, r, "", "", "") + }, + }, + { + // Resolving intents with a giant limit should resolve everything. + name: "ranged intent resolution with giant MaxSpanRequestKeys", + setup: func(t *testing.T, d *data) { + writeABCDEFIntents(t, d, &txn) + d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.MaxSpanRequestKeys = 100000 + }, + check: func(t *testing.T, r resp) { + verifyNumKeys(t, r, 3, 1, 0) + verifyResumeSpans(t, r, "", "", "") + }, + }, + { + // A batch limited to resolve only up to 3 keys should respect that + // limit. The limit is saturated by the first request in the batch. + name: "ranged intent resolution with MaxSpanRequestKeys=3", + setup: func(t *testing.T, d *data) { + writeABCDEFIntents(t, d, &txn) + d.ba.Add(resolveIntentRangeArgsString("a", "d", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("e", "f", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.Add(resolveIntentRangeArgsString("h", "j", txn.TxnMeta, roachpb.COMMITTED)) + d.ba.MaxSpanRequestKeys = 3 + }, + check: func(t *testing.T, r resp) { + verifyNumKeys(t, r, 3, 0, 0) + verifyResumeSpans(t, r, "c\x00-d", "e-f", "h-j") + }, + }, } for _, tc := range tcs { @@ -565,10 +614,14 @@ type testCase struct { } func writeABCDEF(t *testing.T, d *data) { + writeABCDEFIntents(t, d, nil /* txn */) +} + +func writeABCDEFIntents(t *testing.T, d *data, txn *roachpb.Transaction) { for _, k := range []string{"a", "b", "c", "d", "e", "f"} { require.NoError(t, storage.MVCCPut( context.Background(), d.eng, nil /* ms */, roachpb.Key(k), d.ba.Timestamp, - roachpb.MakeValueFromString("value-"+k), nil /* txn */)) + roachpb.MakeValueFromString("value-"+k), txn)) } } @@ -607,17 +660,26 @@ func verifyScanResult(t *testing.T, r resp, keysPerResp ...[]string) { } } +func verifyNumKeys(t *testing.T, r resp, keysPerResp ...int) { + require.Nil(t, r.pErr) + require.NotNil(t, r.br) + require.Len(t, r.br.Responses, len(keysPerResp)) + for i, keys := range keysPerResp { + actKeys := int(r.br.Responses[i].GetInner().Header().NumKeys) + require.Equal(t, keys, actKeys, "in response #%i", i+1) + } +} + func verifyResumeSpans(t *testing.T, r resp, resumeSpans ...string) { for i, span := range resumeSpans { - if span == "" { - continue // don't check request - } rs := r.br.Responses[i].GetInner().Header().ResumeSpan - var act string - if rs != nil { - act = fmt.Sprintf("%s-%s", string(rs.Key), string(rs.EndKey)) + if span == "" { + require.Nil(t, rs) + } else { + require.NotNil(t, rs) + act := fmt.Sprintf("%s-%s", string(rs.Key), string(rs.EndKey)) + require.Equal(t, span, act, "#%d", i+1) } - require.Equal(t, span, act, "#%d", i+1) } } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 11af3e2ceb86..839cc8a8e3ca 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1756,6 +1756,16 @@ func queryIntentArgs( } } +func resolveIntentRangeArgsString( + s, e string, txn enginepb.TxnMeta, status roachpb.TransactionStatus, +) *roachpb.ResolveIntentRangeRequest { + return &roachpb.ResolveIntentRangeRequest{ + RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(s), EndKey: roachpb.Key(e)}, + IntentTxn: txn, + Status: status, + } +} + func internalMergeArgs(key []byte, value roachpb.Value) roachpb.MergeRequest { return roachpb.MergeRequest{ RequestHeader: roachpb.RequestHeader{ diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 2a56e02034da..c3d4742b715a 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -3050,8 +3050,12 @@ func MVCCResolveWriteIntentRange( // MVCCResolveWriteIntentRangeUsingIter commits or aborts (rolls back) // the range of write intents specified by start and end keys for a // given txn. ResolveWriteIntentRange will skip write intents of other -// txns. Returns the number of intents resolved and a resume span if -// the max keys limit was exceeded. A max of zero means unbounded. +// txns. +// +// Returns the number of intents resolved and a resume span if the max +// keys limit was exceeded. A max of zero means unbounded. A max of -1 +// means resolve nothing and return the entire intent span as the resume +// span. func MVCCResolveWriteIntentRangeUsingIter( ctx context.Context, rw ReadWriter, @@ -3060,6 +3064,11 @@ func MVCCResolveWriteIntentRangeUsingIter( intent roachpb.LockUpdate, max int64, ) (int64, *roachpb.Span, error) { + if max < 0 { + resumeSpan := intent.Span // don't inline or `intent` would escape to heap + return 0, &resumeSpan, nil + } + encKey := MakeMVCCMetadataKey(intent.Key) encEndKey := MakeMVCCMetadataKey(intent.EndKey) nextKey := encKey diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 75b14016d29e..3ddfd033be7a 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -378,7 +378,8 @@ var commands = map[string]cmd{ "txn_update": {typTxnUpdate, cmdTxnUpdate}, "resolve_intent": {typDataUpdate, cmdResolveIntent}, - "check_intent": {typReadOnly, cmdCheckIntent}, + // TODO(nvanbenschoten): test "resolve_intent_range". + "check_intent": {typReadOnly, cmdCheckIntent}, "clear_range": {typDataUpdate, cmdClearRange}, "cput": {typDataUpdate, cmdCPut},