Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
119433: kvserver,roachpb: introduce max io overload score r=andrewbaptist a=kvoli

The allocator uses the IO threshold score to determine replicas to
filter for rebalancing and lease transfers. Previously, this score would
be the last instantaneous value gossiped. As part of #118866, leases
will be shed upon encountering overload, in addition to the existing
behavior of blocking lease transfers. To avoid leases shedding quickly
from one node to another, consider the maximum IO threshold score
instead.

Note that the use of the maximum IO overload score is gated behind a
cluster version gate. Without the version gate it would be possible for
an upgraded node to continually shed all of its leases based off of the
maximum IO overload score, whilst prior version nodes transfer the lease
back based off the instantaneous.

---

This commit introduces `IOThresholdMax` as a field on `StoreCapacity`.
IOThresholdMax holds the store's maximum IO threshold over the last 5
minutes.

Part of: #118866
Epic: none
Release note: None


120230: roachtest: add new prefixless backup fixtures r=dt a=msbutler

This patch adds two new backupFixture generators that create 400GB and 8TB non-revision history prefixless backups to be used for online restore scale testing.

Epic: None

Release note: none

120434: c2c: prevent NPE in SHOW TENANT WITH REPLICATION STATUS r=stevendanna a=msbutler

If the user ran SHOW TENANT ... WITH REPLICATION STATUS before the stream ingestion job pts was set, a null pointer exception would occur and crash the node. This patch fixes this bug.

Epic: None

Release note: none

Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
3 people committed Mar 15, 2024
4 parents 8c8d633 + fe5e53a + f2b7263 + 78b3141 commit f2985c3
Show file tree
Hide file tree
Showing 19 changed files with 232 additions and 85 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,4 @@ trace.snapshot.rate duration 0s if non-zero, interval at which background trace
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez application
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. application
ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application
version version 1000023.2-upgrading-to-1000024.1-step-018 set the active cluster version in the format '<major>.<minor>' application
version version 1000023.2-upgrading-to-1000024.1-step-020 set the active cluster version in the format '<major>.<minor>' application
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,6 @@
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-display-timezone" class="anchored"><code>ui.display_timezone</code></div></td><td>enumeration</td><td><code>etc/utc</code></td><td>the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.2-upgrading-to-1000024.1-step-018</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.2-upgrading-to-1000024.1-step-020</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ const (
// database to be SURVIVE ZONE.
V24_1_SystemDatabaseSurvivability

// V24_1_GossipMaximumIOOverload is the version at which stores begin
// populating the store capacity field IOThresholdMax. The field shouldn't be
// used for allocator decisions before then.
V24_1_GossipMaximumIOOverload

numKeys
)

Expand Down Expand Up @@ -361,6 +366,7 @@ var versionTable = [numKeys]roachpb.Version{
V24_1_SessionBasedLeasingUpgradeDescriptor: {Major: 23, Minor: 2, Internal: 14},
V24_1_PebbleFormatSyntheticPrefixSuffix: {Major: 23, Minor: 2, Internal: 16},
V24_1_SystemDatabaseSurvivability: {Major: 23, Minor: 2, Internal: 18},
V24_1_GossipMaximumIOOverload: {Major: 23, Minor: 2, Internal: 20},
}

// Latest is always the highest version key. This is the maximum logical cluster
Expand Down
50 changes: 48 additions & 2 deletions pkg/cmd/roachtest/tests/backup_fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type backupFixtureSpecs struct {
}

func (bf *backupFixtureSpecs) initTestName() {
bf.testName = "backupFixture/" + bf.scheduledBackupSpecs.workload.String() + "/" + bf.scheduledBackupSpecs.cloud
bf.testName = fmt.Sprintf("backupFixture/%s/revision-history=%t/%s", bf.scheduledBackupSpecs.workload.String(), !bf.scheduledBackupSpecs.nonRevisionHistory, bf.scheduledBackupSpecs.cloud)
}

