Skip to content

Commit

Permalink
kvserver: rework memory allocation in replicastats
Browse files Browse the repository at this point in the history
This patch removes some unused fields within the replica stats object.
It also opts to allocate all the memory needed upfront for a replica
stats object for better cache locality and less GC overhead.

resolves cockroachdb#85112

Release justification: low risk, lowers memory footprint to avoid oom.
Release note: None
  • Loading branch information
kvoli committed Aug 23, 2022
1 parent 9b52585 commit 8b03aea
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 93 deletions.
97 changes: 47 additions & 50 deletions pkg/kv/kvserver/replicastats/replica_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,57 +86,45 @@ type ReplicaStats struct {

type replicaStatsRecord struct {
localityCounts PerLocalityCounts
sum, max, min float64
count int64
sum float64
active bool
}

func newReplicaStatsRecord() *replicaStatsRecord {
return &replicaStatsRecord{
localityCounts: make(PerLocalityCounts),
max: -math.MaxFloat64,
min: math.MaxFloat64,
}
return &replicaStatsRecord{localityCounts: make(PerLocalityCounts)}
}

func (rsr *replicaStatsRecord) reset() {
rsr.localityCounts = make(PerLocalityCounts)
rsr.active = false
rsr.sum = 0
}

// mergeReplicaStatsRecords combines two records and returns a new record with
// the merged data. When this is called with nil records, a nil record is
// returned; otherwise a new record instead.
func mergeReplicaStatsRecords(left, right *replicaStatsRecord) *replicaStatsRecord {
if left == nil && right == nil {
return nil
}
if left == nil {
left = newReplicaStatsRecord()
}
if right == nil {
right = newReplicaStatsRecord()
func (rsr *replicaStatsRecord) mergeReplicaStatsRecords(other *replicaStatsRecord) {
if !rsr.active && !other.active {
return
}

mergedStats := newReplicaStatsRecord()

mergedStats.max = math.Max(left.max, right.max)
mergedStats.min = math.Min(left.min, right.min)
mergedStats.sum = left.sum + right.sum
mergedStats.count = left.count + right.count

for locality, count := range left.localityCounts {
mergedStats.localityCounts[locality] += count
if !rsr.active {
rsr.reset()
}

for locality, count := range right.localityCounts {
mergedStats.localityCounts[locality] += count
if !other.active {
other.reset()
}

return mergedStats
rsr.sum += other.sum
rsr.active = true

for locality, count := range other.localityCounts {
rsr.localityCounts[locality] += count
}
}

func (rsr *replicaStatsRecord) split(other *replicaStatsRecord) {
other.max = rsr.max
other.min = rsr.min

rsr.count = rsr.count / 2
other.count = rsr.count

rsr.sum = rsr.sum / 2.0
other.sum = rsr.sum

Expand All @@ -154,7 +142,12 @@ func NewReplicaStats(clock *hlc.Clock, getNodeLocality LocalityOracle) *ReplicaS
}

rs.Mu.lastRotate = timeutil.Unix(0, rs.clock.PhysicalNow())
rs.Mu.records[rs.Mu.idx] = newReplicaStatsRecord()
for i := range rs.Mu.records {
rs.Mu.records[i] = newReplicaStatsRecord()
}
// Set the first record to active. All other records will be initially
// inactive.
rs.Mu.records[rs.Mu.idx].active = true
rs.Mu.lastReset = rs.Mu.lastRotate
return rs
}
Expand All @@ -178,14 +171,12 @@ func (rs *ReplicaStats) MergeRequestCounts(other *ReplicaStats) {
n := len(rs.Mu.records)

for i := range other.Mu.records {

rsIdx := (rs.Mu.idx + n - i) % n
otherIdx := (other.Mu.idx + n - i) % n

rs.Mu.records[rsIdx] = mergeReplicaStatsRecords(rs.Mu.records[rsIdx], other.Mu.records[otherIdx])

// Reset the stats on other.
other.Mu.records[otherIdx] = newReplicaStatsRecord()
rs.Mu.records[rsIdx].mergeReplicaStatsRecords(other.Mu.records[otherIdx])
// Reset the other record.
other.Mu.records[otherIdx].reset()
}

// Update the last rotate time to be the lesser of the two, so that a
Expand All @@ -212,11 +203,15 @@ func (rs *ReplicaStats) SplitRequestCounts(other *ReplicaStats) {
other.Mu.lastReset = rs.Mu.lastReset

for i := range rs.Mu.records {
if rs.Mu.records[i] == nil {
other.Mu.records[i] = nil
// When the rhs isn't active, set the left hand side to inactive as
// well.
if !rs.Mu.records[i].active {
other.Mu.records[i].reset()
other.Mu.records[i].active = false
continue
}
other.Mu.records[i] = newReplicaStatsRecord()
other.Mu.records[i].active = true
rs.Mu.records[i].split(other.Mu.records[i])
}
}
Expand All @@ -236,10 +231,7 @@ func (rs *ReplicaStats) RecordCount(count float64, nodeID roachpb.NodeID) {

record := rs.Mu.records[rs.Mu.idx]
record.sum += count
record.max = math.Max(record.max, count)
record.min = math.Min(record.min, count)
record.localityCounts[locality] += count
record.count++
}

func (rs *ReplicaStats) maybeRotateLocked(now time.Time) {
Expand All @@ -251,7 +243,9 @@ func (rs *ReplicaStats) maybeRotateLocked(now time.Time) {

func (rs *ReplicaStats) rotateLocked() {
rs.Mu.idx = (rs.Mu.idx + 1) % len(rs.Mu.records)
rs.Mu.records[rs.Mu.idx] = newReplicaStatsRecord()
// Reset the next idx and set the record to active.
rs.Mu.records[rs.Mu.idx].reset()
rs.Mu.records[rs.Mu.idx].active = true
}

// PerLocalityDecayingRate returns the per-locality counts-per-second and the
Expand All @@ -277,7 +271,7 @@ func (rs *ReplicaStats) PerLocalityDecayingRate() (PerLocalityCounts, time.Durat
// We have to add len(rs.mu.requests) to the numerator to avoid getting a
// negative result from the modulus operation when rs.mu.idx is small.
requestsIdx := (rs.Mu.idx + len(rs.Mu.records) - i) % len(rs.Mu.records)
if cur := rs.Mu.records[requestsIdx]; cur != nil {
if cur := rs.Mu.records[requestsIdx]; cur.active {
decay := math.Pow(decayFactor, float64(i)+fractionOfRotation)
if i == 0 {
duration += time.Duration(float64(timeSinceRotate) * decay)
Expand Down Expand Up @@ -307,7 +301,7 @@ func (rs *ReplicaStats) SumLocked() (float64, int) {
// We have to add len(rs.mu.requests) to the numerator to avoid getting a
// negative result from the modulus operation when rs.mu.idx is small.
requestsIdx := (rs.Mu.idx + len(rs.Mu.records) - i) % len(rs.Mu.records)
if cur := rs.Mu.records[requestsIdx]; cur != nil {
if cur := rs.Mu.records[requestsIdx]; cur.active {
windowsUsed++
sum += cur.sum
}
Expand Down Expand Up @@ -348,10 +342,13 @@ func (rs *ReplicaStats) ResetRequestCounts() {
rs.Mu.Lock()
defer rs.Mu.Unlock()

// Reset the individual records and set their state to inactive.
for i := range rs.Mu.records {
rs.Mu.records[i] = nil
rs.Mu.records[i].reset()
rs.Mu.records[i].active = false
}
rs.Mu.records[rs.Mu.idx] = newReplicaStatsRecord()
// Update the current idx record to be active.
rs.Mu.records[rs.Mu.idx].active = true
rs.Mu.lastRotate = timeutil.Unix(0, rs.clock.PhysicalNow())
rs.Mu.lastReset = rs.Mu.lastRotate
}
Expand Down
44 changes: 1 addition & 43 deletions pkg/kv/kvserver/replicastats/replica_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,22 +457,6 @@ func TestReplicaStatsSplit(t *testing.T) {
otherHalf := genTestingReplicaStats(nilMultipliers, 0, 0)
n := len(initial.Mu.records)

// Adjust the max/min/count, since the generated replica stats will have
// max/min that is too high and missing half the counts.
for i := range initial.Mu.records {
idx := (initial.Mu.idx + n - i) % n

// We don't want to adjust the aggregates when the initial window
// is uninitialzed.
if initial.Mu.records[idx] == nil {
continue
}

expected.Mu.records[idx].count = initial.Mu.records[idx].count / 2
expected.Mu.records[idx].max = initial.Mu.records[idx].max
expected.Mu.records[idx].min = initial.Mu.records[idx].min
}

initial.SplitRequestCounts(otherHalf)

for i := range expected.Mu.records {
Expand Down Expand Up @@ -556,30 +540,6 @@ func TestReplicaStatsMerge(t *testing.T) {
expectedRs := genTestingReplicaStats(tc.windowsExp, 10, tc.rotateA)
n := len(expectedRs.Mu.records)

// Adjust the max/min/count, since the generated replica stats will have
// max/min that is too high and missing half the counts.
for i := range expectedRs.Mu.records {

idxExpected, idxA, idxB := (expectedRs.Mu.idx+n-i)%n, (rsA.Mu.idx+n-i)%n, (rsB.Mu.idx+n-i)%n
if expectedRs.Mu.records[idxExpected] == nil {
continue
}

expectedRs.Mu.records[idxExpected].count = 0
expectedRs.Mu.records[idxExpected].max = -math.MaxFloat64
expectedRs.Mu.records[idxExpected].min = math.MaxFloat64

if rsA.Mu.records[idxA] != nil {
expectedRs.Mu.records[idxExpected].count = rsA.Mu.records[idxA].count
expectedRs.Mu.records[idxExpected].max = rsA.Mu.records[idxA].max
expectedRs.Mu.records[idxExpected].min = rsA.Mu.records[idxA].min
}
if rsB.Mu.records[idxB] != nil {
expectedRs.Mu.records[idxExpected].count += rsB.Mu.records[idxB].count
expectedRs.Mu.records[idxExpected].max = math.Max(expectedRs.Mu.records[idxExpected].max, rsB.Mu.records[idxB].max)
expectedRs.Mu.records[idxExpected].min = math.Min(expectedRs.Mu.records[idxExpected].min, rsB.Mu.records[idxB].min)
}
}
rsA.MergeRequestCounts(rsB)

for i := range expectedRs.Mu.records {
Expand Down Expand Up @@ -620,9 +580,7 @@ func TestReplicaStatsRecordAggregate(t *testing.T) {
expectedStatsRecord := &replicaStatsRecord{
localityCounts: expectedLocalityCounts,
sum: expectedSum * 6,
max: float64(n * 3),
min: 1,
count: int64(n * 3),
active: true,
}

require.Equal(t, expectedStatsRecord, rs.Mu.records[rs.Mu.idx])
Expand Down

0 comments on commit 8b03aea

Please sign in to comment.