Skip to content

Commit

Permalink
Merge #86249 #88458
Browse files Browse the repository at this point in the history
86249: asim: simulate store rebalancer r=lidorcarmel a=kvoli

This PR refactors the store rebalancer and adjacent components, without introducing behavioral changes. The aim is to separate and moduralize it's respective parts, for simulaton. A simulator store rebalancer is then added, which uses these refactored parts.

resolves #82631

Release justification: low risk, no behavior changes with the purpose of testing the existing rebalancing system.

Release note: None

88458: admission: fix some code comments r=irfansharif a=irfansharif

Release note: None

Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
3 people committed Sep 22, 2022
3 parents 30ba7c3 + 25e869d + ecbfb98 commit fab3305
Show file tree
Hide file tree
Showing 48 changed files with 3,995 additions and 1,107 deletions.
15 changes: 15 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ ALL_TESTS = [
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test",
"//pkg/kv/kvserver/allocator/storepool:storepool_test",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/op:operator_test",
"//pkg/kv/kvserver/asim/queue:queue_test",
"//pkg/kv/kvserver/asim/state:state_test",
"//pkg/kv/kvserver/asim/storerebalancer:storerebalancer_test",
"//pkg/kv/kvserver/asim/workload:workload_test",
"//pkg/kv/kvserver/asim:asim_test",
"//pkg/kv/kvserver/batcheval/result:result_test",
Expand Down Expand Up @@ -1099,8 +1103,16 @@ GO_TARGETS = [
"//pkg/kv/kvserver/apply:apply",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim/config:config",
"//pkg/kv/kvserver/asim/op:op",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/op:operator",
"//pkg/kv/kvserver/asim/op:operator_test",
"//pkg/kv/kvserver/asim/queue:queue",
"//pkg/kv/kvserver/asim/queue:queue_test",
"//pkg/kv/kvserver/asim/state:state",
"//pkg/kv/kvserver/asim/state:state_test",
"//pkg/kv/kvserver/asim/storerebalancer:storerebalancer",
"//pkg/kv/kvserver/asim/storerebalancer:storerebalancer_test",
"//pkg/kv/kvserver/asim/workload:workload",
"//pkg/kv/kvserver/asim/workload:workload_test",
"//pkg/kv/kvserver/asim:asim",
Expand Down Expand Up @@ -2403,7 +2415,10 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/apply:get_x_data",
"//pkg/kv/kvserver/asim:get_x_data",
"//pkg/kv/kvserver/asim/config:get_x_data",
"//pkg/kv/kvserver/asim/op:get_x_data",
"//pkg/kv/kvserver/asim/queue:get_x_data",
"//pkg/kv/kvserver/asim/state:get_x_data",
"//pkg/kv/kvserver/asim/storerebalancer:get_x_data",
"//pkg/kv/kvserver/asim/workload:get_x_data",
"//pkg/kv/kvserver/batcheval:get_x_data",
"//pkg/kv/kvserver/batcheval/result:get_x_data",
Expand Down
23 changes: 13 additions & 10 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@ func (a *Allocator) TransferLeaseTarget(
GetFirstIndex() uint64
Desc() *roachpb.RangeDescriptor
},
stats *replicastats.ReplicaStats,
statSummary *replicastats.RatedSummary,
forceDecisionWithoutStats bool,
opts allocator.TransferLeaseOptions,
) roachpb.ReplicaDescriptor {
Expand Down Expand Up @@ -1746,7 +1746,7 @@ func (a *Allocator) TransferLeaseTarget(
// falls back to `leaseCountConvergence`. Rationalize this or refactor this
// logic to be more clear.
transferDec, repl := a.shouldTransferLeaseForAccessLocality(
ctx, source, existing, stats, nil, candidateLeasesMean,
ctx, source, existing, statSummary, nil, candidateLeasesMean,
)
if !excludeLeaseRepl {
switch transferDec {
Expand Down Expand Up @@ -1813,7 +1813,7 @@ func (a *Allocator) TransferLeaseTarget(
return candidates[a.randGen.Intn(len(candidates))]

case allocator.QPSConvergence:
leaseReplQPS, _ := stats.AverageRatePerSecond()
leaseReplQPS := statSummary.QPS
candidates := make([]roachpb.StoreID, 0, len(existing)-1)
for _, repl := range existing {
if repl.StoreID != leaseRepl.StoreID() {
Expand Down Expand Up @@ -1953,7 +1953,7 @@ func (a *Allocator) ShouldTransferLease(
GetFirstIndex() uint64
Desc() *roachpb.RangeDescriptor
},
stats *replicastats.ReplicaStats,
statSummary *replicastats.RatedSummary,
) bool {
if a.leaseholderShouldMoveDueToPreferences(ctx, conf, leaseRepl, existing) {
return true
Expand Down Expand Up @@ -1984,7 +1984,7 @@ func (a *Allocator) ShouldTransferLease(
ctx,
source,
existing,
stats,
statSummary,
nil,
sl.CandidateLeases.Mean,
)
Expand Down Expand Up @@ -2014,10 +2014,10 @@ func (a Allocator) FollowTheWorkloadPrefersLocal(
source roachpb.StoreDescriptor,
candidate roachpb.StoreID,
existing []roachpb.ReplicaDescriptor,
stats *replicastats.ReplicaStats,
statSummary *replicastats.RatedSummary,
) bool {
adjustments := make(map[roachpb.StoreID]float64)
decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, stats, adjustments, sl.CandidateLeases.Mean)
decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, statSummary, adjustments, sl.CandidateLeases.Mean)
if decision == decideWithoutStats {
return false
}
Expand All @@ -2035,13 +2035,15 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
ctx context.Context,
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
stats *replicastats.ReplicaStats,
statSummary *replicastats.RatedSummary,
rebalanceAdjustments map[roachpb.StoreID]float64,
candidateLeasesMean float64,
) (transferDecision, roachpb.ReplicaDescriptor) {
// Only use load-based rebalancing if it's enabled and we have both
// stats and locality information to base our decision on.
if stats == nil || !enableLoadBasedLeaseRebalancing.Get(&a.StorePool.St.SV) {
if statSummary == nil ||
statSummary.LocalityCounts == nil ||
!enableLoadBasedLeaseRebalancing.Get(&a.StorePool.St.SV) {
return decideWithoutStats, roachpb.ReplicaDescriptor{}
}
replicaLocalities := a.StorePool.GetLocalitiesByNode(existing)
Expand All @@ -2051,7 +2053,8 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
}
}

qpsStats, qpsStatsDur := stats.PerLocalityDecayingRate()
qpsStats := statSummary.LocalityCounts
qpsStatsDur := statSummary.Duration

// If we haven't yet accumulated enough data, avoid transferring for now,
// unless we've been explicitly asked otherwise. Do not fall back to the
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5414,7 +5414,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
replicationFactor: 3,
storeID: c.leaseholder,
},
c.stats,
c.stats.SnapshotRatedSummary(),
false,
allocator.TransferLeaseOptions{
ExcludeLeaseRepl: c.excludeLeaseRepl,
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/asim/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ go_library(
"asim.go",
"metrics_tracker.go",
"pacer.go",
"replicate_queue.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/op",
"//pkg/kv/kvserver/asim/queue",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/asim/storerebalancer",
"//pkg/kv/kvserver/asim/workload",
"//pkg/roachpb",
"//pkg/util/encoding/csv",
Expand All @@ -30,15 +30,13 @@ go_test(
"asim_test.go",
"metrics_tracker_test.go",
"pacer_test.go",
"replicate_queue_test.go",
],
args = ["-test.timeout=295s"],
embed = [":asim"],
deps = [
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/asim/workload",
"//pkg/roachpb",
"//pkg/testutils/skip",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
Expand Down
Loading

0 comments on commit fab3305

Please sign in to comment.