Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allocator: refactor StorePool usage to interface #91461

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/allocation_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type AllocationOp interface {
trackPlanningMetrics()
// applyImpact updates the given storepool to reflect the result of
// applying this operation.
applyImpact(storepool *storepool.StorePool)
applyImpact(storepool storepool.AllocatorStorePool)
// lhBeingRemoved returns true when the leaseholder is will be removed if
// this operation succeeds, otherwise false.
lhBeingRemoved() bool
Expand All @@ -49,7 +49,7 @@ func (o AllocationTransferLeaseOp) lhBeingRemoved() bool {
return true
}

func (o AllocationTransferLeaseOp) applyImpact(storepool *storepool.StorePool) {
func (o AllocationTransferLeaseOp) applyImpact(storepool storepool.AllocatorStorePool) {
// TODO(kvoli): Currently the local storepool is updated directly in the
// lease transfer call, rather than in this function. Move the storepool
// tracking from rq.TransferLease to this function once #89771 is merged.
Expand Down Expand Up @@ -89,7 +89,7 @@ func (o AllocationChangeReplicasOp) lhBeingRemoved() bool {

// applyEstimatedImpact updates the given storepool to reflect the result
// of applying this operation.
func (o AllocationChangeReplicasOp) applyImpact(storepool *storepool.StorePool) {
func (o AllocationChangeReplicasOp) applyImpact(storepool storepool.AllocatorStorePool) {
for _, chg := range o.chgs {
storepool.UpdateLocalStoreAfterRebalance(chg.Target.StoreID, o.usage, chg.ChangeType)
}
Expand All @@ -109,16 +109,16 @@ type AllocationFinalizeAtomicReplicationOp struct{}

// TODO(kvoli): This always returns false, however it is possible that the LH
// may have been removed here.
func (o AllocationFinalizeAtomicReplicationOp) lhBeingRemoved() bool { return false }
func (o AllocationFinalizeAtomicReplicationOp) applyImpact(storepool *storepool.StorePool) {}
func (o AllocationFinalizeAtomicReplicationOp) trackPlanningMetrics() {}
func (o AllocationFinalizeAtomicReplicationOp) lhBeingRemoved() bool { return false }
func (o AllocationFinalizeAtomicReplicationOp) applyImpact(storepool storepool.AllocatorStorePool) {}
func (o AllocationFinalizeAtomicReplicationOp) trackPlanningMetrics() {}

// AllocationNoop represents no operation.
type AllocationNoop struct{}

func (o AllocationNoop) lhBeingRemoved() bool { return false }
func (o AllocationNoop) applyImpact(storepool *storepool.StorePool) {}
func (o AllocationNoop) trackPlanningMetrics() {}
func (o AllocationNoop) lhBeingRemoved() bool { return false }
func (o AllocationNoop) applyImpact(storepool storepool.AllocatorStorePool) {}
func (o AllocationNoop) trackPlanningMetrics() {}

// effectBuilder is a utility struct to track a list of effects, which may be
// used to construct a single effect function that in turn calls all tracked
Expand Down
34 changes: 19 additions & 15 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ type AllocatorMetrics struct {
// Allocator tries to spread replicas as evenly as possible across the stores
// in the cluster.
type Allocator struct {
StorePool *storepool.StorePool
st *cluster.Settings
StorePool storepool.AllocatorStorePool
nodeLatencyFn func(addr string) (time.Duration, bool)
// TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source
// wrapped inside a mutex, to avoid misuse.
Expand Down Expand Up @@ -509,20 +510,23 @@ func makeAllocatorMetrics() AllocatorMetrics {

// MakeAllocator creates a new allocator using the specified StorePool.
func MakeAllocator(
storePool *storepool.StorePool,
st *cluster.Settings,
storePool storepool.AllocatorStorePool,
nodeLatencyFn func(addr string) (time.Duration, bool),
knobs *allocator.TestingKnobs,
) Allocator {
var randSource rand.Source
// There are number of test cases that make a test store but don't add
// gossip or a store pool. So we can't rely on the existence of the
// store pool in those cases.
if storePool != nil && storePool.Deterministic {
if storePool != nil && storePool.IsDeterministic() {
randSource = rand.NewSource(777)
} else {

randSource = rand.NewSource(rand.Int63())
}
allocator := Allocator{
st: st,
StorePool: storePool,
nodeLatencyFn: nodeLatencyFn,
randGen: makeAllocatorRand(randSource),
Expand Down Expand Up @@ -931,7 +935,7 @@ func (a *Allocator) allocateTarget(
// as possible, and therefore any store that is good enough will be
// considered.
var selector CandidateSelector
if replicaStatus == Alive || recoveryStoreSelector.Get(&a.StorePool.St.SV) == "best" {
if replicaStatus == Alive || recoveryStoreSelector.Get(&a.st.SV) == "best" {
selector = a.NewBestCandidateSelector()
} else {
selector = a.NewGoodCandidateSelector()
Expand Down Expand Up @@ -1515,8 +1519,8 @@ func (a Allocator) RebalanceNonVoter(
func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions {
return &RangeCountScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
deterministic: a.StorePool.Deterministic,
rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.StorePool.St.SV),
deterministic: a.StorePool.IsDeterministic(),
rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.st.SV),
}
}

Expand All @@ -1525,7 +1529,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
return &ScatterScorerOptions{
RangeCountScorerOptions: RangeCountScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
deterministic: a.StorePool.Deterministic,
deterministic: a.StorePool.IsDeterministic(),
rangeRebalanceThreshold: 0,
},
// We set jitter to be equal to the padding around replica-count rebalancing
Expand All @@ -1534,7 +1538,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
// made by the replicateQueue during normal course of operations. In other
// words, we don't want stores that are too far away from the mean to be
// affected by the jitter.
jitter: RangeRebalanceThreshold.Get(&a.StorePool.St.SV),
jitter: RangeRebalanceThreshold.Get(&a.st.SV),
}
}

Expand Down Expand Up @@ -1691,10 +1695,10 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
// storeHealthLogOnly. By default storeHealthBlockRebalanceTo is the action taken. When
// there is a mixed version cluster, storeHealthNoAction is set instead.
func (a *Allocator) StoreHealthOptions(_ context.Context) StoreHealthOptions {
enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.StorePool.St.SV))
enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.st.SV))
return StoreHealthOptions{
EnforcementLevel: enforcementLevel,
L0SublevelThreshold: l0SublevelsThreshold.Get(&a.StorePool.St.SV),
L0SublevelThreshold: l0SublevelsThreshold.Get(&a.st.SV),
}
}

Expand Down Expand Up @@ -1862,8 +1866,8 @@ func (a *Allocator) TransferLeaseTarget(
storeDescMap,
&QPSScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.StorePool.St.SV),
MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.StorePool.St.SV),
QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.st.SV),
MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.st.SV),
},
)

Expand Down Expand Up @@ -2066,7 +2070,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
// stats and locality information to base our decision on.
if statSummary == nil ||
statSummary.LocalityCounts == nil ||
!EnableLoadBasedLeaseRebalancing.Get(&a.StorePool.St.SV) {
!EnableLoadBasedLeaseRebalancing.Get(&a.st.SV) {
return decideWithoutStats, roachpb.ReplicaDescriptor{}
}
replicaLocalities := a.StorePool.GetLocalitiesByNode(existing)
Expand Down Expand Up @@ -2128,7 +2132,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
if !ok {
continue
}
addr, err := a.StorePool.Gossip.GetNodeIDAddress(repl.NodeID)
addr, err := a.StorePool.GossipNodeIDAddress(repl.NodeID)
if err != nil {
log.KvDistribution.Errorf(ctx, "missing address for n%d: %+v", repl.NodeID, err)
continue
Expand All @@ -2140,7 +2144,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(

remoteWeight := math.Max(minReplicaWeight, replicaWeights[repl.NodeID])
replScore, rebalanceAdjustment := loadBasedLeaseRebalanceScore(
ctx, a.StorePool.St, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, candidateLeasesMean)
ctx, a.st, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, candidateLeasesMean)
if replScore > bestReplScore {
bestReplScore = replScore
bestRepl = repl
Expand Down
46 changes: 26 additions & 20 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,15 +529,16 @@ func mockStorePool(
for _, storeID := range suspectedStoreIDs {
liveNodeSet[roachpb.NodeID(storeID)] = livenesspb.NodeLivenessStatus_LIVE
detail := storePool.GetStoreDetailLocked(storeID)
detail.LastAvailable = storePool.Clock.Now().GoTime()
detail.LastUnavailable = storePool.Clock.Now().GoTime()
detail.LastAvailable = storePool.Clock().Now().GoTime()
detail.LastUnavailable = storePool.Clock().Now().GoTime()
detail.Desc = &roachpb.StoreDescriptor{
StoreID: storeID,
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)},
}
}

// Set the node liveness function using the set we constructed.
// TODO(sarkesian): This override needs to be fixed to stop exporting this field.
storePool.NodeLivenessFn =
func(nodeID roachpb.NodeID, now time.Time, threshold time.Duration) livenesspb.NodeLivenessStatus {
if status, ok := liveNodeSet[nodeID]; ok {
Expand Down Expand Up @@ -689,7 +690,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) {
sg.GossipStores(test.stores, t)

// Enable read disk health checking in candidate exclusion.
l0SublevelsThresholdEnforce.Override(ctx, &a.StorePool.St.SV, int64(test.enforcement))
l0SublevelsThresholdEnforce.Override(ctx, &a.st.SV, int64(test.enforcement))

// Allocate a voter where all replicas are alive (e.g. up-replicating a valid range).
add, _, err := a.AllocateVoter(
Expand Down Expand Up @@ -1326,8 +1327,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) {
stopper, g, _, a, _ := CreateTestAllocator(ctx, 1, true /* deterministic */)
defer stopper.Stop(ctx)

st := a.StorePool.St
cluster := tc.cluster(st)
cluster := tc.cluster(a.st)

// It doesn't make sense to test sets of stores containing fewer than 4
// stores, because 4 stores is the minimum number of stores needed to
Expand Down Expand Up @@ -2029,11 +2029,12 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(ctx)
Expand Down Expand Up @@ -2413,11 +2414,12 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(context.Background())
Expand Down Expand Up @@ -2479,11 +2481,12 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper, g, clock, storePool, nl := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, clock, storePool, nl := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(context.Background())
Expand Down Expand Up @@ -2512,7 +2515,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) {
)
require.Equal(t, expected, result)
}
timeAfterStoreSuspect := storepool.TimeAfterStoreSuspect.Get(&storePool.St.SV)
timeAfterStoreSuspect := storepool.TimeAfterStoreSuspect.Get(&a.st.SV)
// Based on capacity node 1 is desirable.
assertShouldTransferLease(true)
// Flip node 1 to unavailable, there should be no lease transfer now.
Expand Down Expand Up @@ -3394,7 +3397,8 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) {
removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints)
rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints)

a.StorePool.IsStoreReadyForRoutineReplicaTransfer = func(_ context.Context, storeID roachpb.StoreID) bool {
storePool := a.StorePool.(*storepool.StorePool)
storePool.OverrideIsStoreReadyForRoutineReplicaTransferFn = func(_ context.Context, storeID roachpb.StoreID) bool {
for _, s := range tc.excluded {
if s == storeID {
return false
Expand Down Expand Up @@ -5262,7 +5266,8 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper, g, _, storePool, _ := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, _, storePool, _ := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
Expand Down Expand Up @@ -5407,7 +5412,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {

for _, c := range testCases {
t.Run("", func(t *testing.T) {
a := MakeAllocator(storePool, func(addr string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(addr string) (time.Duration, bool) {
return c.latency[addr], true
}, nil)
target := a.TransferLeaseTarget(
Expand Down Expand Up @@ -6979,11 +6984,12 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) {

var numNodes int
ctx := context.Background()
stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, false, /* deterministic */
func() int { return numNodes },
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(sp, func(string) (time.Duration, bool) {
a := MakeAllocator(st, sp, func(string) (time.Duration, bool) {
return 0, true
}, nil)

Expand Down Expand Up @@ -7089,7 +7095,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

a := MakeAllocator(nil, nil, nil)
a := MakeAllocator(nil, nil, nil, nil)
action, priority := a.ComputeAction(context.Background(), roachpb.SpanConfig{}, nil)
if action != AllocatorNoop {
t.Errorf("expected AllocatorNoop, but got %v", action)
Expand Down Expand Up @@ -7730,7 +7736,7 @@ func TestAllocatorFullDisks(t *testing.T) {
mockNodeLiveness.NodeLivenessFunc,
false, /* deterministic */
)
alloc := MakeAllocator(sp, func(string) (time.Duration, bool) {
alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) {
return 0, false
}, nil)

Expand Down Expand Up @@ -8176,7 +8182,7 @@ func exampleRebalancing(
storepool.NewMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).NodeLivenessFunc,
/* deterministic */ true,
)
alloc := MakeAllocator(sp, func(string) (time.Duration, bool) {
alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) {
return 0, false
}, nil)

Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
Expand All @@ -36,11 +37,12 @@ func CreateTestAllocator(
func CreateTestAllocatorWithKnobs(
ctx context.Context, numNodes int, deterministic bool, knobs *allocator.TestingKnobs,
) (*stop.Stopper, *gossip.Gossip, *storepool.StorePool, Allocator, *timeutil.ManualTime) {
stopper, g, manual, storePool, _ := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, manual, storePool, _ := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, deterministic,
func() int { return numNodes },
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, knobs)
return stopper, g, storePool, a, manual
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/rpc",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
Expand All @@ -40,6 +41,7 @@ go_test(
deps = [
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils/gossiputil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
Loading