diff --git a/pkg/kv/kvserver/replica_stats.go b/pkg/kv/kvserver/replica_stats.go index e4c9490204bf..20f100e6a74a 100644 --- a/pkg/kv/kvserver/replica_stats.go +++ b/pkg/kv/kvserver/replica_stats.go @@ -96,15 +96,36 @@ func newReplicaStatsRecord() *replicaStatsRecord { } } -func (rsr *replicaStatsRecord) merge(other *replicaStatsRecord) { - rsr.max = math.Max(rsr.max, other.max) - rsr.min = math.Min(rsr.min, other.min) - rsr.sum += other.sum - rsr.count += other.count - - for locality, count := range other.localityCounts { - rsr.localityCounts[locality] += count +// mergeReplicaStatsRecords combines two records and returns a new record with +// the merged data. When this is called with nil records, a nil record is +// returned; otherwise a new record instead. +func mergeReplicaStatsRecords(left, right *replicaStatsRecord) *replicaStatsRecord { + if left == nil && right == nil { + return nil } + if left == nil { + left = newReplicaStatsRecord() + } + if right == nil { + right = newReplicaStatsRecord() + } + + mergedStats := newReplicaStatsRecord() + + mergedStats.max = math.Max(left.max, right.max) + mergedStats.min = math.Min(left.min, right.min) + mergedStats.sum = left.sum + right.sum + mergedStats.count = left.count + right.count + + for locality, count := range left.localityCounts { + mergedStats.localityCounts[locality] += count + } + + for locality, count := range right.localityCounts { + mergedStats.localityCounts[locality] += count + } + + return mergedStats } func (rsr *replicaStatsRecord) split(other *replicaStatsRecord) { @@ -158,11 +179,7 @@ func (rs *replicaStats) mergeRequestCounts(other *replicaStats) { rsIdx := (rs.mu.idx + n - i) % n otherIdx := (other.mu.idx + n - i) % n - if other.mu.records[otherIdx] == nil { - continue - } - - rs.mu.records[rsIdx].merge(other.mu.records[otherIdx]) + rs.mu.records[rsIdx] = mergeReplicaStatsRecords(rs.mu.records[rsIdx], other.mu.records[otherIdx]) // Reset the stats on other. other.mu.records[otherIdx] = newReplicaStatsRecord() diff --git a/pkg/kv/kvserver/replica_stats_test.go b/pkg/kv/kvserver/replica_stats_test.go index 4d6a0eecf572..6bf612fced4d 100644 --- a/pkg/kv/kvserver/replica_stats_test.go +++ b/pkg/kv/kvserver/replica_stats_test.go @@ -376,7 +376,7 @@ func TestReplicaStatsDecaySmoothing(t *testing.T) { } } -func genTestingReplicaStats(windowedMultipliers [6]int, n, offset int) *replicaStats { +func genTestingReplicaStats(windowedMultipliers []int, n, offset int) *replicaStats { manual := hlc.NewManualClock(123) clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) awsLocalities := map[roachpb.NodeID]string{ @@ -414,13 +414,13 @@ func TestReplicaStatsSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - windowedMultipliersInitial := [6]int{10, 20, 30, 40, 50, 60} - windowedMultipliersSplit := [6]int{5, 10, 15, 20, 25, 30} - nilMultipliers := [6]int{0, 0, 0, 0, 0, 0} + windowedMultipliersInitial := []int{10, 20, 30, 40, 50, 60} + windowedMultipliersSplit := []int{5, 10, 15, 20, 25, 30} + nilMultipliers := []int{} testCases := []struct { - expectedSplit [6]int - windowsInitial [6]int + expectedSplit []int + windowsInitial []int rotation int }{ { @@ -438,17 +438,35 @@ func TestReplicaStatsSplit(t *testing.T) { windowsInitial: windowedMultipliersInitial, rotation: 50, }, + { + expectedSplit: windowedMultipliersSplit[:0], + windowsInitial: windowedMultipliersInitial[:0], + rotation: 0, + }, + { + expectedSplit: windowedMultipliersSplit[:2], + windowsInitial: windowedMultipliersInitial[:2], + rotation: 0, + }, } for _, tc := range testCases { - initial := genTestingReplicaStats(tc.windowsInitial, 10, 0) - expected := genTestingReplicaStats(tc.expectedSplit, 10, 0) + initial := genTestingReplicaStats(tc.windowsInitial, 10, tc.rotation) + expected := genTestingReplicaStats(tc.expectedSplit, 10, tc.rotation) otherHalf := genTestingReplicaStats(nilMultipliers, 0, 0) + n := len(initial.mu.records) // Adjust the max/min/count, since the generated replica stats will have // max/min that is too high and missing half the counts. for i := range initial.mu.records { - idx := (initial.mu.idx + i) % 6 + idx := (initial.mu.idx + n - i) % n + + // We don't want to adjust the aggregates when the initial window + // is uninitialzed. + if initial.mu.records[idx] == nil { + continue + } + expected.mu.records[idx].count = initial.mu.records[idx].count / 2 expected.mu.records[idx].max = initial.mu.records[idx].max expected.mu.records[idx].min = initial.mu.records[idx].min @@ -457,7 +475,7 @@ func TestReplicaStatsSplit(t *testing.T) { initial.splitRequestCounts(otherHalf) for i := range expected.mu.records { - idxExpected, leftIdx, rightIdx := (expected.mu.idx+i)%6, (initial.mu.idx+i)%6, (otherHalf.mu.idx+i)%6 + idxExpected, leftIdx, rightIdx := (expected.mu.idx+n-i)%n, (initial.mu.idx+n-i)%n, (otherHalf.mu.idx+n-i)%n assert.Equal(t, expected.mu.records[idxExpected], initial.mu.records[leftIdx]) assert.Equal(t, expected.mu.records[idxExpected], otherHalf.mu.records[rightIdx]) } @@ -473,14 +491,14 @@ func TestReplicaStatsMerge(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - windowedMultipliers1 := [6]int{1, 2, 3, 4, 5, 6} - windowedMultipliers10 := [6]int{10, 20, 30, 40, 50, 60} - expectedMultipliers := [6]int{11, 22, 33, 44, 55, 66} + windowedMultipliers1 := []int{1, 2, 3, 4, 5, 6} + windowedMultipliers10 := []int{10, 20, 30, 40, 50, 60} + expectedMultipliers := []int{11, 22, 33, 44, 55, 66} testCases := []struct { - windowsA [6]int - windowsB [6]int - windowsExp [6]int + windowsA []int + windowsB []int + windowsExp []int rotateA int rotateB int }{ @@ -505,26 +523,67 @@ func TestReplicaStatsMerge(t *testing.T) { rotateA: 50, rotateB: 109, }, + // Ensure that with uninitialzed entries in replica stats, the result + // adjusts correctly. Here we expect the sliding windows to be + // merged from the most recent window (tail) backwards. + { + windowsA: []int{}, + windowsB: []int{}, + windowsExp: []int{}, + rotateA: 0, + rotateB: 0, + }, + { + windowsA: []int{1, 1}, + windowsB: []int{1, 1, 1, 1, 1, 1}, + windowsExp: []int{1, 1, 1, 1, 2, 2}, + rotateA: 0, + rotateB: 0, + }, + { + windowsA: []int{1, 1, 1, 1, 1, 1}, + windowsB: []int{1, 1}, + windowsExp: []int{1, 1, 1, 1, 2, 2}, + rotateA: 0, + rotateB: 0, + }, } for _, tc := range testCases { - rsA := genTestingReplicaStats(tc.windowsA, 10, 0) - rsB := genTestingReplicaStats(tc.windowsB, 10, 0) - expectedRs := genTestingReplicaStats(tc.windowsExp, 10, 0) + rsA := genTestingReplicaStats(tc.windowsA, 10, tc.rotateA) + rsB := genTestingReplicaStats(tc.windowsB, 10, tc.rotateB) + expectedRs := genTestingReplicaStats(tc.windowsExp, 10, tc.rotateA) + n := len(expectedRs.mu.records) // Adjust the max/min/count, since the generated replica stats will have // max/min that is too high and missing half the counts. for i := range expectedRs.mu.records { - idxExpected, idxA, idxB := (expectedRs.mu.idx+i)%6, (rsA.mu.idx+i)%6, (rsB.mu.idx+i)%6 - expectedRs.mu.records[idxExpected].count = rsA.mu.records[idxA].count + rsB.mu.records[idxB].count - expectedRs.mu.records[idxExpected].max = math.Max(rsA.mu.records[idxA].max, rsB.mu.records[idxB].max) - expectedRs.mu.records[idxExpected].min = math.Min(rsA.mu.records[idxA].min, rsB.mu.records[idxB].min) + + idxExpected, idxA, idxB := (expectedRs.mu.idx+n-i)%n, (rsA.mu.idx+n-i)%n, (rsB.mu.idx+n-i)%n + if expectedRs.mu.records[idxExpected] == nil { + continue + } + + expectedRs.mu.records[idxExpected].count = 0 + expectedRs.mu.records[idxExpected].max = -math.MaxFloat64 + expectedRs.mu.records[idxExpected].min = math.MaxFloat64 + + if rsA.mu.records[idxA] != nil { + expectedRs.mu.records[idxExpected].count = rsA.mu.records[idxA].count + expectedRs.mu.records[idxExpected].max = rsA.mu.records[idxA].max + expectedRs.mu.records[idxExpected].min = rsA.mu.records[idxA].min + } + if rsB.mu.records[idxB] != nil { + expectedRs.mu.records[idxExpected].count += rsB.mu.records[idxB].count + expectedRs.mu.records[idxExpected].max = math.Max(expectedRs.mu.records[idxExpected].max, rsB.mu.records[idxB].max) + expectedRs.mu.records[idxExpected].min = math.Min(expectedRs.mu.records[idxExpected].min, rsB.mu.records[idxB].min) + } } rsA.mergeRequestCounts(rsB) for i := range expectedRs.mu.records { - idxExpected, idxA := (expectedRs.mu.idx+i)%6, (rsA.mu.idx+i)%6 - assert.Equal(t, expectedRs.mu.records[idxExpected], rsA.mu.records[idxA]) + idxExpected, idxA := (expectedRs.mu.idx+n-i)%n, (rsA.mu.idx+n-i)%n + assert.Equal(t, expectedRs.mu.records[idxExpected], rsA.mu.records[idxA], "expected idx: %d, merged idx %d", idxExpected, idxA) } } } @@ -542,7 +601,7 @@ func TestReplicaStatsRecordAggregate(t *testing.T) { } n := 100 - rs := genTestingReplicaStats([6]int{1, 0, 0, 0, 0, 0}, n, 0) + rs := genTestingReplicaStats([]int{1, 0, 0, 0, 0, 0}, n, 0) expectedSum := float64(n*(n+1)) / 2.0