From c6456e4c57bb88b971093448bec68268009963ff Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 10 Feb 2022 20:26:25 +0000 Subject: [PATCH 1/2] kvserver: update replica_send call flow docs This patch updates the function naming to reflect the current method handler for batch requests containing at least one write request. to `executeWriteBatch`, from `executeReadWriteBatch`. Release note: None --- pkg/kv/kvserver/replica_send.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 5ace75fb2f21..48522d0c5a2f 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -74,7 +74,7 @@ var optimisticEvalLimitedScans = settings.RegisterBoolSetting( // │ (handles leases and txn conflicts) │ // │ │ // ▼ │ -// executeReadWriteBatch │ +// executeWriteBatch │ // │ │ // ▼ ▼ // evalAndPropose (turns the BatchRequest executeReadOnlyBatch From 27bfca4b9f14cbe74880553a61f1b07993606689 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 3 Feb 2022 23:23:51 +0000 Subject: [PATCH 2/2] kvserver: account for AddSSTableRequests in QPS Previously, Queries-Per-Second (QPS) was calculated uniformly per `BatchRequest` as 1. This patch introduces variable QPS calculation for `AddSSTableRequest`, which use an order of magnitude more resources than other request types. This patch introduces the `kv.replica_stats.addsst_request_size_factor` cluster setting. This setting is used to attribute QPS to `AddSSTableRequest` sizes. The calculation is done as QPS = 1 + size(AddSSTableRequest) / factor. When `kv.replica_stats.addsst_request_size_factor` is less than 1, or no `AddSSTableRequest` exists within a `BatchRequest`, then QPS = 1; the current behavior today. resolves #73731 Release note (performance improvement): Introduced `kv.replica_stats.addsst_request_size_factor` cluster setting. This setting is used to tune Queries-Per-Second (QPS) sensitivity to large imports. By default, this setting is disabled. When enabled, the size of any AddSSTableRequest will contribute to QPS in inverse relation to this settings magnitude. By default this setting configured to a conservative 50,000; every 50 kilobytes will be accounted for as an additional 1 QPS. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/allocator_test.go | 14 +-- .../batcheval/cmd_add_sstable_test.go | 20 ++-- pkg/kv/kvserver/replica_metrics.go | 4 +- pkg/kv/kvserver/replica_rankings_test.go | 111 ++++++++++++++++++ pkg/kv/kvserver/replica_send.go | 33 +++++- pkg/kv/kvserver/replica_stats.go | 56 ++++++--- pkg/kv/kvserver/replica_stats_test.go | 20 ++-- pkg/kv/kvserver/store_pool_test.go | 2 +- 11 files changed, 215 insertions(+), 48 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index c665a1c7ccb4..474bb5136a69 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -33,6 +33,7 @@ kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconcili kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled +kv.replica_stats.addsst_request_size_factor integer 50000 the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1 kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions kv.transaction.max_refresh_spans_bytes integer 256000 maximum number of bytes used to track refresh spans in serializable transactions diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 4f1e1b6f9ad7..223e3a602a86 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -41,6 +41,7 @@ kv.range_split.load_qps_thresholdinteger2500the QPS over which, the range becomes a candidate for load based splitting kv.rangefeed.enabledbooleanfalseif set, rangefeed registration is enabled kv.replica_circuit_breaker.slow_replication_thresholdduration0sduration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers) +kv.replica_stats.addsst_request_size_factorinteger50000the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1 kv.replication_reports.intervalduration1m0sthe frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) kv.snapshot_rebalance.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 15ff5dda957c..fd93fec81209 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -379,6 +379,7 @@ go_test( "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", + "//pkg/testutils/sstutil", "//pkg/testutils/testcluster", "//pkg/ts", "//pkg/ts/tspb", diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 553aeffc5041..941cf9168528 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -5110,17 +5110,17 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { // the unknown node 99 in evenlyBalanced to verify that requests from // unknown localities don't affect the algorithm. evenlyBalanced := newReplicaStats(clock, localityFn) - evenlyBalanced.record(1) - evenlyBalanced.record(2) - evenlyBalanced.record(3) + evenlyBalanced.recordCount(1, 1) + evenlyBalanced.recordCount(1, 2) + evenlyBalanced.recordCount(1, 3) imbalanced1 := newReplicaStats(clock, localityFn) imbalanced2 := newReplicaStats(clock, localityFn) imbalanced3 := newReplicaStats(clock, localityFn) for i := 0; i < 100*int(MinLeaseTransferStatsDuration.Seconds()); i++ { - evenlyBalanced.record(99) - imbalanced1.record(1) - imbalanced2.record(2) - imbalanced3.record(3) + evenlyBalanced.recordCount(1, 99) + imbalanced1.recordCount(1, 1) + imbalanced2.recordCount(1, 2) + imbalanced3.recordCount(1, 3) } manual.Increment(int64(MinLeaseTransferStatsDuration)) diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 33b9b316528c..cc276544dc3d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -623,16 +623,16 @@ func TestEvalAddSSTable(t *testing.T) { expectStatsEst: true, }, /* Disabled due to nondeterminism under metamorphic tests. SSTTimestamp will - * shortly be removed anyway. - "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": { - atReqTS: 8, - data: []sstutil.KV{{"a", 6, "a6"}}, - sst: []sstutil.KV{{"a", 7, "a7"}}, - sstTimestamp: 8, - expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}}, - expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`, - expectStatsEst: true, - },*/ + * shortly be removed anyway. + "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": { + atReqTS: 8, + data: []sstutil.KV{{"a", 6, "a6"}}, + sst: []sstutil.KV{{"a", 7, "a7"}}, + sstTimestamp: 8, + expect: []sstutil.KV{{"a", 7, "a7"}, {"a", 6, "a6"}}, + expectErrRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`, + expectStatsEst: true, + },*/ } testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) { for name, tc := range testcases { diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index fd7db9b5095a..7844ae8f79f9 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -249,8 +249,8 @@ func calcBehindCount( // A "Query" is a BatchRequest (regardless of its contents) arriving at the // leaseholder with a gateway node set in the header (i.e. excluding requests // that weren't sent through a DistSender, which in practice should be -// practically none). Also return the amount of time over which the stat was -// accumulated. +// practically none). See Replica.getBatchRequestQPS() for how this is +// accounted for. func (r *Replica) QueriesPerSecond() (float64, time.Duration) { return r.leaseholderStats.avgQPS() } diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go index 548e2e91f1ed..271b88c5eee8 100644 --- a/pkg/kv/kvserver/replica_rankings_test.go +++ b/pkg/kv/kvserver/replica_rankings_test.go @@ -11,13 +11,20 @@ package kvserver import ( + "context" + "fmt" "math/rand" "reflect" "testing" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sstutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" ) func TestReplicaRankings(t *testing.T) { @@ -73,3 +80,107 @@ func TestReplicaRankings(t *testing.T) { } } } + +// TestAddSSTQPSStat verifies that AddSSTableRequests are accounted for +// differently, when present in a BatchRequest, with a divisor set. +func TestAddSSTQPSStat(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0) + db := ts.DB() + conn := tc.ServerConn(0) + sqlDB := sqlutils.MakeSQLRunner(conn) + + scratchKey := tc.ScratchRange(t) + nextKey := scratchKey.Next() + + // Construct an sst with 200 keys that will be reused with different divisors. + sstKeys := make([]sstutil.KV, 200) + for i := range sstKeys { + sstKeys[i] = sstutil.KV{ + KeyString: nextKey.String(), + WallTimestamp: 1, + ValueString: "value", + } + nextKey = nextKey.Next() + } + sst, start, end := sstutil.MakeSST(t, sstKeys) + requestSize := float64(len(sst)) + + sstReq := &roachpb.AddSSTableRequest{ + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + Data: sst, + MVCCStats: sstutil.ComputeStats(t, sst), + } + + get := &roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{Key: start}, + } + + addSSTBA := roachpb.BatchRequest{} + nonSSTBA := roachpb.BatchRequest{} + addSSTBA.Add(sstReq) + nonSSTBA.Add(get) + + // When the factor is set to 0, it is disabled and we expect uniform 1 QPS. + // In all other cases, we expect 1 + the size of a + // AddSSTableRequest/factor. If no AddSStableRequest exists within the + // request, it should be cost 1, regardless of factor. + testCases := []struct { + addsstRequestFactor int + expectedQPS float64 + ba roachpb.BatchRequest + }{ + {0, 1, addSSTBA}, + {100, 1, nonSSTBA}, + {10, 1 + requestSize/10, addSSTBA}, + {20, 1 + requestSize/20, addSSTBA}, + {40, 1 + requestSize/40, addSSTBA}, + {100, 1 + requestSize/100, addSSTBA}, + } + + // Send an AddSSTRequest once to create the key range. + _, pErr := db.NonTransactionalSender().Send(ctx, addSSTBA) + require.Nil(t, pErr) + + store, err := ts.GetStores().(*Stores).GetStore(ts.GetFirstStoreID()) + require.NoError(t, err) + + repl := store.LookupReplica(roachpb.RKey(start)) + require.NotNil(t, repl) + + // Disable the consistency checker, to avoid interleaving requests + // artificially inflating QPS due to consistency checking. + sqlDB.Exec(t, `SET CLUSTER SETTING server.consistency_check.interval = '0'`) + + for _, testCase := range testCases { + sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor)) + + // Reset the request counts to 0 before sending to clear previous requests. + repl.leaseholderStats.resetRequestCounts() + + _, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba) + require.Nil(t, pErr) + + repl.leaseholderStats.mu.Lock() + queriesAfter, _ := repl.leaseholderStats.sumQueriesLocked() + repl.leaseholderStats.mu.Unlock() + + // If queries are correctly recorded, we should see increase in query + // count by the expected QPS. However, it is possible to to get a + // slightly higher number due to interleaving requests. To avoid a + // flakey test, we assert that QPS is at least as high as expected, + // then no greater than 4 requests of expected QPS. If this test is + // flaky, increase the delta to account for background activity + // interleaving with measurements. + require.GreaterOrEqual(t, queriesAfter, testCase.expectedQPS) + require.InDelta(t, queriesAfter, testCase.expectedQPS, 4) + } +} diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 48522d0c5a2f..b094e78db398 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -122,8 +122,9 @@ func (r *Replica) sendWithoutRangeID( ctx context.Context, ba *roachpb.BatchRequest, ) (_ *roachpb.BatchResponse, rErr *roachpb.Error) { var br *roachpb.BatchResponse + if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 { - r.leaseholderStats.record(ba.Header.GatewayNodeID) + r.leaseholderStats.recordCount(r.getBatchRequestQPS(ctx, ba), ba.Header.GatewayNodeID) } // Add the range log tag. @@ -966,6 +967,36 @@ func (r *Replica) executeAdminBatch( return br, nil } +// getBatchRequestQPS calculates the cost estimation of a BatchRequest. The +// estimate returns Queries Per Second (QPS), representing the abstract +// resource cost associated with this request. BatchRequests are calculated as +// 1 QPS, unless an AddSSTableRequest exists, in which case the sum of all +// AddSSTableRequest's data size is divided by a factor and added to QPS. This +// specific treatment of QPS is a special case to account for the mismatch +// between AddSSTableRequest and other requests in terms of resource use. +func (r *Replica) getBatchRequestQPS(ctx context.Context, ba *roachpb.BatchRequest) float64 { + var count float64 = 1 + + // For divisors less than 1, use the default treatment of QPS. + requestFact := AddSSTableRequestSizeFactor.Get(&r.store.cfg.Settings.SV) + if requestFact < 1 { + return count + } + + var addSSTSize float64 = 0 + for _, req := range ba.Requests { + switch t := req.GetInner().(type) { + case *roachpb.AddSSTableRequest: + addSSTSize += float64(len(t.Data)) + default: + continue + } + } + + count += addSSTSize / float64(requestFact) + return count +} + // checkBatchRequest verifies BatchRequest validity requirements. In particular, // the batch must have an assigned timestamp, and either all requests must be // read-only, or none. diff --git a/pkg/kv/kvserver/replica_stats.go b/pkg/kv/kvserver/replica_stats.go index 51d09555eed0..a60c70a28a1f 100644 --- a/pkg/kv/kvserver/replica_stats.go +++ b/pkg/kv/kvserver/replica_stats.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -31,6 +32,24 @@ const ( MinStatsDuration = 5 * time.Second ) +// AddSSTableRequestSizeFactor wraps +// "kv.replica_stats.addsst_request_size_factor". When this setting is set to +// 0, all batch requests are treated uniformly as 1 QPS. When this setting is +// greater than or equal to 1, AddSSTable requests will add additional QPS, +// when present within a batch request. The additional QPS is size of the +// SSTable data, divided by this factor. Thereby, the magnitude of this factor +// is inversely related to QPS sensitivity to AddSSTableRequests. +var AddSSTableRequestSizeFactor = settings.RegisterIntSetting( + settings.TenantWritable, + "kv.replica_stats.addsst_request_size_factor", + "the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1", + // The default value of 50,000 was chosen as the default divisor, following manual testing that + // is discussed in this pull request: #76252. Every additional 50,000 AddSSTable bytes will + // increase accounted QPS by 1. Typically AddSSTableRequests are ~1mb in size, accounted as 20 + // QPS. + 50000, +).WithPublic() + type localityOracle func(roachpb.NodeID) string // perLocalityCounts maps from the string representation of a locality to count. @@ -105,10 +124,6 @@ func (rs *replicaStats) splitRequestCounts(other *replicaStats) { } } -func (rs *replicaStats) record(nodeID roachpb.NodeID) { - rs.recordCount(1, nodeID) -} - func (rs *replicaStats) recordCount(count float64, nodeID roachpb.NodeID) { var locality string if rs.getNodeLocality != nil { @@ -179,6 +194,25 @@ func (rs *replicaStats) perLocalityDecayingQPS() (perLocalityCounts, time.Durati return counts, now.Sub(rs.mu.lastReset) } +// sumQueriesLocked returns the sum of all queries currently recorded. +// Calling this method requires holding a lock on mu. +func (rs *replicaStats) sumQueriesLocked() (float64, int) { + var sum float64 + var windowsUsed int + for i := range rs.mu.requests { + // We have to add len(rs.mu.requests) to the numerator to avoid getting a + // negative result from the modulus operation when rs.mu.idx is small. + requestsIdx := (rs.mu.idx + len(rs.mu.requests) - i) % len(rs.mu.requests) + if cur := rs.mu.requests[requestsIdx]; cur != nil { + windowsUsed++ + for _, v := range cur { + sum += v + } + } + } + return sum, windowsUsed +} + // avgQPS returns the average requests-per-second and the amount of time // over which the stat was accumulated. Note that these averages are exact, // not exponentially decayed (there isn't a ton of justification for going @@ -196,19 +230,7 @@ func (rs *replicaStats) avgQPS() (float64, time.Duration) { rs.maybeRotateLocked(now) // First accumulate the counts, then divide by the total number of seconds. - var sum float64 - var windowsUsed int - for i := range rs.mu.requests { - // We have to add len(rs.mu.requests) to the numerator to avoid getting a - // negative result from the modulus operation when rs.mu.idx is small. - requestsIdx := (rs.mu.idx + len(rs.mu.requests) - i) % len(rs.mu.requests) - if cur := rs.mu.requests[requestsIdx]; cur != nil { - windowsUsed++ - for _, v := range cur { - sum += v - } - } - } + sum, windowsUsed := rs.sumQueriesLocked() if windowsUsed <= 0 { return 0, 0 } diff --git a/pkg/kv/kvserver/replica_stats_test.go b/pkg/kv/kvserver/replica_stats_test.go index 364680d23a9a..d529a86d7784 100644 --- a/pkg/kv/kvserver/replica_stats_test.go +++ b/pkg/kv/kvserver/replica_stats_test.go @@ -174,7 +174,7 @@ func TestReplicaStats(t *testing.T) { return tc.localities[nodeID] }) for _, req := range tc.reqs { - rs.record(req) + rs.recordCount(1, req) } manual.Increment(int64(time.Second)) if actual, _ := rs.perLocalityDecayingQPS(); !floatMapsEqual(tc.expected, actual) { @@ -200,7 +200,7 @@ func TestReplicaStats(t *testing.T) { t.Errorf("%d: avgQPS() got %f, want %f", i, actual, expectedAvgQPS) } rs.resetRequestCounts() - if actual, _ := rs.perLocalityDecayingQPS(); len(actual) != 0 { + if actual, _ := rs.sumQueriesLocked(); actual != 0 { t.Errorf("%d: unexpected non-empty QPS averages after resetting: %+v", i, actual) } } @@ -240,7 +240,7 @@ func TestReplicaStatsDecay(t *testing.T) { { for _, req := range []roachpb.NodeID{1, 1, 2, 2, 3} { - rs.record(req) + rs.recordCount(1, req) } counts := perLocalityCounts{ awsLocalities[1]: 2, @@ -285,11 +285,11 @@ func TestReplicaStatsDecay(t *testing.T) { { for _, req := range []roachpb.NodeID{1, 1, 2, 2, 3} { - rs.record(req) + rs.recordCount(1, req) } manual.Increment(int64(replStatsRotateInterval)) for _, req := range []roachpb.NodeID{2, 2, 3, 3, 3} { - rs.record(req) + rs.recordCount(1, req) } durationDivisor := time.Duration(float64(replStatsRotateInterval) * decayFactor).Seconds() expected := perLocalityCounts{ @@ -322,11 +322,11 @@ func TestReplicaStatsDecaySmoothing(t *testing.T) { rs := newReplicaStats(clock, func(nodeID roachpb.NodeID) string { return awsLocalities[nodeID] }) - rs.record(1) - rs.record(1) - rs.record(2) - rs.record(2) - rs.record(3) + rs.recordCount(1, 1) + rs.recordCount(1, 1) + rs.recordCount(1, 2) + rs.recordCount(1, 2) + rs.recordCount(1, 3) expected := perLocalityCounts{ awsLocalities[1]: 2, awsLocalities[2]: 2, diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index 78354ab253d4..70ebdd307c9d 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -547,7 +547,7 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { replica.mu.Unlock() rs := newReplicaStats(clock, nil) for _, store := range stores { - rs.record(store.Node.NodeID) + rs.recordCount(1, store.Node.NodeID) } manual.Increment(int64(MinStatsDuration + time.Second)) replica.leaseholderStats = rs