From 09d7d5ee5cfad20269c375ffa93b94e75c7ac47b Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Tue, 4 Oct 2022 15:48:56 -0400 Subject: [PATCH] kvserver: use response data in the load-based splitter We investigated why running YCSB Workload E results in a single hot range and we observed that range queries of the form SELECT * FROM table WHERE pkey >= A LIMIT B will result in all request spans having the same end key - similar to [A, range_end) - rather than end keys that take into account the specified LIMIT. Since the majority of request spans have the same end key, the load splitter algorithm cannot find a split key without too many contained and balance between left and right requests. A proposed solution is to use the response span rather than the request span, since the response span is more accurate in reflecting the keys that this request truly iterated over. We utilize the request span as well as the response's resume span to derive the key span that this request truly iterated over. Using response data (resume span) rather than just the request span in the load-based splitter (experimentally) allows the load-based splitter to find a split key under range query workloads (YCSB Workload E, KV workload with spans). Release note (ops change): We use response data rather than just the request span in the load-based splitter to pass more accurate data about the keys iterated over to the load splitter to find a suitable split key, enabling the load splitter to find a split key under heavy range query workloads. --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/replica_send.go | 73 +++++++++++- pkg/kv/kvserver/replica_send_test.go | 171 +++++++++++++++++++++++++++ 3 files changed, 239 insertions(+), 6 deletions(-) create mode 100644 pkg/kv/kvserver/replica_send_test.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 37adbb41f394..c9a8295c1943 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -293,6 +293,7 @@ go_test( "replica_raft_truncation_test.go", "replica_rangefeed_test.go", "replica_rankings_test.go", + "replica_send_test.go", "replica_sideload_test.go", "replica_sst_snapshot_storage_test.go", "replica_test.go", diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 395c2e8a1a94..0a8b1cc71284 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -378,6 +378,66 @@ type batchExecutionFn func( var _ batchExecutionFn = (*Replica).executeWriteBatch var _ batchExecutionFn = (*Replica).executeReadOnlyBatch +func getTrueSpans( + ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, +) *spanset.SpanSet { + if br != nil { + if len(ba.Requests) != len(br.Responses) { + log.KvDistribution.Errorf(ctx, + "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", + len(ba.Requests), len(br.Responses)) + } else { + spans := spanset.New() + for i, union := range br.Responses { + baInner := ba.Requests[i].GetInner() + var isReverse bool + switch baInner.(type) { + case *roachpb.ReverseScanRequest: + isReverse = true + } + resumeSpan := union.GetInner().Header().ResumeSpan + if resumeSpan == nil { + spans.AddNonMVCC(spanset.SpanAccess(0), baInner.Header().Span()) + } else if !isReverse { + // Not reverse (->) + // Request: [key...............endKey] + // ResumeSpan: [key......endKey] + // True span: [key......key] + // + // Assumptions: + // resumeSpan.EndKey == baInner.Header().EndKey + // baInner.Header().Key <= resumeKey + // We don't check these assumptions because comparing keys could + // add potentially significant overhead. + resumeKey := resumeSpan.Key + spans.AddNonMVCC(spanset.SpanAccess(0), roachpb.Span{ + Key: baInner.Header().Key, + EndKey: resumeKey, + }) + } else { + // Reverse (<-) + // Request: [key...............endKey] + // ResumeSpan: [key......endKey] + // True span: [endKey...endKey] + // + // Assumptions: + // resumeSpan.Key == baInner.Header().Key + // resumeEndKey <= baInner.Header().EndKey + // We don't check these assumptions because comparing keys could + // add potentially significant overhead. + resumeEndKey := resumeSpan.EndKey + spans.AddNonMVCC(spanset.SpanAccess(0), roachpb.Span{ + Key: resumeEndKey, + EndKey: baInner.Header().EndKey, + }) + } + } + return spans + } + } + return nil +} + // executeBatchWithConcurrencyRetries is the entry point for client (non-admin) // requests that execute against the range's state. The method coordinates the // execution of requests that may require multiple retries due to interactions @@ -400,6 +460,12 @@ func (r *Replica) executeBatchWithConcurrencyRetries( var requestEvalKind concurrency.RequestEvalKind var g *concurrency.Guard defer func() { + // Handle load-based splitting, if necessary. + spans := getTrueSpans(ctx, ba, br) + if spans != nil { + r.recordBatchForLoadBasedSplitting(ctx, ba, spans) + } + // NB: wrapped to delay g evaluation to its value when returning. if g != nil { r.concMgr.FinishReq(g) @@ -411,7 +477,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( // commands and wait even if the circuit breaker is tripped. pp = poison.Policy_Wait } - for first := true; ; first = false { + for { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { return nil, nil, roachpb.NewError(errors.Wrap(err, "aborted during Replica.Send")) @@ -431,11 +497,6 @@ func (r *Replica) executeBatchWithConcurrencyRetries( } } - // Handle load-based splitting, if necessary. - if first { - r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) - } - // Acquire latches to prevent overlapping requests from executing until // this request completes. After latching, wait on any conflicting locks // to ensure that the request has full isolation during evaluation. This diff --git a/pkg/kv/kvserver/replica_send_test.go b/pkg/kv/kvserver/replica_send_test.go new file mode 100644 index 000000000000..b3339e94dd62 --- /dev/null +++ b/pkg/kv/kvserver/replica_send_test.go @@ -0,0 +1,171 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func TestReplicaGetTrueSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + testCases := []struct { + ba *roachpb.BatchRequest + br *roachpb.BatchResponse + expectedTrueSpans []roachpb.Span + }{ + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + { + Value: &roachpb.RequestUnion_Get{ + Get: &roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: roachpb.KeyMin, + }, + }, + }, + }, + { + Value: &roachpb.RequestUnion_Scan{ + Scan: &roachpb.ScanRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + }, + { + Value: &roachpb.RequestUnion_Scan{ + Scan: &roachpb.ScanRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + }, + { + Value: &roachpb.RequestUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + }, + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + { + Value: &roachpb.ResponseUnion_Get{ + Get: &roachpb.GetResponse{ + ResponseHeader: roachpb.ResponseHeader{ + ResumeSpan: nil, + }, + }, + }, + }, + { + Value: &roachpb.ResponseUnion_Scan{ + Scan: &roachpb.ScanResponse{ + ResponseHeader: roachpb.ResponseHeader{ + ResumeSpan: nil, + }, + }, + }, + }, + { + Value: &roachpb.ResponseUnion_Scan{ + Scan: &roachpb.ScanResponse{ + ResponseHeader: roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(113), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + }, + }, + { + Value: &roachpb.ResponseUnion_ReverseScan{ + ReverseScan: &roachpb.ReverseScanResponse{ + ResponseHeader: roachpb.ResponseHeader{ + ResumeSpan: &roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(979), + }, + }, + }, + }, + }, + }, + }, + expectedTrueSpans: []roachpb.Span{ + { + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: roachpb.KeyMin, + }, + { + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + { + Key: keys.SystemSQLCodec.TablePrefix(100), + EndKey: keys.SystemSQLCodec.TablePrefix(113), + }, + { + Key: keys.SystemSQLCodec.TablePrefix(979), + EndKey: keys.SystemSQLCodec.TablePrefix(1000), + }, + }, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{ + {}, + {}, + }, + }, + br: &roachpb.BatchResponse{ + Responses: []roachpb.ResponseUnion{ + {}, + }, + }, + expectedTrueSpans: nil, + }, + { + ba: &roachpb.BatchRequest{ + Requests: []roachpb.RequestUnion{}, + }, + br: nil, + expectedTrueSpans: nil, + }, + } + for i, test := range testCases { + trueSpanSet := getTrueSpans(context.Background(), test.ba, test.br) + var trueSpans []roachpb.Span + trueSpanSet.Iterate(func(spanAccess spanset.SpanAccess, spanScope spanset.SpanScope, span spanset.Span) { + trueSpans = append(trueSpans, span.Span) + }) + assert.Equal(t, test.expectedTrueSpans, trueSpans, "True spans not equal in test %d", i) + } +}