Skip to content

Commit

Permalink
allocator: use max io threshold score instead of instantaneous
Browse files Browse the repository at this point in the history
The allocator uses the IO threshold score to determine replicas to
filter for rebalancing and lease transfers. Previously, this score would
be the last instantaneous value gossiped. As part of #118866, leases
will be shed upon encountering overload, in addition to the existing
behavior of blocking lease transfers. To avoid leases shedding quickly
from one node to another, consider the maximum IO threshold score
instead.

Note that the use of the maximum IO overload score is gated behind a
cluster version gate. Without the version gate it would be possible for
an upgraded node to continually shed all of its leases based off of the
maximum IO overload score, whilst prior version nodes transfer the lease
back based off the instantaneous.

Part of: #118866
Release note: None
  • Loading branch information
kvoli committed Mar 12, 2024
1 parent b3dec44 commit fe5e53a
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 80 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,4 @@ trace.snapshot.rate duration 0s if non-zero, interval at which background trace
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez application
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. application
ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application
version version 1000023.2-upgrading-to-1000024.1-step-018 set the active cluster version in the format '<major>.<minor>' application
version version 1000023.2-upgrading-to-1000024.1-step-020 set the active cluster version in the format '<major>.<minor>' application
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,6 @@
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-display-timezone" class="anchored"><code>ui.display_timezone</code></div></td><td>enumeration</td><td><code>etc/utc</code></td><td>the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.2-upgrading-to-1000024.1-step-018</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.2-upgrading-to-1000024.1-step-020</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ const (
// database to be SURVIVE ZONE.
V24_1_SystemDatabaseSurvivability

// V24_1_GossipMaximumIOOverload is the version at which stores begin
// populating the store capacity field IOThresholdMax. The field shouldn't be
// used for allocator decisions before then.
V24_1_GossipMaximumIOOverload

numKeys
)

Expand Down Expand Up @@ -361,6 +366,7 @@ var versionTable = [numKeys]roachpb.Version{
V24_1_SessionBasedLeasingUpgradeDescriptor: {Major: 23, Minor: 2, Internal: 14},
V24_1_PebbleFormatSyntheticPrefixSuffix: {Major: 23, Minor: 2, Internal: 16},
V24_1_SystemDatabaseSurvivability: {Major: 23, Minor: 2, Internal: 18},
V24_1_GossipMaximumIOOverload: {Major: 23, Minor: 2, Internal: 20},
}

// Latest is always the highest version key. This is the maximum logical cluster
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/allocator",
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load"
Expand Down Expand Up @@ -2105,7 +2106,6 @@ func (a *Allocator) nonIOOverloadedLeaseTargets(
}

sl, _, _ := storePool.GetStoreListFromIDs(replDescsToStoreIDs(existingReplicas), storepool.StoreFilterSuspect)
avgIOOverload := sl.CandidateIOOverloadScores.Mean

