Skip to content

Commit

Permalink
allocator: replace read amp with io thresh
Browse files Browse the repository at this point in the history
We previously checked stores' L0-sublevels to exclude IO overloaded
stores from being allocation targets (cockroachdb#78608). This commit replaces the signal
with the normalized IO overload score instead, which also factors in the
L0-filecount. We started gossiping this value as of cockroachdb#83720. We continue
gossiping L0-sublevels for mixed-version compatibility; we can stop doing this
in 23.2.

Resolves: cockroachdb#85084

Release note (ops change): We've deprecated two cluster settings:
- kv.allocator.l0_sublevels_threshold
- kv.allocator.l0_sublevels_threshold_enforce.
The pair of them were used to control rebalancing and upreplication behavior in
the face of IO overloaded stores. This has been now been replaced by other
internal mechanisms.
  • Loading branch information
kvoli committed Feb 22, 2023
1 parent 2431b4b commit 4b11002
Show file tree
Hide file tree
Showing 11 changed files with 438 additions and 391 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/stop",
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2085,16 +2085,16 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
}

// StoreHealthOptions returns the store health options, currently only
// considering the threshold for L0 sub-levels. This threshold is not
// considering the threshold for io overload. This threshold is not
// considered in allocation or rebalancing decisions (excluding candidate
// stores as targets) when enforcementLevel is set to storeHealthNoAction or
// storeHealthLogOnly. By default storeHealthBlockRebalanceTo is the action taken. When
// there is a mixed version cluster, storeHealthNoAction is set instead.
func (a *Allocator) StoreHealthOptions(_ context.Context) StoreHealthOptions {
enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.st.SV))
enforcementLevel := IOOverloadEnforcementLevel(IOOverloadThresholdEnforcement.Get(&a.st.SV))
return StoreHealthOptions{
EnforcementLevel: enforcementLevel,
L0SublevelThreshold: l0SublevelsThreshold.Get(&a.st.SV),
IOOverloadThreshold: IOOverloadThreshold.Get(&a.st.SV),
}
}

