diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 37adbb41f394..c9a8295c1943 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -293,6 +293,7 @@ go_test( "replica_raft_truncation_test.go", "replica_rangefeed_test.go", "replica_rankings_test.go", + "replica_send_test.go", "replica_sideload_test.go", "replica_sst_snapshot_storage_test.go", "replica_test.go", diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 395c2e8a1a94..0a8b1cc71284 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -378,6 +378,66 @@ type batchExecutionFn func( var _ batchExecutionFn = (*Replica).executeWriteBatch var _ batchExecutionFn = (*Replica).executeReadOnlyBatch +func getTrueSpans( + ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, +) *spanset.SpanSet { + if br != nil { + if len(ba.Requests) != len(br.Responses) { + log.KvDistribution.Errorf(ctx, + "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", + len(ba.Requests), len(br.Responses)) + } else { + spans := spanset.New() + for i, union := range br.Responses { + baInner := ba.Requests[i].GetInner() + var isReverse bool + switch baInner.(type) { + case *roachpb.ReverseScanRequest: + isReverse = true + } + resumeSpan := union.GetInner().Header().ResumeSpan + if resumeSpan == nil { + spans.AddNonMVCC(spanset.SpanAccess(0), baInner.Header().Span()) + } else if !isReverse { + // Not reverse (->) + // Request: [key...............endKey] + // ResumeSpan: [key......endKey] + // True span: [key......key] + // + // Assumptions: + // resumeSpan.EndKey == baInner.Header().EndKey + // baInner.Header().Key <= resumeKey + // We don't check these assumptions because comparing keys could + // add potentially significant overhead. + resumeKey := resumeSpan.Key + spans.AddNonMVCC(spanset.SpanAccess(0), roachpb.Span{ + Key: baInner.Header().Key, + EndKey: resumeKey, + }) + } else { + // Reverse (<-) + // Request: [key...............endKey] + // ResumeSpan: [key......endKey] + // True span: [endKey...endKey] + // + // Assumptions: + // resumeSpan.Key == baInner.Header().Key + // resumeEndKey <= baInner.Header().EndKey + // We don't check these assumptions because comparing keys could + // add potentially significant overhead. + resumeEndKey := resumeSpan.EndKey + spans.AddNonMVCC(spanset.SpanAccess(0), roachpb.Span{ + Key: resumeEndKey, + EndKey: baInner.Header().EndKey, + }) + } + } + return spans + } + } + return nil +} + // executeBatchWithConcurrencyRetries is the entry point for client (non-admin) // requests that execute against the range's state. The method coordinates the // execution of requests that may require multiple retries due to interactions @@ -400,6 +460,12 @@ func (r *Replica) executeBatchWithConcurrencyRetries( var requestEvalKind concurrency.RequestEvalKind var g *concurrency.Guard defer func() { + // Handle load-based splitting, if necessary. + spans := getTrueSpans(ctx, ba, br) + if spans != nil { + r.recordBatchForLoadBasedSplitting(ctx, ba, spans) + } + // NB: wrapped to delay g evaluation to its value when returning. if g != nil { r.concMgr.FinishReq(g) @@ -411,7 +477,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( // commands and wait even if the circuit breaker is tripped. pp = poison.Policy_Wait } - for first := true; ; first = false { + for { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { return nil, nil, roachpb.NewError(errors.Wrap(err, "aborted during Replica.Send")) @@ -431,11 +497,6 @@ func (r *Replica) executeBatchWithConcurrencyRetries( } } - // Handle load-based splitting, if necessary. - if first { - r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) - } - // Acquire latches to prevent overlapping requests from executing until // this request completes. After latching, wait on any conflicting locks // to ensure that the request has full isolation during evaluation. This diff --git a/pkg/kv/kvserver/replica_send_test.go b/pkg/kv/kvserver/replica_send_test.go new file mode 100644 index 000000000000..b3339e94dd62 --- /dev/null +++ b/pkg/kv/kvserver/replica_send_test.go @@ -0,0 +1,171 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func TestReplicaGetTrueSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + testCases := []struct { + ba *roachpb.BatchRequest + br *roachpb.BatchResponse + expectedTrueSpans []roachpb.Span + }{ + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + { + Value: &roachpb.RequestUnion_Get{ + Get: &roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: roachpb.KeyMin, + }, + }, + }, + }, + { + Value: &roachpb.RequestUnion_Scan{ + Scan: &roachpb.ScanRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + }, + { + Value: &roachpb.RequestUnion_Scan{ + Scan: &roachpb.ScanRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + }, + { + Value: &roachpb.RequestUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + }, + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + { + Value: &roachpb.ResponseUnion_Get{ + Get: &roachpb.GetResponse{ + ResponseHeader: roachpb.ResponseHeader{ + ResumeSpan: nil, + }, + }, + }, + }, + { + Value: &roachpb.ResponseUnion_Scan{ + Scan: &roachpb.ScanResponse{ + ResponseHeader: roachpb.ResponseHeader{ + ResumeSpan: nil, + }, + }, + }, + }, + { + Value: &roachpb.ResponseUnion_Scan{ + Scan: &roachpb.ScanResponse{ + ResponseHeader: roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(113), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + }, + }, + { + Value: &roachpb.ResponseUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanResponse{ + ResponseHeader: roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(979), + }, + }, + }, + }, + }, + }, + }, + expectedTrueSpans: []roachpb.Span{ + { + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: roachpb.KeyMin, + }, + { + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + { + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(113), + }, + { + Key: keys.SystemSQLCodec.TablePrefix(979), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + {}, + {}, + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + {}, + }, + }, + expectedTrueSpans: nil, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{}, + }, + br: nil, + expectedTrueSpans: nil, + }, + } + for i, test := range testCases { + trueSpanSet := getTrueSpans(context.Background(), test.ba, test.br) + var trueSpans []roachpb.Span + trueSpanSet.Iterate(func(spanAccess spanset.SpanAccess, spanScope spanset.SpanScope, span spanset.Span) { + trueSpans = append(trueSpans, span.Span) + }) + assert.Equal(t, test.expectedTrueSpans, trueSpans, "True spans not equal in test %d", i) + } +}