for _, replDesc := range existingReplicas {
store, ok := sl.FindStoreByID(replDesc.StoreID)
Expand All @@ -2120,15 +2120,15 @@ func (a *Allocator) nonIOOverloadedLeaseTargets(
// Instead, we create a buffer between the two to avoid leases moving back
// and forth.
if (replDesc.StoreID == leaseStoreID) &&
(!ok || !ioOverloadOptions.existingLeaseCheck(ctx, store, avgIOOverload)) {
(!ok || !ioOverloadOptions.existingLeaseCheck(ctx, store, sl)) {
continue
}

// If the replica is not the leaseholder, don't include it as a candidate
// if it is filtered out similar to above, or the replica store doesn't
// pass the lease transfer IO overload check.
if replDesc.StoreID != leaseStoreID &&
(!ok || !ioOverloadOptions.transferLeaseToCheck(ctx, store, avgIOOverload)) {
(!ok || !ioOverloadOptions.transferLeaseToCheck(ctx, store, sl)) {
continue
}

Expand All @@ -2148,7 +2148,6 @@ func (a *Allocator) leaseholderShouldMoveDueToIOOverload(
ioOverloadOptions IOOverloadOptions,
) bool {
sl, _, _ := storePool.GetStoreListFromIDs(replDescsToStoreIDs(existingReplicas), storepool.StoreFilterSuspect)
avgIOOverload := sl.CandidateIOOverloadScores.Mean

// Check the existing replicas for the leaseholder, if it doesn't meet the
// check return that the lease should be moved due to IO overload on the
Expand All @@ -2157,7 +2156,7 @@ func (a *Allocator) leaseholderShouldMoveDueToIOOverload(
// overloaded.
for _, replDesc := range existingReplicas {
if store, ok := sl.FindStoreByID(replDesc.StoreID); ok && replDesc.StoreID == leaseStoreID {
return !ioOverloadOptions.existingLeaseCheck(ctx, store, avgIOOverload)
return !ioOverloadOptions.existingLeaseCheck(ctx, store, sl)
}
}

Expand Down Expand Up @@ -2232,6 +2231,7 @@ func (a *Allocator) IOOverloadOptions() IOOverloadOptions {
return IOOverloadOptions{
ReplicaEnforcementLevel: IOOverloadEnforcementLevel(ReplicaIOOverloadThresholdEnforcement.Get(&a.st.SV)),
LeaseEnforcementLevel: IOOverloadEnforcementLevel(LeaseIOOverloadThresholdEnforcement.Get(&a.st.SV)),
UseIOThresholdMax: a.st.Version.IsActive(context.Background(), clusterversion.V24_1_GossipMaximumIOOverload),
ReplicaIOOverloadThreshold: ReplicaIOOverloadThreshold.Get(&a.st.SV),
LeaseIOOverloadThreshold: LeaseIOOverloadThreshold.Get(&a.st.SV),
LeaseIOOverloadShedThreshold: LeaseIOOverloadShedThreshold.Get(&a.st.SV),
Expand Down
47 changes: 37 additions & 10 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ func rankedCandidateListForAllocation(
!options.getIOOverloadOptions().allocateReplicaToCheck(
ctx,
s,
candidateStores.CandidateIOOverloadScores.Mean,
candidateStores,
) {
continue
}
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func rankedCandidateListForRebalancing(
s,
// We only wish to compare the IO overload to the
// comparable stores average and not the cluster.
comparable.candidateSL.CandidateIOOverloadScores.Mean,
comparable.candidateSL,
)
cand.balanceScore = options.balanceScore(comparable.candidateSL, s.Capacity)
cand.convergesScore = options.rebalanceToConvergesScore(comparable, s)
Expand Down Expand Up @@ -2381,6 +2381,11 @@ type IOOverloadOptions struct {
ReplicaEnforcementLevel IOOverloadEnforcementLevel
LeaseEnforcementLevel IOOverloadEnforcementLevel

// TODO(kvoli): Remove this max protection check after 25.1. In mixed version
// clusters, the max IO score is not populated on pre v24.1 nodes. Use the
// instantaneous value.
UseIOThresholdMax bool

ReplicaIOOverloadThreshold float64
LeaseIOOverloadThreshold float64
LeaseIOOverloadShedThreshold float64
Expand Down Expand Up @@ -2411,13 +2416,32 @@ func ioOverloadCheck(
return true, ""
}

func (o IOOverloadOptions) storeScore(store roachpb.StoreDescriptor) float64 {
var score float64
if o.UseIOThresholdMax {
score, _ = store.Capacity.IOThresholdMax.Score()
} else {
score, _ = store.Capacity.IOThreshold.Score()
}

return score
}

func (o IOOverloadOptions) storeListAvgScore(storeList storepool.StoreList) float64 {
if o.UseIOThresholdMax {
return storeList.CandidateMaxIOOverloadScores.Mean
}
return storeList.CandidateIOOverloadScores.Mean
}

// allocateReplicaToCheck returns true if the store IO overload does not exceed
// the cluster threshold and mean, or the enforcement level does not prevent
// replica allocation to IO overloaded stores.
func (o IOOverloadOptions) allocateReplicaToCheck(
ctx context.Context, store roachpb.StoreDescriptor, avg float64,
ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList,
) bool {
score, _ := store.Capacity.IOThreshold.Score()
score := o.storeScore(store)
avg := o.storeListAvgScore(storeList)

if ok, reason := ioOverloadCheck(score, avg,
o.ReplicaIOOverloadThreshold, IOOverloadMeanThreshold,
Expand All @@ -2435,9 +2459,10 @@ func (o IOOverloadOptions) allocateReplicaToCheck(
// exceed the cluster threshold and mean, or the enforcement level does not
// prevent replica rebalancing to IO overloaded stores.
func (o IOOverloadOptions) rebalanceReplicaToCheck(
ctx context.Context, store roachpb.StoreDescriptor, avg float64,
ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList,
) bool {
score, _ := store.Capacity.IOThreshold.Score()
score := o.storeScore(store)
avg := o.storeListAvgScore(storeList)

if ok, reason := ioOverloadCheck(score, avg,
o.ReplicaIOOverloadThreshold, IOOverloadMeanThreshold,
Expand All @@ -2454,9 +2479,10 @@ func (o IOOverloadOptions) rebalanceReplicaToCheck(
// the cluster threshold and mean, or the enforcement level does not prevent
// lease transfers to IO overloaded stores.
func (o IOOverloadOptions) transferLeaseToCheck(
ctx context.Context, store roachpb.StoreDescriptor, avg float64,
ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList,
) bool {
score, _ := store.Capacity.IOThreshold.Score()
score := o.storeScore(store)
avg := o.storeListAvgScore(storeList)

if ok, reason := ioOverloadCheck(score, avg,
o.LeaseIOOverloadThreshold, IOOverloadMeanThreshold,
Expand All @@ -2474,9 +2500,10 @@ func (o IOOverloadOptions) transferLeaseToCheck(
// the cluster threshold and mean, or the enforcement level does not prevent
// existing stores from holidng leases whilst being IO overloaded.
func (o IOOverloadOptions) existingLeaseCheck(
ctx context.Context, store roachpb.StoreDescriptor, avg float64,
ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList,
) bool {
score, _ := store.Capacity.IOThreshold.Score()
score := o.storeScore(store)
avg := o.storeListAvgScore(storeList)

if ok, reason := ioOverloadCheck(score, avg,
o.LeaseIOOverloadShedThreshold, IOOverloadMeanThreshold,
Expand Down
44 changes: 24 additions & 20 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,86 +349,86 @@ var oneStoreHighIOOverload = []*roachpb.StoreDescriptor{
{
StoreID: 1,
Node: roachpb.NodeDescriptor{NodeID: 1},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 5)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 5)},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{NodeID: 2},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1800, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 5)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1800, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 5)},
},
{
StoreID: 3,
Node: roachpb.NodeDescriptor{NodeID: 3},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 5)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 5)},
},
{
StoreID: 4,
Node: roachpb.NodeDescriptor{NodeID: 4},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 5)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 5)},
},
}

var allStoresHighIOOverload = []*roachpb.StoreDescriptor{
{
StoreID: 1,
Node: roachpb.NodeDescriptor{NodeID: 1},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 1)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 1)},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{NodeID: 2},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 800, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 1)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 800, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 1)},
},
{
StoreID: 3,
Node: roachpb.NodeDescriptor{NodeID: 3},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 1)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 1)},
},
}

