From b7e8046c6f2a1de1c8069a0f21f03aca2b30692f Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Mon, 7 Nov 2022 23:20:32 -0500 Subject: [PATCH] allocator: refactor StorePool usage to interface This change refactors the usage of `StorePool` in the allocator to a new interface, `AllocatorStorePool`, in order to be able to utilize a store pool with overriden liveness to properly evaluate decommission pre-flight checks. Part of #91570. Release note: None --- pkg/kv/kvserver/allocation_op.go | 18 +- .../allocator/allocatorimpl/allocator.go | 34 +-- .../allocator/allocatorimpl/allocator_test.go | 46 +++-- .../allocator/allocatorimpl/test_helpers.go | 6 +- .../kvserver/allocator/storepool/BUILD.bazel | 2 + .../allocator/storepool/store_pool.go | 193 ++++++++++++++---- .../allocator/storepool/store_pool_test.go | 42 ++-- .../allocator/storepool/test_helpers.go | 2 +- pkg/kv/kvserver/allocator_impl_test.go | 7 +- pkg/kv/kvserver/asim/state/impl.go | 1 + pkg/kv/kvserver/store.go | 8 +- pkg/kv/kvserver/store_pool_test.go | 17 +- pkg/kv/kvserver/store_rebalancer.go | 6 +- pkg/kv/kvserver/store_rebalancer_test.go | 10 +- 14 files changed, 266 insertions(+), 126 deletions(-) diff --git a/pkg/kv/kvserver/allocation_op.go b/pkg/kv/kvserver/allocation_op.go index a6f91e14ca5d..9ef3587a35c5 100644 --- a/pkg/kv/kvserver/allocation_op.go +++ b/pkg/kv/kvserver/allocation_op.go @@ -27,7 +27,7 @@ type AllocationOp interface { trackPlanningMetrics() // applyImpact updates the given storepool to reflect the result of // applying this operation. - applyImpact(storepool *storepool.StorePool) + applyImpact(storepool storepool.AllocatorStorePool) // lhBeingRemoved returns true when the leaseholder is will be removed if // this operation succeeds, otherwise false. lhBeingRemoved() bool @@ -49,7 +49,7 @@ func (o AllocationTransferLeaseOp) lhBeingRemoved() bool { return true } -func (o AllocationTransferLeaseOp) applyImpact(storepool *storepool.StorePool) { +func (o AllocationTransferLeaseOp) applyImpact(storepool storepool.AllocatorStorePool) { // TODO(kvoli): Currently the local storepool is updated directly in the // lease transfer call, rather than in this function. Move the storepool // tracking from rq.TransferLease to this function once #89771 is merged. @@ -89,7 +89,7 @@ func (o AllocationChangeReplicasOp) lhBeingRemoved() bool { // applyEstimatedImpact updates the given storepool to reflect the result // of applying this operation. -func (o AllocationChangeReplicasOp) applyImpact(storepool *storepool.StorePool) { +func (o AllocationChangeReplicasOp) applyImpact(storepool storepool.AllocatorStorePool) { for _, chg := range o.chgs { storepool.UpdateLocalStoreAfterRebalance(chg.Target.StoreID, o.usage, chg.ChangeType) } @@ -109,16 +109,16 @@ type AllocationFinalizeAtomicReplicationOp struct{} // TODO(kvoli): This always returns false, however it is possible that the LH // may have been removed here. -func (o AllocationFinalizeAtomicReplicationOp) lhBeingRemoved() bool { return false } -func (o AllocationFinalizeAtomicReplicationOp) applyImpact(storepool *storepool.StorePool) {} -func (o AllocationFinalizeAtomicReplicationOp) trackPlanningMetrics() {} +func (o AllocationFinalizeAtomicReplicationOp) lhBeingRemoved() bool { return false } +func (o AllocationFinalizeAtomicReplicationOp) applyImpact(storepool storepool.AllocatorStorePool) {} +func (o AllocationFinalizeAtomicReplicationOp) trackPlanningMetrics() {} // AllocationNoop represents no operation. type AllocationNoop struct{} -func (o AllocationNoop) lhBeingRemoved() bool { return false } -func (o AllocationNoop) applyImpact(storepool *storepool.StorePool) {} -func (o AllocationNoop) trackPlanningMetrics() {} +func (o AllocationNoop) lhBeingRemoved() bool { return false } +func (o AllocationNoop) applyImpact(storepool storepool.AllocatorStorePool) {} +func (o AllocationNoop) trackPlanningMetrics() {} // effectBuilder is a utility struct to track a list of effects, which may be // used to construct a single effect function that in turn calls all tracked diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index bd61ed4c9cf7..ddd6881f3f53 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -477,7 +477,8 @@ type AllocatorMetrics struct { // Allocator tries to spread replicas as evenly as possible across the stores // in the cluster. type Allocator struct { - StorePool *storepool.StorePool + st *cluster.Settings + StorePool storepool.AllocatorStorePool nodeLatencyFn func(addr string) (time.Duration, bool) // TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source // wrapped inside a mutex, to avoid misuse. @@ -509,7 +510,8 @@ func makeAllocatorMetrics() AllocatorMetrics { // MakeAllocator creates a new allocator using the specified StorePool. func MakeAllocator( - storePool *storepool.StorePool, + st *cluster.Settings, + storePool storepool.AllocatorStorePool, nodeLatencyFn func(addr string) (time.Duration, bool), knobs *allocator.TestingKnobs, ) Allocator { @@ -517,12 +519,14 @@ func MakeAllocator( // There are number of test cases that make a test store but don't add // gossip or a store pool. So we can't rely on the existence of the // store pool in those cases. - if storePool != nil && storePool.Deterministic { + if storePool != nil && storePool.IsDeterministic() { randSource = rand.NewSource(777) } else { + randSource = rand.NewSource(rand.Int63()) } allocator := Allocator{ + st: st, StorePool: storePool, nodeLatencyFn: nodeLatencyFn, randGen: makeAllocatorRand(randSource), @@ -931,7 +935,7 @@ func (a *Allocator) allocateTarget( // as possible, and therefore any store that is good enough will be // considered. var selector CandidateSelector - if replicaStatus == Alive || recoveryStoreSelector.Get(&a.StorePool.St.SV) == "best" { + if replicaStatus == Alive || recoveryStoreSelector.Get(&a.st.SV) == "best" { selector = a.NewBestCandidateSelector() } else { selector = a.NewGoodCandidateSelector() @@ -1515,8 +1519,8 @@ func (a Allocator) RebalanceNonVoter( func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions { return &RangeCountScorerOptions{ StoreHealthOptions: a.StoreHealthOptions(ctx), - deterministic: a.StorePool.Deterministic, - rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.StorePool.St.SV), + deterministic: a.StorePool.IsDeterministic(), + rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.st.SV), } } @@ -1525,7 +1529,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO return &ScatterScorerOptions{ RangeCountScorerOptions: RangeCountScorerOptions{ StoreHealthOptions: a.StoreHealthOptions(ctx), - deterministic: a.StorePool.Deterministic, + deterministic: a.StorePool.IsDeterministic(), rangeRebalanceThreshold: 0, }, // We set jitter to be equal to the padding around replica-count rebalancing @@ -1534,7 +1538,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO // made by the replicateQueue during normal course of operations. In other // words, we don't want stores that are too far away from the mean to be // affected by the jitter. - jitter: RangeRebalanceThreshold.Get(&a.StorePool.St.SV), + jitter: RangeRebalanceThreshold.Get(&a.st.SV), } } @@ -1691,10 +1695,10 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( // storeHealthLogOnly. By default storeHealthBlockRebalanceTo is the action taken. When // there is a mixed version cluster, storeHealthNoAction is set instead. func (a *Allocator) StoreHealthOptions(_ context.Context) StoreHealthOptions { - enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.StorePool.St.SV)) + enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.st.SV)) return StoreHealthOptions{ EnforcementLevel: enforcementLevel, - L0SublevelThreshold: l0SublevelsThreshold.Get(&a.StorePool.St.SV), + L0SublevelThreshold: l0SublevelsThreshold.Get(&a.st.SV), } } @@ -1862,8 +1866,8 @@ func (a *Allocator) TransferLeaseTarget( storeDescMap, &QPSScorerOptions{ StoreHealthOptions: a.StoreHealthOptions(ctx), - QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.StorePool.St.SV), - MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.StorePool.St.SV), + QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.st.SV), + MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.st.SV), }, ) @@ -2066,7 +2070,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( // stats and locality information to base our decision on. if statSummary == nil || statSummary.LocalityCounts == nil || - !EnableLoadBasedLeaseRebalancing.Get(&a.StorePool.St.SV) { + !EnableLoadBasedLeaseRebalancing.Get(&a.st.SV) { return decideWithoutStats, roachpb.ReplicaDescriptor{} } replicaLocalities := a.StorePool.GetLocalitiesByNode(existing) @@ -2128,7 +2132,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( if !ok { continue } - addr, err := a.StorePool.Gossip.GetNodeIDAddress(repl.NodeID) + addr, err := a.StorePool.GossipNodeIDAddress(repl.NodeID) if err != nil { log.KvDistribution.Errorf(ctx, "missing address for n%d: %+v", repl.NodeID, err) continue @@ -2140,7 +2144,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( remoteWeight := math.Max(minReplicaWeight, replicaWeights[repl.NodeID]) replScore, rebalanceAdjustment := loadBasedLeaseRebalanceScore( - ctx, a.StorePool.St, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, candidateLeasesMean) + ctx, a.st, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, candidateLeasesMean) if replScore > bestReplScore { bestReplScore = replScore bestRepl = repl diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 31576ed24cd1..df8207b30d2a 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -529,8 +529,8 @@ func mockStorePool( for _, storeID := range suspectedStoreIDs { liveNodeSet[roachpb.NodeID(storeID)] = livenesspb.NodeLivenessStatus_LIVE detail := storePool.GetStoreDetailLocked(storeID) - detail.LastAvailable = storePool.Clock.Now().GoTime() - detail.LastUnavailable = storePool.Clock.Now().GoTime() + detail.LastAvailable = storePool.Clock().Now().GoTime() + detail.LastUnavailable = storePool.Clock().Now().GoTime() detail.Desc = &roachpb.StoreDescriptor{ StoreID: storeID, Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, @@ -538,6 +538,7 @@ func mockStorePool( } // Set the node liveness function using the set we constructed. + // TODO(sarkesian): This override needs to be fixed to stop exporting this field. storePool.NodeLivenessFn = func(nodeID roachpb.NodeID, now time.Time, threshold time.Duration) livenesspb.NodeLivenessStatus { if status, ok := liveNodeSet[nodeID]; ok { @@ -689,7 +690,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { sg.GossipStores(test.stores, t) // Enable read disk health checking in candidate exclusion. - l0SublevelsThresholdEnforce.Override(ctx, &a.StorePool.St.SV, int64(test.enforcement)) + l0SublevelsThresholdEnforce.Override(ctx, &a.st.SV, int64(test.enforcement)) // Allocate a voter where all replicas are alive (e.g. up-replicating a valid range). add, _, err := a.AllocateVoter( @@ -1326,8 +1327,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { stopper, g, _, a, _ := CreateTestAllocator(ctx, 1, true /* deterministic */) defer stopper.Stop(ctx) - st := a.StorePool.St - cluster := tc.cluster(st) + cluster := tc.cluster(a.st) // It doesn't make sense to test sets of stores containing fewer than 4 // stores, because 4 stores is the minimum number of stores needed to @@ -2029,11 +2029,12 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(ctx) @@ -2413,11 +2414,12 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -2479,11 +2481,12 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, clock, storePool, nl := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, clock, storePool, nl := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -2512,7 +2515,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { ) require.Equal(t, expected, result) } - timeAfterStoreSuspect := storepool.TimeAfterStoreSuspect.Get(&storePool.St.SV) + timeAfterStoreSuspect := storepool.TimeAfterStoreSuspect.Get(&a.st.SV) // Based on capacity node 1 is desirable. assertShouldTransferLease(true) // Flip node 1 to unavailable, there should be no lease transfer now. @@ -3394,7 +3397,8 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints) - a.StorePool.IsStoreReadyForRoutineReplicaTransfer = func(_ context.Context, storeID roachpb.StoreID) bool { + storePool := a.StorePool.(*storepool.StorePool) + storePool.OverrideIsStoreReadyForRoutineReplicaTransferFn = func(_ context.Context, storeID roachpb.StoreID) bool { for _, s := range tc.excluded { if s == storeID { return false @@ -5262,7 +5266,8 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, storePool, _ := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, storePool, _ := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) @@ -5407,7 +5412,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { - a := MakeAllocator(storePool, func(addr string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(addr string) (time.Duration, bool) { return c.latency[addr], true }, nil) target := a.TransferLeaseTarget( @@ -6979,11 +6984,12 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { var numNodes int ctx := context.Background() - stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(sp, func(string) (time.Duration, bool) { + a := MakeAllocator(st, sp, func(string) (time.Duration, bool) { return 0, true }, nil) @@ -7089,7 +7095,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - a := MakeAllocator(nil, nil, nil) + a := MakeAllocator(nil, nil, nil, nil) action, priority := a.ComputeAction(context.Background(), roachpb.SpanConfig{}, nil) if action != AllocatorNoop { t.Errorf("expected AllocatorNoop, but got %v", action) @@ -7730,7 +7736,7 @@ func TestAllocatorFullDisks(t *testing.T) { mockNodeLiveness.NodeLivenessFunc, false, /* deterministic */ ) - alloc := MakeAllocator(sp, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) { return 0, false }, nil) @@ -8176,7 +8182,7 @@ func exampleRebalancing( storepool.NewMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).NodeLivenessFunc, /* deterministic */ true, ) - alloc := MakeAllocator(sp, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) { return 0, false }, nil) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go index 7f8e765ac8f9..8d9c88e84ee5 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go @@ -12,6 +12,7 @@ package allocatorimpl import ( "context" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "time" "github.com/cockroachdb/cockroach/pkg/gossip" @@ -36,11 +37,12 @@ func CreateTestAllocator( func CreateTestAllocatorWithKnobs( ctx context.Context, numNodes int, deterministic bool, knobs *allocator.TestingKnobs, ) (*stop.Stopper, *gossip.Gossip, *storepool.StorePool, Allocator, *timeutil.ManualTime) { - stopper, g, manual, storePool, _ := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, manual, storePool, _ := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDeadOff, deterministic, func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { return 0, true }, knobs) return stopper, g, storePool, a, manual diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index 8dedef999c49..cf76d060eeed 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/rpc", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/util", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/log", @@ -41,6 +42,7 @@ go_test( "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", "//pkg/testutils/gossiputil", + "//pkg/util", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index d6d7266cb41b..64da8c5d5d7c 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -330,24 +331,100 @@ type localityWithString struct { str string } +// AllocatorStorePool provides an interface for use by the allocator to a list +// of all known stores in the cluster and information on their health. +type AllocatorStorePool interface { + // ClusterNodeCount returns the number of nodes that are possible allocation + // targets. + // See comment on StorePool.ClusterNodeCount(). + ClusterNodeCount() int + + // IsDeterministic returns true iff the pool is configured to be deterministic. + IsDeterministic() bool + + // IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is + // live (as indicated by its `NodeLivenessStatus`) and thus a legal candidate + // to receive a replica. + // See comment on StorePool.IsStoreReadyForRoutineReplicaTransfer(..). + IsStoreReadyForRoutineReplicaTransfer(ctx context.Context, targetStoreID roachpb.StoreID) bool + + // Clock returns the store pool's clock. + // TODO(sarkesian): If possible, this should be removed. + Clock() *hlc.Clock + + // DecommissioningReplicas selects the replicas on decommissioning + // node/stores from the provided list. + DecommissioningReplicas(repls []roachpb.ReplicaDescriptor) []roachpb.ReplicaDescriptor + + // GetLocalitiesByNode returns the localities for the provided replicas by NodeID. + // See comment on StorePool.GetLocalitiesByNode(..). + GetLocalitiesByNode(replicas []roachpb.ReplicaDescriptor) map[roachpb.NodeID]roachpb.Locality + + // GetLocalitiesByStore returns the localities for the provided replicas by StoreID. + // See comment on StorePool.GetLocalitiesByStore(..). + GetLocalitiesByStore(replicas []roachpb.ReplicaDescriptor) map[roachpb.StoreID]roachpb.Locality + + // GetStores returns information on all the stores with descriptor in the pool. + // See comment on StorePool.GetStores(). + GetStores() map[roachpb.StoreID]roachpb.StoreDescriptor + + // GetStoreDescriptor returns the latest store descriptor for the given + // storeID. + GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) + + // GetStoreList returns a storeList of active stores based on a filter. + // See comment on StorePool.GetStoreList(..). + GetStoreList(filter StoreFilter) (StoreList, int, ThrottledStoreReasons) + + // GetStoreListFromIDs is the same function as GetStoreList but only returns stores + // from the subset of passed in store IDs. + GetStoreListFromIDs( + storeIDs roachpb.StoreIDSlice, + filter StoreFilter, + ) (StoreList, int, ThrottledStoreReasons) + + // GossipNodeIDAddress looks up the RPC address for the given node via gossip. + GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) + + // LiveAndDeadReplicas divides the provided repls slice into two slices: the + // first for live replicas, and the second for dead replicas. + // See comment on StorePool.LiveAndDeadReplicas(..). + LiveAndDeadReplicas( + repls []roachpb.ReplicaDescriptor, + includeSuspectAndDrainingStores bool, + ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) + + // UpdateLocalStoreAfterRebalance is used to update the local copy of the + // target store immediately after a replica addition or removal. + UpdateLocalStoreAfterRebalance( + storeID roachpb.StoreID, + rangeUsageInfo allocator.RangeUsageInfo, + changeType roachpb.ReplicaChangeType, + ) + + // UpdateLocalStoresAfterLeaseTransfer is used to update the local copies of the + // involved store descriptors immediately after a lease transfer. + UpdateLocalStoresAfterLeaseTransfer(from roachpb.StoreID, to roachpb.StoreID, rangeQPS float64) +} + // StorePool maintains a list of all known stores in the cluster and // information on their health. -// -// TODO(irfansharif): Mediate access through a thin interface. type StorePool struct { log.AmbientContext - St *cluster.Settings // TODO(irfansharif): Shouldn't need to be exported. + st *cluster.Settings - Clock *hlc.Clock - Gossip *gossip.Gossip // TODO(irfansharif): Shouldn't need to be exported. + clock *hlc.Clock + gossip *gossip.Gossip nodeCountFn NodeCountFunc NodeLivenessFn NodeLivenessFunc startTime time.Time - Deterministic bool + deterministic bool + // We use separate mutexes for storeDetails and nodeLocalities because the // nodeLocalities map is used in the critical code path of Replica.Send() // and we'd rather not block that on something less important accessing // storeDetails. + // NB: Exported for use in tests and allocator simulator. DetailsMu struct { syncutil.RWMutex StoreDetails map[roachpb.StoreID]*StoreDetail @@ -357,22 +434,15 @@ type StorePool struct { nodeLocalities map[roachpb.NodeID]localityWithString } - // IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is - // live (as indicated by its `NodeLivenessStatus`) and thus a legal candidate - // to receive a replica. This is defined as a closure reference here instead + // OverrideIsStoreReadyForRoutineReplicaTransferFn, if set, is used in + // IsStoreReadyForRoutineReplicaTransfer. This is defined as a closure reference here instead // of a regular method so it can be overridden in tests. - // - // NB: What this method aims to capture is distinct from "dead" nodes. Nodes - // are classified as "dead" if they haven't successfully heartbeat their - // liveness record in the last `server.time_until_store_dead` seconds. - // - // Functionally, the distinction is that we simply avoid transferring replicas - // to "non-ready" nodes (i.e. nodes that _currently_ have a non-live - // `NodeLivenessStatus`), whereas we _actively move replicas off of "dead" - // nodes_. - IsStoreReadyForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool + // TODO(sarkesian): Consider moving to a TestingKnobs struct. + OverrideIsStoreReadyForRoutineReplicaTransferFn func(context.Context, roachpb.StoreID) bool } +var _ AllocatorStorePool = &StorePool{} + // NewStorePool creates a StorePool and registers the store updating callback // with gossip. func NewStorePool( @@ -386,15 +456,14 @@ func NewStorePool( ) *StorePool { sp := &StorePool{ AmbientContext: ambient, - St: st, - Clock: clock, - Gossip: g, + st: st, + clock: clock, + gossip: g, nodeCountFn: nodeCountFn, NodeLivenessFn: nodeLivenessFn, startTime: clock.PhysicalTime(), - Deterministic: deterministic, + deterministic: deterministic, } - sp.IsStoreReadyForRoutineReplicaTransfer = sp.isStoreReadyForRoutineReplicaTransferInternal sp.DetailsMu.StoreDetails = make(map[roachpb.StoreID]*StoreDetail) sp.localitiesMu.nodeLocalities = make(map[roachpb.NodeID]localityWithString) @@ -418,9 +487,9 @@ func (sp *StorePool) String() string { sort.Sort(ids) var buf bytes.Buffer - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, id := range ids { detail := sp.DetailsMu.StoreDetails[id] @@ -454,7 +523,7 @@ func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) { sp.DetailsMu.Lock() detail := sp.GetStoreDetailLocked(storeDesc.StoreID) detail.Desc = &storeDesc - detail.LastUpdatedTime = sp.Clock.PhysicalTime() + detail.LastUpdatedTime = sp.clock.PhysicalTime() sp.DetailsMu.Unlock() sp.localitiesMu.Lock() @@ -590,9 +659,9 @@ func (sp *StorePool) DecommissioningReplicas( // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, repl := range repls { detail := sp.GetStoreDetailLocked(repl.StoreID) @@ -611,6 +680,16 @@ func (sp *StorePool) ClusterNodeCount() int { return sp.nodeCountFn() } +// Clock returns the store pool's clock. +func (sp *StorePool) Clock() *hlc.Clock { + return sp.clock +} + +// IsDeterministic returns true iff the pool is configured to be deterministic. +func (sp *StorePool) IsDeterministic() bool { + return sp.deterministic +} + // IsDead determines if a store is dead. It will return an error if the store is // not found in the store pool or the status is unknown. If the store is not dead, // it returns the time to death. @@ -624,8 +703,8 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error } // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) deadAsOf := sd.LastUpdatedTime.Add(timeUntilStoreDead) if now.After(deadAsOf) { @@ -681,9 +760,9 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { } // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) return sd.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect), nil } @@ -705,9 +784,9 @@ func (sp *StorePool) LiveAndDeadReplicas( sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, repl := range repls { detail := sp.GetStoreDetailLocked(repl.StoreID) @@ -902,7 +981,7 @@ func (sp *StorePool) GetStoreListFromIDs( func (sp *StorePool) getStoreListFromIDsLocked( storeIDs roachpb.StoreIDSlice, filter StoreFilter, ) (StoreList, int, ThrottledStoreReasons) { - if sp.Deterministic { + if sp.deterministic { sort.Sort(storeIDs) } else { shuffle.Shuffle(storeIDs) @@ -912,9 +991,9 @@ func (sp *StorePool) getStoreListFromIDsLocked( var throttled ThrottledStoreReasons var storeDescriptors []roachpb.StoreDescriptor - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, storeID := range storeIDs { detail, ok := sp.DetailsMu.StoreDetails[storeID] @@ -975,8 +1054,8 @@ func (sp *StorePool) Throttle(reason ThrottleReason, why string, storeID roachpb // configured timeout period has passed. switch reason { case ThrottleFailed: - timeout := FailedReservationsTimeout.Get(&sp.St.SV) - detail.ThrottledUntil = sp.Clock.PhysicalTime().Add(timeout) + timeout := FailedReservationsTimeout.Get(&sp.st.SV) + detail.ThrottledUntil = sp.clock.PhysicalTime().Add(timeout) if log.V(2) { ctx := sp.AnnotateCtx(context.TODO()) log.Infof(ctx, "snapshot failed (%s), s%d will be throttled for %s until %s", @@ -1030,6 +1109,11 @@ func (sp *StorePool) GetLocalitiesByNode( return localities } +// GossipNodeIDAddress looks up the RPC address for the given node via gossip. +func (sp *StorePool) GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) { + return sp.gossip.GetNodeIDAddress(nodeID) +} + // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { @@ -1042,6 +1126,25 @@ func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { return locality.str } +// IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is +// live (as indicated by its `NodeLivenessStatus`) and thus a legal candidate +// to receive a replica. +// +// NB: What this method aims to capture is distinct from "dead" nodes. Nodes +// are classified as "dead" if they haven't successfully heartbeat their +// liveness record in the last `server.time_until_store_dead` seconds. +// +// Functionally, the distinction is that we simply avoid transferring replicas +// to "non-ready" nodes (i.e. nodes that _currently_ have a non-live +// `NodeLivenessStatus`), whereas we _actively move replicas off of "dead" +// nodes_. +func (sp *StorePool) IsStoreReadyForRoutineReplicaTransfer(ctx context.Context, targetStoreID roachpb.StoreID) bool { + if sp.OverrideIsStoreReadyForRoutineReplicaTransferFn != nil { + return sp.OverrideIsStoreReadyForRoutineReplicaTransferFn(ctx, targetStoreID) + } + return sp.isStoreReadyForRoutineReplicaTransferInternal(ctx, targetStoreID) +} + func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal( ctx context.Context, targetStoreID roachpb.StoreID, ) bool { diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go index 2dababee9d9f..bdea163c45d1 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -52,7 +53,8 @@ func TestStorePoolGossipUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 0 }, /* NodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -120,8 +122,9 @@ func TestStorePoolGetStoreList(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() + st := cluster.MakeTestingClusterSettings() // We're going to manually mark stores dead in this test. - stopper, g, _, sp, mnl := CreateTestStorePool(ctx, + stopper, g, _, sp, mnl := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -203,10 +206,10 @@ func TestStorePoolGetStoreList(t *testing.T) { mnl.SetNodeStatus(deadStore.Node.NodeID, livenesspb.NodeLivenessStatus_DEAD) sp.DetailsMu.Lock() // Set declinedStore as throttled. - sp.DetailsMu.StoreDetails[declinedStore.StoreID].ThrottledUntil = sp.Clock.Now().GoTime().Add(time.Hour) + sp.DetailsMu.StoreDetails[declinedStore.StoreID].ThrottledUntil = sp.clock.Now().GoTime().Add(time.Hour) // Set suspectedStore as suspected. - sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastAvailable = sp.Clock.Now().GoTime() - sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = sp.Clock.Now().GoTime() + sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastAvailable = sp.clock.Now().GoTime() + sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = sp.clock.Now().GoTime() sp.DetailsMu.Unlock() // No filter or limited set of store IDs. @@ -417,7 +420,8 @@ func TestStorePoolGetStoreDetails(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -439,7 +443,8 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, mnl := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, mnl := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -544,7 +549,8 @@ func TestStorePoolDefaultState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, _, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, _, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -574,7 +580,8 @@ func TestStorePoolThrottle(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -583,7 +590,7 @@ func TestStorePoolThrottle(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(uniqueStore, t) - expected := sp.Clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.St.SV)) + expected := sp.clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.st.SV)) sp.Throttle(ThrottleFailed, "", 1) sp.DetailsMu.Lock() @@ -599,7 +606,8 @@ func TestStorePoolSuspected(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, mnl := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, mnl := CreateTestStorePool(ctx, st, TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -609,9 +617,9 @@ func TestStorePoolSuspected(t *testing.T) { sg.GossipStores(uniqueStore, t) store := uniqueStore[0] - now := sp.Clock.Now().GoTime() - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.St.SV) - timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.St.SV) + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) // See state transition diagram in storeDetail.status() for a visual // representation of what this test asserts. @@ -665,7 +673,8 @@ func TestGetLocalities(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, _ := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -746,7 +755,8 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, sp, mnl := CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, mnl := CreateTestStorePool(ctx, st, TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) diff --git a/pkg/kv/kvserver/allocator/storepool/test_helpers.go b/pkg/kv/kvserver/allocator/storepool/test_helpers.go index 805434aeb172..4effe9240eff 100644 --- a/pkg/kv/kvserver/allocator/storepool/test_helpers.go +++ b/pkg/kv/kvserver/allocator/storepool/test_helpers.go @@ -71,6 +71,7 @@ func (m *MockNodeLiveness) NodeLivenessFunc( // tests. Stopper must be stopped by the caller. func CreateTestStorePool( ctx context.Context, + st *cluster.Settings, timeUntilStoreDeadValue time.Duration, deterministic bool, nodeCount NodeCountFunc, @@ -79,7 +80,6 @@ func CreateTestStorePool( stopper := stop.NewStopper() mc := timeutil.NewManualTime(timeutil.Unix(0, 123)) clock := hlc.NewClock(mc, time.Nanosecond /* maxOffset */) - st := cluster.MakeTestingClusterSettings() ambientCtx := log.MakeTestingAmbientContext(stopper.Tracer()) rpcContext := rpc.NewContext(ctx, rpc.ContextOptions{ diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index ba454a8bcf01..3f2a90729e84 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -269,13 +269,14 @@ func TestAllocatorThrottled(t *testing.T) { // Finally, set that store to be throttled and ensure we don't send the // replica to purgatory. - a.StorePool.DetailsMu.Lock() - storeDetail, ok := a.StorePool.DetailsMu.StoreDetails[singleStore[0].StoreID] + storePool := a.StorePool.(*storepool.StorePool) + storePool.DetailsMu.Lock() + storeDetail, ok := storePool.DetailsMu.StoreDetails[singleStore[0].StoreID] if !ok { t.Fatalf("store:%d was not found in the store pool", singleStore[0].StoreID) } storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) - a.StorePool.DetailsMu.Unlock() + storePool.DetailsMu.Unlock() _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { t.Fatalf("expected a non purgatory error, got: %+v", err) diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index b0db9f026bcd..cf68277f0ff5 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -729,6 +729,7 @@ func (s *state) NodeCountFn() storepool.NodeCountFunc { // populates the storepool with the current state. func (s *state) MakeAllocator(storeID StoreID) allocatorimpl.Allocator { return allocatorimpl.MakeAllocator( + s.stores[storeID].storepool.st, s.stores[storeID].storepool, func(addr string) (time.Duration, bool) { return 0, true }, &allocator.TestingKnobs{ diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 333978381605..3524e6e40722 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1219,10 +1219,14 @@ func NewStore( ioThresholds: &iot, } s.ioThreshold.t = &admissionpb.IOThreshold{} + var allocatorStorePool storepool.AllocatorStorePool + if cfg.StorePool != nil { + allocatorStorePool = cfg.StorePool + } if cfg.RPCContext != nil { - s.allocator = allocatorimpl.MakeAllocator(cfg.StorePool, cfg.RPCContext.RemoteClocks.Latency, cfg.TestingKnobs.AllocatorKnobs) + s.allocator = allocatorimpl.MakeAllocator(cfg.Settings, allocatorStorePool, cfg.RPCContext.RemoteClocks.Latency, cfg.TestingKnobs.AllocatorKnobs) } else { - s.allocator = allocatorimpl.MakeAllocator(cfg.StorePool, func(string) (time.Duration, bool) { + s.allocator = allocatorimpl.MakeAllocator(cfg.Settings, allocatorStorePool, func(string) (time.Duration, bool) { return 0, false }, cfg.TestingKnobs.AllocatorKnobs) } diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index 4d84d3de22b9..7267468361ba 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -19,12 +19,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -36,7 +38,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { clock := hlc.NewClock(manual, time.Nanosecond /* maxOffset */) ctx := context.Background() // We're going to manually mark stores dead in this test. - stopper, g, _, sp, _ := storepool.CreateTestStorePool(ctx, + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, _ := storepool.CreateTestStorePool(ctx, st, storepool.TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -164,7 +167,9 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 123)), time.Nanosecond /* maxOffset */) - stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx, + cfg := TestStoreConfig(clock) + var stopper *stop.Stopper + stopper, _, _, cfg.StorePool, _ = storepool.CreateTestStorePool(ctx, cfg.Settings, storepool.TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_DEAD) @@ -174,7 +179,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { node := roachpb.NodeDescriptor{NodeID: roachpb.NodeID(1)} eng := storage.NewDefaultInMemForTesting() stopper.AddCloser(eng) - cfg := TestStoreConfig(clock) + cfg.Transport = NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer) store := NewStore(ctx, cfg, eng, &node) // Fake an ident because this test doesn't want to start the store @@ -203,11 +208,11 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { // Update StorePool, which should be a no-op. storeID := roachpb.StoreID(1) - if _, ok := sp.GetStoreDescriptor(storeID); ok { + if _, ok := cfg.StorePool.GetStoreDescriptor(storeID); ok { t.Fatalf("StoreDescriptor not gossiped, should not be found") } - sp.UpdateLocalStoreAfterRebalance(storeID, rangeUsageInfo, roachpb.ADD_VOTER) - if _, ok := sp.GetStoreDescriptor(storeID); ok { + cfg.StorePool.UpdateLocalStoreAfterRebalance(storeID, rangeUsageInfo, roachpb.ADD_VOTER) + if _, ok := cfg.StorePool.GetStoreDescriptor(storeID); ok { t.Fatalf("StoreDescriptor still not gossiped, should not be found") } } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 5bcddc21bcb8..013a4d4af2bb 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -258,7 +258,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { func (sr *StoreRebalancer) scorerOptions(ctx context.Context) *allocatorimpl.QPSScorerOptions { return &allocatorimpl.QPSScorerOptions{ StoreHealthOptions: sr.allocator.StoreHealthOptions(ctx), - Deterministic: sr.allocator.StorePool.Deterministic, + Deterministic: sr.allocator.StorePool.IsDeterministic(), QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&sr.st.SV), MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&sr.st.SV), } @@ -616,7 +616,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( ctx context.Context, rctx *RebalanceContext, ) (CandidateReplica, roachpb.ReplicaDescriptor, []CandidateReplica) { var considerForRebalance []CandidateReplica - now := sr.allocator.StorePool.Clock.NowAsClockTimestamp() + now := sr.allocator.StorePool.Clock().NowAsClockTimestamp() for { if len(rctx.hottestRanges) == 0 { return nil, roachpb.ReplicaDescriptor{}, considerForRebalance @@ -728,7 +728,7 @@ type rangeRebalanceContext struct { func (sr *StoreRebalancer) chooseRangeToRebalance( ctx context.Context, rctx *RebalanceContext, ) (candidateReplica CandidateReplica, voterTargets, nonVoterTargets []roachpb.ReplicationTarget) { - now := sr.allocator.StorePool.Clock.NowAsClockTimestamp() + now := sr.allocator.StorePool.Clock().NowAsClockTimestamp() if len(rctx.rebalanceCandidates) == 0 && len(rctx.hottestRanges) >= 0 { // NB: In practice, the rebalanceCandidates will be populated with // hottest ranges by the preceeding function call, rebalance leases. diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index a03456fc8184..444c152bc01d 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -766,7 +767,8 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { sr.getRaftStatusFn = func(r CandidateReplica) *raft.Status { return TestingRaftStatusFn(r) } - a.StorePool.IsStoreReadyForRoutineReplicaTransfer = func(_ context.Context, this roachpb.StoreID) bool { + storePool := a.StorePool.(*storepool.StorePool) + storePool.OverrideIsStoreReadyForRoutineReplicaTransferFn = func(_ context.Context, this roachpb.StoreID) bool { for _, deadStore := range deadStores { // NodeID match StoreIDs here, so this comparison is valid. if deadStore.StoreID == this { @@ -1185,7 +1187,7 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { localDesc := *noLocalityStores[len(noLocalityStores)-1] cfg := TestStoreConfig(nil) cfg.Gossip = g - cfg.StorePool = a.StorePool + cfg.StorePool = a.StorePool.(*storepool.StorePool) cfg.DefaultSpanConfig.NumVoters = 1 cfg.DefaultSpanConfig.NumReplicas = 1 s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) @@ -1421,7 +1423,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g - cfg.StorePool = a.StorePool + cfg.StorePool = a.StorePool.(*storepool.StorePool) s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(noLocalityStores, t) s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} @@ -1601,7 +1603,7 @@ func TestStoreRebalancerReadAmpCheck(t *testing.T) { localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g - cfg.StorePool = a.StorePool + cfg.StorePool = a.StorePool.(*storepool.StorePool) s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(test.stores, t) s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID}