diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 7ff9616a2f0a..8eef88c09952 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -479,7 +479,6 @@ type AllocatorMetrics struct { type Allocator struct { st *cluster.Settings deterministic bool - StorePool storepool.AllocatorStorePool nodeLatencyFn func(addr string) (time.Duration, bool) // TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source // wrapped inside a mutex, to avoid misuse. @@ -509,28 +508,26 @@ func makeAllocatorMetrics() AllocatorMetrics { } } -// MakeAllocator creates a new allocator using the specified StorePool. +// MakeAllocator creates a new allocator. +// The deterministic flag indicates that this allocator is intended to be used +// with a deterministic store pool. +// +// In test cases where the store pool is nil, deterministic should be false. func MakeAllocator( st *cluster.Settings, - storePool storepool.AllocatorStorePool, + deterministic bool, nodeLatencyFn func(addr string) (time.Duration, bool), knobs *allocator.TestingKnobs, ) Allocator { var randSource rand.Source - var deterministic bool - // There are number of test cases that make a test store but don't add - // gossip or a store pool. So we can't rely on the existence of the - // store pool in those cases. - if storePool != nil && storePool.IsDeterministic() { + if deterministic { randSource = rand.NewSource(777) - deterministic = true } else { randSource = rand.NewSource(rand.Int63()) } allocator := Allocator{ st: st, deterministic: deterministic, - StorePool: storePool, nodeLatencyFn: nodeLatencyFn, randGen: makeAllocatorRand(randSource), Metrics: makeAllocatorMetrics(), @@ -2349,7 +2346,9 @@ func (a Allocator) shouldTransferLeaseForLeaseCountConvergence( // PreferredLeaseholders returns a slice of replica descriptors corresponding to // replicas that meet lease preferences (among the `existing` replicas). func (a Allocator) PreferredLeaseholders( - storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + existing []roachpb.ReplicaDescriptor, ) []roachpb.ReplicaDescriptor { // Go one preference at a time. As soon as we've found replicas that match a // preference, we don't need to look at the later preferences, because diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 1a2ef399fc18..92a56c50f623 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -553,12 +553,12 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) result, _, err := a.AllocateVoter( ctx, - a.StorePool, + sp, simpleSpanConfig, nil /* existingVoters */, nil, /* existingNonVoters */ Dead, @@ -576,11 +576,11 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, _, _, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) + stopper, _, sp, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) defer stopper.Stop(ctx) result, _, err := a.AllocateVoter( ctx, - a.StorePool, + sp, simpleSpanConfig, nil /* existingVoters */, nil, /* existingNonVoters */ Dead, @@ -686,7 +686,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) @@ -697,7 +697,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { // Allocate a voter where all replicas are alive (e.g. up-replicating a valid range). add, _, err := a.AllocateVoter( ctx, - a.StorePool, + sp, test.conf, nil, nil, @@ -712,7 +712,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { // Allocate a voter where we have a dead (or decommissioning) replica. add, _, err = a.AllocateVoter( ctx, - a.StorePool, + sp, test.conf, nil, nil, @@ -738,12 +738,12 @@ func TestAllocatorTwoDatacenters(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(multiDCStores, t) result1, _, err := a.AllocateVoter( ctx, - a.StorePool, + sp, multiDCConfigSSD, nil /* existingVoters */, nil, /* existingNonVoters */ Dead, @@ -753,7 +753,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { } result2, _, err := a.AllocateVoter( ctx, - a.StorePool, + sp, multiDCConfigSSD, []roachpb.ReplicaDescriptor{{ NodeID: result1.NodeID, @@ -772,7 +772,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { // Verify that no result is forthcoming if we already have a replica. result3, _, err := a.AllocateVoter( ctx, - a.StorePool, + sp, multiDCConfigSSD, []roachpb.ReplicaDescriptor{ { @@ -796,12 +796,12 @@ func TestAllocatorExistingReplica(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) result, _, err := a.AllocateVoter( ctx, - a.StorePool, + sp, roachpb.SpanConfig{ NumReplicas: 0, Constraints: []roachpb.ConstraintsConjunction{ @@ -920,7 +920,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { } ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(stores, t) @@ -975,7 +975,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { for _, tc := range testCases { { result, _, err := a.AllocateVoter( - ctx, a.StorePool, emptySpanConfig(), tc.existing, nil, + ctx, sp, emptySpanConfig(), tc.existing, nil, Dead, ) if e, a := tc.expectTargetAllocate, !roachpb.Empty(result); e != a { @@ -990,7 +990,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, details, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, tc.existing, @@ -1054,7 +1054,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { } ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) storeGossiper := gossiputil.NewStoreGossiper(g) storeGossiper.GossipStores(stores, t) @@ -1065,7 +1065,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { for i := 1; i < 40; i++ { add, remove, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, ranges[i].InternalReplicas, @@ -1107,7 +1107,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { for i := 1; i < 40; i++ { _, _, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, ranges[i].InternalReplicas, @@ -1170,7 +1170,7 @@ func TestAllocatorRebalanceBasedOnRangeCount(t *testing.T) { } ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(stores, t) @@ -1180,7 +1180,7 @@ func TestAllocatorRebalanceBasedOnRangeCount(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: 3, StoreID: 3}}, @@ -1200,13 +1200,13 @@ func TestAllocatorRebalanceBasedOnRangeCount(t *testing.T) { } } - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + sl, _, _ := sp.GetStoreList(storepool.StoreFilterThrottled) eqClass := equivalenceClass{ candidateSL: sl, } // Verify shouldRebalanceBasedOnThresholds results. for i, store := range stores { - desc, ok := a.StorePool.GetStoreDescriptor(store.StoreID) + desc, ok := sp.GetStoreDescriptor(store.StoreID) if !ok { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } @@ -1284,7 +1284,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, c.existing, @@ -1390,7 +1390,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { // Deterministic is required when stressing as test case 8 may rebalance // to different configurations. ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 1, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 1, true /* deterministic */) defer stopper.Stop(ctx) cluster := tc.cluster(a.st) @@ -1418,7 +1418,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { // Ensure gossiped store descriptor changes have propagated. testutils.SucceedsSoon(t, func() error { - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + sl, _, _ := sp.GetStoreList(storepool.StoreFilterThrottled) for j, s := range sl.Stores { if a, e := s.Capacity.RangeCount, cluster[j].rangeCount; a != e { return errors.Errorf("range count for %d = %d != expected %d", j, a, e) @@ -1426,13 +1426,13 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { } return nil }) - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + sl, _, _ := sp.GetStoreList(storepool.StoreFilterThrottled) eqClass := equivalenceClass{ candidateSL: sl, } // Verify shouldRebalanceBasedOnThresholds returns the expected value. for j, store := range stores { - desc, ok := a.StorePool.GetStoreDescriptor(store.StoreID) + desc, ok := sp.GetStoreDescriptor(store.StoreID) if !ok { t.Fatalf("[store %d]: unable to get store %d descriptor", j, store.StoreID) } @@ -1556,7 +1556,7 @@ func TestAllocatorRebalanceByQPS(t *testing.T) { for _, subtest := range tests { ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(subtest.testStores, t) var rangeUsageInfo allocator.RangeUsageInfo @@ -1567,7 +1567,7 @@ func TestAllocatorRebalanceByQPS(t *testing.T) { } add, remove, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{StoreID: subtest.testStores[0].StoreID}}, @@ -1581,8 +1581,8 @@ func TestAllocatorRebalanceByQPS(t *testing.T) { require.Equal(t, subtest.expectedAddTarget, add.StoreID) require.Equal(t, subtest.expectedRemoveTarget, remove.StoreID) // Verify shouldRebalanceBasedOnThresholds results. - if desc, descOk := a.StorePool.GetStoreDescriptor(remove.StoreID); descOk { - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + if desc, descOk := sp.GetStoreDescriptor(remove.StoreID); descOk { + sl, _, _ := sp.GetStoreList(storepool.StoreFilterThrottled) eqClass := equivalenceClass{ existing: desc, candidateSL: sl, @@ -1672,7 +1672,7 @@ func TestAllocatorRemoveBasedOnQPS(t *testing.T) { for _, subtest := range tests { ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(subtest.testStores, t) options := &QPSScorerOptions{ @@ -1681,7 +1681,7 @@ func TestAllocatorRemoveBasedOnQPS(t *testing.T) { } remove, _, err := a.RemoveVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), subtest.existingRepls, subtest.existingRepls, @@ -1725,7 +1725,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { } ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(stores, t) @@ -1735,7 +1735,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo result, _, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, @@ -1751,11 +1751,11 @@ func TestAllocatorRebalanceByCount(t *testing.T) { // Verify shouldRebalanceBasedOnThresholds results. for i, store := range stores { - desc, ok := a.StorePool.GetStoreDescriptor(store.StoreID) + desc, ok := sp.GetStoreDescriptor(store.StoreID) if !ok { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + sl, _, _ := sp.GetStoreList(storepool.StoreFilterThrottled) eqClass := equivalenceClass{ existing: desc, candidateSL: sl, @@ -1820,7 +1820,7 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) // 3 stores where the lease count for each store is equal to 10x the store @@ -1865,7 +1865,7 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { t.Run("", func(t *testing.T) { target := a.TransferLeaseTarget( ctx, - a.StorePool, + sp, emptySpanConfig(), c.existing, &mockRepl{ @@ -1897,7 +1897,7 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) { {StoreID: 4, NodeID: 4, ReplicaID: 4}, } ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) // 4 stores where the lease count for each store is equal to 10x the store @@ -1982,7 +1982,7 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) { t.Run("", func(t *testing.T) { target := a.TransferLeaseTarget( ctx, - a.StorePool, + sp, emptySpanConfig(), c.existing, repl, @@ -2004,7 +2004,7 @@ func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) // 6 stores with the following setup @@ -2074,7 +2074,7 @@ func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) { t.Run("", func(t *testing.T) { target := a.TransferLeaseTarget( context.Background(), - a.StorePool, + sp, c.conf, c.existing, &mockRepl{ @@ -2106,7 +2106,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(ctx) @@ -2186,7 +2186,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { t.Run("", func(t *testing.T) { target := a.TransferLeaseTarget( ctx, - a.StorePool, + storePool, c.conf, c.existing, &mockRepl{ @@ -2212,7 +2212,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) // Set up 8 stores -- 2 in each of the first 2 localities, and 4 in the third. @@ -2327,7 +2327,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo result, _, details, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, tc.existing, @@ -2400,7 +2400,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo result, _, details, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, tc.existing, @@ -2431,7 +2431,7 @@ func TestAllocatorShouldTransferLease(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) // 4 stores where the lease count for each store is equal to 10x the store @@ -2470,7 +2470,7 @@ func TestAllocatorShouldTransferLease(t *testing.T) { t.Run("", func(t *testing.T) { result := a.ShouldTransferLease( ctx, - a.StorePool, + sp, emptySpanConfig(), c.existing, &mockRepl{ @@ -2495,7 +2495,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -2538,7 +2538,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { t.Run("", func(t *testing.T) { result := a.ShouldTransferLease( ctx, - a.StorePool, + storePool, emptySpanConfig(), c.existing, &mockRepl{ @@ -2563,7 +2563,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { return 0, true }, nil) defer stopper.Stop(context.Background()) @@ -2585,7 +2585,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { t.Helper() result := a.ShouldTransferLease( ctx, - a.StorePool, + storePool, emptySpanConfig(), replicas(1, 2, 3), &mockRepl{storeID: 2, replicationFactor: 3}, @@ -2611,7 +2611,7 @@ func TestAllocatorLeasePreferences(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) // 4 stores with distinct localities, store attributes, and node attributes @@ -2726,7 +2726,7 @@ func TestAllocatorLeasePreferences(t *testing.T) { conf := roachpb.SpanConfig{LeasePreferences: c.preferences} result := a.ShouldTransferLease( ctx, - a.StorePool, + sp, conf, c.existing, &mockRepl{ @@ -2741,7 +2741,7 @@ func TestAllocatorLeasePreferences(t *testing.T) { } target := a.TransferLeaseTarget( ctx, - a.StorePool, + sp, conf, c.existing, &mockRepl{ @@ -2760,7 +2760,7 @@ func TestAllocatorLeasePreferences(t *testing.T) { } target = a.TransferLeaseTarget( ctx, - a.StorePool, + sp, conf, c.existing, &mockRepl{ @@ -2785,7 +2785,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) // 6 stores, 2 in each of 3 distinct localities. @@ -2853,7 +2853,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { conf := roachpb.SpanConfig{LeasePreferences: c.preferences} target := a.TransferLeaseTarget( ctx, - a.StorePool, + sp, conf, c.existing, &mockRepl{ @@ -2873,7 +2873,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { target = a.TransferLeaseTarget( ctx, - a.StorePool, + sp, conf, c.existing, &mockRepl{ @@ -2905,7 +2905,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(multiDiversityDCStores, t) @@ -2954,7 +2954,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { for _, c := range testCases { targetVoter, details, err := a.RemoveVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), c.existingVoters, /* voterCandidates */ c.existingVoters, @@ -2974,7 +2974,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { // diversity score calculations, we would fail here. targetVoter, _, err = a.RemoveVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), c.existingVoters, c.existingVoters, @@ -2988,7 +2988,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { targetNonVoter, _, err := a.RemoveNonVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), c.existingNonVoters, /* nonVoterCandidates */ c.existingVoters, @@ -3051,7 +3051,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { for i, test := range testCases { t.Run(fmt.Sprintf("%d:%s", i+1, test.name), func(t *testing.T) { ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) @@ -3059,7 +3059,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { // Allocate the voting replica first, before the non-voter. This is the // order in which we'd expect the allocator to repair a given range. See // TestAllocatorComputeAction. - voterTarget, _, err := a.AllocateVoter(ctx, a.StorePool, test.conf, test.existingVoters, test.existingNonVoters, Dead) + voterTarget, _, err := a.AllocateVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldVoterAllocFail { require.Errorf(t, err, "expected voter allocation to fail; got %v as a valid target instead", voterTarget) } else { @@ -3068,7 +3068,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { test.existingVoters = append(test.existingVoters, replicas(voterTarget.StoreID)...) } - nonVoterTarget, _, err := a.AllocateNonVoter(ctx, a.StorePool, test.conf, test.existingVoters, test.existingNonVoters, Dead) + nonVoterTarget, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldNonVoterAllocFail { require.Errorf(t, err, "expected non-voter allocation to fail; got %v as a valid target instead", nonVoterTarget) } else { @@ -3084,7 +3084,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(multiDiversityDCStores, t) @@ -3142,7 +3142,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { StoreID: storeID, } } - targetStore, details, err := a.AllocateVoter(ctx, a.StorePool, emptySpanConfig(), existingRepls, nil, Dead) + targetStore, details, err := a.AllocateVoter(ctx, sp, emptySpanConfig(), existingRepls, nil, Dead) if err != nil { t.Fatal(err) } @@ -3164,7 +3164,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) stores := []*roachpb.StoreDescriptor{ @@ -3263,7 +3263,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, details, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, existingRepls, @@ -3438,11 +3438,11 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { } ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(stores, t) - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + sl, _, _ := sp.GetStoreList(storepool.StoreFilterThrottled) testCases := []struct { existing roachpb.StoreID @@ -3477,13 +3477,12 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { } // No constraints. conf := roachpb.SpanConfig{} - analyzed := constraint.AnalyzeConstraints(a.StorePool, existingRepls, conf.NumReplicas, conf.Constraints) + analyzed := constraint.AnalyzeConstraints(sp, existingRepls, conf.NumReplicas, conf.Constraints) allocationConstraintsChecker := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints) removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints) - storePool := a.StorePool.(*storepool.StorePool) - storePool.OverrideIsStoreReadyForRoutineReplicaTransferFn = func(_ context.Context, storeID roachpb.StoreID) bool { + sp.OverrideIsStoreReadyForRoutineReplicaTransferFn = func(_ context.Context, storeID roachpb.StoreID) bool { for _, s := range tc.excluded { if s == storeID { return false @@ -3499,8 +3498,8 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { allocationConstraintsChecker, existingRepls, nil, - a.StorePool.GetLocalitiesByStore(existingRepls), - a.StorePool.IsStoreReadyForRoutineReplicaTransfer, + sp.GetLocalitiesByStore(existingRepls), + sp.IsStoreReadyForRoutineReplicaTransfer, false, /* allowMultipleReplsPerNode */ a.ScorerOptions(ctx), VoterTarget, @@ -3520,8 +3519,8 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { rebalanceConstraintsChecker, existingRepls, nil, - a.StorePool.GetLocalitiesByStore(existingRepls), - a.StorePool.IsStoreReadyForRoutineReplicaTransfer, + sp.GetLocalitiesByStore(existingRepls), + sp.IsStoreReadyForRoutineReplicaTransfer, a.ScorerOptions(ctx), a.Metrics, ) @@ -3605,12 +3604,12 @@ func TestAllocatorNonVoterAllocationExcludesVoterNodes(t *testing.T) { for i, test := range testCases { t.Run(fmt.Sprintf("%d:%s", i+1, test.name), func(t *testing.T) { ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) - result, _, err := a.AllocateNonVoter(ctx, a.StorePool, test.conf, test.existingVoters, test.existingNonVoters, Dead) + result, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldFail { require.Error(t, err) require.Regexp(t, test.expError, err) @@ -3627,11 +3626,11 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(multiDiversityDCStores, t) - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + sl, _, _ := sp.GetStoreList(storepool.StoreFilterThrottled) // Given a set of existing replicas for a range, rank which of the remaining // stores from multiDiversityDCStores would be the best addition to the range @@ -3832,7 +3831,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { } } conf := roachpb.SpanConfig{Constraints: tc.constraints} - analyzed := constraint.AnalyzeConstraints(a.StorePool, existingRepls, conf.NumReplicas, conf.Constraints) + analyzed := constraint.AnalyzeConstraints(sp, existingRepls, conf.NumReplicas, conf.Constraints) checkFn := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints) candidates := rankedCandidateListForAllocation( @@ -3841,7 +3840,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { checkFn, existingRepls, nil, - a.StorePool.GetLocalitiesByStore(existingRepls), + sp.GetLocalitiesByStore(existingRepls), func(context.Context, roachpb.StoreID) bool { return true }, false, /* allowMultipleReplsPerNode */ a.ScorerOptions(ctx), @@ -3874,7 +3873,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(multiDiversityDCStores, t) @@ -4054,7 +4053,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { } for testIdx, tc := range testCases { - sl, _, _ := a.StorePool.GetStoreListFromIDs(tc.existing, storepool.StoreFilterNone) + sl, _, _ := sp.GetStoreListFromIDs(tc.existing, storepool.StoreFilterNone) existingRepls := make([]roachpb.ReplicaDescriptor, len(tc.existing)) for i, storeID := range tc.existing { existingRepls[i] = roachpb.ReplicaDescriptor{ @@ -4062,14 +4061,14 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { StoreID: storeID, } } - analyzed := constraint.AnalyzeConstraints(a.StorePool, existingRepls, 0 /* numReplicas */, tc.constraints) + analyzed := constraint.AnalyzeConstraints(sp, existingRepls, 0 /* numReplicas */, tc.constraints) // Check behavior in a span config where `voter_constraints` are empty. checkFn := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) candidates := candidateListForRemoval(ctx, sl, checkFn, - a.StorePool.GetLocalitiesByStore(existingRepls), + sp.GetLocalitiesByStore(existingRepls), a.ScorerOptions(ctx)) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d (with `constraints`): expected candidateListForRemoval(%v)"+ @@ -4083,7 +4082,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { candidates = candidateListForRemoval(ctx, sl, checkFn, - a.StorePool.GetLocalitiesByStore(existingRepls), + sp.GetLocalitiesByStore(existingRepls), a.ScorerOptions(ctx)) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d (with `voter_constraints`): expected candidateListForRemoval(%v)"+ @@ -4224,14 +4223,14 @@ func TestAllocatorRebalanceNonVoters(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) // Enable read disk health checking in candidate exclusion. add, remove, _, ok := a.RebalanceNonVoter( ctx, - a.StorePool, + sp, test.conf, nil, test.existingVoters, @@ -4356,7 +4355,7 @@ func TestAllocatorRebalanceReadAmpCheck(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) @@ -4365,7 +4364,7 @@ func TestAllocatorRebalanceReadAmpCheck(t *testing.T) { options.StoreHealthOptions = StoreHealthOptions{EnforcementLevel: test.enforcement, L0SublevelThreshold: 20} add, remove, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, test.conf, nil, test.existingVoters, @@ -4397,7 +4396,7 @@ func TestVotersCanRebalanceToNonVoterStores(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(multiDiversityDCStores, t) @@ -4423,7 +4422,7 @@ func TestVotersCanRebalanceToNonVoterStores(t *testing.T) { existingVoters := replicas(3, 4) add, remove, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, conf, nil, existingVoters, @@ -4451,7 +4450,7 @@ func TestNonVotersCannotRebalanceToVoterStores(t *testing.T) { context.Background(), tracing.NewTracer(), "test", ) - stopper, g, _, a, _ := CreateTestAllocator(ctx, 2, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 2, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) @@ -4483,7 +4482,7 @@ func TestNonVotersCannotRebalanceToVoterStores(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo add, remove, _, ok := a.RebalanceNonVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, existingVoters, @@ -4511,11 +4510,11 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(multiDiversityDCStores, t) - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + sl, _, _ := sp.GetStoreList(storepool.StoreFilterThrottled) // Given a set of existing replicas for a range, rank which of the remaining // stores would be best to remove if we had to remove one purely on the basis @@ -5272,7 +5271,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { Constraints: tc.constraints, NumReplicas: tc.numReplicas, } - analyzed := constraint.AnalyzeConstraints(a.StorePool, existingRepls, conf.NumReplicas, conf.Constraints) + analyzed := constraint.AnalyzeConstraints(sp, existingRepls, conf.NumReplicas, conf.Constraints) removalConstraintsChecker := voterConstraintsCheckerForRemoval( analyzed, constraint.EmptyAnalyzedConstraints, @@ -5289,7 +5288,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { rebalanceConstraintsChecker, existingRepls, nil, - a.StorePool.GetLocalitiesByStore(existingRepls), + sp.GetLocalitiesByStore(existingRepls), func(context.Context, roachpb.StoreID) bool { return true }, a.ScorerOptions(ctx), a.Metrics, @@ -5317,7 +5316,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { // the final rebalance choice. target, _, details, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, conf, nil, existingRepls, @@ -5497,12 +5496,12 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { - a := MakeAllocator(st, storePool, func(addr string) (time.Duration, bool) { + a := MakeAllocator(st, true /* deterministic */, func(addr string) (time.Duration, bool) { return c.latency[addr], true }, nil) target := a.TransferLeaseTarget( ctx, - a.StorePool, + storePool, emptySpanConfig(), existing, &mockRepl{ @@ -5699,7 +5698,7 @@ func TestAllocatorRemoveTargetBasedOnCapacity(t *testing.T) { } ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(stores, t) @@ -5708,7 +5707,7 @@ func TestAllocatorRemoveTargetBasedOnCapacity(t *testing.T) { for i := 0; i < 10; i++ { targetRepl, _, err := a.RemoveVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), replicas, replicas, @@ -6407,7 +6406,7 @@ func TestAllocatorComputeAction(t *testing.T) { lastPriority := float64(999999999) for i, tcase := range testCases { - action, priority := a.ComputeAction(ctx, a.StorePool, tcase.conf, &tcase.desc) + action, priority := a.ComputeAction(ctx, sp, tcase.conf, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %q, got action %q", i, allocatorActionNames[tcase.expectedAction], allocatorActionNames[action]) @@ -6504,7 +6503,7 @@ func TestAllocatorComputeActionRemoveDead(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, nil, tcase.dead, nil, nil, nil) - action, _ := a.ComputeAction(ctx, a.StorePool, conf, &tcase.desc) + action, _ := a.ComputeAction(ctx, sp, conf, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) } @@ -6679,7 +6678,7 @@ func TestAllocatorComputeActionSuspect(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, nil, nil, nil, nil, tcase.suspect) - action, _ := a.ComputeAction(ctx, a.StorePool, conf, &tcase.desc) + action, _ := a.ComputeAction(ctx, sp, conf, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) } @@ -6958,7 +6957,7 @@ func TestAllocatorComputeActionDecommission(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, nil, tcase.dead, tcase.decommissioning, tcase.decommissioned, nil) - action, _ := a.ComputeAction(ctx, a.StorePool, tcase.conf, &tcase.desc) + action, _ := a.ComputeAction(ctx, sp, tcase.conf, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %s, got action %s", i, tcase.expectedAction, action) continue @@ -7291,7 +7290,7 @@ func TestAllocatorRemoveLearner(t *testing.T) { defer stopper.Stop(ctx) live, dead := []roachpb.StoreID{1, 2}, []roachpb.StoreID{3} mockStorePool(sp, live, nil, dead, nil, nil, nil) - action, _ := a.ComputeAction(ctx, a.StorePool, conf, &rangeWithLearnerDesc) + action, _ := a.ComputeAction(ctx, sp, conf, &rangeWithLearnerDesc) require.Equal(t, AllocatorRemoveLearner, action) } @@ -7475,7 +7474,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storepool.TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, sp, func(string) (time.Duration, bool) { + a := MakeAllocator(st, false /* deterministic */, func(string) (time.Duration, bool) { return 0, true }, nil) @@ -7494,11 +7493,11 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { desc := makeDescriptor(c.storeList) desc.EndKey = prefixKey - clusterNodes := a.StorePool.ClusterNodeCount() + clusterNodes := sp.ClusterNodeCount() effectiveNumReplicas := GetNeededVoters(conf.NumReplicas, clusterNodes) require.Equal(t, c.expectedNumReplicas, effectiveNumReplicas, "clusterNodes=%d", clusterNodes) - action, _ := a.ComputeAction(ctx, a.StorePool, conf, &desc) + action, _ := a.ComputeAction(ctx, sp, conf, &desc) require.Equal(t, c.expectedAction.String(), action.String()) }) } @@ -7581,8 +7580,8 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - a := MakeAllocator(nil, nil, nil, nil) - action, priority := a.ComputeAction(context.Background(), a.StorePool, roachpb.SpanConfig{}, nil) + a := MakeAllocator(nil, false, nil, nil) + action, priority := a.ComputeAction(context.Background(), nil, roachpb.SpanConfig{}, nil) if action != AllocatorNoop { t.Errorf("expected AllocatorNoop, but got %v", action) } @@ -7905,7 +7904,7 @@ func TestAllocatorRebalanceDeterminism(t *testing.T) { runner := func() func() (roachpb.ReplicationTarget, roachpb.ReplicationTarget) { ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 7 /* numNodes */, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 7 /* numNodes */, true /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(stores, t) return func() (roachpb.ReplicationTarget, roachpb.ReplicationTarget) { @@ -7914,7 +7913,7 @@ func TestAllocatorRebalanceDeterminism(t *testing.T) { // replica count. add, remove, _, _ := a.RebalanceVoter( ctx, - a.StorePool, + sp, roachpb.TestingDefaultSpanConfig(), nil, replicas(1, 2, 5), @@ -7944,7 +7943,7 @@ func TestAllocatorRebalanceWithScatter(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10 /* numNodes */, true /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10 /* numNodes */, true /* deterministic */) defer stopper.Stop(ctx) stores := []*roachpb.StoreDescriptor{ @@ -7985,7 +7984,7 @@ func TestAllocatorRebalanceWithScatter(t *testing.T) { // replica count. _, _, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, replicas(1), @@ -7999,7 +7998,7 @@ func TestAllocatorRebalanceWithScatter(t *testing.T) { // Ensure that we would produce a rebalance target when running with scatter. _, _, _, ok = a.RebalanceVoter( ctx, - a.StorePool, + sp, emptySpanConfig(), nil, replicas(1), @@ -8099,7 +8098,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { } ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(stores, t) @@ -8114,7 +8113,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo actual, _, _, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, roachpb.SpanConfig{Constraints: []roachpb.ConstraintsConjunction{constraints}}, nil, existingReplicas, @@ -8226,7 +8225,7 @@ func TestAllocatorFullDisks(t *testing.T) { mockNodeLiveness.NodeLivenessFunc, false, /* deterministic */ ) - alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, false /* deterministic */, func(string) (time.Duration, bool) { return 0, false }, nil) @@ -8292,7 +8291,7 @@ func TestAllocatorFullDisks(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, details, ok := alloc.RebalanceVoter( ctx, - alloc.StorePool, + sp, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, @@ -8335,11 +8334,11 @@ func TestAllocatorFullDisks(t *testing.T) { func Example_rangeCountRebalancing() { testStores := make([]testStore, 20) - rebalanceFn := func(ctx context.Context, ts *testStore, testStores []testStore, alloc *Allocator) { + rebalanceFn := func(ctx context.Context, ts *testStore, testStores []testStore, alloc *Allocator, storePool *storepool.StorePool) { var rangeUsageInfo allocator.RangeUsageInfo target, _, details, ok := alloc.RebalanceVoter( ctx, - alloc.StorePool, + storePool, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, @@ -8435,7 +8434,11 @@ func Example_rangeCountRebalancing() { } func qpsBasedRebalanceFn( - ctx context.Context, candidate *testStore, testStores []testStore, alloc *Allocator, + ctx context.Context, + candidate *testStore, + testStores []testStore, + alloc *Allocator, + storePool *storepool.StorePool, ) { avgQPS := candidate.Capacity.QueriesPerSecond / float64(candidate.Capacity.RangeCount) jitteredQPS := avgQPS * (1 + alloc.randGen.Float64()) @@ -8447,7 +8450,7 @@ func qpsBasedRebalanceFn( var rangeUsageInfo allocator.RangeUsageInfo add, remove, details, ok := alloc.RebalanceVoter( ctx, - alloc.StorePool, + storePool, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: candidate.Node.NodeID, StoreID: candidate.StoreID}}, @@ -8634,7 +8637,7 @@ func Example_qpsRebalancingMultiRegion() { func exampleRebalancing( testStores []testStore, - rebalanceFn func(context.Context, *testStore, []testStore, *Allocator), + rebalanceFn func(context.Context, *testStore, []testStore, *Allocator, *storepool.StorePool), printFn func([]testStore, *tablewriter.Table), ) { stopper := stop.NewStopper() @@ -8675,7 +8678,7 @@ func exampleRebalancing( storepool.NewMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).NodeLivenessFunc, /* deterministic */ true, ) - alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) { + alloc := MakeAllocator(st, true /* deterministic */, func(string) (time.Duration, bool) { return 0, false }, nil) @@ -8728,7 +8731,7 @@ func exampleRebalancing( if ts.Capacity.RangeCount == 0 { continue } - rebalanceFn(ctx, ts, testStores, &alloc) + rebalanceFn(ctx, ts, testStores, &alloc, sp) } printFn(testStores, table) @@ -8800,7 +8803,7 @@ func TestNonVoterPrioritizationInVoterAdditions(t *testing.T) { } ctx := context.Background() - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(stores, t) @@ -8868,7 +8871,7 @@ func TestNonVoterPrioritizationInVoterAdditions(t *testing.T) { } for i, tc := range testCases { - result, _, _ := a.AllocateVoter(ctx, a.StorePool, tc.spanConfig, tc.existingVoters, tc.existingNonVoters, Alive) + result, _, _ := a.AllocateVoter(ctx, sp, tc.spanConfig, tc.existingVoters, tc.existingNonVoters, Alive) assert.Equal(t, tc.expectedTargetAllocate, result, "Unexpected replication target returned by allocate voter in test %d", i) } } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go index afc1ea2b6996..da694c987ada 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go @@ -42,7 +42,7 @@ func CreateTestAllocatorWithKnobs( storepool.TestTimeUntilStoreDeadOff, deterministic, func() int { return numNodes }, livenesspb.NodeLivenessStatus_LIVE) - a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) { + a := MakeAllocator(st, deterministic, func(string) (time.Duration, bool) { return 0, true }, knobs) return stopper, g, storePool, a, manual diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 6b99c90b3972..2f693cbb92f0 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -1084,8 +1084,7 @@ func (sp *StorePool) GetStoreListFromIDs( // from the subset of stores present in the passed in replication targets, // converting to a StoreList. func (sp *StorePool) GetStoreListForTargets( - candidates []roachpb.ReplicationTarget, - filter StoreFilter, + candidates []roachpb.ReplicationTarget, filter StoreFilter, ) (StoreList, int, ThrottledStoreReasons) { sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index 69ae59877e46..cead053b10a2 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -66,7 +66,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { defer log.Scope(t).Close(t) clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 123)), time.Nanosecond /* maxOffset */) ctx := context.Background() - stopper, g, _, a, _ := allocatorimpl.CreateTestAllocator(ctx, 5, false /* deterministic */) + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 5, false /* deterministic */) defer stopper.Stop(ctx) // We make 5 stores in this test -- 3 in the same datacenter, and 1 each in // 2 other datacenters. All of our replicas are distributed within these 3 @@ -180,7 +180,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { for i := 0; i < 10; i++ { result, _, details, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, roachpb.SpanConfig{}, status, replicas, @@ -206,7 +206,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { for i := 0; i < 10; i++ { target, _, details, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, roachpb.SpanConfig{}, status, replicas, @@ -226,7 +226,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { for i := 0; i < 10; i++ { target, origin, details, ok := a.RebalanceVoter( ctx, - a.StorePool, + sp, roachpb.SpanConfig{}, status, replicas, @@ -251,18 +251,18 @@ func TestAllocatorThrottled(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stopper, g, _, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) // First test to make sure we would send the replica to purgatory. - _, _, err := a.AllocateVoter(ctx, a.StorePool, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + _, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); !ok { t.Fatalf("expected a purgatory error, got: %+v", err) } // Second, test the normal case in which we can allocate to the store. gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateVoter(ctx, a.StorePool, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + result, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if err != nil { t.Fatalf("unable to perform allocation: %+v", err) } @@ -272,15 +272,14 @@ func TestAllocatorThrottled(t *testing.T) { // Finally, set that store to be throttled and ensure we don't send the // replica to purgatory. - storePool := a.StorePool.(*storepool.StorePool) - storePool.DetailsMu.Lock() - storeDetail, ok := storePool.DetailsMu.StoreDetails[singleStore[0].StoreID] + sp.DetailsMu.Lock() + storeDetail, ok := sp.DetailsMu.StoreDetails[singleStore[0].StoreID] if !ok { t.Fatalf("store:%d was not found in the store pool", singleStore[0].StoreID) } storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) - storePool.DetailsMu.Unlock() - _, _, err = a.AllocateVoter(ctx, a.StorePool, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + sp.DetailsMu.Unlock() + _, _, err = a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { t.Fatalf("expected a non purgatory error, got: %+v", err) } diff --git a/pkg/kv/kvserver/asim/asim.go b/pkg/kv/kvserver/asim/asim.go index ef2b2dfddc45..03657eed2139 100644 --- a/pkg/kv/kvserver/asim/asim.go +++ b/pkg/kv/kvserver/asim/asim.go @@ -81,6 +81,7 @@ func NewSimulator( for _, store := range initialState.Stores() { storeID := store.StoreID() allocator := initialState.MakeAllocator(storeID) + storePool := initialState.StorePool(storeID) // TODO(kvoli): Instead of passing in individual settings to construct // the each ticking component, pass a pointer to the simulation // settings struct. That way, the settings may be adjusted dynamically @@ -90,6 +91,7 @@ func NewSimulator( changer, settings.ReplicaChangeDelayFn(), allocator, + storePool, start, ) sqs[storeID] = queue.NewSplitQueue( @@ -109,6 +111,7 @@ func NewSimulator( controllers[storeID] = op.NewController( changer, allocator, + storePool, settings, ) srs[storeID] = storerebalancer.NewStoreRebalancer( @@ -116,6 +119,7 @@ func NewSimulator( storeID, controllers[storeID], allocator, + storePool, settings, storerebalancer.GetStateRaftStatusFn(initialState), ) diff --git a/pkg/kv/kvserver/asim/op/BUILD.bazel b/pkg/kv/kvserver/asim/op/BUILD.bazel index ba7436acf75f..cbca022fed73 100644 --- a/pkg/kv/kvserver/asim/op/BUILD.bazel +++ b/pkg/kv/kvserver/asim/op/BUILD.bazel @@ -15,6 +15,7 @@ go_library( deps = [ "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/allocatorimpl", + "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/roachpb", @@ -29,6 +30,7 @@ go_test( embed = [":operator"], deps = [ "//pkg/kv/kvserver/allocator/allocatorimpl", + "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/roachpb", @@ -50,6 +52,7 @@ go_library( deps = [ "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/allocatorimpl", + "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/roachpb", diff --git a/pkg/kv/kvserver/asim/op/controller.go b/pkg/kv/kvserver/asim/op/controller.go index 871fc787ae60..f00b1b7d2938 100644 --- a/pkg/kv/kvserver/asim/op/controller.go +++ b/pkg/kv/kvserver/asim/op/controller.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -47,6 +48,7 @@ type Controller interface { type controller struct { changer state.Changer allocator allocatorimpl.Allocator + storePool storepool.AllocatorStorePool settings *config.SimulationSettings pending *priorityQueue @@ -56,11 +58,15 @@ type controller struct { // NewController returns a new Controller implementation. func NewController( - changer state.Changer, allocator allocatorimpl.Allocator, settings *config.SimulationSettings, + changer state.Changer, + allocator allocatorimpl.Allocator, + storePool storepool.AllocatorStorePool, + settings *config.SimulationSettings, ) Controller { return &controller{ changer: changer, allocator: allocator, + storePool: storePool, settings: settings, pending: &priorityQueue{items: []*queuedOp{}}, tickets: make(map[DispatchedTicket]ControlledOperation), @@ -142,7 +148,7 @@ func (c *controller) processRelocateRange( ctx context.Context, tick time.Time, s state.State, ro *RelocateRangeOp, ) error { rng := s.RangeFor(ro.key) - options := SimRelocateOneOptions{allocator: c.allocator, state: s} + options := SimRelocateOneOptions{allocator: c.allocator, storePool: c.storePool, state: s} ops, leaseTarget, err := kvserver.RelocateOne( ctx, rng.Descriptor(), diff --git a/pkg/kv/kvserver/asim/op/controller_test.go b/pkg/kv/kvserver/asim/op/controller_test.go index 4b6fc1431e42..35a9367fd43a 100644 --- a/pkg/kv/kvserver/asim/op/controller_test.go +++ b/pkg/kv/kvserver/asim/op/controller_test.go @@ -85,7 +85,7 @@ func TestLeaseTransferOp(t *testing.T) { s := state.NewTestStateReplCounts(map[state.StoreID]int{1: tc.ranges + 1, 2: tc.ranges + 1, 3: tc.ranges + 1}, 3, 1000 /* keyspace */) settings := config.DefaultSimulationSettings() changer := state.NewReplicaChanger() - controller := NewController(changer, allocatorimpl.Allocator{}, settings) + controller := NewController(changer, allocatorimpl.Allocator{}, nil /* storePool */, settings) for i := 2; i <= tc.ranges+1; i++ { s.TransferLease(state.RangeID(i), 1) @@ -270,7 +270,8 @@ func TestRelocateRangeOp(t *testing.T) { s := state.NewTestStateReplCounts(map[state.StoreID]int{1: 3, 2: 3, 3: 3, 4: 0, 5: 0, 6: 0}, 3, 1000 /* keyspace */) changer := state.NewReplicaChanger() allocator := s.MakeAllocator(state.StoreID(1)) - controller := NewController(changer, allocator, settings) + storePool := s.StorePool(state.StoreID(1)) + controller := NewController(changer, allocator, storePool, settings) // Transfer the lease to store 1 for all ranges. for i := 2; i < 4; i++ { diff --git a/pkg/kv/kvserver/asim/op/relocate_range.go b/pkg/kv/kvserver/asim/op/relocate_range.go index 822f2e2770f4..37b22cbfaba1 100644 --- a/pkg/kv/kvserver/asim/op/relocate_range.go +++ b/pkg/kv/kvserver/asim/op/relocate_range.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/errors" @@ -54,6 +55,7 @@ func (rro *RelocateRangeOp) error(err error) { // to generate a suggested replication change. type SimRelocateOneOptions struct { allocator allocatorimpl.Allocator + storePool storepool.AllocatorStorePool state state.State } @@ -62,6 +64,11 @@ func (s *SimRelocateOneOptions) Allocator() allocatorimpl.Allocator { return s.allocator } +// StorePool returns the store's configured store pool. +func (s *SimRelocateOneOptions) StorePool() storepool.AllocatorStorePool { + return s.storePool +} + // SpanConfig returns the span configuration for the range with start key. func (s *SimRelocateOneOptions) SpanConfig( ctx context.Context, startKey roachpb.RKey, diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue.go b/pkg/kv/kvserver/asim/queue/replicate_queue.go index db9b9a700bd3..4819cee8b980 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue.go @@ -25,6 +25,7 @@ import ( type replicateQueue struct { baseQueue allocator allocatorimpl.Allocator + storePool storepool.AllocatorStorePool delay func(rangeSize int64, add bool) time.Duration } @@ -34,6 +35,7 @@ func NewReplicateQueue( stateChanger state.Changer, delay func(rangeSize int64, add bool) time.Duration, allocator allocatorimpl.Allocator, + storePool storepool.AllocatorStorePool, start time.Time, ) RangeQueue { return &replicateQueue{ @@ -45,6 +47,7 @@ func NewReplicateQueue( }, delay: delay, allocator: allocator, + storePool: storePool, } } @@ -59,7 +62,7 @@ func (rq *replicateQueue) MaybeAdd( return false } - action, priority := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, rng.SpanConfig(), rng.Descriptor()) + action, priority := rq.allocator.ComputeAction(ctx, rq.storePool, rng.SpanConfig(), rng.Descriptor()) if action == allocatorimpl.AllocatorNoop { return false } @@ -98,7 +101,7 @@ func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.Stat return } - action, _ := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, rng.SpanConfig(), rng.Descriptor()) + action, _ := rq.allocator.ComputeAction(ctx, rq.storePool, rng.SpanConfig(), rng.Descriptor()) switch action { case allocatorimpl.AllocatorConsiderRebalance: @@ -125,7 +128,7 @@ func (rq *replicateQueue) considerRebalance( ) { add, remove, _, ok := rq.allocator.RebalanceVoter( ctx, - rq.allocator.StorePool, + rq.storePool, rng.SpanConfig(), nil, /* raftStatus */ rng.Descriptor().Replicas().VoterDescriptors(), diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go index 3a0e4f3362c3..3485ca9a9225 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go @@ -129,6 +129,7 @@ func TestReplicateQueue(t *testing.T) { changer, testSettings.ReplicaChangeDelayFn(), s.MakeAllocator(store.StoreID()), + s.StorePool(store.StoreID()), start, ) s.TickClock(start) diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index b50d53901dd9..c879741898c2 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -760,7 +760,7 @@ func (s *state) NodeCountFn() storepool.NodeCountFunc { func (s *state) MakeAllocator(storeID StoreID) allocatorimpl.Allocator { return allocatorimpl.MakeAllocator( s.stores[storeID].settings, - s.stores[storeID].storepool, + s.stores[storeID].storepool.IsDeterministic(), func(addr string) (time.Duration, bool) { return 0, true }, &allocator.TestingKnobs{ AllowLeaseTransfersToReplicasNeedingSnapshots: true, @@ -768,6 +768,11 @@ func (s *state) MakeAllocator(storeID StoreID) allocatorimpl.Allocator { ) } +// StorePool returns the store pool for the given storeID. +func (s *state) StorePool(storeID StoreID) storepool.AllocatorStorePool { + return s.stores[storeID].storepool +} + // LeaseHolderReplica returns the replica which holds a lease for the range // with ID RangeID, if the range exists, otherwise returning false. func (s *state) LeaseHolderReplica(rangeID RangeID) (Replica, bool) { diff --git a/pkg/kv/kvserver/asim/state/state.go b/pkg/kv/kvserver/asim/state/state.go index 68c7eb3a51da..d8af55e83453 100644 --- a/pkg/kv/kvserver/asim/state/state.go +++ b/pkg/kv/kvserver/asim/state/state.go @@ -152,6 +152,8 @@ type State interface { // the allocator and storepool should both be separated out of this // interface, instead using it to populate themselves. MakeAllocator(StoreID) allocatorimpl.Allocator + // StorePool returns the store pool for the given storeID. + StorePool(StoreID) storepool.AllocatorStorePool // LoadSplitterFor returns the load splitter for the Store with ID StoreID. LoadSplitterFor(StoreID) LoadSplitter // RaftStatus returns the current raft status for the replica of the Range diff --git a/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel b/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel index 315c36b7168d..09fae4d064d9 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel +++ b/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator", "//pkg/kv/kvserver/allocator/allocatorimpl", + "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/op", "//pkg/kv/kvserver/asim/state", diff --git a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go index cfd8ae408a08..6e4cfae15273 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go +++ b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" @@ -79,10 +80,11 @@ func NewStoreRebalancer( storeID state.StoreID, controller op.Controller, allocator allocatorimpl.Allocator, + storePool storepool.AllocatorStorePool, settings *config.SimulationSettings, getRaftStatusFn func(replica kvserver.CandidateReplica) *raft.Status, ) StoreRebalancer { - return newStoreRebalancerControl(start, storeID, controller, allocator, settings, getRaftStatusFn) + return newStoreRebalancerControl(start, storeID, controller, allocator, storePool, settings, getRaftStatusFn) } func newStoreRebalancerControl( @@ -90,12 +92,14 @@ func newStoreRebalancerControl( storeID state.StoreID, controller op.Controller, allocator allocatorimpl.Allocator, + storePool storepool.AllocatorStorePool, settings *config.SimulationSettings, getRaftStatusFn func(replica kvserver.CandidateReplica) *raft.Status, ) *storeRebalancerControl { sr := kvserver.SimulatorStoreRebalancer( roachpb.StoreID(storeID), allocator, + storePool, getRaftStatusFn, ) diff --git a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go index f1f8be3e22fc..d997dfb98eb7 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go +++ b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go @@ -190,9 +190,10 @@ func TestStoreRebalancer(t *testing.T) { s.UpdateStorePool(testingStore, exchange.Get(state.OffsetTick(start, 1), roachpb.StoreID(testingStore))) allocator := s.MakeAllocator(testingStore) + storePool := s.StorePool(testingStore) changer := state.NewReplicaChanger() - controller := op.NewController(changer, allocator, testSettings) - src := newStoreRebalancerControl(start, testingStore, controller, allocator, testSettings, GetStateRaftStatusFn(s)) + controller := op.NewController(changer, allocator, storePool, testSettings) + src := newStoreRebalancerControl(start, testingStore, controller, allocator, storePool, testSettings, GetStateRaftStatusFn(s)) s.TickClock(start) resultsQPS := []map[state.StoreID]float64{} @@ -298,9 +299,10 @@ func TestStoreRebalancerBalances(t *testing.T) { s.UpdateStorePool(testingStore, exchange.Get(state.OffsetTick(start, 1), roachpb.StoreID(testingStore))) allocator := s.MakeAllocator(testingStore) + storePool := s.StorePool(testingStore) changer := state.NewReplicaChanger() - controller := op.NewController(changer, allocator, testSettings) - src := newStoreRebalancerControl(start, testingStore, controller, allocator, testSettings, GetStateRaftStatusFn(s)) + controller := op.NewController(changer, allocator, storePool, testSettings) + src := newStoreRebalancerControl(start, testingStore, controller, allocator, storePool, testSettings, GetStateRaftStatusFn(s)) s.TickClock(start) results := []map[state.StoreID]float64{} diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 9760248e6104..7ef521c1993f 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3223,6 +3223,8 @@ func (r *relocationArgs) finalRelocationTargets() []roachpb.ReplicationTarget { type RelocateOneOptions interface { // Allocator returns the allocator for the store this replica is on. Allocator() allocatorimpl.Allocator + // StorePool returns the store's configured store pool. + StorePool() storepool.AllocatorStorePool // SpanConfig returns the span configuration for the range with start key. SpanConfig(ctx context.Context, startKey roachpb.RKey) (roachpb.SpanConfig, error) // LeaseHolder returns the descriptor of the replica which holds the lease @@ -3239,6 +3241,11 @@ func (roo *replicaRelocateOneOptions) Allocator() allocatorimpl.Allocator { return roo.store.allocator } +// StorePool returns the store's configured store pool. +func (roo *replicaRelocateOneOptions) StorePool() storepool.AllocatorStorePool { + return roo.store.cfg.StorePool +} + // SpanConfig returns the span configuration for the range with start key. func (roo *replicaRelocateOneOptions) SpanConfig( ctx context.Context, startKey roachpb.RKey, @@ -3289,7 +3296,7 @@ func RelocateOne( } allocator := options.Allocator() - storePool := allocator.StorePool + storePool := options.StorePool() conf, err := options.SpanConfig(ctx, desc.StartKey) if err != nil { @@ -3339,7 +3346,7 @@ func RelocateOne( additionTarget, _ = allocator.AllocateTargetFromList( ctx, - allocator.StorePool, + storePool, candidateStoreList, conf, existingVoters, @@ -3408,12 +3415,12 @@ func RelocateOne( // (s1,s2,s3,s4) which is a reasonable request; that replica set is // overreplicated. If we asked it instead to remove s3 from (s1,s2,s3) it // may not want to do that due to constraints. - candidatesStoreList, _, _ := allocator.StorePool.GetStoreListForTargets( + candidatesStoreList, _, _ := storePool.GetStoreListForTargets( args.targetsToRemove(), storepool.StoreFilterNone, ) targetStore, _, err := allocator.RemoveTarget( ctx, - allocator.StorePool, + storePool, conf, candidatesStoreList, existingVoters, @@ -3683,7 +3690,7 @@ func (r *Replica) adminScatter( if args.RandomizeLeases && r.OwnsValidLease(ctx, r.store.Clock().NowAsClockTimestamp()) { desc := r.Desc() potentialLeaseTargets := r.store.allocator.ValidLeaseTargets( - ctx, r.store.allocator.StorePool, r.SpanConfig(), desc.Replicas().VoterDescriptors(), r, allocator.TransferLeaseOptions{}) + ctx, r.store.cfg.StorePool, r.SpanConfig(), desc.Replicas().VoterDescriptors(), r, allocator.TransferLeaseOptions{}) if len(potentialLeaseTargets) > 0 { newLeaseholderIdx := rand.Intn(len(potentialLeaseTargets)) targetStoreID := potentialLeaseTargets[newLeaseholderIdx].StoreID diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 424191721fd9..7db0f2f2c9c5 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -611,7 +611,7 @@ func (rq *replicateQueue) shouldQueue( ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader, ) (shouldQueue bool, priority float64) { desc, conf := repl.DescAndSpanConfig() - action, priority := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, conf, desc) + action, priority := rq.allocator.ComputeAction(ctx, rq.store.cfg.StorePool, conf, desc) if action == allocatorimpl.AllocatorNoop { log.KvDistribution.VEventf(ctx, 2, "no action to take") @@ -627,7 +627,7 @@ func (rq *replicateQueue) shouldQueue( rangeUsageInfo := rangeUsageInfoForRepl(repl) _, _, _, ok := rq.allocator.RebalanceVoter( ctx, - rq.allocator.StorePool, + rq.store.cfg.StorePool, conf, repl.RaftStatus(), voterReplicas, @@ -642,7 +642,7 @@ func (rq *replicateQueue) shouldQueue( } _, _, _, ok = rq.allocator.RebalanceNonVoter( ctx, - rq.allocator.StorePool, + rq.store.cfg.StorePool, conf, repl.RaftStatus(), voterReplicas, @@ -662,7 +662,7 @@ func (rq *replicateQueue) shouldQueue( if rq.canTransferLeaseFrom(ctx, repl) && rq.allocator.ShouldTransferLease( ctx, - rq.allocator.StorePool, + rq.store.cfg.StorePool, conf, voterReplicas, repl, @@ -983,7 +983,7 @@ func (rq *replicateQueue) processOneChange( // Update the local storepool state to reflect the successful application // of the change. - change.Op.applyImpact(rq.allocator.StorePool) + change.Op.applyImpact(rq.store.cfg.StorePool) // Requeue the replica if it meets the criteria in ShouldRequeue. return rq.ShouldRequeue(ctx, change), nil @@ -1039,7 +1039,7 @@ func (rq *replicateQueue) PlanOneChange( // unavailability; see: _ = execChangeReplicasTxn - action, allocatorPrio := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, conf, desc) + action, allocatorPrio := rq.allocator.ComputeAction(ctx, rq.store.cfg.StorePool, conf, desc) log.KvDistribution.VEventf(ctx, 1, "next replica action: %s", action) var err error @@ -1252,7 +1252,7 @@ func (rq *replicateQueue) addOrReplaceVoters( // we're removing it (i.e. dead or decommissioning). If we left the replica in // the slice, the allocator would not be guaranteed to pick a replica that // fills the gap removeRepl leaves once it's gone. - newVoter, details, err := rq.allocator.AllocateVoter(ctx, rq.allocator.StorePool, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus) + newVoter, details, err := rq.allocator.AllocateVoter(ctx, rq.store.cfg.StorePool, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus) if err != nil { return nil, err } @@ -1284,7 +1284,7 @@ func (rq *replicateQueue) addOrReplaceVoters( oldPlusNewReplicas, roachpb.ReplicaDescriptor{NodeID: newVoter.NodeID, StoreID: newVoter.StoreID}, ) - _, _, err := rq.allocator.AllocateVoter(ctx, rq.allocator.StorePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus) + _, _, err := rq.allocator.AllocateVoter(ctx, rq.store.cfg.StorePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus) if err != nil { // It does not seem possible to go to the next odd replica state. Note // that AllocateVoter returns an allocatorError (a PurgatoryError) @@ -1365,7 +1365,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( existingNonVoters := desc.Replicas().NonVoterDescriptors() effects := effectBuilder{} - newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, rq.allocator.StorePool, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus) + newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, rq.store.cfg.StorePool, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus) if err != nil { return nil, err } @@ -1488,7 +1488,7 @@ func (rq *replicateQueue) findRemoveVoter( return rq.allocator.RemoveVoter( ctx, - rq.allocator.StorePool, + rq.store.cfg.StorePool, zone, candidates, existingVoters, @@ -1531,7 +1531,7 @@ func (rq *replicateQueue) maybeTransferLeaseAwayTarget( // a replica needs to be removed for constraint violations. target := rq.allocator.TransferLeaseTarget( ctx, - rq.allocator.StorePool, + rq.store.cfg.StorePool, conf, desc.Replicas().VoterDescriptors(), repl, @@ -1611,7 +1611,7 @@ func (rq *replicateQueue) removeNonVoter( _, conf := repl.DescAndSpanConfig() removeNonVoter, details, err := rq.allocator.RemoveNonVoter( ctx, - rq.allocator.StorePool, + rq.store.cfg.StorePool, conf, existingNonVoters, existingVoters, @@ -1771,7 +1771,7 @@ func (rq *replicateQueue) considerRebalance( rangeUsageInfo := rangeUsageInfoForRepl(repl) addTarget, removeTarget, details, ok := rq.allocator.RebalanceVoter( ctx, - rq.allocator.StorePool, + rq.store.cfg.StorePool, conf, repl.RaftStatus(), existingVoters, @@ -1786,7 +1786,7 @@ func (rq *replicateQueue) considerRebalance( log.KvDistribution.Infof(ctx, "no suitable rebalance target for voters") addTarget, removeTarget, details, ok = rq.allocator.RebalanceNonVoter( ctx, - rq.allocator.StorePool, + rq.store.cfg.StorePool, conf, repl.RaftStatus(), existingVoters, @@ -1970,7 +1970,7 @@ func (rq *replicateQueue) shedLease( // so only consider the `VoterDescriptors` replicas. target := rq.allocator.TransferLeaseTarget( ctx, - rq.allocator.StorePool, + rq.store.cfg.StorePool, conf, desc.Replicas().VoterDescriptors(), repl, @@ -2047,7 +2047,7 @@ func (rq *replicateQueue) TransferLease( return errors.Wrapf(err, "%s: unable to transfer lease to s%d", rlm, target) } - rq.allocator.StorePool.UpdateLocalStoresAfterLeaseTransfer(source, target, rangeQPS) + rq.store.cfg.StorePool.UpdateLocalStoresAfterLeaseTransfer(source, target, rangeQPS) rq.lastLeaseTransfer.Store(timeutil.Now()) return nil } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b2c22defe993..bf37867ef01e 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1234,15 +1234,29 @@ func NewStore( } s.ioThreshold.t = &admissionpb.IOThreshold{} var allocatorStorePool storepool.AllocatorStorePool + var storePoolIsDeterministic bool if cfg.StorePool != nil { + // There are number of test cases that make a test store but don't add + // gossip or a store pool. So we can't rely on the existence of the + // store pool in those cases. allocatorStorePool = cfg.StorePool + storePoolIsDeterministic = allocatorStorePool.IsDeterministic() } if cfg.RPCContext != nil { - s.allocator = allocatorimpl.MakeAllocator(cfg.Settings, allocatorStorePool, cfg.RPCContext.RemoteClocks.Latency, cfg.TestingKnobs.AllocatorKnobs) + s.allocator = allocatorimpl.MakeAllocator( + cfg.Settings, + storePoolIsDeterministic, + cfg.RPCContext.RemoteClocks.Latency, + cfg.TestingKnobs.AllocatorKnobs, + ) } else { - s.allocator = allocatorimpl.MakeAllocator(cfg.Settings, allocatorStorePool, func(string) (time.Duration, bool) { - return 0, false - }, cfg.TestingKnobs.AllocatorKnobs) + s.allocator = allocatorimpl.MakeAllocator( + cfg.Settings, + storePoolIsDeterministic, + func(string) (time.Duration, bool) { + return 0, false + }, cfg.TestingKnobs.AllocatorKnobs, + ) } if s.metrics != nil { s.metrics.registry.AddMetricStruct(s.allocator.Metrics.LoadBasedLeaseTransferMetrics) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index abe243e1af7b..1615a2136ab5 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -783,7 +783,7 @@ func (s *Store) raftTickLoop(ctx context.Context) { func (s *Store) updateIOThresholdMap() { ioThresholdMap := map[roachpb.StoreID]*admissionpb.IOThreshold{} - for _, sd := range s.allocator.StorePool.GetStores() { + for _, sd := range s.cfg.StorePool.GetStores() { ioThreshold := sd.Capacity.IOThreshold // need a copy ioThresholdMap[sd.StoreID] = &ioThreshold } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 65dba708ec21..733a64401018 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -123,6 +123,7 @@ type StoreRebalancer struct { st *cluster.Settings storeID roachpb.StoreID allocator allocatorimpl.Allocator + storePool storepool.AllocatorStorePool rr RangeRebalancer replicaRankings *ReplicaRankings getRaftStatusFn func(replica CandidateReplica) *raft.Status @@ -134,6 +135,10 @@ type StoreRebalancer struct { func NewStoreRebalancer( ambientCtx log.AmbientContext, st *cluster.Settings, rq *replicateQueue, rr *ReplicaRankings, ) *StoreRebalancer { + var storePool storepool.AllocatorStorePool + if rq.store.cfg.StorePool != nil { + storePool = rq.store.cfg.StorePool + } sr := &StoreRebalancer{ AmbientContext: ambientCtx, metrics: makeStoreRebalancerMetrics(), @@ -141,6 +146,7 @@ func NewStoreRebalancer( storeID: rq.store.StoreID(), rr: rq, allocator: rq.allocator, + storePool: storePool, replicaRankings: rr, getRaftStatusFn: func(replica CandidateReplica) *raft.Status { return replica.RaftStatus() @@ -159,6 +165,7 @@ func NewStoreRebalancer( func SimulatorStoreRebalancer( storeID roachpb.StoreID, alocator allocatorimpl.Allocator, + storePool storepool.AllocatorStorePool, getRaftStatusFn func(replica CandidateReplica) *raft.Status, ) *StoreRebalancer { sr := &StoreRebalancer{ @@ -167,6 +174,7 @@ func SimulatorStoreRebalancer( st: &cluster.Settings{}, storeID: storeID, allocator: alocator, + storePool: storePool, getRaftStatusFn: getRaftStatusFn, } return sr @@ -238,7 +246,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { func (sr *StoreRebalancer) scorerOptions(ctx context.Context) *allocatorimpl.QPSScorerOptions { return &allocatorimpl.QPSScorerOptions{ StoreHealthOptions: sr.allocator.StoreHealthOptions(ctx), - Deterministic: sr.allocator.StorePool.IsDeterministic(), + Deterministic: sr.storePool.IsDeterministic(), QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&sr.st.SV), MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&sr.st.SV), } @@ -252,7 +260,7 @@ func (sr *StoreRebalancer) NewRebalanceContext( hottestRanges []CandidateReplica, rebalancingMode LBRebalancingMode, ) *RebalanceContext { - allStoresList, _, _ := sr.allocator.StorePool.GetStoreList(storepool.StoreFilterSuspect) + allStoresList, _, _ := sr.storePool.GetStoreList(storepool.StoreFilterSuspect) // Find the store descriptor for the local store. localDesc, ok := allStoresList.FindStoreByID(sr.storeID) if !ok { @@ -438,7 +446,7 @@ func (sr *StoreRebalancer) applyLeaseRebalance( // latest storepool information. After a rebalance or lease transfer the // storepool is updated. func (sr *StoreRebalancer) RefreshRebalanceContext(ctx context.Context, rctx *RebalanceContext) { - allStoresList, _, _ := sr.allocator.StorePool.GetStoreList(storepool.StoreFilterSuspect) + allStoresList, _, _ := sr.storePool.GetStoreList(storepool.StoreFilterSuspect) // Find the local descriptor in the all store list. If the store descriptor // doesn't exist, then log an error rather than just a warning. @@ -611,7 +619,7 @@ func (sr *StoreRebalancer) PostRangeRebalance( // Finally, update our local copies of the descriptors so that if // additional transfers are needed we'll be making the decisions with more // up-to-date info. - sr.allocator.StorePool.UpdateLocalStoreAfterRelocate( + sr.storePool.UpdateLocalStoreAfterRelocate( voterTargets, nonVoterTargets, oldVoters, oldNonVoters, rctx.LocalDesc.StoreID, @@ -624,7 +632,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( ctx context.Context, rctx *RebalanceContext, ) (CandidateReplica, roachpb.ReplicaDescriptor, []CandidateReplica) { var considerForRebalance []CandidateReplica - now := sr.allocator.StorePool.Clock().NowAsClockTimestamp() + now := sr.storePool.Clock().NowAsClockTimestamp() for { if len(rctx.hottestRanges) == 0 { return nil, roachpb.ReplicaDescriptor{}, considerForRebalance @@ -669,7 +677,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( candidate := sr.allocator.TransferLeaseTarget( ctx, - sr.allocator.StorePool, + sr.storePool, conf, candidates, candidateReplica, @@ -695,7 +703,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( filteredStoreList := rctx.allStoresList.ExcludeInvalid(conf.VoterConstraints) if sr.allocator.FollowTheWorkloadPrefersLocal( ctx, - sr.allocator.StorePool, + sr.storePool, filteredStoreList, rctx.LocalDesc, candidate.StoreID, @@ -738,7 +746,7 @@ type rangeRebalanceContext struct { func (sr *StoreRebalancer) chooseRangeToRebalance( ctx context.Context, rctx *RebalanceContext, ) (candidateReplica CandidateReplica, voterTargets, nonVoterTargets []roachpb.ReplicationTarget) { - now := sr.allocator.StorePool.Clock().NowAsClockTimestamp() + now := sr.storePool.Clock().NowAsClockTimestamp() if len(rctx.rebalanceCandidates) == 0 && len(rctx.hottestRanges) >= 0 { // NB: In practice, the rebalanceCandidates will be populated with // hottest ranges by the preceeding function call, rebalance leases. @@ -778,7 +786,7 @@ func (sr *StoreRebalancer) chooseRangeToRebalance( } rangeDesc, conf := candidateReplica.DescAndSpanConfig() - clusterNodes := sr.allocator.StorePool.ClusterNodeCount() + clusterNodes := sr.storePool.ClusterNodeCount() numDesiredVoters := allocatorimpl.GetNeededVoters(conf.GetNumVoters(), clusterNodes) numDesiredNonVoters := allocatorimpl.GetNeededNonVoters(numDesiredVoters, int(conf.GetNumNonVoters()), clusterNodes) if expected, actual := numDesiredVoters, len(rangeDesc.Replicas().VoterDescriptors()); expected != actual { @@ -866,7 +874,7 @@ func (sr *StoreRebalancer) chooseRangeToRebalance( // misconfiguration. validTargets := sr.allocator.ValidLeaseTargets( ctx, - sr.allocator.StorePool, + sr.storePool, rebalanceCtx.conf, targetVoterRepls, rebalanceCtx.candidateReplica, @@ -938,7 +946,7 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS( // `AdminRelocateRange` so that these decisions show up in system.rangelog add, remove, _, shouldRebalance := sr.allocator.RebalanceTarget( ctx, - sr.allocator.StorePool, + sr.storePool, rbCtx.conf, rbCtx.candidateReplica.RaftStatus(), finalVoterTargets, @@ -1003,7 +1011,7 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS( for i := 0; i < len(finalNonVoterTargets); i++ { add, remove, _, shouldRebalance := sr.allocator.RebalanceTarget( ctx, - sr.allocator.StorePool, + sr.storePool, rbCtx.conf, rbCtx.candidateReplica.RaftStatus(), finalVoterTargets, diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index e51cb22ad259..aaf66d59abf1 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -735,7 +734,7 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { for i := 0; i < numIterations; i++ { t.Run(fmt.Sprintf("%d", i+1), func(t *testing.T) { ctx := context.Background() - stopper, g, _, a, _ := allocatorimpl.CreateTestAllocator(ctx, numNodes, false /* deterministic */) + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, numNodes, false /* deterministic */) defer stopper.Stop(context.Background()) stores, actualQPSMean := randomNoLocalityStores(numNodes, qpsMultiplier) @@ -769,7 +768,7 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { sr.getRaftStatusFn = func(r CandidateReplica) *raft.Status { return TestingRaftStatusFn(r) } - storePool := a.StorePool.(*storepool.StorePool) + storePool := sp storePool.OverrideIsStoreReadyForRoutineReplicaTransferFn = func(_ context.Context, this roachpb.StoreID) bool { for _, deadStore := range deadStores { // NodeID match StoreIDs here, so this comparison is valid. @@ -1190,7 +1189,7 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - stopper, g, _, a, _ := allocatorimpl.CreateTestAllocatorWithKnobs( + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocatorWithKnobs( ctx, 10, false, /* deterministic */ @@ -1201,7 +1200,7 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { localDesc := *noLocalityStores[len(noLocalityStores)-1] cfg := TestStoreConfig(nil) cfg.Gossip = g - cfg.StorePool = a.StorePool.(*storepool.StorePool) + cfg.StorePool = sp cfg.DefaultSpanConfig.NumVoters = 1 cfg.DefaultSpanConfig.NumReplicas = 1 s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) @@ -1435,7 +1434,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - stopper, g, _, a, _ := allocatorimpl.CreateTestAllocatorWithKnobs(ctx, + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocatorWithKnobs(ctx, 10, false, /* deterministic */ &allocator.TestingKnobs{ @@ -1448,7 +1447,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g - cfg.StorePool = a.StorePool.(*storepool.StorePool) + cfg.StorePool = sp s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(noLocalityStores, t) s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} @@ -1633,13 +1632,13 @@ func TestStoreRebalancerReadAmpCheck(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { - stopper, g, _, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) cfg.Gossip = g - cfg.StorePool = a.StorePool.(*storepool.StorePool) + cfg.StorePool = sp s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(test.stores, t) s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID}