diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 7a3d982cf125..992bff96c772 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -298,6 +298,7 @@ go_test( "replica_rangefeed_test.go", "replica_rankings_test.go", "replica_sideload_test.go", + "replica_split_load_test.go", "replica_sst_snapshot_storage_test.go", "replica_test.go", "replica_tscache_test.go", diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index ed15b2b3789a..aa920d3f544e 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -402,9 +402,14 @@ func (r *Replica) executeBatchWithConcurrencyRetries( var g *concurrency.Guard defer func() { // Handle load-based splitting, if necessary. - if pErr == nil { - spansRead, _, _ := r.collectSpansRead(ba, br) - r.recordBatchForLoadBasedSplitting(ctx, ba, spansRead) + if pErr == nil && br != nil { + if len(ba.Requests) != len(br.Responses) { + log.KvDistribution.Errorf(ctx, + "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", + len(ba.Requests), len(br.Responses)) + } else { + r.recordBatchForLoadBasedSplitting(ctx, ba, br) + } } // NB: wrapped to delay g evaluation to its value when returning. diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index ea42bbabbdde..3cc43ae28e39 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -13,7 +13,6 @@ package kvserver import ( "context" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -48,16 +47,89 @@ func (r *Replica) SplitByLoadEnabled() bool { !r.store.TestingKnobs().DisableLoadBasedSplitting } +// getResponseBoundarySpan computes the union span of the true spans that were +// iterated over using the request span and the response's resumeSpan. +// +// Assumptions: +// 1. br != nil +// 2. len(ba.Requests) == len(br.Responses) +// Assumptions are checked in executeBatchWithConcurrencyRetries. +func getResponseBoundarySpan( + ba *roachpb.BatchRequest, br *roachpb.BatchResponse, +) (responseBoundarySpan roachpb.Span) { + addSpanToBoundary := func(span roachpb.Span) { + if !responseBoundarySpan.Valid() { + responseBoundarySpan = span + } else { + responseBoundarySpan = responseBoundarySpan.Combine(span) + } + } + for i, respUnion := range br.Responses { + reqHeader := ba.Requests[i].GetInner().Header() + resp := respUnion.GetInner() + resumeSpan := resp.Header().ResumeSpan + if resumeSpan == nil { + // Fully evaluated. + addSpanToBoundary(reqHeader.Span()) + continue + } + + switch resp.(type) { + case *roachpb.GetResponse: + // The request did not evaluate. Ignore it. + continue + case *roachpb.ScanResponse: + // Not reverse (->) + // Request: [key...............endKey) + // ResumeSpan: [key......endKey) + // True span: [key......key) + // + // Assumptions (not checked to minimize overhead): + // reqHeader.EndKey == resumeSpan.EndKey + // reqHeader.Key <= resumeSpan.Key. + if reqHeader.Key.Equal(resumeSpan.Key) { + // The request did not evaluate. Ignore it. + continue + } + addSpanToBoundary(roachpb.Span{ + Key: reqHeader.Key, + EndKey: resumeSpan.Key, + }) + case *roachpb.ReverseScanResponse: + // Reverse (<-) + // Request: [key...............endKey) + // ResumeSpan: [key......endKey) + // True span: [endKey...endKey) + // + // Assumptions (not checked to minimize overhead): + // reqHeader.Key == resumeSpan.Key + // resumeSpan.EndKey <= reqHeader.EndKey. + if reqHeader.EndKey.Equal(resumeSpan.EndKey) { + // The request did not evaluate. Ignore it. + continue + } + addSpanToBoundary(roachpb.Span{ + Key: resumeSpan.EndKey, + EndKey: reqHeader.EndKey, + }) + default: + // Consider it fully evaluated, which is safe. + addSpanToBoundary(reqHeader.Span()) + } + } + return +} + // recordBatchForLoadBasedSplitting records the batch's spans to be considered // for load based splitting. func (r *Replica) recordBatchForLoadBasedSplitting( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, ) { if !r.SplitByLoadEnabled() { return } shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), len(ba.Requests), func() roachpb.Span { - return spans.BoundarySpan(spanset.SpanGlobal) + return getResponseBoundarySpan(ba, br) }) if shouldInitSplit { r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) diff --git a/pkg/kv/kvserver/replica_split_load_test.go b/pkg/kv/kvserver/replica_split_load_test.go new file mode 100644 index 000000000000..3d04da3546b9 --- /dev/null +++ b/pkg/kv/kvserver/replica_split_load_test.go @@ -0,0 +1,297 @@ +/// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func roachpbKey(key uint32) roachpb.Key { + return keys.SystemSQLCodec.TablePrefix(key) +} + +func requestHeaderWithNilEndKey(key uint32) roachpb.RequestHeader { + return roachpb.RequestHeader{ + Key: roachpbKey(key), + } +} + +func requestHeader(key uint32, endKey uint32) roachpb.RequestHeader { + return roachpb.RequestHeader{ + Key: roachpbKey(key), + EndKey: roachpbKey(endKey), + } +} + +func responseHeaderWithNilResumeSpan() roachpb.ResponseHeader { + return roachpb.ResponseHeader{ + ResumeSpan: nil, + } +} + +func responseHeaderWithNilEndKey(key uint32) roachpb.ResponseHeader { + return roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: roachpbKey(key), + }, + } +} + +func responseHeader(key uint32, endKey uint32) roachpb.ResponseHeader { + return roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: roachpbKey(key), + EndKey: roachpbKey(endKey), + }, + } +} + +func requestUnionGet(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_Get{ + Get: &roachpb.GetRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionGet(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_Get{ + Get: &roachpb.GetResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func requestUnionScan(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_Scan{ + Scan: &roachpb.ScanRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionScan(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_Scan{ + Scan: &roachpb.ScanResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func requestUnionReverseScan(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionReverseScan(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func requestUnionDeleteRange(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_DeleteRange{ + DeleteRange: &roachpb.DeleteRangeRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionDeleteRange(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_DeleteRange{ + DeleteRange: &roachpb.DeleteRangeResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func TestGetResponseBoundarySpan(t *testing.T) { + defer leaktest.AfterTest(t)() + testCases := []struct { + ba *roachpb.BatchRequest + br *roachpb.BatchResponse + expectedResponseBoundarySpan roachpb.Span + }{ + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionGet(requestHeaderWithNilEndKey(100)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionGet(responseHeaderWithNilResumeSpan()), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeaderWithNilResumeSpan()), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + EndKey: roachpbKey(900), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeader(113, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + EndKey: roachpbKey(113), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionReverseScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionReverseScan(responseHeader(100, 879)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(879), + EndKey: roachpbKey(900), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionDeleteRange(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionDeleteRange(responseHeader(113, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + EndKey: roachpbKey(900), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionGet(requestHeaderWithNilEndKey(100)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionGet(responseHeaderWithNilEndKey(100)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{}, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeader(100, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{}, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionReverseScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionReverseScan(responseHeader(100, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{}, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(500, 600)), + requestUnionReverseScan(requestHeader(475, 625)), + requestUnionGet(requestHeaderWithNilEndKey(480)), + requestUnionReverseScan(requestHeader(500, 510)), + requestUnionScan(requestHeader(700, 800)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeader(550, 600)), + responseUnionReverseScan(responseHeader(475, 525)), + responseUnionGet(responseHeaderWithNilResumeSpan()), + responseUnionReverseScan(responseHeaderWithNilResumeSpan()), + responseUnionScan(responseHeader(700, 800)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(480), + EndKey: roachpbKey(625), + }, + }, + } + for i, test := range testCases { + responseBoundarySpan := getResponseBoundarySpan(test.ba, test.br) + assert.Equal(t, test.expectedResponseBoundarySpan, responseBoundarySpan, "Expected response boundary span %s, got %s in test %d", + test.expectedResponseBoundarySpan, responseBoundarySpan, i) + } +}