var allStoresHighIOOverloadSkewed = []*roachpb.StoreDescriptor{
{
StoreID: 1,
Node: roachpb.NodeDescriptor{NodeID: 1},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 1)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 1)},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{NodeID: 2},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 800, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 50)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 800, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 50)},
},
{
StoreID: 3,
Node: roachpb.NodeDescriptor{NodeID: 3},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 55)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 55)},
},
}

var threeStoresHighIOOverloadAscRangeCount = []*roachpb.StoreDescriptor{
{
StoreID: 1,
Node: roachpb.NodeDescriptor{NodeID: 1},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 100, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 10)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 100, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 10)},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{NodeID: 2},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 400, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 10)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 400, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 10)},
},
{
StoreID: 3,
Node: roachpb.NodeDescriptor{NodeID: 3},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1600, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 10)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1600, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold + 10)},
},
{
StoreID: 4,
Node: roachpb.NodeDescriptor{NodeID: 4},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 6400, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 10)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 6400, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 10)},
},
{
StoreID: 5,
Node: roachpb.NodeDescriptor{NodeID: 5},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 25000, IOThreshold: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 10)},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 25000, IOThresholdMax: TestingIOThresholdWithScore(DefaultReplicaIOOverloadThreshold - 10)},
},
}

Expand Down Expand Up @@ -2081,8 +2081,8 @@ func TestAllocatorTransferLeaseTargetIOOverloadCheck(t *testing.T) {
StoreID: roachpb.StoreID(i + 1),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i + 1)},
Capacity: roachpb.StoreCapacity{
LeaseCount: int32(tc.leaseCounts[i]),
IOThreshold: TestingIOThresholdWithScore(tc.IOScores[i]),
LeaseCount: int32(tc.leaseCounts[i]),
IOThresholdMax: TestingIOThresholdWithScore(tc.IOScores[i]),
},
}
}
Expand Down Expand Up @@ -2912,8 +2912,8 @@ func TestAllocatorShouldTransferLeaseIOOverload(t *testing.T) {
StoreID: roachpb.StoreID(i + 1),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i + 1)},
Capacity: roachpb.StoreCapacity{
LeaseCount: int32(tc.leaseCounts[i]),
IOThreshold: TestingIOThresholdWithScore(tc.IOScores[i]),
LeaseCount: int32(tc.leaseCounts[i]),
IOThresholdMax: TestingIOThresholdWithScore(tc.IOScores[i]),
},
}
}
Expand Down Expand Up @@ -4698,7 +4698,11 @@ func TestAllocatorRebalanceIOOverloadCheck(t *testing.T) {
sg.GossipStores(test.stores, t)
// Enable read disk health checking in candidate exclusion.
options := a.ScorerOptions(ctx)
options.IOOverloadOptions = IOOverloadOptions{ReplicaEnforcementLevel: test.enforcement, ReplicaIOOverloadThreshold: 1}
options.IOOverloadOptions = IOOverloadOptions{
ReplicaEnforcementLevel: test.enforcement,
ReplicaIOOverloadThreshold: 1,
UseIOThresholdMax: true,
}
add, remove, _, ok := a.RebalanceVoter(
ctx,
sp,
Expand Down
Loading

0 comments on commit fe5e53a

Please sign in to comment.