Skip to content

Commit

Permalink
kvserver: avoid nil deref on replica_stats merge
Browse files Browse the repository at this point in the history
A bug existed where when replica stats were merged with only paritally
initialized windows, merging would result in a nil ptr dereference. This
patch updates the logic of replicaStatsMerge to avoid nil dereferences
and appropriately handle uninitialized window records.

resolves: #80072

Release note: None
  • Loading branch information
kvoli committed Apr 16, 2022
1 parent 771432d commit 32c6124
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 39 deletions.
43 changes: 30 additions & 13 deletions pkg/kv/kvserver/replica_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,36 @@ func newReplicaStatsRecord() *replicaStatsRecord {
}
}

func (rsr *replicaStatsRecord) merge(other *replicaStatsRecord) {
rsr.max = math.Max(rsr.max, other.max)
rsr.min = math.Min(rsr.min, other.min)
rsr.sum += other.sum
rsr.count += other.count

for locality, count := range other.localityCounts {
rsr.localityCounts[locality] += count
// mergeReplicaStatsRecords combines two records and returns a new record with
// the merged data. When this is called with nil records, a nil record is
// returned; otherwise a new record instead.
func mergeReplicaStatsRecords(left, right *replicaStatsRecord) *replicaStatsRecord {
if left == nil && right == nil {
return nil
}
if left == nil {
left = newReplicaStatsRecord()
}
if right == nil {
right = newReplicaStatsRecord()
}

mergedStats := newReplicaStatsRecord()

mergedStats.max = math.Max(left.max, right.max)
mergedStats.min = math.Min(left.min, right.min)
mergedStats.sum = left.sum + right.sum
mergedStats.count = left.count + right.count

for locality, count := range left.localityCounts {
mergedStats.localityCounts[locality] += count
}

for locality, count := range right.localityCounts {
mergedStats.localityCounts[locality] += count
}

return mergedStats
}

func (rsr *replicaStatsRecord) split(other *replicaStatsRecord) {
Expand Down Expand Up @@ -158,11 +179,7 @@ func (rs *replicaStats) mergeRequestCounts(other *replicaStats) {
rsIdx := (rs.mu.idx + n - i) % n
otherIdx := (other.mu.idx + n - i) % n

if other.mu.records[otherIdx] == nil {
continue
}

rs.mu.records[rsIdx].merge(other.mu.records[otherIdx])
rs.mu.records[rsIdx] = mergeReplicaStatsRecords(rs.mu.records[rsIdx], other.mu.records[otherIdx])

// Reset the stats on other.
other.mu.records[otherIdx] = newReplicaStatsRecord()
Expand Down
111 changes: 85 additions & 26 deletions pkg/kv/kvserver/replica_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func TestReplicaStatsDecaySmoothing(t *testing.T) {
}
}

func genTestingReplicaStats(windowedMultipliers [6]int, n, offset int) *replicaStats {
func genTestingReplicaStats(windowedMultipliers []int, n, offset int) *replicaStats {
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
awsLocalities := map[roachpb.NodeID]string{
Expand Down Expand Up @@ -414,13 +414,13 @@ func TestReplicaStatsSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

windowedMultipliersInitial := [6]int{10, 20, 30, 40, 50, 60}
windowedMultipliersSplit := [6]int{5, 10, 15, 20, 25, 30}
nilMultipliers := [6]int{0, 0, 0, 0, 0, 0}
windowedMultipliersInitial := []int{10, 20, 30, 40, 50, 60}
windowedMultipliersSplit := []int{5, 10, 15, 20, 25, 30}
nilMultipliers := []int{}

testCases := []struct {
expectedSplit [6]int
windowsInitial [6]int
expectedSplit []int
windowsInitial []int
rotation int
}{
{
Expand All @@ -438,17 +438,35 @@ func TestReplicaStatsSplit(t *testing.T) {
windowsInitial: windowedMultipliersInitial,
rotation: 50,
},
{
expectedSplit: windowedMultipliersSplit[:0],
windowsInitial: windowedMultipliersInitial[:0],
rotation: 0,
},
{
expectedSplit: windowedMultipliersSplit[:2],
windowsInitial: windowedMultipliersInitial[:2],
rotation: 0,
},
}

for _, tc := range testCases {
initial := genTestingReplicaStats(tc.windowsInitial, 10, 0)
expected := genTestingReplicaStats(tc.expectedSplit, 10, 0)
initial := genTestingReplicaStats(tc.windowsInitial, 10, tc.rotation)
expected := genTestingReplicaStats(tc.expectedSplit, 10, tc.rotation)
otherHalf := genTestingReplicaStats(nilMultipliers, 0, 0)
n := len(initial.mu.records)

// Adjust the max/min/count, since the generated replica stats will have
// max/min that is too high and missing half the counts.
for i := range initial.mu.records {
idx := (initial.mu.idx + i) % 6
idx := (initial.mu.idx + n - i) % n

// We don't want to adjust the aggregates when the initial window
// is uninitialzed.
if initial.mu.records[idx] == nil {
continue
}

expected.mu.records[idx].count = initial.mu.records[idx].count / 2
expected.mu.records[idx].max = initial.mu.records[idx].max
expected.mu.records[idx].min = initial.mu.records[idx].min
Expand All @@ -457,7 +475,7 @@ func TestReplicaStatsSplit(t *testing.T) {
initial.splitRequestCounts(otherHalf)

for i := range expected.mu.records {
idxExpected, leftIdx, rightIdx := (expected.mu.idx+i)%6, (initial.mu.idx+i)%6, (otherHalf.mu.idx+i)%6
idxExpected, leftIdx, rightIdx := (expected.mu.idx+n-i)%n, (initial.mu.idx+n-i)%n, (otherHalf.mu.idx+n-i)%n
assert.Equal(t, expected.mu.records[idxExpected], initial.mu.records[leftIdx])
assert.Equal(t, expected.mu.records[idxExpected], otherHalf.mu.records[rightIdx])
}
Expand All @@ -473,14 +491,14 @@ func TestReplicaStatsMerge(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

windowedMultipliers1 := [6]int{1, 2, 3, 4, 5, 6}
windowedMultipliers10 := [6]int{10, 20, 30, 40, 50, 60}
expectedMultipliers := [6]int{11, 22, 33, 44, 55, 66}
windowedMultipliers1 := []int{1, 2, 3, 4, 5, 6}
windowedMultipliers10 := []int{10, 20, 30, 40, 50, 60}
expectedMultipliers := []int{11, 22, 33, 44, 55, 66}

testCases := []struct {
windowsA [6]int
windowsB [6]int
windowsExp [6]int
windowsA []int
windowsB []int
windowsExp []int
rotateA int
rotateB int
}{
Expand All @@ -505,26 +523,67 @@ func TestReplicaStatsMerge(t *testing.T) {
rotateA: 50,
rotateB: 109,
},
// Ensure that with uninitialzed entries in replica stats, the result
// adjusts correctly. Here we expect the sliding windows to be
// merged from most the recent window (tail) backwards.
{
windowsA: []int{},
windowsB: []int{},
windowsExp: []int{},
rotateA: 0,
rotateB: 0,
},
{
windowsA: []int{1, 1},
windowsB: []int{1, 1, 1, 1, 1, 1},
windowsExp: []int{1, 1, 1, 1, 2, 2},
rotateA: 0,
rotateB: 0,
},
{
windowsA: []int{1, 1, 1, 1, 1, 1},
windowsB: []int{1, 1},
windowsExp: []int{1, 1, 1, 1, 2, 2},
rotateA: 0,
rotateB: 0,
},
}

for _, tc := range testCases {
rsA := genTestingReplicaStats(tc.windowsA, 10, 0)
rsB := genTestingReplicaStats(tc.windowsB, 10, 0)
expectedRs := genTestingReplicaStats(tc.windowsExp, 10, 0)
rsA := genTestingReplicaStats(tc.windowsA, 10, tc.rotateA)
rsB := genTestingReplicaStats(tc.windowsB, 10, tc.rotateB)
expectedRs := genTestingReplicaStats(tc.windowsExp, 10, tc.rotateA)
n := len(expectedRs.mu.records)

// Adjust the max/min/count, since the generated replica stats will have
// max/min that is too high and missing half the counts.
for i := range expectedRs.mu.records {
idxExpected, idxA, idxB := (expectedRs.mu.idx+i)%6, (rsA.mu.idx+i)%6, (rsB.mu.idx+i)%6
expectedRs.mu.records[idxExpected].count = rsA.mu.records[idxA].count + rsB.mu.records[idxB].count
expectedRs.mu.records[idxExpected].max = math.Max(rsA.mu.records[idxA].max, rsB.mu.records[idxB].max)
expectedRs.mu.records[idxExpected].min = math.Min(rsA.mu.records[idxA].min, rsB.mu.records[idxB].min)

idxExpected, idxA, idxB := (expectedRs.mu.idx+n-i)%n, (rsA.mu.idx+n-i)%n, (rsB.mu.idx+n-i)%n
if expectedRs.mu.records[idxExpected] == nil {
continue
}

expectedRs.mu.records[idxExpected].count = 0
expectedRs.mu.records[idxExpected].max = -math.MaxFloat64
expectedRs.mu.records[idxExpected].min = math.MaxFloat64

if rsA.mu.records[idxA] != nil {
expectedRs.mu.records[idxExpected].count = rsA.mu.records[idxA].count
expectedRs.mu.records[idxExpected].max = rsA.mu.records[idxA].max
expectedRs.mu.records[idxExpected].min = rsA.mu.records[idxA].min
}
if rsB.mu.records[idxB] != nil {
expectedRs.mu.records[idxExpected].count += rsB.mu.records[idxB].count
expectedRs.mu.records[idxExpected].max = math.Max(expectedRs.mu.records[idxExpected].max, rsB.mu.records[idxB].max)
expectedRs.mu.records[idxExpected].min = math.Min(expectedRs.mu.records[idxExpected].min, rsB.mu.records[idxB].min)
}
}
rsA.mergeRequestCounts(rsB)

for i := range expectedRs.mu.records {
idxExpected, idxA := (expectedRs.mu.idx+i)%6, (rsA.mu.idx+i)%6
assert.Equal(t, expectedRs.mu.records[idxExpected], rsA.mu.records[idxA])
idxExpected, idxA := (expectedRs.mu.idx+n-i)%n, (rsA.mu.idx+n-i)%n
assert.Equal(t, expectedRs.mu.records[idxExpected], rsA.mu.records[idxA], "expected idx: %d, merged idx %d", idxExpected, idxA)
}
}
}
Expand All @@ -542,7 +601,7 @@ func TestReplicaStatsRecordAggregate(t *testing.T) {
}

n := 100
rs := genTestingReplicaStats([6]int{1, 0, 0, 0, 0, 0}, n, 0)
rs := genTestingReplicaStats([]int{1, 0, 0, 0, 0, 0}, n, 0)

expectedSum := float64(n*(n+1)) / 2.0

Expand Down

0 comments on commit 32c6124

Please sign in to comment.