func makeBackupDriver(t test.Test, c cluster.Cluster, sp backupFixtureSpecs) backupDriver {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (bd *backupDriver) prepareCluster(ctx context.Context) {
if !bd.sp.scheduledBackupSpecs.ignoreExistingBackups {
// This check allows the roachtest to fail fast, instead of when the
// scheduled backup cmd is issued.
require.False(bd.t, bd.checkForExistingBackupCollection(ctx))
require.False(bd.t, bd.checkForExistingBackupCollection(ctx), fmt.Sprintf("existing backup in collection %s", bd.sp.scheduledBackupSpecs.backupCollection()))
}
}

Expand Down Expand Up @@ -269,6 +269,52 @@ func registerBackupFixtures(r registry.Registry) {
skip: "only for fixture generation",
suites: registry.Suites(registry.Nightly),
},
{
// 400GB backup fixture, no revision history, with 48 incremental layers.
// This will used by the online restore roachtests. During 24.2
// development, we can use it to enable OR of incremental backups.
hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true}),
scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{
backupSpecs: backupSpecs{
version: fixtureFromMasterVersion,
nonRevisionHistory: true,
},
}),
timeout: 5 * time.Hour,
initWorkloadViaRestore: &restoreSpecs{
backup: backupSpecs{
version: fixtureFromMasterVersion,
numBackupsInChain: 48,
},
restoreUptoIncremental: 12,
},
skip: "only for fixture generation",
suites: registry.Suites(registry.Nightly),
},
{
// 8TB backup fixture, no revision history, with 48 incremental layers.
// This will used by the online restore roachtests. During 24.2
// development, we can use it to enable OR of incremental backups.
hardware: makeHardwareSpecs(hardwareSpecs{nodes: 10, volumeSize: 1500, workloadNode: true}),
scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{
backupSpecs: backupSpecs{
version: fixtureFromMasterVersion,
nonRevisionHistory: true,
workload: tpceRestore{customers: 500000},
},
}),
timeout: 23 * time.Hour,
initWorkloadViaRestore: &restoreSpecs{
backup: backupSpecs{
version: "v23.1.11",
numBackupsInChain: 48,
nonRevisionHistory: true,
},
restoreUptoIncremental: 12,
},
skip: "only for fixture generation",
suites: registry.Suites(registry.Weekly),
},
{
// 15 GB backup fixture with 48 incremental layers. This is used by
// restore/tpce/15GB/aws/nodes=4/cpus=8. Runs weekly to catch any
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ go_library(
"//pkg/util/quotapool",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/slidingwindow",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/syncutil/singleflight",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/allocator",
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load"
Expand Down Expand Up @@ -2105,7 +2106,6 @@ func (a *Allocator) nonIOOverloadedLeaseTargets(
}

sl, _, _ := storePool.GetStoreListFromIDs(replDescsToStoreIDs(existingReplicas), storepool.StoreFilterSuspect)
avgIOOverload := sl.CandidateIOOverloadScores.Mean

for _, replDesc := range existingReplicas {
store, ok := sl.FindStoreByID(replDesc.StoreID)
Expand All @@ -2120,15 +2120,15 @@ func (a *Allocator) nonIOOverloadedLeaseTargets(
// Instead, we create a buffer between the two to avoid leases moving back
// and forth.
if (replDesc.StoreID == leaseStoreID) &&
(!ok || !ioOverloadOptions.existingLeaseCheck(ctx, store, avgIOOverload)) {
(!ok || !ioOverloadOptions.existingLeaseCheck(ctx, store, sl)) {
continue
}

// If the replica is not the leaseholder, don't include it as a candidate
// if it is filtered out similar to above, or the replica store doesn't
// pass the lease transfer IO overload check.
if replDesc.StoreID != leaseStoreID &&
(!ok || !ioOverloadOptions.transferLeaseToCheck(ctx, store, avgIOOverload)) {
(!ok || !ioOverloadOptions.transferLeaseToCheck(ctx, store, sl)) {
continue
}

Expand All @@ -2148,7 +2148,6 @@ func (a *Allocator) leaseholderShouldMoveDueToIOOverload(
ioOverloadOptions IOOverloadOptions,
) bool {
sl, _, _ := storePool.GetStoreListFromIDs(replDescsToStoreIDs(existingReplicas), storepool.StoreFilterSuspect)
avgIOOverload := sl.CandidateIOOverloadScores.Mean

// Check the existing replicas for the leaseholder, if it doesn't meet the
// check return that the lease should be moved due to IO overload on the
Expand All @@ -2157,7 +2156,7 @@ func (a *Allocator) leaseholderShouldMoveDueToIOOverload(
// overloaded.
for _, replDesc := range existingReplicas {
if store, ok := sl.FindStoreByID(replDesc.StoreID); ok && replDesc.StoreID == leaseStoreID {
return !ioOverloadOptions.existingLeaseCheck(ctx, store, avgIOOverload)
return !ioOverloadOptions.existingLeaseCheck(ctx, store, sl)
}
}

Expand Down Expand Up @@ -2232,6 +2231,7 @@ func (a *Allocator) IOOverloadOptions() IOOverloadOptions {
return IOOverloadOptions{
ReplicaEnforcementLevel: IOOverloadEnforcementLevel(ReplicaIOOverloadThresholdEnforcement.Get(&a.st.SV)),
LeaseEnforcementLevel: IOOverloadEnforcementLevel(LeaseIOOverloadThresholdEnforcement.Get(&a.st.SV)),
UseIOThresholdMax: a.st.Version.IsActive(context.Background(), clusterversion.V24_1_GossipMaximumIOOverload),
ReplicaIOOverloadThreshold: ReplicaIOOverloadThreshold.Get(&a.st.SV),
LeaseIOOverloadThreshold: LeaseIOOverloadThreshold.Get(&a.st.SV),
LeaseIOOverloadShedThreshold: LeaseIOOverloadShedThreshold.Get(&a.st.SV),
Expand Down
47 changes: 37 additions & 10 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ func rankedCandidateListForAllocation(
!options.getIOOverloadOptions().allocateReplicaToCheck(
ctx,
s,
candidateStores.CandidateIOOverloadScores.Mean,
candidateStores,
) {
continue
}
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func rankedCandidateListForRebalancing(
s,
// We only wish to compare the IO overload to the
// comparable stores average and not the cluster.
comparable.candidateSL.CandidateIOOverloadScores.Mean,
comparable.candidateSL,
)
cand.balanceScore = options.balanceScore(comparable.candidateSL, s.Capacity)
cand.convergesScore = options.rebalanceToConvergesScore(comparable, s)
Expand Down Expand Up @@ -2381,6 +2381,11 @@ type IOOverloadOptions struct {
ReplicaEnforcementLevel IOOverloadEnforcementLevel
LeaseEnforcementLevel IOOverloadEnforcementLevel

// TODO(kvoli): Remove this max protection check after 25.1. In mixed version
// clusters, the max IO score is not populated on pre v24.1 nodes. Use the
// instantaneous value.
UseIOThresholdMax bool

ReplicaIOOverloadThreshold float64
LeaseIOOverloadThreshold float64
LeaseIOOverloadShedThreshold float64
Expand Down Expand Up @@ -2411,13 +2416,32 @@ func ioOverloadCheck(
return true, ""
}

func (o IOOverloadOptions) storeScore(store roachpb.StoreDescriptor) float64 {
var score float64
if o.UseIOThresholdMax {
score, _ = store.Capacity.IOThresholdMax.Score()
} else {
score, _ = store.Capacity.IOThreshold.Score()
}

return score
}

func (o IOOverloadOptions) storeListAvgScore(storeList storepool.StoreList) float64 {
if o.UseIOThresholdMax {
return storeList.CandidateMaxIOOverloadScores.Mean
}
return storeList.CandidateIOOverloadScores.Mean
}

// allocateReplicaToCheck returns true if the store IO overload does not exceed
// the cluster threshold and mean, or the enforcement level does not prevent
// replica allocation to IO overloaded stores.
func (o IOOverloadOptions) allocateReplicaToCheck(
ctx context.Context, store roachpb.StoreDescriptor, avg float64,
ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList,
) bool {
score, _ := store.Capacity.IOThreshold.Score()
score := o.storeScore(store)
avg := o.storeListAvgScore(storeList)

if ok, reason := ioOverloadCheck(score, avg,
o.ReplicaIOOverloadThreshold, IOOverloadMeanThreshold,
Expand All @@ -2435,9 +2459,10 @@ func (o IOOverloadOptions) allocateReplicaToCheck(
// exceed the cluster threshold and mean, or the enforcement level does not
// prevent replica rebalancing to IO overloaded stores.
func (o IOOverloadOptions) rebalanceReplicaToCheck(
ctx context.Context, store roachpb.StoreDescriptor, avg float64,
ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList,
) bool {
score, _ := store.Capacity.IOThreshold.Score()
score := o.storeScore(store)
avg := o.storeListAvgScore(storeList)

if ok, reason := ioOverloadCheck(score, avg,
o.ReplicaIOOverloadThreshold, IOOverloadMeanThreshold,
Expand All @@ -2454,9 +2479,10 @@ func (o IOOverloadOptions) rebalanceReplicaToCheck(
// the cluster threshold and mean, or the enforcement level does not prevent
// lease transfers to IO overloaded stores.
func (o IOOverloadOptions) transferLeaseToCheck(
ctx context.Context, store roachpb.StoreDescriptor, avg float64,
ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList,
) bool {
score, _ := store.Capacity.IOThreshold.Score()
score := o.storeScore(store)
avg := o.storeListAvgScore(storeList)

if ok, reason := ioOverloadCheck(score, avg,
o.LeaseIOOverloadThreshold, IOOverloadMeanThreshold,
Expand All @@ -2474,9 +2500,10 @@ func (o IOOverloadOptions) transferLeaseToCheck(
// the cluster threshold and mean, or the enforcement level does not prevent
// existing stores from holidng leases whilst being IO overloaded.
func (o IOOverloadOptions) existingLeaseCheck(
ctx context.Context, store roachpb.StoreDescriptor, avg float64,
ctx context.Context, store roachpb.StoreDescriptor, storeList storepool.StoreList,
) bool {
score, _ := store.Capacity.IOThreshold.Score()
score := o.storeScore(store)
avg := o.storeListAvgScore(storeList)

if ok, reason := ioOverloadCheck(score, avg,
o.LeaseIOOverloadShedThreshold, IOOverloadMeanThreshold,
Expand Down
Loading

0 comments on commit f2985c3

Please sign in to comment.