Skip to content

Commit

Permalink
kvserver: Fix performance regression due to new call to collectSpansRead
Browse files Browse the repository at this point in the history
When we incorporated the use of response data in the load-based
splitter, we called collectSpansRead, which is allocation heavy and
computationally expensive, resulting in a performance regression.

To address this, this patch performs 3 optimizations:
1. Remove the call to collectSpansRead; instead, add a custom function
to iterate over the batch of requests / responses and calculate the true
spans
2. Instead of constructing a *spanset.SpanSet and finding the union of
spans (which uses O(batch_size) memory), we directly compute the union
of spans while iterating over the batch resulting in only O(1) memory
used
3. Lazily compute the union of true spans only when it is truly needed
i.e. we are under heavy load (e.g. >2500QPS) and a load-based splitter
has been initialized

Release note: None
  • Loading branch information
KaiSun314 authored and kvoli committed Jun 7, 2023
1 parent d2db70a commit 8d4c5e1
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ go_test(
"replica_rangefeed_test.go",
"replica_rankings_test.go",
"replica_sideload_test.go",
"replica_split_load_test.go",
"replica_sst_snapshot_storage_test.go",
"replica_test.go",
"replica_tscache_test.go",
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,14 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
var g *concurrency.Guard
defer func() {
// Handle load-based splitting, if necessary.
if pErr == nil {
spansRead, _, _ := r.collectSpansRead(ba, br)
r.recordBatchForLoadBasedSplitting(ctx, ba, spansRead)
if pErr == nil && br != nil {
if len(ba.Requests) != len(br.Responses) {
log.KvDistribution.Errorf(ctx,
"Requests and responses should be equal lengths: # of requests = %d, # of responses = %d",
len(ba.Requests), len(br.Responses))
} else {
r.recordBatchForLoadBasedSplitting(ctx, ba, br)
}
}

// NB: wrapped to delay g evaluation to its value when returning.
Expand Down
78 changes: 75 additions & 3 deletions pkg/kv/kvserver/replica_split_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvserver
import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -48,16 +47,89 @@ func (r *Replica) SplitByLoadEnabled() bool {
!r.store.TestingKnobs().DisableLoadBasedSplitting
}

// getResponseBoundarySpan computes the union span of the true spans that were
// iterated over using the request span and the response's resumeSpan.
//
// Assumptions:
// 1. br != nil
// 2. len(ba.Requests) == len(br.Responses)
// Assumptions are checked in executeBatchWithConcurrencyRetries.
func getResponseBoundarySpan(
ba *roachpb.BatchRequest, br *roachpb.BatchResponse,
) (responseBoundarySpan roachpb.Span) {
addSpanToBoundary := func(span roachpb.Span) {
if !responseBoundarySpan.Valid() {
responseBoundarySpan = span
} else {
responseBoundarySpan = responseBoundarySpan.Combine(span)
}
}
for i, respUnion := range br.Responses {
reqHeader := ba.Requests[i].GetInner().Header()
resp := respUnion.GetInner()
resumeSpan := resp.Header().ResumeSpan
if resumeSpan == nil {
// Fully evaluated.
addSpanToBoundary(reqHeader.Span())
continue
}

switch resp.(type) {
case *roachpb.GetResponse:
// The request did not evaluate. Ignore it.
continue
case *roachpb.ScanResponse:
// Not reverse (->)
// Request: [key...............endKey)
// ResumeSpan: [key......endKey)
// True span: [key......key)
//
// Assumptions (not checked to minimize overhead):
// reqHeader.EndKey == resumeSpan.EndKey
// reqHeader.Key <= resumeSpan.Key.
if reqHeader.Key.Equal(resumeSpan.Key) {
// The request did not evaluate. Ignore it.
continue
}
addSpanToBoundary(roachpb.Span{
Key: reqHeader.Key,
EndKey: resumeSpan.Key,
})
case *roachpb.ReverseScanResponse:
// Reverse (<-)
// Request: [key...............endKey)
// ResumeSpan: [key......endKey)
// True span: [endKey...endKey)
//
// Assumptions (not checked to minimize overhead):
// reqHeader.Key == resumeSpan.Key
// resumeSpan.EndKey <= reqHeader.EndKey.
if reqHeader.EndKey.Equal(resumeSpan.EndKey) {
// The request did not evaluate. Ignore it.
continue
}
addSpanToBoundary(roachpb.Span{
Key: resumeSpan.EndKey,
EndKey: reqHeader.EndKey,
})
default:
// Consider it fully evaluated, which is safe.
addSpanToBoundary(reqHeader.Span())
}
}
return
}

// recordBatchForLoadBasedSplitting records the batch's spans to be considered
// for load based splitting.
func (r *Replica) recordBatchForLoadBasedSplitting(
ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet,
ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse,
) {
if !r.SplitByLoadEnabled() {
return
}
shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), len(ba.Requests), func() roachpb.Span {
return spans.BoundarySpan(spanset.SpanGlobal)
return getResponseBoundarySpan(ba, br)
})
if shouldInitSplit {
r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp())
Expand Down
Loading

0 comments on commit 8d4c5e1

Please sign in to comment.