Skip to content

Commit

Permalink
allocator: refactor StorePool usage to interface
Browse files Browse the repository at this point in the history
This change refactors the usage of `StorePool` in the allocator to a new
interface, `AllocatorStorePool`, in order to be able to utilize a store
pool with overriden liveness to properly evaluate decommission
pre-flight checks.

Part of cockroachdb#91570.

Release note: None
  • Loading branch information
AlexTalks committed Nov 15, 2022
1 parent 5a24e11 commit b7e8046
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 126 deletions.
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/allocation_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type AllocationOp interface {
trackPlanningMetrics()
// applyImpact updates the given storepool to reflect the result of
// applying this operation.
applyImpact(storepool *storepool.StorePool)
applyImpact(storepool storepool.AllocatorStorePool)
// lhBeingRemoved returns true when the leaseholder is will be removed if
// this operation succeeds, otherwise false.
lhBeingRemoved() bool
Expand All @@ -49,7 +49,7 @@ func (o AllocationTransferLeaseOp) lhBeingRemoved() bool {
return true
}

func (o AllocationTransferLeaseOp) applyImpact(storepool *storepool.StorePool) {
func (o AllocationTransferLeaseOp) applyImpact(storepool storepool.AllocatorStorePool) {
// TODO(kvoli): Currently the local storepool is updated directly in the
// lease transfer call, rather than in this function. Move the storepool
// tracking from rq.TransferLease to this function once #89771 is merged.
Expand Down Expand Up @@ -89,7 +89,7 @@ func (o AllocationChangeReplicasOp) lhBeingRemoved() bool {

// applyEstimatedImpact updates the given storepool to reflect the result
// of applying this operation.
func (o AllocationChangeReplicasOp) applyImpact(storepool *storepool.StorePool) {
func (o AllocationChangeReplicasOp) applyImpact(storepool storepool.AllocatorStorePool) {
for _, chg := range o.chgs {
storepool.UpdateLocalStoreAfterRebalance(chg.Target.StoreID, o.usage, chg.ChangeType)
}
Expand All @@ -109,16 +109,16 @@ type AllocationFinalizeAtomicReplicationOp struct{}

// TODO(kvoli): This always returns false, however it is possible that the LH
// may have been removed here.
func (o AllocationFinalizeAtomicReplicationOp) lhBeingRemoved() bool { return false }
func (o AllocationFinalizeAtomicReplicationOp) applyImpact(storepool *storepool.StorePool) {}
func (o AllocationFinalizeAtomicReplicationOp) trackPlanningMetrics() {}
func (o AllocationFinalizeAtomicReplicationOp) lhBeingRemoved() bool { return false }
func (o AllocationFinalizeAtomicReplicationOp) applyImpact(storepool storepool.AllocatorStorePool) {}
func (o AllocationFinalizeAtomicReplicationOp) trackPlanningMetrics() {}

// AllocationNoop represents no operation.
type AllocationNoop struct{}

func (o AllocationNoop) lhBeingRemoved() bool { return false }
func (o AllocationNoop) applyImpact(storepool *storepool.StorePool) {}
func (o AllocationNoop) trackPlanningMetrics() {}
func (o AllocationNoop) lhBeingRemoved() bool { return false }
func (o AllocationNoop) applyImpact(storepool storepool.AllocatorStorePool) {}
func (o AllocationNoop) trackPlanningMetrics() {}

// effectBuilder is a utility struct to track a list of effects, which may be
// used to construct a single effect function that in turn calls all tracked
Expand Down
34 changes: 19 additions & 15 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ type AllocatorMetrics struct {
// Allocator tries to spread replicas as evenly as possible across the stores
// in the cluster.
type Allocator struct {
StorePool *storepool.StorePool
st *cluster.Settings
StorePool storepool.AllocatorStorePool
nodeLatencyFn func(addr string) (time.Duration, bool)
// TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source
// wrapped inside a mutex, to avoid misuse.
Expand Down Expand Up @@ -509,20 +510,23 @@ func makeAllocatorMetrics() AllocatorMetrics {

// MakeAllocator creates a new allocator using the specified StorePool.
func MakeAllocator(
storePool *storepool.StorePool,
st *cluster.Settings,
storePool storepool.AllocatorStorePool,
nodeLatencyFn func(addr string) (time.Duration, bool),
knobs *allocator.TestingKnobs,
) Allocator {
var randSource rand.Source
// There are number of test cases that make a test store but don't add
// gossip or a store pool. So we can't rely on the existence of the
// store pool in those cases.
if storePool != nil && storePool.Deterministic {
if storePool != nil && storePool.IsDeterministic() {
randSource = rand.NewSource(777)
} else {

randSource = rand.NewSource(rand.Int63())
}
allocator := Allocator{
st: st,
StorePool: storePool,
nodeLatencyFn: nodeLatencyFn,
randGen: makeAllocatorRand(randSource),
Expand Down Expand Up @@ -931,7 +935,7 @@ func (a *Allocator) allocateTarget(
// as possible, and therefore any store that is good enough will be
// considered.
var selector CandidateSelector
if replicaStatus == Alive || recoveryStoreSelector.Get(&a.StorePool.St.SV) == "best" {
if replicaStatus == Alive || recoveryStoreSelector.Get(&a.st.SV) == "best" {
selector = a.NewBestCandidateSelector()
} else {
selector = a.NewGoodCandidateSelector()
Expand Down Expand Up @@ -1515,8 +1519,8 @@ func (a Allocator) RebalanceNonVoter(
func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions {
return &RangeCountScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
deterministic: a.StorePool.Deterministic,
rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.StorePool.St.SV),
deterministic: a.StorePool.IsDeterministic(),
rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.st.SV),
}
}

Expand All @@ -1525,7 +1529,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
return &ScatterScorerOptions{
RangeCountScorerOptions: RangeCountScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
deterministic: a.StorePool.Deterministic,
deterministic: a.StorePool.IsDeterministic(),
rangeRebalanceThreshold: 0,
},
// We set jitter to be equal to the padding around replica-count rebalancing
Expand All @@ -1534,7 +1538,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
// made by the replicateQueue during normal course of operations. In other
// words, we don't want stores that are too far away from the mean to be
// affected by the jitter.
jitter: RangeRebalanceThreshold.Get(&a.StorePool.St.SV),
jitter: RangeRebalanceThreshold.Get(&a.st.SV),
}
}

Expand Down Expand Up @@ -1691,10 +1695,10 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
// storeHealthLogOnly. By default storeHealthBlockRebalanceTo is the action taken. When
// there is a mixed version cluster, storeHealthNoAction is set instead.
func (a *Allocator) StoreHealthOptions(_ context.Context) StoreHealthOptions {
enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.StorePool.St.SV))
enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.st.SV))
return StoreHealthOptions{
EnforcementLevel: enforcementLevel,
L0SublevelThreshold: l0SublevelsThreshold.Get(&a.StorePool.St.SV),
L0SublevelThreshold: l0SublevelsThreshold.Get(&a.st.SV),
}
}

Expand Down Expand Up @@ -1862,8 +1866,8 @@ func (a *Allocator) TransferLeaseTarget(
storeDescMap,
&QPSScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.StorePool.St.SV),
MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.StorePool.St.SV),
QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.st.SV),
MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.st.SV),
},
)

Expand Down Expand Up @@ -2066,7 +2070,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
// stats and locality information to base our decision on.
if statSummary == nil ||
statSummary.LocalityCounts == nil ||
!EnableLoadBasedLeaseRebalancing.Get(&a.StorePool.St.SV) {
!EnableLoadBasedLeaseRebalancing.Get(&a.st.SV) {
return decideWithoutStats, roachpb.ReplicaDescriptor{}
}
replicaLocalities := a.StorePool.GetLocalitiesByNode(existing)
Expand Down Expand Up @@ -2128,7 +2132,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
if !ok {
continue
}
addr, err := a.StorePool.Gossip.GetNodeIDAddress(repl.NodeID)
addr, err := a.StorePool.GossipNodeIDAddress(repl.NodeID)
if err != nil {
log.KvDistribution.Errorf(ctx, "missing address for n%d: %+v", repl.NodeID, err)
continue
Expand All @@ -2140,7 +2144,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(

remoteWeight := math.Max(minReplicaWeight, replicaWeights[repl.NodeID])
replScore, rebalanceAdjustment := loadBasedLeaseRebalanceScore(
ctx, a.StorePool.St, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, candidateLeasesMean)
ctx, a.st, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, candidateLeasesMean)
if replScore > bestReplScore {
bestReplScore = replScore
bestRepl = repl
Expand Down
46 changes: 26 additions & 20 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,15 +529,16 @@ func mockStorePool(
for _, storeID := range suspectedStoreIDs {
liveNodeSet[roachpb.NodeID(storeID)] = livenesspb.NodeLivenessStatus_LIVE
detail := storePool.GetStoreDetailLocked(storeID)
detail.LastAvailable = storePool.Clock.Now().GoTime()
detail.LastUnavailable = storePool.Clock.Now().GoTime()
detail.LastAvailable = storePool.Clock().Now().GoTime()
detail.LastUnavailable = storePool.Clock().Now().GoTime()
detail.Desc = &roachpb.StoreDescriptor{
StoreID: storeID,
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)},
}
}

// Set the node liveness function using the set we constructed.
// TODO(sarkesian): This override needs to be fixed to stop exporting this field.
storePool.NodeLivenessFn =
func(nodeID roachpb.NodeID, now time.Time, threshold time.Duration) livenesspb.NodeLivenessStatus {
if status, ok := liveNodeSet[nodeID]; ok {
Expand Down Expand Up @@ -689,7 +690,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) {
sg.GossipStores(test.stores, t)

// Enable read disk health checking in candidate exclusion.
l0SublevelsThresholdEnforce.Override(ctx, &a.StorePool.St.SV, int64(test.enforcement))
l0SublevelsThresholdEnforce.Override(ctx, &a.st.SV, int64(test.enforcement))

// Allocate a voter where all replicas are alive (e.g. up-replicating a valid range).
add, _, err := a.AllocateVoter(
Expand Down Expand Up @@ -1326,8 +1327,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) {
stopper, g, _, a, _ := CreateTestAllocator(ctx, 1, true /* deterministic */)
defer stopper.Stop(ctx)

st := a.StorePool.St
cluster := tc.cluster(st)
cluster := tc.cluster(a.st)

// It doesn't make sense to test sets of stores containing fewer than 4
// stores, because 4 stores is the minimum number of stores needed to
Expand Down Expand Up @@ -2029,11 +2029,12 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(ctx)
Expand Down Expand Up @@ -2413,11 +2414,12 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(context.Background())
Expand Down Expand Up @@ -2479,11 +2481,12 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
stopper, g, clock, storePool, nl := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, clock, storePool, nl := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(context.Background())
Expand Down Expand Up @@ -2512,7 +2515,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) {
)
require.Equal(t, expected, result)
}
timeAfterStoreSuspect := storepool.TimeAfterStoreSuspect.Get(&storePool.St.SV)
timeAfterStoreSuspect := storepool.TimeAfterStoreSuspect.Get(&a.st.SV)
// Based on capacity node 1 is desirable.
assertShouldTransferLease(true)
// Flip node 1 to unavailable, there should be no lease transfer now.
Expand Down Expand Up @@ -3394,7 +3397,8 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) {
removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints)
rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints)

a.StorePool.IsStoreReadyForRoutineReplicaTransfer = func(_ context.Context, storeID roachpb.StoreID) bool {
storePool := a.StorePool.(*storepool.StorePool)
storePool.OverrideIsStoreReadyForRoutineReplicaTransferFn = func(_ context.Context, storeID roachpb.StoreID) bool {
for _, s := range tc.excluded {
if s == storeID {
return false
Expand Down Expand Up @@ -5262,7 +5266,8 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper, g, _, storePool, _ := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, _, storePool, _ := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
Expand Down Expand Up @@ -5407,7 +5412,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {

for _, c := range testCases {
t.Run("", func(t *testing.T) {
a := MakeAllocator(storePool, func(addr string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(addr string) (time.Duration, bool) {
return c.latency[addr], true
}, nil)
target := a.TransferLeaseTarget(
Expand Down Expand Up @@ -6979,11 +6984,12 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) {

var numNodes int
ctx := context.Background()
stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, false, /* deterministic */
func() int { return numNodes },
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(sp, func(string) (time.Duration, bool) {
a := MakeAllocator(st, sp, func(string) (time.Duration, bool) {
return 0, true
}, nil)

Expand Down Expand Up @@ -7089,7 +7095,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

a := MakeAllocator(nil, nil, nil)
a := MakeAllocator(nil, nil, nil, nil)
action, priority := a.ComputeAction(context.Background(), roachpb.SpanConfig{}, nil)
if action != AllocatorNoop {
t.Errorf("expected AllocatorNoop, but got %v", action)
Expand Down Expand Up @@ -7730,7 +7736,7 @@ func TestAllocatorFullDisks(t *testing.T) {
mockNodeLiveness.NodeLivenessFunc,
false, /* deterministic */
)
alloc := MakeAllocator(sp, func(string) (time.Duration, bool) {
alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) {
return 0, false
}, nil)

Expand Down Expand Up @@ -8176,7 +8182,7 @@ func exampleRebalancing(
storepool.NewMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).NodeLivenessFunc,
/* deterministic */ true,
)
alloc := MakeAllocator(sp, func(string) (time.Duration, bool) {
alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) {
return 0, false
}, nil)

Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package allocatorimpl

import (
"context"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"time"

"github.com/cockroachdb/cockroach/pkg/gossip"
Expand All @@ -36,11 +37,12 @@ func CreateTestAllocator(
func CreateTestAllocatorWithKnobs(
ctx context.Context, numNodes int, deterministic bool, knobs *allocator.TestingKnobs,
) (*stop.Stopper, *gossip.Gossip, *storepool.StorePool, Allocator, *timeutil.ManualTime) {
stopper, g, manual, storePool, _ := storepool.CreateTestStorePool(ctx,
st := cluster.MakeTestingClusterSettings()
stopper, g, manual, storePool, _ := storepool.CreateTestStorePool(ctx, st,
storepool.TestTimeUntilStoreDeadOff, deterministic,
func() int { return numNodes },
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, knobs)
return stopper, g, storePool, a, manual
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/rpc",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
Expand All @@ -41,6 +42,7 @@ go_test(
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/testutils/gossiputil",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
Loading

0 comments on commit b7e8046

Please sign in to comment.