diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 52b1f4e792f2..7efdecc37aaa 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -226,6 +226,7 @@ /pkg/kv/kvserver/kvserverpb/ @cockroachdb/kv-prs /pkg/kv/kvserver/kvstorage/ @cockroachdb/repl-prs /pkg/kv/kvserver/liveness/ @cockroachdb/kv-prs +/pkg/kv/kvserver/load/ @cockroachdb/kv-prs /pkg/kv/kvserver/logstore/ @cockroachdb/repl-prs /pkg/kv/kvserver/loqrecovery/ @cockroachdb/repl-prs /pkg/kv/kvserver/multiqueue/ @cockroachdb/kv-prs diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 4039f828d9c7..2c7330d3b6d7 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1197,6 +1197,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/liveness/livenesspb:livenesspb", "//pkg/kv/kvserver/liveness:liveness", "//pkg/kv/kvserver/liveness:liveness_test", + "//pkg/kv/kvserver/load:load", "//pkg/kv/kvserver/logstore:logstore", "//pkg/kv/kvserver/logstore:logstore_test", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb", @@ -2555,6 +2556,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/kvstorage:get_x_data", "//pkg/kv/kvserver/liveness:get_x_data", "//pkg/kv/kvserver/liveness/livenesspb:get_x_data", + "//pkg/kv/kvserver/load:get_x_data", "//pkg/kv/kvserver/logstore:get_x_data", "//pkg/kv/kvserver/loqrecovery:get_x_data", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:get_x_data", diff --git a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel index 99b39c431c5b..0f0f3d289624 100644 --- a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel @@ -70,6 +70,7 @@ go_test( "//pkg/kv/kvclient/rangecache", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", + "//pkg/kv/kvserver/load", "//pkg/roachpb", "//pkg/rpc", "//pkg/security", diff --git a/pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go b/pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go index a909fef6bdf8..cad2090abf8e 100644 --- a/pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -68,15 +69,15 @@ func TestTenantRangeQPSStat(t *testing.T) { require.NoError(t, err) repl, err := store.GetReplica(roachpb.RangeID(rangeID)) require.NoError(t, err) - - qpsBefore, durationBefore := repl.QueriesPerSecond() - queriesBefore := qpsBefore * durationBefore.Seconds() + // NB: We call directly into the load tracking struct, in order to avoid + // flakes due to timing differences affecting the result + loadStats := repl.GetLoadStatsForTesting() + qpsBefore := loadStats.TestingGetSum(load.Queries) for i := 0; i < 110; i++ { r.Exec(t, `SELECT k FROM foo.qps_test`) } - qpsAfter, durationAfter := repl.QueriesPerSecond() - queriesAfter := qpsAfter * durationAfter.Seconds() - queriesIncrease := int(queriesAfter - queriesBefore) + qpsAfter := loadStats.TestingGetSum(load.Queries) + queriesIncrease := int(qpsAfter - qpsBefore) // If queries are correctly recorded, we should see increase in query count by // 110. As it is possible due to rounding and conversion from QPS to query count // to get a slightly higher or lower number - we expect the increase to be at diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 60272eafe14f..10528735f73d 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -51,7 +51,6 @@ go_library( "replica_gc_queue.go", "replica_gossip.go", "replica_init.go", - "replica_load.go", "replica_metrics.go", "replica_placeholder.go", "replica_proposal.go", @@ -138,6 +137,7 @@ go_library( "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/kv/kvserver/load", "//pkg/kv/kvserver/logstore", "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", @@ -360,6 +360,7 @@ go_test( "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/kv/kvserver/load", "//pkg/kv/kvserver/logstore", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index cead053b10a2..1549865b8b06 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -161,7 +162,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { repl.mu.state.Stats = &enginepb.MVCCStats{} repl.mu.Unlock() - repl.loadStats = NewReplicaLoad(clock, nil) + repl.loadStats = load.NewReplicaLoad(clock, nil) var rangeUsageInfo allocator.RangeUsageInfo diff --git a/pkg/kv/kvserver/load/BUILD.bazel b/pkg/kv/kvserver/load/BUILD.bazel new file mode 100644 index 000000000000..684e34d916c4 --- /dev/null +++ b/pkg/kv/kvserver/load/BUILD.bazel @@ -0,0 +1,19 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "load", + srcs = [ + "record_replica_load.go", + "replica_load.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver/replicastats", + "//pkg/roachpb", + "//pkg/util/hlc", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/load/record_replica_load.go b/pkg/kv/kvserver/load/record_replica_load.go new file mode 100644 index 000000000000..01e395513649 --- /dev/null +++ b/pkg/kv/kvserver/load/record_replica_load.go @@ -0,0 +1,54 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package load + +import "github.com/cockroachdb/cockroach/pkg/roachpb" + +// RecordBatchRequests records the value for number of batch requests at the +// current time against the gateway nodeID. +func (rl *ReplicaLoad) RecordBatchRequests(val float64, nodeID roachpb.NodeID) { + rl.record(Queries, val, nodeID) +} + +// RecordRequests records the value given for requests. +func (rl *ReplicaLoad) RecordRequests(val float64) { + rl.record(Requests, val, 0 /* nodeID */) +} + +// RecordWriteKeys records the value given for write keys. +func (rl *ReplicaLoad) RecordWriteKeys(val float64) { + rl.record(WriteKeys, val, 0 /* nodeID */) +} + +// RecordReadKeys records the value given for read keys. +func (rl *ReplicaLoad) RecordReadKeys(val float64) { + rl.record(ReadKeys, val, 0 /* nodeID */) +} + +// RecordWriteBytes records the value given for write bytes. +func (rl *ReplicaLoad) RecordWriteBytes(val float64) { + rl.record(WriteBytes, val, 0 /* nodeID */) +} + +// RecordReadBytes records the value given for read bytes. +func (rl *ReplicaLoad) RecordReadBytes(val float64) { + rl.record(ReadBytes, val, 0 /* nodeID */) +} + +// RecordRaftCPUNanos records the value given for raft cpu nanos. +func (rl *ReplicaLoad) RecordRaftCPUNanos(val float64) { + rl.record(RaftCPUNanos, val, 0 /* nodeID */) +} + +// RecordReqCPUNanos records the value given for request cpu nanos. +func (rl *ReplicaLoad) RecordReqCPUNanos(val float64) { + rl.record(ReqCPUNanos, val, 0 /* nodeID */) +} diff --git a/pkg/kv/kvserver/load/replica_load.go b/pkg/kv/kvserver/load/replica_load.go new file mode 100644 index 000000000000..e960afe4c709 --- /dev/null +++ b/pkg/kv/kvserver/load/replica_load.go @@ -0,0 +1,175 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package load + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// TODO(kvoli): Recording per-second stats on every replica costs unnecessary +// memory. Only a counter should be maintained per replica along with a low +// cost ewma if necessary. A central, fixed memory cost structure should track +// finer grained moving averages of interesting replicas. + +// LoadStat represents an ordinal position in replica load for a specific type +// of load statistic. +type LoadStat int + +const ( + Queries LoadStat = iota + Requests + WriteKeys + ReadKeys + WriteBytes + ReadBytes + RaftCPUNanos + ReqCPUNanos + + numLoadStats = 8 +) + +// ReplicaLoadStats contains per-second average statistics for load upon a +// replica. All fields will return 0 until after replicastats.MinStatsDuration +// has passed, despite any values recorded. This is done to avoid reporting +// erroneous stats that can arise when the sample time is small. +type ReplicaLoadStats struct { + // QueriesPerSecond is the replica's average QPS if it is the current leaseholder. If + // it isn't, this will return 0 because the replica does not know about the + // reads that the leaseholder is serving. + // + // A "Query" is a BatchRequest (regardless of its contents) arriving at the + // leaseholder with a gateway node set in the header (i.e. excluding + // requests that weren't sent through a DistSender, which in practice + // should be practically none). See Replica.getBatchRequestQPS() for how + // this is accounted for. + QueriesPerSecond float64 + // RequestsPerSecond is the replica's average requests received per second. A + // batch request may have one to many requests. + RequestsPerSecond float64 + // WriteKeysPerSecond is the replica's average keys written per second. A + // "Write" is a mutation applied by Raft as measured by + // engine.RocksDBBatchCount(writeBatch). This corresponds roughly to the number + // of keys mutated by a write. For example, writing 12 intents would count as 24 + // writes (12 for the metadata, 12 for the versions). A DeleteRange that + // ultimately only removes one key counts as one (or two if it's transactional). + WriteKeysPerSecond float64 + // ReadKeysPerSecond is the replica's average keys read per second. A "Read" + // is a key access during evaluation of a batch request. This includes both + // follower and leaseholder reads. + ReadKeysPerSecond float64 + // WriteBytesPerSecond is the replica's average bytes written per second. A + // "Write" is as described in WritesPerSecond. + WriteBytesPerSecond float64 + // ReadBytesPerSecond is the replica's average bytes read per second. A "Read" is as + // described in ReadsPerSecond. + ReadBytesPerSecond float64 + // CPUNanosPerSecond is the range's time spent on-processor averaged per second. + CPUNanosPerSecond float64 +} + +// ReplicaLoad tracks a sliding window of throughput on a replica. +type ReplicaLoad struct { + stats [numLoadStats]*replicastats.ReplicaStats +} + +// NewReplicaLoad returns a new ReplicaLoad, which may be used to track the +// request throughput of a replica. +func NewReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracle) *ReplicaLoad { + stats := [numLoadStats]*replicastats.ReplicaStats{} + + // NB: We only wish to record the locality of a request for QPS, where it + // as only follow-the-workload lease transfers use this per-locality + // request count. Maintaining more than one bucket for client requests + // increases the memory footprint O(localities). + stats[Queries] = replicastats.NewReplicaStats(clock, getNodeLocality) + + // For all other stats, we don't include a locality oracle. + for i := 1; i < numLoadStats; i++ { + stats[i] = replicastats.NewReplicaStats(clock, nil) + } + + return &ReplicaLoad{ + stats: stats, + } +} + +func (rl *ReplicaLoad) record(stat LoadStat, val float64, nodeID roachpb.NodeID) { + rl.stats[stat].RecordCount(val, nodeID) +} + +// Split will distribute the load in the calling struct, evenly between itself +// and other. +func (rl *ReplicaLoad) Split(other *ReplicaLoad) { + for i := range rl.stats { + rl.stats[i].SplitRequestCounts(other.stats[i]) + } +} + +// Merge will combine the tracked load in other, into the calling struct. +func (rl *ReplicaLoad) Merge(other *ReplicaLoad) { + for i := range rl.stats { + rl.stats[i].MergeRequestCounts(other.stats[i]) + } +} + +// Reset will clear all recorded history. +func (rl *ReplicaLoad) Reset() { + for i := range rl.stats { + rl.stats[i].ResetRequestCounts() + } +} + +// get returns the current value for the LoadStat with ordinal stat. +func (rl *ReplicaLoad) get(stat LoadStat) float64 { + var ret float64 + // Only return the value if the statistics have been gathered for longer + // than the minimum duration. + if val, dur := rl.stats[stat].AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { + ret = val + } + return ret +} + +// Stats returns a current stat summary of replica load. +func (rl *ReplicaLoad) Stats() ReplicaLoadStats { + return ReplicaLoadStats{ + QueriesPerSecond: rl.get(Queries), + RequestsPerSecond: rl.get(Requests), + WriteKeysPerSecond: rl.get(WriteKeys), + ReadKeysPerSecond: rl.get(ReadKeys), + WriteBytesPerSecond: rl.get(WriteBytes), + ReadBytesPerSecond: rl.get(ReadBytes), + CPUNanosPerSecond: rl.get(RaftCPUNanos) + rl.get(ReqCPUNanos), + } +} + +// RequestLocalityInfo returns the summary of client localities for requests +// made to this replica. +func (rl *ReplicaLoad) RequestLocalityInfo() *replicastats.RatedSummary { + return rl.stats[Queries].SnapshotRatedSummary() +} + +// TestingGetSum returns the sum of recorded values for the LoadStat with +// ordinal stat. The sum is used in testing in place of any averaging in order +// to assert precisely on the values that have been recorded. +func (rl *ReplicaLoad) TestingGetSum(stat LoadStat) float64 { + val, _ := rl.stats[stat].SumLocked() + return val +} + +// TestingSetStat sets the value for the LoadStat with ordinal stat to be the +// value given. This value will then be returned in all future Stats() calls +// unless overriden by another call to TestingSetStat. +func (rl *ReplicaLoad) TestingSetStat(stat LoadStat, val float64) { + rl.stats[stat].SetMeanRateForTesting(val) +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 075eaada5f70..1b8421a31468 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" @@ -201,7 +202,7 @@ type Replica struct { // Multiple types of throughput are accounted for. Where the localities of // requests are tracked in order in addition to the aggregate, in order to // inform load based lease and replica rebalancing decisions. - loadStats *ReplicaLoad + loadStats *load.ReplicaLoad // Held in read mode during read-only commands. Held in exclusive mode to // prevent read-only commands from executing. Acquired before the embedded @@ -2089,7 +2090,7 @@ func init() { // requests. func (r *Replica) MeasureReqCPUNanos(start time.Duration) { r.measureNanosRunning(start, func(dur float64) { - r.loadStats.reqCPUNanos.RecordCount(dur, 0 /* nodeID */) + r.loadStats.RecordReqCPUNanos(dur) }) } @@ -2097,7 +2098,7 @@ func (r *Replica) MeasureReqCPUNanos(start time.Duration) { // raft work. func (r *Replica) MeasureRaftCPUNanos(start time.Duration) { r.measureNanosRunning(start, func(dur float64) { - r.loadStats.raftCPUNanos.RecordCount(dur, 0 /* nodeID */) + r.loadStats.RecordRaftCPUNanos(dur) }) } @@ -2110,6 +2111,12 @@ func (r *Replica) measureNanosRunning(start time.Duration, f func(float64)) { f(float64(dur)) } +// GetLoadStatsForTesting is for use only by tests to read the Replicas' load +// tracker state. +func (r *Replica) GetLoadStatsForTesting() *load.ReplicaLoad { + return r.loadStats +} + // ReadProtectedTimestampsForTesting is for use only by tests to read and update // the Replicas' cached protected timestamp state. func (r *Replica) ReadProtectedTimestampsForTesting(ctx context.Context) (err error) { diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 60b33fd96427..383fa0936c3f 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -604,11 +604,8 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { deltaStats.Subtract(prevStats) r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, deltaStats) - // Record the write activity, passing a 0 nodeID because replica.writeStats - // intentionally doesn't track the origin of the writes. - // - // This also records the number of keys ingested via AddSST. - b.r.loadStats.writeKeys.RecordCount(float64(b.ab.numMutations), 0) + // Record the number of keys written to the replica. + b.r.loadStats.RecordWriteKeys(float64(b.ab.numMutations)) now := timeutil.Now() if needsSplitBySize && r.splitQueueThrottle.ShouldProcess(now) { diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 3414033afb70..d1c59716cd0a 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -112,7 +113,7 @@ func newUnloadedReplica( r.leaseHistory = newLeaseHistory() } if store.cfg.StorePool != nil { - r.loadStats = NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString) + r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString) } // Init rangeStr with the range ID. diff --git a/pkg/kv/kvserver/replica_load.go b/pkg/kv/kvserver/replica_load.go deleted file mode 100644 index b01927229392..000000000000 --- a/pkg/kv/kvserver/replica_load.go +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvserver - -import ( - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" - "github.com/cockroachdb/cockroach/pkg/util/hlc" -) - -// TODO(kvoli): This file, along with repicastats/* requires refactoring to -// avoid consuming unnecessary memory. There are individual locks and locality -// tracking structs on all replicastats objects that are only needed on one. -// This is tracked in #87187. - -// ReplicaLoad tracks a sliding window of throughput on a replica. By default, -// there are 6, 5 minute sliding windows. -type ReplicaLoad struct { - batchRequests *replicastats.ReplicaStats - requests *replicastats.ReplicaStats - writeKeys *replicastats.ReplicaStats - readKeys *replicastats.ReplicaStats - writeBytes *replicastats.ReplicaStats - readBytes *replicastats.ReplicaStats - raftCPUNanos *replicastats.ReplicaStats - reqCPUNanos *replicastats.ReplicaStats -} - -// NewReplicaLoad returns a new ReplicaLoad, which may be used to track the -// request throughput of a replica. -func NewReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracle) *ReplicaLoad { - // NB: We only wish to record the locality of a request for QPS, where it - // as only follow-the-workload lease transfers use this per-locality - // request count. Maintaining more than one bucket for client requests - // increases the memory footprint O(localities). - return &ReplicaLoad{ - batchRequests: replicastats.NewReplicaStats(clock, getNodeLocality), - requests: replicastats.NewReplicaStats(clock, nil), - writeKeys: replicastats.NewReplicaStats(clock, nil), - readKeys: replicastats.NewReplicaStats(clock, nil), - writeBytes: replicastats.NewReplicaStats(clock, nil), - readBytes: replicastats.NewReplicaStats(clock, nil), - raftCPUNanos: replicastats.NewReplicaStats(clock, nil), - reqCPUNanos: replicastats.NewReplicaStats(clock, nil), - } -} - -// split will distribute the load in the calling struct, evenly between itself -// and other. -func (rl *ReplicaLoad) split(other *ReplicaLoad) { - rl.batchRequests.SplitRequestCounts(other.batchRequests) - rl.requests.SplitRequestCounts(other.requests) - rl.writeKeys.SplitRequestCounts(other.writeKeys) - rl.readKeys.SplitRequestCounts(other.readKeys) - rl.writeBytes.SplitRequestCounts(other.writeBytes) - rl.readBytes.SplitRequestCounts(other.readBytes) - rl.raftCPUNanos.SplitRequestCounts(other.raftCPUNanos) - rl.reqCPUNanos.SplitRequestCounts(other.reqCPUNanos) -} - -// merge will combine the tracked load in other, into the calling struct. -func (rl *ReplicaLoad) merge(other *ReplicaLoad) { - rl.batchRequests.MergeRequestCounts(other.batchRequests) - rl.requests.MergeRequestCounts(other.requests) - rl.writeKeys.MergeRequestCounts(other.writeKeys) - rl.readKeys.MergeRequestCounts(other.readKeys) - rl.writeBytes.MergeRequestCounts(other.writeBytes) - rl.readBytes.MergeRequestCounts(other.readBytes) - rl.raftCPUNanos.MergeRequestCounts(other.raftCPUNanos) - rl.reqCPUNanos.MergeRequestCounts(other.reqCPUNanos) -} - -// reset will clear all recorded history. -func (rl *ReplicaLoad) reset() { - rl.batchRequests.ResetRequestCounts() - rl.requests.ResetRequestCounts() - rl.writeKeys.ResetRequestCounts() - rl.readKeys.ResetRequestCounts() - rl.writeBytes.ResetRequestCounts() - rl.readBytes.ResetRequestCounts() - rl.raftCPUNanos.ResetRequestCounts() - rl.reqCPUNanos.ResetRequestCounts() -} diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index 71afe5303349..1de0acc97de7 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -13,13 +13,13 @@ package kvserver import ( "context" "math" - "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "go.etcd.io/raft/v3" @@ -278,65 +278,9 @@ func calcBehindCount( return behindCount } -// QueriesPerSecond returns the range's average QPS if it is the current -// leaseholder. If it isn't, this will return 0 because the replica does not -// know about the reads that the leaseholder is serving. -// -// A "Query" is a BatchRequest (regardless of its contents) arriving at the -// leaseholder with a gateway node set in the header (i.e. excluding requests -// that weren't sent through a DistSender, which in practice should be -// practically none). See Replica.getBatchRequestQPS() for how this is -// accounted for. -func (r *Replica) QueriesPerSecond() (float64, time.Duration) { - return r.loadStats.batchRequests.AverageRatePerSecond() -} - -// RequestsPerSecond returns the range's average requests received per second. -// A batch request may have one to many requests. -func (r *Replica) RequestsPerSecond() float64 { - rps, _ := r.loadStats.requests.AverageRatePerSecond() - return rps -} - -// WritesPerSecond returns the range's average keys written per second. A -// "Write" is a mutation applied by Raft as measured by -// engine.RocksDBBatchCount(writeBatch). This corresponds roughly to the number -// of keys mutated by a write. For example, writing 12 intents would count as 24 -// writes (12 for the metadata, 12 for the versions). A DeleteRange that -// ultimately only removes one key counts as one (or two if it's transactional). -func (r *Replica) WritesPerSecond() float64 { - wps, _ := r.loadStats.writeKeys.AverageRatePerSecond() - return wps -} - -// ReadsPerSecond returns the range's average keys read per second. A "Read" is -// a key access during evaluation of a batch request. This includes both -// follower and leaseholder reads. -func (r *Replica) ReadsPerSecond() float64 { - rps, _ := r.loadStats.readKeys.AverageRatePerSecond() - return rps -} - -// WriteBytesPerSecond returns the range's average bytes written per second. A "Write" is -// as described in WritesPerSecond. -func (r *Replica) WriteBytesPerSecond() float64 { - wbps, _ := r.loadStats.writeBytes.AverageRatePerSecond() - return wbps -} - -// ReadBytesPerSecond returns the range's average bytes read per second. A "Read" is -// as described in ReadsPerSecond. -func (r *Replica) ReadBytesPerSecond() float64 { - rbps, _ := r.loadStats.readBytes.AverageRatePerSecond() - return rbps -} - -// CPUNanosPerSecond tracks the time this replica spent on-processor averaged -// per second. -func (r *Replica) CPUNanosPerSecond() float64 { - raftCPUNanos, _ := r.loadStats.raftCPUNanos.AverageRatePerSecond() - reqCPUNanos, _ := r.loadStats.reqCPUNanos.AverageRatePerSecond() - return raftCPUNanos + reqCPUNanos +// LoadStats returns the load statistics for the replica. +func (r *Replica) LoadStats() load.ReplicaLoadStats { + return r.loadStats.Stats() } func (r *Replica) needsSplitBySizeRLocked() bool { diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 1e3fdf6f89c2..bf59882acbaf 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -326,7 +326,7 @@ func (r *Replica) leasePostApplyLocked( // Reset the request counts used to make lease placement decisions and // load-based splitting/merging decisions whenever starting a new lease. if r.loadStats != nil { - r.loadStats.reset() + r.loadStats.Reset() } r.loadBasedSplitter.Reset(r.Clock().PhysicalTime()) } @@ -403,7 +403,7 @@ func (r *Replica) leasePostApplyLocked( r.store.maybeGossipOnCapacityChange(ctx, leaseRemoveEvent) } if r.loadStats != nil { - r.loadStats.reset() + r.loadStats.Reset() } } diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go index 6ff037124a26..3d3f8f5fef7c 100644 --- a/pkg/kv/kvserver/replica_rankings_test.go +++ b/pkg/kv/kvserver/replica_rankings_test.go @@ -19,7 +19,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" + aload "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -49,7 +50,7 @@ func TestReplicaRankings(t *testing.T) { } for _, tc := range testCases { - acc := NewReplicaAccumulator(load.Queries) + acc := NewReplicaAccumulator(aload.Queries) // Randomize the order of the inputs each time the test is run. want := make([]float64, len(tc.replicasByQPS)) @@ -164,15 +165,12 @@ func TestAddSSTQPSStat(t *testing.T) { sqlDB.Exec(t, fmt.Sprintf(`SET CLUSTER setting kv.replica_stats.addsst_request_size_factor = %d`, testCase.addsstRequestFactor)) // Reset the request counts to 0 before sending to clear previous requests. - repl.loadStats.reset() + repl.loadStats.Reset() _, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba) require.Nil(t, pErr) - repl.loadStats.batchRequests.Mu.Lock() - queriesAfter, _ := repl.loadStats.batchRequests.SumLocked() - repl.loadStats.batchRequests.Mu.Unlock() - + queriesAfter := repl.loadStats.TestingGetSum(load.Queries) // If queries are correctly recorded, we should see increase in query // count by the expected QPS. However, it is possible to to get a // slightly higher number due to interleaving requests. To avoid a @@ -198,11 +196,6 @@ func assertGreaterThanInDelta(t *testing.T, expected float64, actual float64, de require.InDelta(t, expected, actual, delta) } -func headVal(f func() (float64, int)) float64 { - ret, _ := f() - return ret -} - func TestWriteLoadStatsAccounting(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -259,27 +252,13 @@ func TestWriteLoadStatsAccounting(t *testing.T) { // should succeed soon, if it fails on the first. testutils.SucceedsSoon(t, func() error { // Reset the request counts to 0 before sending to clear previous requests. - repl.loadStats.reset() - - repl.loadStats.requests.Mu.Lock() - repl.loadStats.writeKeys.Mu.Lock() - repl.loadStats.readKeys.Mu.Lock() - repl.loadStats.writeBytes.Mu.Lock() - repl.loadStats.readBytes.Mu.Lock() - repl.loadStats.batchRequests.Mu.Lock() - - requestsBefore := headVal(repl.loadStats.requests.SumLocked) - writesBefore := headVal(repl.loadStats.writeKeys.SumLocked) - readsBefore := headVal(repl.loadStats.readKeys.SumLocked) - readBytesBefore := headVal(repl.loadStats.readBytes.SumLocked) - writeBytesBefore := headVal(repl.loadStats.writeBytes.SumLocked) - - repl.loadStats.requests.Mu.Unlock() - repl.loadStats.writeKeys.Mu.Unlock() - repl.loadStats.readKeys.Mu.Unlock() - repl.loadStats.writeBytes.Mu.Unlock() - repl.loadStats.readBytes.Mu.Unlock() - repl.loadStats.batchRequests.Mu.Unlock() + repl.loadStats.Reset() + + requestsBefore := repl.loadStats.TestingGetSum(load.Requests) + writesBefore := repl.loadStats.TestingGetSum(load.WriteKeys) + readsBefore := repl.loadStats.TestingGetSum(load.ReadKeys) + readBytesBefore := repl.loadStats.TestingGetSum(load.ReadBytes) + writeBytesBefore := repl.loadStats.TestingGetSum(load.WriteBytes) for i := 0; i < testCase.writes; i++ { _, pErr := db.Inc(ctx, scratchKey, 1) @@ -291,25 +270,11 @@ func TestWriteLoadStatsAccounting(t *testing.T) { require.Equal(t, 0.0, writeBytesBefore) require.Equal(t, 0.0, readBytesBefore) - repl.loadStats.requests.Mu.Lock() - repl.loadStats.writeKeys.Mu.Lock() - repl.loadStats.readKeys.Mu.Lock() - repl.loadStats.writeBytes.Mu.Lock() - repl.loadStats.readBytes.Mu.Lock() - repl.loadStats.batchRequests.Mu.Lock() - - requestsAfter := headVal(repl.loadStats.requests.SumLocked) - writesAfter := headVal(repl.loadStats.writeKeys.SumLocked) - readsAfter := headVal(repl.loadStats.readKeys.SumLocked) - readBytesAfter := headVal(repl.loadStats.readBytes.SumLocked) - writeBytesAfter := headVal(repl.loadStats.writeBytes.SumLocked) - - repl.loadStats.requests.Mu.Unlock() - repl.loadStats.writeKeys.Mu.Unlock() - repl.loadStats.readKeys.Mu.Unlock() - repl.loadStats.writeBytes.Mu.Unlock() - repl.loadStats.readBytes.Mu.Unlock() - repl.loadStats.batchRequests.Mu.Unlock() + requestsAfter := repl.loadStats.TestingGetSum(load.Requests) + writesAfter := repl.loadStats.TestingGetSum(load.WriteKeys) + readsAfter := repl.loadStats.TestingGetSum(load.ReadKeys) + readBytesAfter := repl.loadStats.TestingGetSum(load.ReadBytes) + writeBytesAfter := repl.loadStats.TestingGetSum(load.WriteBytes) assertGreaterThanInDelta(t, testCase.expectedRQPS, requestsAfter, epsilonAllowed) assertGreaterThanInDelta(t, testCase.expectedWPS, writesAfter, epsilonAllowed) @@ -419,27 +384,13 @@ func TestReadLoadMetricAccounting(t *testing.T) { testutils.SucceedsSoon(t, func() error { // Reset the request counts to 0 before sending to clear previous requests. // Reset the request counts to 0 before sending to clear previous requests. - repl.loadStats.reset() - - repl.loadStats.requests.Mu.Lock() - repl.loadStats.writeKeys.Mu.Lock() - repl.loadStats.readKeys.Mu.Lock() - repl.loadStats.writeBytes.Mu.Lock() - repl.loadStats.readBytes.Mu.Lock() - repl.loadStats.batchRequests.Mu.Lock() - - requestsBefore := headVal(repl.loadStats.requests.SumLocked) - writesBefore := headVal(repl.loadStats.writeKeys.SumLocked) - readsBefore := headVal(repl.loadStats.readKeys.SumLocked) - readBytesBefore := headVal(repl.loadStats.readBytes.SumLocked) - writeBytesBefore := headVal(repl.loadStats.writeBytes.SumLocked) - - repl.loadStats.requests.Mu.Unlock() - repl.loadStats.writeKeys.Mu.Unlock() - repl.loadStats.readKeys.Mu.Unlock() - repl.loadStats.writeBytes.Mu.Unlock() - repl.loadStats.readBytes.Mu.Unlock() - repl.loadStats.batchRequests.Mu.Unlock() + repl.loadStats.Reset() + + requestsBefore := repl.loadStats.TestingGetSum(load.Requests) + writesBefore := repl.loadStats.TestingGetSum(load.WriteKeys) + readsBefore := repl.loadStats.TestingGetSum(load.ReadKeys) + readBytesBefore := repl.loadStats.TestingGetSum(load.ReadBytes) + writeBytesBefore := repl.loadStats.TestingGetSum(load.WriteBytes) _, pErr = db.NonTransactionalSender().Send(ctx, testCase.ba) require.Nil(t, pErr) @@ -450,25 +401,11 @@ func TestReadLoadMetricAccounting(t *testing.T) { require.Equal(t, 0.0, writeBytesBefore) require.Equal(t, 0.0, readBytesBefore) - repl.loadStats.requests.Mu.Lock() - repl.loadStats.writeKeys.Mu.Lock() - repl.loadStats.readKeys.Mu.Lock() - repl.loadStats.writeBytes.Mu.Lock() - repl.loadStats.readBytes.Mu.Lock() - repl.loadStats.batchRequests.Mu.Lock() - - requestsAfter := headVal(repl.loadStats.requests.SumLocked) - writesAfter := headVal(repl.loadStats.writeKeys.SumLocked) - readsAfter := headVal(repl.loadStats.readKeys.SumLocked) - readBytesAfter := headVal(repl.loadStats.readBytes.SumLocked) - writeBytesAfter := headVal(repl.loadStats.writeBytes.SumLocked) - - repl.loadStats.requests.Mu.Unlock() - repl.loadStats.writeKeys.Mu.Unlock() - repl.loadStats.readKeys.Mu.Unlock() - repl.loadStats.writeBytes.Mu.Unlock() - repl.loadStats.readBytes.Mu.Unlock() - repl.loadStats.batchRequests.Mu.Unlock() + requestsAfter := repl.loadStats.TestingGetSum(load.Requests) + writesAfter := repl.loadStats.TestingGetSum(load.WriteKeys) + readsAfter := repl.loadStats.TestingGetSum(load.ReadKeys) + readBytesAfter := repl.loadStats.TestingGetSum(load.ReadBytes) + writeBytesAfter := repl.loadStats.TestingGetSum(load.WriteBytes) assertGreaterThanInDelta(t, testCase.expectedRQPS, requestsAfter, epsilonAllowed) assertGreaterThanInDelta(t, testCase.expectedWPS, writesAfter, epsilonAllowed) diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 6c5a242a7cf4..b7a46e4c9301 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -215,8 +215,8 @@ func (r *Replica) executeReadOnlyBatch( log.VErrEventf(ctx, 3, "%v", pErr.String()) } else { keysRead, bytesRead := getBatchResponseReadStats(br) - r.loadStats.readKeys.RecordCount(keysRead, 0) - r.loadStats.readBytes.RecordCount(bytesRead, 0) + r.loadStats.RecordReadKeys(keysRead) + r.loadStats.RecordReadBytes(bytesRead) log.Event(ctx, "read completed") } return br, nil, nil, pErr diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index cfca99142c06..e0017ea86cbf 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -1023,8 +1023,8 @@ func (r *Replica) recordBatchRequestLoad(ctx context.Context, ba *roachpb.BatchR // calculation. adjustedQPS := r.getBatchRequestQPS(ctx, ba) - r.loadStats.batchRequests.RecordCount(adjustedQPS, ba.Header.GatewayNodeID) - r.loadStats.requests.RecordCount(float64(len(ba.Requests)), ba.Header.GatewayNodeID) + r.loadStats.RecordBatchRequests(adjustedQPS, ba.Header.GatewayNodeID) + r.loadStats.RecordRequests(float64(len(ba.Requests))) } // getBatchRequestQPS calculates the cost estimation of a BatchRequest. The @@ -1065,10 +1065,7 @@ func (r *Replica) recordRequestWriteBytes(writeBytes *kvadmission.StoreWriteByte } // TODO(kvoli): Consider recording the ingested bytes (AddSST) separately // to the write bytes. - r.loadStats.writeBytes.RecordCount( - float64(writeBytes.WriteBytes+writeBytes.IngestedBytes), - 0, - ) + r.loadStats.RecordWriteBytes(float64(writeBytes.WriteBytes + writeBytes.IngestedBytes)) } // checkBatchRequest verifies BatchRequest validity requirements. In particular, diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 0a10981125c9..588cf6e5cb23 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -2190,22 +2189,15 @@ func rangeRaftProgress(raftStatus *raft.Status, replicas []roachpb.ReplicaDescri // usage information matches. Which is dubious in some cases but often // reasonable when we only consider the leaseholder. func RangeUsageInfoForRepl(repl *Replica) allocator.RangeUsageInfo { - info := allocator.RangeUsageInfo{ - LogicalBytes: repl.GetMVCCStats().Total(), - } - - localitySummary := repl.loadStats.batchRequests.SnapshotRatedSummary() - info.RequestLocality = &allocator.RangeRequestLocalityInfo{ - Counts: localitySummary.LocalityCounts, - Duration: localitySummary.Duration, - } - - if localitySummary.Duration >= replicastats.MinStatsDuration { - info.QueriesPerSecond = localitySummary.QPS - } - - if writesPerSecond, dur := repl.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { - info.WritesPerSecond = writesPerSecond + loadStats := repl.LoadStats() + localityInfo := repl.loadStats.RequestLocalityInfo() + return allocator.RangeUsageInfo{ + LogicalBytes: repl.GetMVCCStats().Total(), + QueriesPerSecond: loadStats.QueriesPerSecond, + WritesPerSecond: loadStats.WriteKeysPerSecond, + RequestLocality: &allocator.RangeRequestLocalityInfo{ + Counts: localityInfo.LocalityCounts, + Duration: localityInfo.Duration, + }, } - return info } diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 3c6dbd04e9c6..04b2bed8faa8 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -220,8 +220,9 @@ func (sq *splitQueue) processAttempt( now := timeutil.Now() if splitByLoadKey := r.loadBasedSplitter.MaybeSplitKey(ctx, now); splitByLoadKey != nil { - batchHandledQPS, _ := r.QueriesPerSecond() - raftAppliedQPS := r.WritesPerSecond() + loadStats := r.loadStats.Stats() + batchHandledQPS := loadStats.QueriesPerSecond + raftAppliedQPS := loadStats.WriteKeysPerSecond splitQPS := r.loadBasedSplitter.LastQPS(ctx, now) reason := fmt.Sprintf( "load at key %s (%.2f splitQPS, %.2f batches/sec, %.2f raft mutations/sec)", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 3d21ac2538f0..141f8d31c3a4 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -49,7 +49,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnrecovery" @@ -2924,6 +2923,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // incorrectly low the first time or two it gets gossiped when a store // starts? We can't easily have a countdown as its value changes like for // leases/replicas. + // TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles? totalQueriesPerSecond += usage.QueriesPerSecond totalWritesPerSecond += usage.WritesPerSecond writesPerReplica = append(writesPerReplica, usage.WritesPerSecond) @@ -3140,25 +3140,14 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { pausedFollowerCount += metrics.PausedFollowerCount slowRaftProposalCount += metrics.SlowRaftProposalCount behindCount += metrics.BehindCount - if qps, dur := rep.loadStats.batchRequests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { - averageQueriesPerSecond += qps - } - if rqps, dur := rep.loadStats.requests.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { - averageRequestsPerSecond += rqps - } - if wps, dur := rep.loadStats.writeKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { - averageWritesPerSecond += wps - } - if rps, dur := rep.loadStats.readKeys.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { - averageReadsPerSecond += rps - } - if rbps, dur := rep.loadStats.readBytes.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { - averageReadBytesPerSecond += rbps - } - if wbps, dur := rep.loadStats.writeBytes.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { - averageWriteBytesPerSecond += wbps - } - averageCPUNanosPerSecond += rep.CPUNanosPerSecond() + loadStats := rep.loadStats.Stats() + averageQueriesPerSecond += loadStats.QueriesPerSecond + averageRequestsPerSecond += loadStats.RequestsPerSecond + averageWritesPerSecond += loadStats.WriteKeysPerSecond + averageReadsPerSecond += loadStats.ReadKeysPerSecond + averageReadBytesPerSecond += loadStats.ReadBytesPerSecond + averageWriteBytesPerSecond += loadStats.WriteBytesPerSecond + averageCPUNanosPerSecond += loadStats.CPUNanosPerSecond locks += metrics.LockTableMetrics.Locks totalLockHoldDurationNanos += metrics.LockTableMetrics.TotalLockHoldDurationNanos @@ -3423,13 +3412,14 @@ func (s *Store) HottestReplicasByTenant(tenantID roachpb.TenantID) []HotReplicaI func mapToHotReplicasInfo(repls []CandidateReplica) []HotReplicaInfo { hotRepls := make([]HotReplicaInfo, len(repls)) for i := range repls { + loadStats := repls[i].Repl().LoadStats() hotRepls[i].Desc = repls[i].Desc() - hotRepls[i].QPS = repls[i].RangeUsageInfo().QueriesPerSecond - hotRepls[i].RequestsPerSecond = repls[i].Repl().RequestsPerSecond() - hotRepls[i].WriteKeysPerSecond = repls[i].Repl().WritesPerSecond() - hotRepls[i].ReadKeysPerSecond = repls[i].Repl().ReadsPerSecond() - hotRepls[i].WriteBytesPerSecond = repls[i].Repl().WriteBytesPerSecond() - hotRepls[i].ReadBytesPerSecond = repls[i].Repl().ReadBytesPerSecond() + hotRepls[i].QPS = loadStats.QueriesPerSecond + hotRepls[i].RequestsPerSecond = loadStats.RequestsPerSecond + hotRepls[i].WriteKeysPerSecond = loadStats.WriteKeysPerSecond + hotRepls[i].ReadKeysPerSecond = loadStats.ReadKeysPerSecond + hotRepls[i].WriteBytesPerSecond = loadStats.WriteBytesPerSecond + hotRepls[i].ReadBytesPerSecond = loadStats.ReadBytesPerSecond } return hotRepls } diff --git a/pkg/kv/kvserver/store_merge.go b/pkg/kv/kvserver/store_merge.go index 142ebb54697e..14e2f12f3967 100644 --- a/pkg/kv/kvserver/store_merge.go +++ b/pkg/kv/kvserver/store_merge.go @@ -145,7 +145,7 @@ func (s *Store) MergeRange( return err } - leftRepl.loadStats.merge(rightRepl.loadStats) + leftRepl.loadStats.Merge(rightRepl.loadStats) // Clear the concurrency manager's lock and txn wait-queues to redirect the // queued transactions to the left-hand replica, if necessary. diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index adff749ff4a8..c19bb7d981b8 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -86,16 +87,17 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { ValBytes: 4, } replica.mu.Unlock() - replica.loadStats = NewReplicaLoad(clock, nil) + replica.loadStats = load.NewReplicaLoad(clock, nil) for _, store := range stores { - replica.loadStats.batchRequests.RecordCount(1, store.Node.NodeID) - replica.loadStats.writeKeys.RecordCount(1, store.Node.NodeID) + replica.loadStats.RecordBatchRequests(1, store.Node.NodeID) + replica.loadStats.RecordWriteKeys(1) } manual.Advance(replicastats.MinStatsDuration + time.Second) rangeUsageInfo := RangeUsageInfoForRepl(&replica) - QPS, _ := replica.loadStats.batchRequests.AverageRatePerSecond() - WPS, _ := replica.loadStats.writeKeys.AverageRatePerSecond() + stats := replica.LoadStats() + QPS := stats.QueriesPerSecond + WPS := stats.WriteKeysPerSecond sp.UpdateLocalStoreAfterRebalance(roachpb.StoreID(1), rangeUsageInfo, roachpb.ADD_VOTER) desc, ok := sp.GetStoreDescriptor(roachpb.StoreID(1)) @@ -225,7 +227,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { if err != nil { t.Fatalf("make replica error : %+v", err) } - replica.loadStats = NewReplicaLoad(store.Clock(), nil) + replica.loadStats = load.NewReplicaLoad(store.Clock(), nil) rangeUsageInfo := RangeUsageInfoForRepl(replica) diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index fa6d64c09709..b4b6a4594890 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" + rload "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -465,11 +466,9 @@ func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange) { Type: roachpb.NON_VOTER, }) } - // TODO(a-robinson): The below three lines won't be needed once the old - // rangeInfo code is ripped out of the allocator. repl.mu.state.Stats = &enginepb.MVCCStats{} - repl.loadStats = NewReplicaLoad(s.Clock(), nil) - repl.loadStats.batchRequests.SetMeanRateForTesting(r.qps) + repl.loadStats = rload.NewReplicaLoad(s.Clock(), nil) + repl.loadStats.TestingSetStat(rload.Queries, r.qps) acc.AddReplica(candidateReplica{ Replica: repl, diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index b8c80b8f8059..9a7a312e7608 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -14,6 +14,7 @@ import ( "bytes" "context" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -332,15 +333,15 @@ func (s *Store) SplitRange( if rightReplOrNil == nil { // There is no rhs replica, so instead halve the load of the lhs // replica. - throwawayRightStats := NewReplicaLoad(s.Clock(), nil) - leftRepl.loadStats.split(throwawayRightStats) + throwawayRightStats := load.NewReplicaLoad(s.Clock(), nil) + leftRepl.loadStats.Split(throwawayRightStats) } else { rightRepl := rightReplOrNil // Split the replica load of the lhs evenly (50:50) with the rhs. NB: // that this ignores the split point and makes as simplifying // assumption that distribution across all tracked load stats is // identical. - leftRepl.loadStats.split(rightRepl.loadStats) + leftRepl.loadStats.Split(rightRepl.loadStats) if err := s.addReplicaInternalLocked(rightRepl); err != nil { return errors.Wrapf(err, "unable to add replica %v", rightRepl) } diff --git a/pkg/server/status.go b/pkg/server/status.go index 1610bc94adba..0d0c29277afe 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -2055,7 +2055,8 @@ func (s *systemStatusServer) rangesHelper( WaitingWriters: lm.WaitingWriters, }) } - qps, _ := rep.QueriesPerSecond() + + loadStats := rep.LoadStats() locality := serverpb.Locality{} for _, tier := range rep.GetNodeLocality().Tiers { locality.Tiers = append(locality.Tiers, serverpb.Tier{ @@ -2071,12 +2072,12 @@ func (s *systemStatusServer) rangesHelper( SourceStoreID: storeID, LeaseHistory: leaseHistory, Stats: serverpb.RangeStatistics{ - QueriesPerSecond: qps, - RequestsPerSecond: rep.RequestsPerSecond(), - WritesPerSecond: rep.WritesPerSecond(), - ReadsPerSecond: rep.ReadsPerSecond(), - WriteBytesPerSecond: rep.WriteBytesPerSecond(), - ReadBytesPerSecond: rep.ReadBytesPerSecond(), + QueriesPerSecond: loadStats.QueriesPerSecond, + RequestsPerSecond: loadStats.RequestsPerSecond, + WritesPerSecond: loadStats.WriteKeysPerSecond, + ReadsPerSecond: loadStats.ReadKeysPerSecond, + WriteBytesPerSecond: loadStats.WriteKeysPerSecond, + ReadBytesPerSecond: loadStats.ReadBytesPerSecond, }, Problems: serverpb.RangeProblems{ Unavailable: metrics.Unavailable,