Skip to content

Commit

Permalink
kvserver: refactor replica load
Browse files Browse the repository at this point in the history
This patch consolidates accesses to replica load stats via the replica
load struct. Previously, `ReplicaLoad` was a layer ontop of multiple
`ReplicaStats`, yet multiple parts of the code would reach through
`ReplicaLoad` to access `ReplicaStats` directly; this patch removes
these accesses. The `replica_load.go` file is also moved into a new
`kv/kvserver/load` pkg to enforce separation.

part of #87187

Release note: None
  • Loading branch information
kvoli committed Jan 4, 2023
1 parent 2c3d75f commit 352d2ef
Show file tree
Hide file tree
Showing 26 changed files with 370 additions and 336 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@
/pkg/kv/kvserver/kvserverpb/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvstorage/ @cockroachdb/repl-prs
/pkg/kv/kvserver/liveness/ @cockroachdb/kv-prs
/pkg/kv/kvserver/load/ @cockroachdb/kv-prs
/pkg/kv/kvserver/logstore/ @cockroachdb/repl-prs
/pkg/kv/kvserver/loqrecovery/ @cockroachdb/repl-prs
/pkg/kv/kvserver/multiqueue/ @cockroachdb/kv-prs
Expand Down
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb",
"//pkg/kv/kvserver/liveness:liveness",
"//pkg/kv/kvserver/liveness:liveness_test",
"//pkg/kv/kvserver/load:load",
"//pkg/kv/kvserver/logstore:logstore",
"//pkg/kv/kvserver/logstore:logstore_test",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb",
Expand Down Expand Up @@ -2555,6 +2556,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvstorage:get_x_data",
"//pkg/kv/kvserver/liveness:get_x_data",
"//pkg/kv/kvserver/liveness/livenesspb:get_x_data",
"//pkg/kv/kvserver/load:get_x_data",
"//pkg/kv/kvserver/logstore:get_x_data",
"//pkg/kv/kvserver/loqrecovery:get_x_data",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_test(
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/load",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security",
Expand Down
13 changes: 7 additions & 6 deletions pkg/ccl/kvccl/kvtenantccl/tenant_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -68,15 +69,15 @@ func TestTenantRangeQPSStat(t *testing.T) {
require.NoError(t, err)
repl, err := store.GetReplica(roachpb.RangeID(rangeID))
require.NoError(t, err)

qpsBefore, durationBefore := repl.QueriesPerSecond()
queriesBefore := qpsBefore * durationBefore.Seconds()
// NB: We call directly into the load tracking struct, in order to avoid
// flakes due to timing differences affecting the result
loadStats := repl.GetLoadStatsForTesting()
qpsBefore := loadStats.TestingGetSum(load.Queries)
for i := 0; i < 110; i++ {
r.Exec(t, `SELECT k FROM foo.qps_test`)
}
qpsAfter, durationAfter := repl.QueriesPerSecond()
queriesAfter := qpsAfter * durationAfter.Seconds()
queriesIncrease := int(queriesAfter - queriesBefore)
qpsAfter := loadStats.TestingGetSum(load.Queries)
queriesIncrease := int(qpsAfter - qpsBefore)
// If queries are correctly recorded, we should see increase in query count by
// 110. As it is possible due to rounding and conversion from QPS to query count
// to get a slightly higher or lower number - we expect the increase to be at
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ go_library(
"replica_gc_queue.go",
"replica_gossip.go",
"replica_init.go",
"replica_load.go",
"replica_metrics.go",
"replica_placeholder.go",
"replica_proposal.go",
Expand Down Expand Up @@ -138,6 +137,7 @@ go_library(
"//pkg/kv/kvserver/kvstorage",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/load",
"//pkg/kv/kvserver/logstore",
"//pkg/kv/kvserver/multiqueue",
"//pkg/kv/kvserver/raftentry",
Expand Down Expand Up @@ -360,6 +360,7 @@ go_test(
"//pkg/kv/kvserver/kvstorage",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/load",
"//pkg/kv/kvserver/logstore",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/allocator_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) {
repl.mu.state.Stats = &enginepb.MVCCStats{}
repl.mu.Unlock()

repl.loadStats = NewReplicaLoad(clock, nil)
repl.loadStats = load.NewReplicaLoad(clock, nil)

var rangeUsageInfo allocator.RangeUsageInfo

Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/load/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "load",
srcs = [
"record_replica_load.go",
"replica_load.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/replicastats",
"//pkg/roachpb",
"//pkg/util/hlc",
],
)

get_x_data(name = "get_x_data")
54 changes: 54 additions & 0 deletions pkg/kv/kvserver/load/record_replica_load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package load

import "github.com/cockroachdb/cockroach/pkg/roachpb"

// RecordBatchRequests records the value for number of batch requests at the
// current time against the gateway nodeID.
func (rl *ReplicaLoad) RecordBatchRequests(val float64, nodeID roachpb.NodeID) {
rl.record(Queries, val, nodeID)
}

// RecordRequests records the value given for requests.
func (rl *ReplicaLoad) RecordRequests(val float64) {
rl.record(Requests, val, 0 /* nodeID */)
}

// RecordWriteKeys records the value given for write keys.
func (rl *ReplicaLoad) RecordWriteKeys(val float64) {
rl.record(WriteKeys, val, 0 /* nodeID */)
}

// RecordReadKeys records the value given for read keys.
func (rl *ReplicaLoad) RecordReadKeys(val float64) {
rl.record(ReadKeys, val, 0 /* nodeID */)
}

// RecordWriteBytes records the value given for write bytes.
func (rl *ReplicaLoad) RecordWriteBytes(val float64) {
rl.record(WriteBytes, val, 0 /* nodeID */)
}

// RecordReadBytes records the value given for read bytes.
func (rl *ReplicaLoad) RecordReadBytes(val float64) {
rl.record(ReadBytes, val, 0 /* nodeID */)
}

// RecordRaftCPUNanos records the value given for raft cpu nanos.
func (rl *ReplicaLoad) RecordRaftCPUNanos(val float64) {
rl.record(RaftCPUNanos, val, 0 /* nodeID */)
}

// RecordReqCPUNanos records the value given for request cpu nanos.
func (rl *ReplicaLoad) RecordReqCPUNanos(val float64) {
rl.record(ReqCPUNanos, val, 0 /* nodeID */)
}
175 changes: 175 additions & 0 deletions pkg/kv/kvserver/load/replica_load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package load

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// TODO(kvoli): Recording per-second stats on every replica costs unnecessary
// memory. Only a counter should be maintained per replica along with a low
// cost ewma if necessary. A central, fixed memory cost structure should track
// finer grained moving averages of interesting replicas.

// LoadStat represents an ordinal position in replica load for a specific type
// of load statistic.
type LoadStat int

const (
Queries LoadStat = iota
Requests
WriteKeys
ReadKeys
WriteBytes
ReadBytes
RaftCPUNanos
ReqCPUNanos

numLoadStats = 8
)

// ReplicaLoadStats contains per-second average statistics for load upon a
// replica. All fields will return 0 until after replicastats.MinStatsDuration
// has passed, despite any values recorded. This is done to avoid reporting
// erroneous stats that can arise when the sample time is small.
type ReplicaLoadStats struct {
// QueriesPerSecond is the replica's average QPS if it is the current leaseholder. If
// it isn't, this will return 0 because the replica does not know about the
// reads that the leaseholder is serving.
//
// A "Query" is a BatchRequest (regardless of its contents) arriving at the
// leaseholder with a gateway node set in the header (i.e. excluding
// requests that weren't sent through a DistSender, which in practice
// should be practically none). See Replica.getBatchRequestQPS() for how
// this is accounted for.
QueriesPerSecond float64
// RequestsPerSecond is the replica's average requests received per second. A
// batch request may have one to many requests.
RequestsPerSecond float64
// WriteKeysPerSecond is the replica's average keys written per second. A
// "Write" is a mutation applied by Raft as measured by
// engine.RocksDBBatchCount(writeBatch). This corresponds roughly to the number
// of keys mutated by a write. For example, writing 12 intents would count as 24
// writes (12 for the metadata, 12 for the versions). A DeleteRange that
// ultimately only removes one key counts as one (or two if it's transactional).
WriteKeysPerSecond float64
// ReadKeysPerSecond is the replica's average keys read per second. A "Read"
// is a key access during evaluation of a batch request. This includes both
// follower and leaseholder reads.
ReadKeysPerSecond float64
// WriteBytesPerSecond is the replica's average bytes written per second. A
// "Write" is as described in WritesPerSecond.
WriteBytesPerSecond float64
// ReadBytesPerSecond is the replica's average bytes read per second. A "Read" is as
// described in ReadsPerSecond.
ReadBytesPerSecond float64
// CPUNanosPerSecond is the range's time spent on-processor averaged per second.
CPUNanosPerSecond float64
}

// ReplicaLoad tracks a sliding window of throughput on a replica.
type ReplicaLoad struct {
stats [numLoadStats]*replicastats.ReplicaStats
}

// NewReplicaLoad returns a new ReplicaLoad, which may be used to track the
// request throughput of a replica.
func NewReplicaLoad(clock *hlc.Clock, getNodeLocality replicastats.LocalityOracle) *ReplicaLoad {
stats := [numLoadStats]*replicastats.ReplicaStats{}

// NB: We only wish to record the locality of a request for QPS, where it
// as only follow-the-workload lease transfers use this per-locality
// request count. Maintaining more than one bucket for client requests
// increases the memory footprint O(localities).
stats[Queries] = replicastats.NewReplicaStats(clock, getNodeLocality)

// For all other stats, we don't include a locality oracle.
for i := 1; i < numLoadStats; i++ {
stats[i] = replicastats.NewReplicaStats(clock, nil)
}

return &ReplicaLoad{
stats: stats,
}
}

func (rl *ReplicaLoad) record(stat LoadStat, val float64, nodeID roachpb.NodeID) {
rl.stats[stat].RecordCount(val, nodeID)
}

// Split will distribute the load in the calling struct, evenly between itself
// and other.
func (rl *ReplicaLoad) Split(other *ReplicaLoad) {
for i := range rl.stats {
rl.stats[i].SplitRequestCounts(other.stats[i])
}
}

// Merge will combine the tracked load in other, into the calling struct.
func (rl *ReplicaLoad) Merge(other *ReplicaLoad) {
for i := range rl.stats {
rl.stats[i].MergeRequestCounts(other.stats[i])
}
}

// Reset will clear all recorded history.
func (rl *ReplicaLoad) Reset() {
for i := range rl.stats {
rl.stats[i].ResetRequestCounts()
}
}

// get returns the current value for the LoadStat with ordinal stat.
func (rl *ReplicaLoad) get(stat LoadStat) float64 {
var ret float64
// Only return the value if the statistics have been gathered for longer
// than the minimum duration.
if val, dur := rl.stats[stat].AverageRatePerSecond(); dur >= replicastats.MinStatsDuration {
ret = val
}
return ret
}

// Stats returns a current stat summary of replica load.
func (rl *ReplicaLoad) Stats() ReplicaLoadStats {
return ReplicaLoadStats{
QueriesPerSecond: rl.get(Queries),
RequestsPerSecond: rl.get(Requests),
WriteKeysPerSecond: rl.get(WriteKeys),
ReadKeysPerSecond: rl.get(ReadKeys),
WriteBytesPerSecond: rl.get(WriteBytes),
ReadBytesPerSecond: rl.get(ReadBytes),
CPUNanosPerSecond: rl.get(RaftCPUNanos) + rl.get(ReqCPUNanos),
}
}

// RequestLocalityInfo returns the summary of client localities for requests
// made to this replica.
func (rl *ReplicaLoad) RequestLocalityInfo() *replicastats.RatedSummary {
return rl.stats[Queries].SnapshotRatedSummary()
}

// TestingGetSum returns the sum of recorded values for the LoadStat with
// ordinal stat. The sum is used in testing in place of any averaging in order
// to assert precisely on the values that have been recorded.
func (rl *ReplicaLoad) TestingGetSum(stat LoadStat) float64 {
val, _ := rl.stats[stat].SumLocked()
return val
}

// TestingSetStat sets the value for the LoadStat with ordinal stat to be the
// value given. This value will then be returned in all future Stats() calls
// unless overriden by another call to TestingSetStat.
func (rl *ReplicaLoad) TestingSetStat(stat LoadStat, val float64) {
rl.stats[stat].SetMeanRateForTesting(val)
}
Loading

0 comments on commit 352d2ef

Please sign in to comment.