From ca76c28e00f735471864451b8bfe6ebc1df8d429 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Mon, 7 Nov 2022 23:38:51 -0500 Subject: [PATCH] kvserver: Fix performance regression due to new call to collectSpansRead When we incorporated the use of response data in the load-based splitter, we called collectSpansRead, which is allocation heavy and computationally expensive, resulting in a performance regression. To address this, this patch performs 3 optimizations: 1. Remove the call to collectSpansRead; instead, add a custom function to iterate over the batch of requests / responses and calculate the true spans 2. Instead of constructing a *spanset.SpanSet and finding the union of spans (which uses O(batch_size) memory), we directly compute the union of spans while iterating over the batch resulting in only O(1) memory used 3. Lazily compute the union of true spans only when it is truly needed i.e. we are under heavy load (e.g. >2500QPS) and a load-based splitter has been initialized Release note: None --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/replica_send.go | 11 +- pkg/kv/kvserver/replica_split_load.go | 78 +++++- pkg/kv/kvserver/replica_split_load_test.go | 297 +++++++++++++++++++++ 4 files changed, 381 insertions(+), 6 deletions(-) create mode 100644 pkg/kv/kvserver/replica_split_load_test.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 159158a5e73e..2f1088f20589 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -293,6 +293,7 @@ go_test( "replica_rangefeed_test.go", "replica_rankings_test.go", "replica_sideload_test.go", + "replica_split_load_test.go", "replica_sst_snapshot_storage_test.go", "replica_test.go", "replica_tscache_test.go", diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 70796469ce40..5a90dee8eb06 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -401,9 +401,14 @@ func (r *Replica) executeBatchWithConcurrencyRetries( var g *concurrency.Guard defer func() { // Handle load-based splitting, if necessary. - if pErr == nil { - spansRead, _, _ := r.collectSpansRead(ba, br) - r.recordBatchForLoadBasedSplitting(ctx, ba, spansRead) + if pErr == nil && br != nil { + if len(ba.Requests) != len(br.Responses) { + log.KvDistribution.Errorf(ctx, + "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", + len(ba.Requests), len(br.Responses)) + } else { + r.recordBatchForLoadBasedSplitting(ctx, ba, br) + } } // NB: wrapped to delay g evaluation to its value when returning. diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index ea42bbabbdde..3cc43ae28e39 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -13,7 +13,6 @@ package kvserver import ( "context" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -48,16 +47,89 @@ func (r *Replica) SplitByLoadEnabled() bool { !r.store.TestingKnobs().DisableLoadBasedSplitting } +// getResponseBoundarySpan computes the union span of the true spans that were +// iterated over using the request span and the response's resumeSpan. +// +// Assumptions: +// 1. br != nil +// 2. len(ba.Requests) == len(br.Responses) +// Assumptions are checked in executeBatchWithConcurrencyRetries. +func getResponseBoundarySpan( + ba *roachpb.BatchRequest, br *roachpb.BatchResponse, +) (responseBoundarySpan roachpb.Span) { + addSpanToBoundary := func(span roachpb.Span) { + if !responseBoundarySpan.Valid() { + responseBoundarySpan = span + } else { + responseBoundarySpan = responseBoundarySpan.Combine(span) + } + } + for i, respUnion := range br.Responses { + reqHeader := ba.Requests[i].GetInner().Header() + resp := respUnion.GetInner() + resumeSpan := resp.Header().ResumeSpan + if resumeSpan == nil { + // Fully evaluated. + addSpanToBoundary(reqHeader.Span()) + continue + } + + switch resp.(type) { + case *roachpb.GetResponse: + // The request did not evaluate. Ignore it. + continue + case *roachpb.ScanResponse: + // Not reverse (->) + // Request: [key...............endKey) + // ResumeSpan: [key......endKey) + // True span: [key......key) + // + // Assumptions (not checked to minimize overhead): + // reqHeader.EndKey == resumeSpan.EndKey + // reqHeader.Key <= resumeSpan.Key. + if reqHeader.Key.Equal(resumeSpan.Key) { + // The request did not evaluate. Ignore it. + continue + } + addSpanToBoundary(roachpb.Span{ + Key: reqHeader.Key, + EndKey: resumeSpan.Key, + }) + case *roachpb.ReverseScanResponse: + // Reverse (<-) + // Request: [key...............endKey) + // ResumeSpan: [key......endKey) + // True span: [endKey...endKey) + // + // Assumptions (not checked to minimize overhead): + // reqHeader.Key == resumeSpan.Key + // resumeSpan.EndKey <= reqHeader.EndKey. + if reqHeader.EndKey.Equal(resumeSpan.EndKey) { + // The request did not evaluate. Ignore it. + continue + } + addSpanToBoundary(roachpb.Span{ + Key: resumeSpan.EndKey, + EndKey: reqHeader.EndKey, + }) + default: + // Consider it fully evaluated, which is safe. + addSpanToBoundary(reqHeader.Span()) + } + } + return +} + // recordBatchForLoadBasedSplitting records the batch's spans to be considered // for load based splitting. func (r *Replica) recordBatchForLoadBasedSplitting( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, ) { if !r.SplitByLoadEnabled() { return } shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), len(ba.Requests), func() roachpb.Span { - return spans.BoundarySpan(spanset.SpanGlobal) + return getResponseBoundarySpan(ba, br) }) if shouldInitSplit { r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) diff --git a/pkg/kv/kvserver/replica_split_load_test.go b/pkg/kv/kvserver/replica_split_load_test.go new file mode 100644 index 000000000000..3d04da3546b9 --- /dev/null +++ b/pkg/kv/kvserver/replica_split_load_test.go @@ -0,0 +1,297 @@ +/// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func roachpbKey(key uint32) roachpb.Key { + return keys.SystemSQLCodec.TablePrefix(key) +} + +func requestHeaderWithNilEndKey(key uint32) roachpb.RequestHeader { + return roachpb.RequestHeader{ + Key: roachpbKey(key), + } +} + +func requestHeader(key uint32, endKey uint32) roachpb.RequestHeader { + return roachpb.RequestHeader{ + Key: roachpbKey(key), + EndKey: roachpbKey(endKey), + } +} + +func responseHeaderWithNilResumeSpan() roachpb.ResponseHeader { + return roachpb.ResponseHeader{ + ResumeSpan: nil, + } +} + +func responseHeaderWithNilEndKey(key uint32) roachpb.ResponseHeader { + return roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: roachpbKey(key), + }, + } +} + +func responseHeader(key uint32, endKey uint32) roachpb.ResponseHeader { + return roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: roachpbKey(key), + EndKey: roachpbKey(endKey), + }, + } +} + +func requestUnionGet(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_Get{ + Get: &roachpb.GetRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionGet(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_Get{ + Get: &roachpb.GetResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func requestUnionScan(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_Scan{ + Scan: &roachpb.ScanRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionScan(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_Scan{ + Scan: &roachpb.ScanResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func requestUnionReverseScan(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionReverseScan(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func requestUnionDeleteRange(requestHeader roachpb.RequestHeader) roachpb.RequestUnion { + return roachpb.RequestUnion{ + Value: &roachpb.RequestUnion_DeleteRange{ + DeleteRange: &roachpb.DeleteRangeRequest{ + RequestHeader: requestHeader, + }, + }, + } +} + +func responseUnionDeleteRange(responseHeader roachpb.ResponseHeader) roachpb.ResponseUnion { + return roachpb.ResponseUnion{ + Value: &roachpb.ResponseUnion_DeleteRange{ + DeleteRange: &roachpb.DeleteRangeResponse{ + ResponseHeader: responseHeader, + }, + }, + } +} + +func TestGetResponseBoundarySpan(t *testing.T) { + defer leaktest.AfterTest(t)() + testCases := []struct { + ba *roachpb.BatchRequest + br *roachpb.BatchResponse + expectedResponseBoundarySpan roachpb.Span + }{ + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionGet(requestHeaderWithNilEndKey(100)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionGet(responseHeaderWithNilResumeSpan()), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeaderWithNilResumeSpan()), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + EndKey: roachpbKey(900), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeader(113, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + EndKey: roachpbKey(113), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionReverseScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionReverseScan(responseHeader(100, 879)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(879), + EndKey: roachpbKey(900), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionDeleteRange(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionDeleteRange(responseHeader(113, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(100), + EndKey: roachpbKey(900), + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionGet(requestHeaderWithNilEndKey(100)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionGet(responseHeaderWithNilEndKey(100)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{}, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeader(100, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{}, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionReverseScan(requestHeader(100, 900)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionReverseScan(responseHeader(100, 900)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{}, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + requestUnionScan(requestHeader(500, 600)), + requestUnionReverseScan(requestHeader(475, 625)), + requestUnionGet(requestHeaderWithNilEndKey(480)), + requestUnionReverseScan(requestHeader(500, 510)), + requestUnionScan(requestHeader(700, 800)), + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + responseUnionScan(responseHeader(550, 600)), + responseUnionReverseScan(responseHeader(475, 525)), + responseUnionGet(responseHeaderWithNilResumeSpan()), + responseUnionReverseScan(responseHeaderWithNilResumeSpan()), + responseUnionScan(responseHeader(700, 800)), + }, + }, + expectedResponseBoundarySpan: roachpb.Span{ + Key: roachpbKey(480), + EndKey: roachpbKey(625), + }, + }, + } + for i, test := range testCases { + responseBoundarySpan := getResponseBoundarySpan(test.ba, test.br) + assert.Equal(t, test.expectedResponseBoundarySpan, responseBoundarySpan, "Expected response boundary span %s, got %s in test %d", + test.expectedResponseBoundarySpan, responseBoundarySpan, i) + } +}