Expand Down
300 changes: 153 additions & 147 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestOnlyValidAndHealthyDisk(t *testing.T) {
defer log.Scope(t).Close(t)

testCases := []struct {
valid, invalid, full, readAmpHigh int
valid, invalid, full, ioOverloaded int
}{
{0, 0, 0, 0},
{1, 0, 0, 0},
Expand Down Expand Up @@ -80,16 +80,16 @@ func TestOnlyValidAndHealthyDisk(t *testing.T) {
for i := 0; i < tc.full; i++ {
cl = append(cl, candidate{fullDisk: true})
}
for i := 0; i < tc.readAmpHigh; i++ {
cl = append(cl, candidate{highReadAmp: true})
for i := 0; i < tc.ioOverloaded; i++ {
cl = append(cl, candidate{ioOverloaded: true})
}
sort.Sort(sort.Reverse(byScore(cl)))

valid := cl.onlyValidAndHealthyDisk()
if a, e := len(valid), tc.valid; a != e {
t.Errorf("expected %d valid, actual %d", e, a)
}
if a, e := len(cl)-len(valid), tc.invalid+tc.full+tc.readAmpHigh; a != e {
if a, e := len(cl)-len(valid), tc.invalid+tc.full+tc.ioOverloaded; a != e {
t.Errorf("expected %d invalid, actual %d", e, a)
}
})
Expand Down Expand Up @@ -1096,7 +1096,7 @@ func TestShouldRebalanceDiversity(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

options := &RangeCountScorerOptions{StoreHealthOptions: StoreHealthOptions{EnforcementLevel: StoreHealthNoAction}}
options := &RangeCountScorerOptions{StoreHealthOptions: StoreHealthOptions{EnforcementLevel: IOOverloadThresholdNoAction}}
newStore := func(id int, locality roachpb.Locality) roachpb.StoreDescriptor {
return roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(id),
Expand Down
169 changes: 82 additions & 87 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go

Large diffs are not rendered by default.

39 changes: 24 additions & 15 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,26 +558,34 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string {
// storeGossipUpdate is the Gossip callback used to keep the StorePool up to date.
func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) {
var storeDesc roachpb.StoreDescriptor
// We keep copies of the capacity and storeID to pass into the
// capacityChanged callback.
var oldCapacity, curCapacity roachpb.StoreCapacity
var storeID roachpb.StoreID

if err := content.GetProto(&storeDesc); err != nil {
ctx := sp.AnnotateCtx(context.TODO())
log.Errorf(ctx, "%v", err)
return
}
storeID = storeDesc.StoreID
curCapacity = storeDesc.Capacity

sp.storeDescriptorUpdate(storeDesc)
}

// storeDescriptorUpdate takes a store descriptor and updates the corresponding
// details for the store in the storepool.
func (sp *StorePool) storeDescriptorUpdate(storeDesc roachpb.StoreDescriptor) {
// We keep copies of the capacity and storeID to pass into the
// capacityChanged callback.
var oldCapacity roachpb.StoreCapacity
storeID := storeDesc.StoreID
curCapacity := storeDesc.Capacity

now := sp.clock.PhysicalTime()

sp.DetailsMu.Lock()
detail := sp.GetStoreDetailLocked(storeID)
if detail.Desc != nil {
oldCapacity = detail.Desc.Capacity
}
detail.Desc = &storeDesc
detail.LastUpdatedTime = sp.clock.PhysicalTime()
detail.LastUpdatedTime = now
sp.DetailsMu.Unlock()

sp.localitiesMu.Lock()
Expand Down Expand Up @@ -749,8 +757,7 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer(
}
}

// newStoreDetail makes a new StoreDetail struct. It sets index to be -1 to
// ensure that it will be processed by a queue immediately.
// newStoreDetail makes a new StoreDetail struct.
func newStoreDetail() *StoreDetail {
return &StoreDetail{}
}
Expand Down Expand Up @@ -1063,9 +1070,9 @@ type StoreList struct {
// eligible to be rebalance targets.
candidateWritesPerSecond Stat

// candidateWritesPerSecond tracks L0 sub-level stats for Stores that are
// eligible to be rebalance targets.
CandidateL0Sublevels Stat
// CandidateIOOverloadScores tracks the IO overload stats for Stores that are
// eligible to be rebalance candidates.
CandidateIOOverloadScores Stat
}

// MakeStoreList constructs a new store list based on the passed in descriptors.
Expand All @@ -1080,8 +1087,9 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList {
sl.candidateLogicalBytes.update(float64(desc.Capacity.LogicalBytes))
sl.CandidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond)
sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond)
sl.CandidateL0Sublevels.update(float64(desc.Capacity.L0Sublevels))
sl.CandidateCPU.update(desc.Capacity.CPUPerSecond)
score, _ := desc.Capacity.IOThreshold.Score()
sl.CandidateIOOverloadScores.update(score)
}
return sl
}
Expand All @@ -1102,12 +1110,13 @@ func (sl StoreList) String() string {
fmt.Fprintf(&buf, " <no candidates>")
}
for _, desc := range sl.Stores {
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s l0-sublevels=%d\n",
ioScore, _ := desc.Capacity.IOThreshold.Score()
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s io-overload=%.2f\n",
desc.StoreID, desc.Capacity.RangeCount,
desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes),
desc.Capacity.QueriesPerSecond,
humanizeutil.Duration(time.Duration(int64(desc.Capacity.CPUPerSecond))),
desc.Capacity.L0Sublevels,
ioScore,
)
}
return buf.String()
Expand Down
13 changes: 6 additions & 7 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2607,9 +2607,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
// L0SublevelsMax. this is not exported to as metric.
sm.l0SublevelsTracker.swag = slidingwindow.NewMaxSwag(
timeutil.Now(),
allocatorimpl.L0SublevelInterval,
// 5 sliding windows, by the default interval (2 mins) will track the
// maximum for up to 10 minutes. Selected experimentally.
// Use 5 sliding windows, so the retention period is divided by 5 to
// calculate the interval of the sliding window buckets.
allocatorimpl.L0SublevelTrackerRetention/5,
5,
)
}
Expand Down Expand Up @@ -2693,17 +2693,16 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
sm.DiskStalled.Update(m.DiskStallCount)
sm.SharedStorageBytesRead.Update(m.SharedStorageReadBytes)
sm.SharedStorageBytesWritten.Update(m.SharedStorageWriteBytes)

