Skip to content

Commit

Permalink
kvserver: rework memory allocation in replicastats
Browse files Browse the repository at this point in the history
This patch removes some unused fields within the replica stats object.
It also opts to allocate all the memory needed upfront for a replica
stats object for better cache locality and less GC overhead.

resolves cockroachdb#85112

Release justification: low risk, lowers memory footprint to avoid oom.
Release note: None
  • Loading branch information
kvoli committed Aug 25, 2022
1 parent 54bc65f commit 82c0639
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 101 deletions.
134 changes: 79 additions & 55 deletions pkg/kv/kvserver/replicastats/replica_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ type ReplicaStats struct {
// http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf
Mu struct {
syncutil.Mutex
idx int
idx int
// the window records are initialized once, then each window is reused
// internally by flipping an active field and clearing the fields.
records [6]*replicaStatsRecord
lastRotate time.Time
lastReset time.Time
Expand All @@ -86,57 +88,71 @@ type ReplicaStats struct {

type replicaStatsRecord struct {
localityCounts PerLocalityCounts
sum, max, min float64
count int64
sum float64
active bool
}

func newReplicaStatsRecord() *replicaStatsRecord {
return &replicaStatsRecord{
localityCounts: make(PerLocalityCounts),
max: -math.MaxFloat64,
min: math.MaxFloat64,
func (rsr *replicaStatsRecord) reset() {
rsr.sum = 0

// Reuse the existing map to avoid heap allocations. Additionally, Instead
// of deleting the locality entries, it is likely that the same entries
// will occur again in the next use of this window. Zero out any existing
// values to avoid churning memory. We could also delete elements that
// haven't been seen in a while, however it makes no difference as the
// memory wouldn't be released (https://github.com/golang/go/issues/20135).
// To protect against a memory leak, where many unused localities
// accumulate, create a new map if the number of unused localities was
// greater than 1.
unusedLocalities := 0
for _, v := range rsr.localityCounts {
if v == 0 {
unusedLocalities++
}
if unusedLocalities > 1 {
rsr.localityCounts = make(PerLocalityCounts)
return
}
}
}

// mergeReplicaStatsRecords combines two records and returns a new record with
// the merged data. When this is called with nil records, a nil record is
// returned; otherwise a new record instead.
func mergeReplicaStatsRecords(left, right *replicaStatsRecord) *replicaStatsRecord {
if left == nil && right == nil {
return nil
}
if left == nil {
left = newReplicaStatsRecord()
}
if right == nil {
right = newReplicaStatsRecord()
for k := range rsr.localityCounts {
rsr.localityCounts[k] = 0
}

mergedStats := newReplicaStatsRecord()
}

mergedStats.max = math.Max(left.max, right.max)
mergedStats.min = math.Min(left.min, right.min)
mergedStats.sum = left.sum + right.sum
mergedStats.count = left.count + right.count
// activate sets the active indicator on the record. It assumes that the record
// is empty and does not clear any fields.
func (rsr *replicaStatsRecord) activate() {
rsr.active = true
}

for locality, count := range left.localityCounts {
mergedStats.localityCounts[locality] += count
}
// deactivate sets the active indicator on a record to false. It clears the
// data, emptying the record.
func (rsr *replicaStatsRecord) deactivate() {
rsr.reset()
rsr.active = false
}

for locality, count := range right.localityCounts {
mergedStats.localityCounts[locality] += count
// mergeReplicaStatsRecords combines the record passed in with the receiver.
func (rsr *replicaStatsRecord) mergeReplicaStatsRecords(other *replicaStatsRecord) {
// If the other record is inactive, then there is nothing to merge into the
// resulting record. Otherwise, regardless of whether the resulting record
// was originally active or inactive, we should merge the stats in and
// ensure it is set to active.
if !other.active {
return
}

return mergedStats
rsr.sum += other.sum
rsr.activate()

for locality, count := range other.localityCounts {
rsr.localityCounts[locality] += count
}
}

func (rsr *replicaStatsRecord) split(other *replicaStatsRecord) {
other.max = rsr.max
other.min = rsr.min

rsr.count = rsr.count / 2
other.count = rsr.count

rsr.sum = rsr.sum / 2.0
other.sum = rsr.sum

Expand All @@ -154,7 +170,12 @@ func NewReplicaStats(clock *hlc.Clock, getNodeLocality LocalityOracle) *ReplicaS
}

rs.Mu.lastRotate = timeutil.Unix(0, rs.clock.PhysicalNow())
rs.Mu.records[rs.Mu.idx] = newReplicaStatsRecord()
for i := range rs.Mu.records {
rs.Mu.records[i] = &replicaStatsRecord{localityCounts: make(PerLocalityCounts)}
}
// Set the first record to active. All other records will be initially
// inactive and empty.
rs.Mu.records[rs.Mu.idx].activate()
rs.Mu.lastReset = rs.Mu.lastRotate
return rs
}
Expand All @@ -178,14 +199,13 @@ func (rs *ReplicaStats) MergeRequestCounts(other *ReplicaStats) {
n := len(rs.Mu.records)

for i := range other.Mu.records {

rsIdx := (rs.Mu.idx + n - i) % n
otherIdx := (other.Mu.idx + n - i) % n

rs.Mu.records[rsIdx] = mergeReplicaStatsRecords(rs.Mu.records[rsIdx], other.Mu.records[otherIdx])

// Reset the stats on other.
other.Mu.records[otherIdx] = newReplicaStatsRecord()
rs.Mu.records[rsIdx].mergeReplicaStatsRecords(other.Mu.records[otherIdx])
// We merged the other records counts into rs, deactivate the other
// record to avoid double counting with repeated calls to merge.
other.Mu.records[otherIdx].deactivate()
}

// Update the last rotate time to be the lesser of the two, so that a
Expand All @@ -212,11 +232,14 @@ func (rs *ReplicaStats) SplitRequestCounts(other *ReplicaStats) {
other.Mu.lastReset = rs.Mu.lastReset

for i := range rs.Mu.records {
if rs.Mu.records[i] == nil {
other.Mu.records[i] = nil
// When the lhs isn't active, set the rhs to inactive as well.
if !rs.Mu.records[i].active {
other.Mu.records[i].deactivate()
continue
}
other.Mu.records[i] = newReplicaStatsRecord()
// Otherwise, active the rhs record and split the request count between
// the two.
other.Mu.records[i].activate()
rs.Mu.records[i].split(other.Mu.records[i])
}
}
Expand All @@ -236,10 +259,7 @@ func (rs *ReplicaStats) RecordCount(count float64, nodeID roachpb.NodeID) {

record := rs.Mu.records[rs.Mu.idx]
record.sum += count
record.max = math.Max(record.max, count)
record.min = math.Min(record.min, count)
record.localityCounts[locality] += count
record.count++
}

func (rs *ReplicaStats) maybeRotateLocked(now time.Time) {
Expand All @@ -251,7 +271,9 @@ func (rs *ReplicaStats) maybeRotateLocked(now time.Time) {

func (rs *ReplicaStats) rotateLocked() {
rs.Mu.idx = (rs.Mu.idx + 1) % len(rs.Mu.records)
rs.Mu.records[rs.Mu.idx] = newReplicaStatsRecord()
// Reset the next idx and set the record to active.
rs.Mu.records[rs.Mu.idx].reset()
rs.Mu.records[rs.Mu.idx].activate()
}

// PerLocalityDecayingRate returns the per-locality counts-per-second and the
Expand All @@ -277,7 +299,7 @@ func (rs *ReplicaStats) PerLocalityDecayingRate() (PerLocalityCounts, time.Durat
// We have to add len(rs.mu.requests) to the numerator to avoid getting a
// negative result from the modulus operation when rs.mu.idx is small.
requestsIdx := (rs.Mu.idx + len(rs.Mu.records) - i) % len(rs.Mu.records)
if cur := rs.Mu.records[requestsIdx]; cur != nil {
if cur := rs.Mu.records[requestsIdx]; cur.active {
decay := math.Pow(decayFactor, float64(i)+fractionOfRotation)
if i == 0 {
duration += time.Duration(float64(timeSinceRotate) * decay)
Expand Down Expand Up @@ -307,7 +329,7 @@ func (rs *ReplicaStats) SumLocked() (float64, int) {
// We have to add len(rs.mu.requests) to the numerator to avoid getting a
// negative result from the modulus operation when rs.mu.idx is small.
requestsIdx := (rs.Mu.idx + len(rs.Mu.records) - i) % len(rs.Mu.records)
if cur := rs.Mu.records[requestsIdx]; cur != nil {
if cur := rs.Mu.records[requestsIdx]; cur.active {
windowsUsed++
sum += cur.sum
}
Expand Down Expand Up @@ -348,10 +370,12 @@ func (rs *ReplicaStats) ResetRequestCounts() {
rs.Mu.Lock()
defer rs.Mu.Unlock()

// Reset the individual records and set their state to inactive.
for i := range rs.Mu.records {
rs.Mu.records[i] = nil
rs.Mu.records[i].deactivate()
}
rs.Mu.records[rs.Mu.idx] = newReplicaStatsRecord()
// Update the current idx record to be active.
rs.Mu.records[rs.Mu.idx].activate()
rs.Mu.lastRotate = timeutil.Unix(0, rs.clock.PhysicalNow())
rs.Mu.lastReset = rs.Mu.lastRotate
}
Expand Down
54 changes: 8 additions & 46 deletions pkg/kv/kvserver/replicastats/replica_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,14 @@ func TestReplicaStatsDecay(t *testing.T) {
}
}

// All of the recorded values should have been rotated out, zeroing out
// all the windows. Assert that no entries in the locality count map
// have any value other than zero. The keys are not cleared as they are
// likely to appear again.
manual.Advance(replStatsRotateInterval)
expected := make(PerLocalityCounts)
if actual, _ := rs.PerLocalityDecayingRate(); !reflect.DeepEqual(expected, actual) {
t.Errorf("incorrect per-locality request counts: %s", pretty.Diff(expected, actual))
actualCounts, _ := rs.PerLocalityDecayingRate()
for _, v := range actualCounts {
require.Zero(t, v)
}
rs.ResetRequestCounts()
}
Expand Down Expand Up @@ -457,22 +461,6 @@ func TestReplicaStatsSplit(t *testing.T) {
otherHalf := genTestingReplicaStats(nilMultipliers, 0, 0)
n := len(initial.Mu.records)

// Adjust the max/min/count, since the generated replica stats will have
// max/min that is too high and missing half the counts.
for i := range initial.Mu.records {
idx := (initial.Mu.idx + n - i) % n

// We don't want to adjust the aggregates when the initial window
// is uninitialzed.
if initial.Mu.records[idx] == nil {
continue
}

expected.Mu.records[idx].count = initial.Mu.records[idx].count / 2
expected.Mu.records[idx].max = initial.Mu.records[idx].max
expected.Mu.records[idx].min = initial.Mu.records[idx].min
}

initial.SplitRequestCounts(otherHalf)

for i := range expected.Mu.records {
Expand Down Expand Up @@ -556,30 +544,6 @@ func TestReplicaStatsMerge(t *testing.T) {
expectedRs := genTestingReplicaStats(tc.windowsExp, 10, tc.rotateA)
n := len(expectedRs.Mu.records)

// Adjust the max/min/count, since the generated replica stats will have
// max/min that is too high and missing half the counts.
for i := range expectedRs.Mu.records {

idxExpected, idxA, idxB := (expectedRs.Mu.idx+n-i)%n, (rsA.Mu.idx+n-i)%n, (rsB.Mu.idx+n-i)%n
if expectedRs.Mu.records[idxExpected] == nil {
continue
}

expectedRs.Mu.records[idxExpected].count = 0
expectedRs.Mu.records[idxExpected].max = -math.MaxFloat64
expectedRs.Mu.records[idxExpected].min = math.MaxFloat64

if rsA.Mu.records[idxA] != nil {
expectedRs.Mu.records[idxExpected].count = rsA.Mu.records[idxA].count
expectedRs.Mu.records[idxExpected].max = rsA.Mu.records[idxA].max
expectedRs.Mu.records[idxExpected].min = rsA.Mu.records[idxA].min
}
if rsB.Mu.records[idxB] != nil {
expectedRs.Mu.records[idxExpected].count += rsB.Mu.records[idxB].count
expectedRs.Mu.records[idxExpected].max = math.Max(expectedRs.Mu.records[idxExpected].max, rsB.Mu.records[idxB].max)
expectedRs.Mu.records[idxExpected].min = math.Min(expectedRs.Mu.records[idxExpected].min, rsB.Mu.records[idxB].min)
}
}
rsA.MergeRequestCounts(rsB)

for i := range expectedRs.Mu.records {
Expand Down Expand Up @@ -620,9 +584,7 @@ func TestReplicaStatsRecordAggregate(t *testing.T) {
expectedStatsRecord := &replicaStatsRecord{
localityCounts: expectedLocalityCounts,
sum: expectedSum * 6,
max: float64(n * 3),
min: 1,
count: int64(n * 3),
active: true,
}

require.Equal(t, expectedStatsRecord, rs.Mu.records[rs.Mu.idx])
Expand Down

0 comments on commit 82c0639

Please sign in to comment.