Skip to content

Commit

Permalink
kvserver: use response data in the load-based splitter
Browse files Browse the repository at this point in the history
We investigated why running YCSB Workload E results in a single hot
range and we observed that range queries of the form
SELECT * FROM table WHERE pkey >= A LIMIT B will result in all request
spans having the same end key - similar to [A, range_end) - rather than
end keys that take into account the specified LIMIT. Since the majority
of request spans have the same end key, the load splitter algorithm
cannot find a split key without too many contained and balance between
left and right requests. A proposed solution is to use the response span
rather than the request span, since the response span is more accurate
in reflecting the keys that this request truly iterated over. We utilize
the request span as well as the response's resume span to derive the key
span that this request truly iterated over. Using response data (resume
span) rather than just the request span in the load-based splitter
(experimentally) allows the load-based splitter to find a split key
under range query workloads (YCSB Workload E, KV workload with spans).

Release note (ops change): We use response data rather than just the
request span in the load-based splitter to pass more accurate data
about the keys iterated over to the load splitter to find a suitable
split key, enabling the load splitter to find a split key under heavy
range query workloads.
  • Loading branch information
KaiSun314 committed Oct 6, 2022
1 parent 2a1abc8 commit 4d11791
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ go_test(
"replica_raft_truncation_test.go",
"replica_rangefeed_test.go",
"replica_rankings_test.go",
"replica_send_test.go",
"replica_sideload_test.go",
"replica_sst_snapshot_storage_test.go",
"replica_test.go",
Expand Down
73 changes: 67 additions & 6 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,66 @@ type batchExecutionFn func(
var _ batchExecutionFn = (*Replica).executeWriteBatch
var _ batchExecutionFn = (*Replica).executeReadOnlyBatch

func getTrueSpans(
ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse,
) *spanset.SpanSet {
if br != nil {
if len(ba.Requests) != len(br.Responses) {
log.KvDistribution.Errorf(ctx,
"Requests and responses should be equal lengths: # of requests = %d, # of responses = %d",
len(ba.Requests), len(br.Responses))
} else {
spans := spanset.New()
for i, union := range br.Responses {
baInner := ba.Requests[i].GetInner()
var isReverse bool
switch baInner.(type) {
case *roachpb.ReverseScanRequest:
isReverse = true
}
resumeSpan := union.GetInner().Header().ResumeSpan
if resumeSpan == nil {
spans.AddNonMVCC(spanset.SpanAccess(0), baInner.Header().Span())
} else if !isReverse {
// Not reverse (->)
// Request: [key...............endKey)
// ResumeSpan: [key......endKey)
// True span: [key......key)
//
// Assumptions:
// resumeSpan.EndKey == baInner.Header().EndKey
// baInner.Header().Key <= resumeKey
// We don't check these assumptions because comparing keys could
// add potentially significant overhead.
resumeKey := resumeSpan.Key
spans.AddNonMVCC(spanset.SpanAccess(0), roachpb.Span{
Key: baInner.Header().Key,
EndKey: resumeKey,
})
} else {
// Reverse (<-)
// Request: [key...............endKey)
// ResumeSpan: [key......endKey)
// True span: [endKey...endKey)
//
// Assumptions:
// resumeSpan.Key == baInner.Header().Key
// resumeEndKey <= baInner.Header().EndKey
// We don't check these assumptions because comparing keys could
// add potentially significant overhead.
resumeEndKey := resumeSpan.EndKey
spans.AddNonMVCC(spanset.SpanAccess(0), roachpb.Span{
Key: resumeEndKey,
EndKey: baInner.Header().EndKey,
})
}
}
return spans
}
}
return nil
}

// executeBatchWithConcurrencyRetries is the entry point for client (non-admin)
// requests that execute against the range's state. The method coordinates the
// execution of requests that may require multiple retries due to interactions
Expand All @@ -400,6 +460,12 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
var requestEvalKind concurrency.RequestEvalKind
var g *concurrency.Guard
defer func() {
// Handle load-based splitting, if necessary.
spans := getTrueSpans(ctx, ba, br)
if spans != nil {
r.recordBatchForLoadBasedSplitting(ctx, ba, spans)
}

// NB: wrapped to delay g evaluation to its value when returning.
if g != nil {
r.concMgr.FinishReq(g)
Expand All @@ -411,7 +477,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
// commands and wait even if the circuit breaker is tripped.
pp = poison.Policy_Wait
}
for first := true; ; first = false {
for {
// Exit loop if context has been canceled or timed out.
if err := ctx.Err(); err != nil {
return nil, nil, roachpb.NewError(errors.Wrap(err, "aborted during Replica.Send"))
Expand All @@ -431,11 +497,6 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
}
}

// Handle load-based splitting, if necessary.
if first {
r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans)
}

// Acquire latches to prevent overlapping requests from executing until
// this request completes. After latching, wait on any conflicting locks
// to ensure that the request has full isolation during evaluation. This
Expand Down
171 changes: 171 additions & 0 deletions pkg/kv/kvserver/replica_send_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
)

func TestReplicaGetTrueSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
testCases := []struct {
ba *roachpb.BatchRequest
br *roachpb.BatchResponse
expectedTrueSpans []roachpb.Span
}{
{
ba: &roachpb.BatchRequest{
Requests: []roachpb.RequestUnion{
{
Value: &roachpb.RequestUnion_Get{
Get: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: keys.SystemSQLCodec.TablePrefix(100),
EndKey: roachpb.KeyMin,
},
},
},
},
{
Value: &roachpb.RequestUnion_Scan{
Scan: &roachpb.ScanRequest{
RequestHeader: roachpb.RequestHeader{
Key: keys.SystemSQLCodec.TablePrefix(100),
EndKey: keys.SystemSQLCodec.TablePrefix(1000),
},
},
},
},
{
Value: &roachpb.RequestUnion_Scan{
Scan: &roachpb.ScanRequest{
RequestHeader: roachpb.RequestHeader{
Key: keys.SystemSQLCodec.TablePrefix(100),
EndKey: keys.SystemSQLCodec.TablePrefix(1000),
},
},
},
},
{
Value: &roachpb.RequestUnion_ReverseScan{
ReverseScan: &roachpb.ReverseScanRequest{
RequestHeader: roachpb.RequestHeader{
Key: keys.SystemSQLCodec.TablePrefix(100),
EndKey: keys.SystemSQLCodec.TablePrefix(1000),
},
},
},
},
},
},
br: &roachpb.BatchResponse{
Responses: []roachpb.ResponseUnion{
{
Value: &roachpb.ResponseUnion_Get{
Get: &roachpb.GetResponse{
ResponseHeader: roachpb.ResponseHeader{
ResumeSpan: nil,
},
},
},
},
{
Value: &roachpb.ResponseUnion_Scan{
Scan: &roachpb.ScanResponse{
ResponseHeader: roachpb.ResponseHeader{
ResumeSpan: nil,
},
},
},
},
{
Value: &roachpb.ResponseUnion_Scan{
Scan: &roachpb.ScanResponse{
ResponseHeader: roachpb.ResponseHeader{
ResumeSpan: &roachpb.Span{
Key: keys.SystemSQLCodec.TablePrefix(113),
EndKey: keys.SystemSQLCodec.TablePrefix(1000),
},
},
},
},
},
{
Value: &roachpb.ResponseUnion_ReverseScan{
ReverseScan: &roachpb.ReverseScanResponse{
ResponseHeader: roachpb.ResponseHeader{
ResumeSpan: &roachpb.Span{
Key: keys.SystemSQLCodec.TablePrefix(100),
EndKey: keys.SystemSQLCodec.TablePrefix(979),
},
},
},
},
},
},
},
expectedTrueSpans: []roachpb.Span{
{
Key: keys.SystemSQLCodec.TablePrefix(100),
EndKey: roachpb.KeyMin,
},
{
Key: keys.SystemSQLCodec.TablePrefix(100),
EndKey: keys.SystemSQLCodec.TablePrefix(1000),
},
{
Key: keys.SystemSQLCodec.TablePrefix(100),
EndKey: keys.SystemSQLCodec.TablePrefix(113),
},
{
Key: keys.SystemSQLCodec.TablePrefix(979),
EndKey: keys.SystemSQLCodec.TablePrefix(1000),
},
},
},
{
ba: &roachpb.BatchRequest{
Requests: []roachpb.RequestUnion{
{},
{},
},
},
br: &roachpb.BatchResponse{
Responses: []roachpb.ResponseUnion{
{},
},
},
expectedTrueSpans: nil,
},
{
ba: &roachpb.BatchRequest{
Requests: []roachpb.RequestUnion{},
},
br: nil,
expectedTrueSpans: nil,
},
}
for i, test := range testCases {
trueSpanSet := getTrueSpans(context.Background(), test.ba, test.br)
var trueSpans []roachpb.Span
trueSpanSet.Iterate(func(spanAccess spanset.SpanAccess, spanScope spanset.SpanScope, span spanset.Span) {
trueSpans = append(trueSpans, span.Span)
})
assert.Equal(t, test.expectedTrueSpans, trueSpans, "True spans not equal in test %d", i)
}
}

0 comments on commit 4d11791

Please sign in to comment.