sm.RdbL0Sublevels.Update(int64(m.Levels[0].Sublevels))
sm.RdbL0NumFiles.Update(m.Levels[0].NumFiles)
sm.RdbL0BytesFlushed.Update(int64(m.Levels[0].BytesFlushed))
// Update the maximum number of L0 sub-levels seen.
sm.l0SublevelsTracker.Lock()
sm.l0SublevelsTracker.swag.Record(timeutil.Now(), float64(m.Levels[0].Sublevels))
curMax, _ := sm.l0SublevelsTracker.swag.Query(timeutil.Now())
sm.l0SublevelsTracker.Unlock()
syncutil.StoreFloat64(&sm.l0SublevelsWindowedMax, curMax)

sm.RdbL0Sublevels.Update(int64(m.Levels[0].Sublevels))
sm.RdbL0NumFiles.Update(m.Levels[0].NumFiles)
sm.RdbL0BytesFlushed.Update(int64(m.Levels[0].BytesFlushed))
for level, stats := range m.Levels {
sm.RdbBytesIngested[level].Update(int64(stats.BytesIngested))
sm.RdbLevelSize[level].Update(stats.Size)
Expand Down
23 changes: 17 additions & 6 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -60,7 +61,12 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
LogicalBytes: 30,
QueriesPerSecond: 100,
WritesPerSecond: 30,
L0Sublevels: 4,
IOThreshold: admissionpb.IOThreshold{
L0NumSubLevels: 5,
L0NumSubLevelsThreshold: 20,
L0NumFiles: 5,
L0NumFilesThreshold: 1000,
},
},
},
{
Expand All @@ -74,7 +80,12 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
LogicalBytes: 25,
QueriesPerSecond: 50,
WritesPerSecond: 25,
L0Sublevels: 8,
IOThreshold: admissionpb.IOThreshold{
L0NumSubLevels: 10,
L0NumSubLevelsThreshold: 20,
L0NumFiles: 10,
L0NumFilesThreshold: 1000,
},
},
},
}
Expand Down Expand Up @@ -130,8 +141,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
if expectedWPS := 30 + WPS; desc.Capacity.WritesPerSecond != expectedWPS {
t.Errorf("expected WritesPerSecond %f, but got %f", expectedWPS, desc.Capacity.WritesPerSecond)
}
if expectedL0Sublevels := int64(4); desc.Capacity.L0Sublevels != expectedL0Sublevels {
t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedL0Sublevels, desc.Capacity.L0Sublevels)
if expectedNumL0Sublevels := int64(5); desc.Capacity.IOThreshold.L0NumSubLevels != expectedNumL0Sublevels {
t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedNumL0Sublevels, desc.Capacity.IOThreshold.L0NumFiles)
}

sp.UpdateLocalStoreAfterRebalance(roachpb.StoreID(2), rangeUsageInfo, roachpb.REMOVE_VOTER)
Expand All @@ -151,8 +162,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
if expectedWPS := 25 - WPS; desc.Capacity.WritesPerSecond != expectedWPS {
t.Errorf("expected WritesPerSecond %f, but got %f", expectedWPS, desc.Capacity.WritesPerSecond)
}
if expectedL0Sublevels := int64(8); desc.Capacity.L0Sublevels != expectedL0Sublevels {
t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedL0Sublevels, desc.Capacity.L0Sublevels)
if expectedNumL0Sublevels := int64(10); desc.Capacity.IOThreshold.L0NumSubLevels != expectedNumL0Sublevels {
t.Errorf("expected L0 Sub-Levels %d, but got %d", expectedNumL0Sublevels, desc.Capacity.IOThreshold.L0NumFiles)
}

sp.UpdateLocalStoresAfterLeaseTransfer(roachpb.StoreID(1), roachpb.StoreID(2), rangeUsageInfo)
Expand Down
Loading

0 comments on commit 4b11002

Please sign in to comment.