diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index b2d38985bdb6..e5b9cf363d12 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -502,8 +502,8 @@ func (a *Allocator) allocateTargetFromList( analyzedConstraints := constraint.AnalyzeConstraints( ctx, a.storePool.getStoreDescriptor, candidateReplicas, zone) candidates := allocateCandidates( - sl, analyzedConstraints, candidateReplicas, a.storePool.getLocalities(candidateReplicas), - options, + sl, analyzedConstraints, candidateReplicas, + a.storePool.getLocalitiesByStore(candidateReplicas), options, ) log.VEventf(ctx, 3, "allocate candidates: %s", candidates) if target := candidates.selectGood(a.randGen); target != nil { @@ -568,7 +568,7 @@ func (a Allocator) RemoveTarget( rankedCandidates := removeCandidates( sl, analyzedConstraints, - a.storePool.getLocalities(existingReplicas), + a.storePool.getLocalitiesByStore(existingReplicas), options, ) log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates) @@ -663,8 +663,7 @@ func (a Allocator) RebalanceTarget( sl, analyzedConstraints, existingReplicas, - a.storePool.getLocalities(existingReplicas), - a.storePool.getNodeLocalityString, + a.storePool.getLocalitiesByStore(existingReplicas), options, ) @@ -742,7 +741,7 @@ func (a Allocator) RebalanceTarget( } detailsBytes, err := json.Marshal(dDetails) if err != nil { - log.Warningf(ctx, "failed to marshal details for choosing rebalance target: %+v", err) + log.VEventf(ctx, 2, "failed to marshal details for choosing rebalance target: %+v", err) } addTarget := roachpb.ReplicationTarget{ @@ -1005,7 +1004,7 @@ func (a Allocator) shouldTransferLeaseUsingStats( if stats == nil || !enableLoadBasedLeaseRebalancing.Get(&a.storePool.st.SV) { return decideWithoutStats, roachpb.ReplicaDescriptor{} } - replicaLocalities := a.storePool.getLocalities(existing) + replicaLocalities := a.storePool.getLocalitiesByNode(existing) for _, locality := range replicaLocalities { if len(locality.Tiers) == 0 { return decideWithoutStats, roachpb.ReplicaDescriptor{} diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index a13b65eab46c..bf882745acc1 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -410,7 +410,7 @@ func allocateCandidates( sl StoreList, constraints constraint.AnalyzedConstraints, existing []roachpb.ReplicaDescriptor, - existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, options scorerOptions, ) candidateList { var candidates candidateList @@ -425,7 +425,7 @@ func allocateCandidates( if !maxCapacityCheck(s) { continue } - diversityScore := diversityAllocateScore(s, existingNodeLocalities) + diversityScore := diversityAllocateScore(s, existingStoreLocalities) balanceScore := balanceScore(sl, s.Capacity, options) var convergesScore int if options.qpsRebalanceThreshold > 0 { @@ -463,7 +463,7 @@ func allocateCandidates( func removeCandidates( sl StoreList, constraints constraint.AnalyzedConstraints, - existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, options scorerOptions, ) candidateList { var candidates candidateList @@ -478,7 +478,7 @@ func removeCandidates( }) continue } - diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities) + diversityScore := diversityRemovalScore(s.StoreID, existingStoreLocalities) balanceScore := balanceScore(sl, s.Capacity, options) var convergesScore int if !rebalanceFromConvergesOnMean(sl, s.Capacity) { @@ -522,18 +522,13 @@ func rebalanceCandidates( allStores StoreList, constraints constraint.AnalyzedConstraints, existingReplicas []roachpb.ReplicaDescriptor, - existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, - localityLookupFn func(roachpb.NodeID) string, + existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, options scorerOptions, ) []rebalanceOptions { // 1. Determine whether existing replicas are valid and/or necessary. - type existingStore struct { - cand candidate - localityStr string - } - existingStores := make(map[roachpb.StoreID]existingStore) + existingStores := make(map[roachpb.StoreID]candidate) var needRebalanceFrom bool - curDiversityScore := rangeDiversityScore(existingNodeLocalities) + curDiversityScore := rangeDiversityScore(existingStoreLocalities) for _, store := range allStores.stores { for _, repl := range existingReplicas { if store.StoreID != repl.StoreID { @@ -544,7 +539,7 @@ func rebalanceCandidates( if !valid { if !needRebalanceFrom { log.VEventf(ctx, 2, "s%d: should-rebalance(invalid): locality:%q", - store.StoreID, store.Node.Locality) + store.StoreID, store.Locality()) } needRebalanceFrom = true } @@ -555,15 +550,12 @@ func rebalanceCandidates( } needRebalanceFrom = true } - existingStores[store.StoreID] = existingStore{ - cand: candidate{ - store: store, - valid: valid, - necessary: necessary, - fullDisk: fullDisk, - diversityScore: curDiversityScore, - }, - localityStr: localityLookupFn(store.Node.NodeID), + existingStores[store.StoreID] = candidate{ + store: store, + valid: valid, + necessary: necessary, + fullDisk: fullDisk, + diversityScore: curDiversityScore, } } } @@ -599,8 +591,8 @@ func rebalanceCandidates( // include Node/Store Attributes because they affect constraints. var matchedOtherExisting bool for i, stores := range comparableStores { - if sameLocalityAndAttrs(stores.existing[0], existing.cand.store) { - comparableStores[i].existing = append(comparableStores[i].existing, existing.cand.store) + if sameLocalityAndAttrs(stores.existing[0], existing.store) { + comparableStores[i].existing = append(comparableStores[i].existing, existing.store) matchedOtherExisting = true break } @@ -610,21 +602,11 @@ func rebalanceCandidates( } var comparableCands candidateList for _, store := range allStores.stores { - // Nodes that already have a replica on one of their stores aren't valid - // rebalance targets. We do include stores that currently have a replica - // because we want them to be considered as valid stores in the - // ConvergesOnMean calculations below. This is subtle but important. - if nodeHasReplica(store.Node.NodeID, existingReplicas) && - !storeHasReplica(store.StoreID, existingReplicas) { - log.VEventf(ctx, 2, "nodeHasReplica(n%d, %v)=true", - store.Node.NodeID, existingReplicas) - continue - } constraintsOK, necessary := rebalanceFromConstraintsCheck( - store, existing.cand.store.StoreID, constraints) + store, existing.store.StoreID, constraints) maxCapacityOK := maxCapacityCheck(store) diversityScore := diversityRebalanceFromScore( - store, existing.cand.store.Node.NodeID, existingNodeLocalities) + store, existing.store.StoreID, existingStoreLocalities) cand := candidate{ store: store, valid: constraintsOK, @@ -632,15 +614,15 @@ func rebalanceCandidates( fullDisk: !maxCapacityOK, diversityScore: diversityScore, } - if !cand.less(existing.cand) { + if !cand.less(existing) { comparableCands = append(comparableCands, cand) - if !needRebalanceFrom && !needRebalanceTo && existing.cand.less(cand) { + if !needRebalanceFrom && !needRebalanceTo && existing.less(cand) { needRebalanceTo = true log.VEventf(ctx, 2, "s%d: should-rebalance(necessary/diversity=s%d): oldNecessary:%t, newNecessary:%t, "+ "oldDiversity:%f, newDiversity:%f, locality:%q", - existing.cand.store.StoreID, store.StoreID, existing.cand.necessary, cand.necessary, - existing.cand.diversityScore, cand.diversityScore, store.Node.Locality) + existing.store.StoreID, store.StoreID, existing.necessary, cand.necessary, + existing.diversityScore, cand.diversityScore, store.Locality()) } } } @@ -655,7 +637,7 @@ func rebalanceCandidates( bestStores[i] = bestCands[i].store } comparableStores = append(comparableStores, comparableStoreList{ - existing: []roachpb.StoreDescriptor{existing.cand.store}, + existing: []roachpb.StoreDescriptor{existing.store}, sl: makeStoreList(bestStores), candidates: bestCands, }) @@ -673,7 +655,7 @@ func rebalanceCandidates( outer: for _, comparable := range comparableStores { for _, existingCand := range comparable.existing { - if existing.cand.store.StoreID == existingCand.StoreID { + if existing.store.StoreID == existingCand.StoreID { sl = comparable.sl break outer } @@ -681,7 +663,7 @@ func rebalanceCandidates( } // TODO(a-robinson): Some moderate refactoring could extract this logic out // into the loop below, avoiding duplicate balanceScore calculations. - if shouldRebalance(ctx, existing.cand.store, sl, options) { + if shouldRebalance(ctx, existing.store, sl, options) { shouldRebalanceCheck = true break } @@ -705,24 +687,24 @@ func rebalanceCandidates( existingDesc, existingStores) continue } - if !existing.cand.valid { - existing.cand.details = "constraint check fail" - existingCandidates = append(existingCandidates, existing.cand) + if !existing.valid { + existing.details = "constraint check fail" + existingCandidates = append(existingCandidates, existing) continue } - balanceScore := balanceScore(comparable.sl, existing.cand.store.Capacity, options) + balanceScore := balanceScore(comparable.sl, existing.store.Capacity, options) var convergesScore int - if !rebalanceFromConvergesOnMean(comparable.sl, existing.cand.store.Capacity) { + if !rebalanceFromConvergesOnMean(comparable.sl, existing.store.Capacity) { // Similarly to in removeCandidates, any replica whose removal // would not converge the range stats to their means is given a // constraint score boost of 1 to make it less attractive for // removal. convergesScore = 1 } - existing.cand.convergesScore = convergesScore - existing.cand.balanceScore = balanceScore - existing.cand.rangeCount = int(existing.cand.store.Capacity.RangeCount) - existingCandidates = append(existingCandidates, existing.cand) + existing.convergesScore = convergesScore + existing.balanceScore = balanceScore + existing.rangeCount = int(existing.store.Capacity.RangeCount) + existingCandidates = append(existingCandidates, existing) } for _, cand := range comparable.candidates { @@ -898,7 +880,7 @@ func storeHasReplica(storeID roachpb.StoreID, existing []roachpb.ReplicaDescript } func sameLocalityAndAttrs(s1, s2 roachpb.StoreDescriptor) bool { - if !s1.Node.Locality.Equals(s2.Node.Locality) { + if !s1.Locality().Equals(s2.Locality()) { return false } if !s1.Node.Attrs.Equals(s2.Node.Attrs) { @@ -1058,14 +1040,14 @@ func constraintsCheck( // given range is. A higher score means the range is more diverse. // All below diversity-scoring methods should in theory be implemented by // calling into this one, but they aren't to avoid allocations. -func rangeDiversityScore(existingNodeLocalities map[roachpb.NodeID]roachpb.Locality) float64 { +func rangeDiversityScore(existingStoreLocalities map[roachpb.StoreID]roachpb.Locality) float64 { var sumScore float64 var numSamples int - for n1, l1 := range existingNodeLocalities { - for n2, l2 := range existingNodeLocalities { + for s1, l1 := range existingStoreLocalities { + for s2, l2 := range existingStoreLocalities { // Only compare pairs of replicas where s2 > s1 to avoid computing the // diversity score between each pair of localities twice. - if n2 <= n1 { + if s2 <= s1 { continue } sumScore += l1.DiversityScore(l2) @@ -1082,7 +1064,7 @@ func rangeDiversityScore(existingNodeLocalities map[roachpb.NodeID]roachpb.Local // desirable it would be to add a replica to store. A higher score means the // store is a better fit. func diversityAllocateScore( - store roachpb.StoreDescriptor, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + store roachpb.StoreDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { var sumScore float64 var numSamples int @@ -1090,8 +1072,8 @@ func diversityAllocateScore( // how well the new store would fit, because for any store that we might // consider adding the pairwise average diversity of the existing replicas // is the same. - for _, locality := range existingNodeLocalities { - newScore := store.Node.Locality.DiversityScore(locality) + for _, locality := range existingStoreLocalities { + newScore := store.Locality().DiversityScore(locality) sumScore += newScore numSamples++ } @@ -1106,15 +1088,15 @@ func diversityAllocateScore( // it would be to remove a node's replica of a range. A higher score indicates // that the node is a better fit (i.e. keeping it around is good for diversity). func diversityRemovalScore( - nodeID roachpb.NodeID, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + storeID roachpb.StoreID, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { var sumScore float64 var numSamples int - locality := existingNodeLocalities[nodeID] + locality := existingStoreLocalities[storeID] // We don't need to calculate the overall diversityScore for the range, because the original overall diversityScore // of this range is always the same. - for otherNodeID, otherLocality := range existingNodeLocalities { - if otherNodeID == nodeID { + for otherStoreID, otherLocality := range existingStoreLocalities { + if otherStoreID == storeID { continue } newScore := locality.DiversityScore(otherLocality) @@ -1134,16 +1116,16 @@ func diversityRemovalScore( // higher score indicates that the provided store is a better fit for the // range. func diversityRebalanceScore( - store roachpb.StoreDescriptor, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + store roachpb.StoreDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { - if len(existingNodeLocalities) == 0 { + if len(existingStoreLocalities) == 0 { return roachpb.MaxDiversityScore } var maxScore float64 // For every existing node, calculate what the diversity score would be if we // remove that node's replica to replace it with one on the provided store. - for removedNodeID := range existingNodeLocalities { - score := diversityRebalanceFromScore(store, removedNodeID, existingNodeLocalities) + for removedStoreID := range existingStoreLocalities { + score := diversityRebalanceFromScore(store, removedStoreID, existingStoreLocalities) if score > maxScore { maxScore = score } @@ -1159,24 +1141,24 @@ func diversityRebalanceScore( // range. func diversityRebalanceFromScore( store roachpb.StoreDescriptor, - fromNodeID roachpb.NodeID, - existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + fromStoreID roachpb.StoreID, + existingNodeLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { // Compute the pairwise diversity score of all replicas that will exist // after adding store and removing fromNodeID. var sumScore float64 var numSamples int - for nodeID, locality := range existingNodeLocalities { - if nodeID == fromNodeID { + for storeID, locality := range existingNodeLocalities { + if storeID == fromStoreID { continue } - newScore := store.Node.Locality.DiversityScore(locality) + newScore := store.Locality().DiversityScore(locality) sumScore += newScore numSamples++ - for otherNodeID, otherLocality := range existingNodeLocalities { + for otherStoreID, otherLocality := range existingNodeLocalities { // Only compare pairs of replicas where otherNodeID > nodeID to avoid // computing the diversity score between each pair of localities twice. - if otherNodeID <= nodeID || otherNodeID == fromNodeID { + if otherStoreID <= storeID || otherStoreID == fromStoreID { continue } newScore := locality.DiversityScore(otherLocality) diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index a59c022f1b83..941a365d3f7b 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -482,7 +482,7 @@ var ( Attrs: []string{"a"}, }, Node: roachpb.NodeDescriptor{ - NodeID: roachpb.NodeID(testStoreUSa15Dupe), + NodeID: roachpb.NodeID(testStoreUSa15), Locality: roachpb.Locality{ Tiers: testStoreTierSetup("us", "a", "1", "5"), }, @@ -1082,10 +1082,10 @@ func TestShouldRebalanceDiversity(t *testing.T) { }, } } - localityForNodeID := func(sl StoreList, id roachpb.NodeID) roachpb.Locality { + localityForNodeID := func(sl StoreList, id roachpb.StoreID) roachpb.Locality { for _, store := range sl.stores { - if store.Node.NodeID == id { - return store.Node.Locality + if store.StoreID == id { + return store.Locality() } } t.Fatalf("no locality for n%d in StoreList %+v", id, sl) @@ -1191,14 +1191,15 @@ func TestShouldRebalanceDiversity(t *testing.T) { } filteredSL := tc.sl filteredSL.stores = append([]roachpb.StoreDescriptor(nil), filteredSL.stores...) - existingNodeLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingStoreLocalities := make(map[roachpb.StoreID]roachpb.Locality) var replicas []roachpb.ReplicaDescriptor for _, nodeID := range tc.existingNodeIDs { + storeID := roachpb.StoreID(nodeID) replicas = append(replicas, roachpb.ReplicaDescriptor{ NodeID: nodeID, - StoreID: roachpb.StoreID(nodeID), + StoreID: storeID, }) - existingNodeLocalities[nodeID] = localityForNodeID(tc.sl, nodeID) + existingStoreLocalities[storeID] = localityForNodeID(tc.sl, storeID) // For the sake of testing, remove all other existing stores from the // store list to only test whether we want to remove the replica on tc.s. if nodeID != tc.s.Node.NodeID { @@ -1211,11 +1212,7 @@ func TestShouldRebalanceDiversity(t *testing.T) { filteredSL, constraint.AnalyzedConstraints{}, replicas, - existingNodeLocalities, - func(nodeID roachpb.NodeID) string { - locality := localityForNodeID(tc.sl, nodeID) - return locality.String() - }, + existingStoreLocalities, options) actual := len(targets) > 0 if actual != tc.expected { @@ -1254,17 +1251,17 @@ func TestAllocateDiversityScore(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - existingNodeLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingStoreLocalities := make(map[roachpb.StoreID]roachpb.Locality) for _, s := range tc.stores { - existingNodeLocalities[testStores[s].Node.NodeID] = testStores[s].Node.Locality + existingStoreLocalities[testStores[s].StoreID] = testStores[s].Locality() } var scores storeScores for _, s := range testStores { - if _, ok := existingNodeLocalities[s.Node.NodeID]; ok { + if _, ok := existingStoreLocalities[s.StoreID]; ok { continue } var score storeScore - actualScore := diversityAllocateScore(s, existingNodeLocalities) + actualScore := diversityAllocateScore(s, existingStoreLocalities) score.storeID = s.StoreID score.score = actualScore scores = append(scores, score) @@ -1329,17 +1326,17 @@ func TestRebalanceToDiversityScore(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - existingNodeLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingStoreLocalities := make(map[roachpb.StoreID]roachpb.Locality) for _, s := range tc.stores { - existingNodeLocalities[testStores[s].Node.NodeID] = testStores[s].Node.Locality + existingStoreLocalities[testStores[s].StoreID] = testStores[s].Locality() } var scores storeScores for _, s := range testStores { - if _, ok := existingNodeLocalities[s.Node.NodeID]; ok { + if _, ok := existingStoreLocalities[s.StoreID]; ok { continue } var score storeScore - actualScore := diversityRebalanceScore(s, existingNodeLocalities) + actualScore := diversityRebalanceScore(s, existingStoreLocalities) score.storeID = s.StoreID score.score = actualScore scores = append(scores, score) @@ -1400,15 +1397,15 @@ func TestRemovalDiversityScore(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - existingNodeLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingStoreLocalities := make(map[roachpb.StoreID]roachpb.Locality) for _, s := range tc.stores { - existingNodeLocalities[testStores[s].Node.NodeID] = testStores[s].Node.Locality + existingStoreLocalities[testStores[s].StoreID] = testStores[s].Locality() } var scores storeScores for _, storeID := range tc.stores { s := testStores[storeID] var score storeScore - actualScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities) + actualScore := diversityRemovalScore(s.StoreID, existingStoreLocalities) score.storeID = s.StoreID score.score = actualScore scores = append(scores, score) @@ -1436,19 +1433,19 @@ func TestDiversityScoreEquivalence(t *testing.T) { {[]roachpb.StoreID{testStoreUSa15}, 1.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe}, 0.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1}, 0.25}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSb}, 0.5}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSb}, 2.0 / 3.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreEurope}, 1.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1}, 1.0 / 6.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSb}, 1.0 / 3.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSb}, 4.0 / 9.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreEurope}, 2.0 / 3.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreUSb}, 5.0 / 12.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreUSb}, 19.0 / 36.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreEurope}, 3.0 / 4.0}, - {[]roachpb.StoreID{testStoreUSa1, testStoreUSb, testStoreEurope}, 5.0 / 6.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb}, 1.0 / 3.0}, + {[]roachpb.StoreID{testStoreUSa1, testStoreUSb, testStoreEurope}, 8.0 / 9.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb}, 5.0 / 12.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreEurope}, 7.0 / 12.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSb, testStoreEurope}, 2.0 / 3.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreUSb, testStoreEurope}, 17.0 / 24.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb, testStoreEurope}, 3.0 / 5.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSb, testStoreEurope}, 26.0 / 36.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreUSb, testStoreEurope}, 55.0 / 72.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb, testStoreEurope}, 13.0 / 20.0}, } // Ensure that rangeDiversityScore and diversityRebalanceFromScore return @@ -1457,27 +1454,27 @@ func TestDiversityScoreEquivalence(t *testing.T) { // diversityAllocateScore and diversityRemovalScore as of their initial // creation or else we would test them here as well. for _, tc := range testCases { - existingLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingLocalities := make(map[roachpb.StoreID]roachpb.Locality) for _, storeID := range tc.stores { s := testStores[storeID] - existingLocalities[s.Node.NodeID] = s.Node.Locality + existingLocalities[s.StoreID] = s.Locality() } rangeScore := rangeDiversityScore(existingLocalities) - if a, e := rangeScore, tc.expected; a != e { + if a, e := rangeScore, tc.expected; !scoresAlmostEqual(a, e) { t.Errorf("rangeDiversityScore(%v) got %f, want %f", existingLocalities, a, e) } for _, storeID := range tc.stores { s := testStores[storeID] - fromNodeID := s.Node.NodeID - s.Node.NodeID = 99 - rebalanceScore := diversityRebalanceFromScore(s, fromNodeID, existingLocalities) - if a, e := rebalanceScore, tc.expected; a != e { + fromStoreID := s.StoreID + s.StoreID = 99 + rebalanceScore := diversityRebalanceFromScore(s, fromStoreID, existingLocalities) + if a, e := rebalanceScore, tc.expected; !scoresAlmostEqual(a, e) { t.Errorf("diversityRebalanceFromScore(%v, %d, %v) got %f, want %f", - s, fromNodeID, existingLocalities, a, e) + s, fromStoreID, existingLocalities, a, e) } - if a, e := rebalanceScore, rangeScore; a != e { + if a, e := rebalanceScore, rangeScore; !scoresAlmostEqual(a, e) { t.Errorf("diversityRebalanceFromScore(%v, %d, %v)=%f not equal to rangeDiversityScore(%v)=%f", - s, fromNodeID, existingLocalities, a, existingLocalities, e) + s, fromStoreID, existingLocalities, a, existingLocalities, e) } } } diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 7b5b40aa316a..c793adf8de63 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -550,28 +550,32 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(stores, t) testCases := []struct { - existing []roachpb.ReplicaDescriptor - expectTarget bool + existing []roachpb.ReplicaDescriptor + expectTargetAllocate bool + expectTargetRebalance bool }{ { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 2}, {NodeID: 2, StoreID: 3}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 2}, {NodeID: 3, StoreID: 6}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ @@ -579,7 +583,8 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { {NodeID: 2, StoreID: 3}, {NodeID: 3, StoreID: 5}, }, - expectTarget: false, + expectTargetAllocate: false, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ @@ -587,7 +592,8 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { {NodeID: 2, StoreID: 4}, {NodeID: 3, StoreID: 6}, }, - expectTarget: false, + expectTargetAllocate: false, + expectTargetRebalance: false, }, } @@ -598,9 +604,9 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), tc.existing, ) - if e, a := tc.expectTarget, result != nil; e != a { + if e, a := tc.expectTargetAllocate, result != nil; e != a { t.Errorf("AllocateTarget(%v) got target %v, err %v; expectTarget=%v", - tc.existing, result, err, tc.expectTarget) + tc.existing, result, err, tc.expectTargetAllocate) } } @@ -614,14 +620,118 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { rangeUsageInfo, storeFilterThrottled, ) - if e, a := tc.expectTarget, ok; e != a { + if e, a := tc.expectTargetRebalance, ok; e != a { t.Errorf("RebalanceTarget(%v) got target %v, details %v; expectTarget=%v", - tc.existing, target, details, tc.expectTarget) + tc.existing, target, details, tc.expectTargetRebalance) } } } } +func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + store1 := roachpb.StoreDescriptor{ + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + } + store2 := roachpb.StoreDescriptor{ + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 0}, + } + + // We start out with 40 ranges on 3 nodes and 3 stores, we then add a new store + // on Node 1 and try to rebalance all the ranges. What we want to see happen + // is an equilibrium where 20 ranges move from Store 1 to Store 2. + stores := []*roachpb.StoreDescriptor{ + &store1, + &store2, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + }, + } + + ranges := make([]roachpb.RangeDescriptor, 40) + for i := 0; i < 40; i++ { + ranges[i] = roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 3}, + {NodeID: 3, StoreID: 4}, + }, + } + } + + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(context.Background()) + storeGossiper := gossiputil.NewStoreGossiper(g) + storeGossiper.GossipStores(stores, t) + + // We run through all the ranges once to get the cluster to balance. + // After that we should not be seeing replicas move. + var rangeUsageInfo RangeUsageInfo + for i := 1; i < 40; i++ { + add, remove, _, ok := a.RebalanceTarget( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + nil, /* raftStatus */ + ranges[i].InternalReplicas, + rangeUsageInfo, + storeFilterThrottled, + ) + if ok { + // Update the descriptor. + newReplicas := make([]roachpb.ReplicaDescriptor, 0, len(ranges[i].InternalReplicas)) + for _, repl := range ranges[i].InternalReplicas { + if remove.StoreID != repl.StoreID { + newReplicas = append(newReplicas, repl) + } + } + newReplicas = append(newReplicas, roachpb.ReplicaDescriptor{ + StoreID: add.StoreID, + NodeID: add.NodeID, + }) + ranges[i].InternalReplicas = newReplicas + + for _, store := range stores { + if store.StoreID == add.StoreID { + store.Capacity.RangeCount = store.Capacity.RangeCount + 1 + } else if store.StoreID == remove.StoreID { + store.Capacity.RangeCount = store.Capacity.RangeCount - 1 + } + } + storeGossiper.GossipStores(stores, t) + } + } + + // Verify that the stores are reasonably balanced. + require.True(t, math.Abs(float64( + store1.Capacity.RangeCount-store2.Capacity.RangeCount)) <= minRangeRebalanceThreshold*2) + // We dont expect any range wanting to move since the system should have + // reached a stable state at this point. + for i := 1; i < 40; i++ { + _, _, _, ok := a.RebalanceTarget( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + nil, /* raftStatus */ + ranges[i].InternalReplicas, + rangeUsageInfo, + storeFilterThrottled, + ) + require.False(t, ok) + } +} + // TestAllocatorRebalance verifies that rebalance targets are chosen // randomly from amongst stores under the maxFractionUsedThreshold. func TestAllocatorRebalance(t *testing.T) { @@ -2559,7 +2669,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { sl, analyzed, existingRepls, - a.storePool.getLocalities(existingRepls), + a.storePool.getLocalitiesByStore(existingRepls), a.scorerOptions(), ) best := candidates.best() @@ -2782,7 +2892,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { candidates := removeCandidates( sl, analyzed, - a.storePool.getLocalities(existingRepls), + a.storePool.getLocalitiesByStore(existingRepls), a.scorerOptions(), ) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { @@ -3579,8 +3689,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { sl, analyzed, existingRepls, - a.storePool.getLocalities(existingRepls), - a.storePool.getNodeLocalityString, + a.storePool.getLocalitiesByStore(existingRepls), a.scorerOptions(), ) match := true @@ -5477,9 +5586,9 @@ func TestAllocatorRebalanceAway(t *testing.T) { } existingReplicas := []roachpb.ReplicaDescriptor{ - {StoreID: stores[0].StoreID}, - {StoreID: stores[1].StoreID}, - {StoreID: stores[2].StoreID}, + {StoreID: stores[0].StoreID, NodeID: stores[0].Node.NodeID}, + {StoreID: stores[1].StoreID, NodeID: stores[1].Node.NodeID}, + {StoreID: stores[2].StoreID, NodeID: stores[2].Node.NodeID}, } testCases := []struct { constraint zonepb.Constraint diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 22d13a2ccfe1..2e2780a86162 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1125,50 +1125,95 @@ func validateReplicationChanges( ) error { // First make sure that the changes don't self-overlap (i.e. we're not adding // a replica twice, or removing and immediately re-adding it). - byNodeID := make(map[roachpb.NodeID]roachpb.ReplicationChange, len(chgs)) + byNodeAndStoreID := make(map[roachpb.NodeID]map[roachpb.StoreID]roachpb.ReplicationChange, len(chgs)) for _, chg := range chgs { - if _, ok := byNodeID[chg.Target.NodeID]; ok { - return fmt.Errorf("changes %+v refer to n%d twice", chgs, chg.Target.NodeID) + byStoreID, ok := byNodeAndStoreID[chg.Target.NodeID] + if !ok { + byStoreID = make(map[roachpb.StoreID]roachpb.ReplicationChange) + byNodeAndStoreID[chg.Target.NodeID] = byStoreID + } else { + // The only operation that is allowed within a node is an Add/Remove. + for _, prevChg := range byStoreID { + if prevChg.ChangeType == chg.ChangeType { + return fmt.Errorf("changes %+v refer to n%d twice for change %v", + chgs, chg.Target.NodeID, chg.ChangeType) + } + if prevChg.ChangeType != roachpb.ADD_REPLICA { + return fmt.Errorf("can only add-remove a replica within a node, but got %+v", chgs) + } + } } - byNodeID[chg.Target.NodeID] = chg + if _, ok := byStoreID[chg.Target.StoreID]; ok { + return fmt.Errorf("changes %+v refer to n%d and s%d twice", chgs, + chg.Target.NodeID, chg.Target.StoreID) + } + byStoreID[chg.Target.StoreID] = chg } // Then, check that we're not adding a second replica on nodes that already - // have one, or "re-add" an existing replica. We delete from byNodeID so that - // after this loop, it contains only StoreIDs that we haven't seen in desc. + // have one, or "re-add" an existing replica. We delete from byNodeAndStoreID so that + // after this loop, it contains only Nodes that we haven't seen in desc. for _, rDesc := range desc.Replicas().All() { - chg, ok := byNodeID[rDesc.NodeID] - delete(byNodeID, rDesc.NodeID) - if !ok || chg.ChangeType != roachpb.ADD_REPLICA { + byStoreID, ok := byNodeAndStoreID[rDesc.NodeID] + if !ok { continue } - // We're adding a replica that's already there. This isn't allowed, even - // when the newly added one would be on a different store. - if rDesc.StoreID != chg.Target.StoreID { - return errors.Errorf("unable to add replica %v; node already has a replica in %s", chg.Target.StoreID, desc) + delete(byNodeAndStoreID, rDesc.NodeID) + if len(byStoreID) == 2 { + chg, k := byStoreID[rDesc.StoreID] + // We should be removing the replica from the existing store during a + // rebalance within the node. + if !k || chg.ChangeType != roachpb.REMOVE_REPLICA { + return errors.Errorf( + "Expected replica to be removed from %v during a lateral rebalance %v within the node.", rDesc, chgs) + } + continue } + chg, ok := byStoreID[rDesc.StoreID] + // There are two valid conditions here: + // (1) removal of an existing store. + // (2) add on the node, when we only have one replica. + // See https://github.com/cockroachdb/cockroach/issues/40333. + if ok { + if chg.ChangeType == roachpb.REMOVE_REPLICA { + continue + } + // Looks like we found a replica with the same store and node id. If the + // replica is already a learner, then either some previous leaseholder was + // trying to add it with the learner+snapshot+voter cycle and got + // interrupted or else we hit a race between the replicate queue and + // AdminChangeReplicas. + if rDesc.GetType() == roachpb.LEARNER { + return errors.Errorf( + "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + } - // Looks like we found a replica with the same store and node id. If the - // replica is already a learner, then either some previous leaseholder was - // trying to add it with the learner+snapshot+voter cycle and got - // interrupted or else we hit a race between the replicate queue and - // AdminChangeReplicas. - if rDesc.GetType() == roachpb.LEARNER { - return errors.Errorf( - "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + // Otherwise, we already had a full voter replica. Can't add another to + // this store. + return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) } - // Otherwise, we already had a full voter replica. Can't add another to - // this store. - return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) + for _, c := range byStoreID { + // We're adding a replica that's already there. This isn't allowed, even + // when the newly added one would be on a different store. + if c.ChangeType == roachpb.ADD_REPLICA { + if len(desc.Replicas().All()) > 1 { + return errors.Errorf("unable to add replica %v; node already has a replica in %s", c.Target.StoreID, desc) + } + } else { + return errors.Errorf("removing %v which is not in %s", c.Target, desc) + } + } } // Any removals left in the map now refer to nonexisting replicas, and we refuse them. - for _, chg := range byNodeID { - if chg.ChangeType != roachpb.REMOVE_REPLICA { - continue + for _, byStoreID := range byNodeAndStoreID { + for _, chg := range byStoreID { + if chg.ChangeType != roachpb.REMOVE_REPLICA { + continue + } + return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } - return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } return nil } diff --git a/pkg/kv/kvserver/replica_command_test.go b/pkg/kv/kvserver/replica_command_test.go index 75bb0c030787..48328d6ea408 100644 --- a/pkg/kv/kvserver/replica_command_test.go +++ b/pkg/kv/kvserver/replica_command_test.go @@ -91,3 +91,142 @@ func TestRangeDescriptorUpdateProtoChangedAcrossVersions(t *testing.T) { t.Fatal(err) } } + +func TestValidateReplicationChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + + learnerType := roachpb.LEARNER + desc := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 3, StoreID: 3}, + {NodeID: 4, StoreID: 4, Type: &learnerType}, + }, + } + + // Test Case 1: Add a new replica to another node. + err := validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}}, + }) + require.NoError(t, err) + + // Test Case 2: Remove a replica from an existing node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 3: Remove a replica from wrong node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}}, + }) + require.Regexp(t, "removing n2,s2 which is not in", err) + + // Test Case 4: Remove a replica from wrong store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "removing n1,s2 which is not in", err) + + // Test Case 5: Re-balance a replica within a store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 6: Re-balance a replica within a store, but attempt remove from + // the wrong one. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 3}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "Expected replica to be removed from", err) + + // Test Case 7: Add replica to same node and store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.Regexp(t, "unable to add replica n1,s1 which is already present", err) + + // Test Case 8: Add replica to same node and different store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "unable to add replica 2", err) + + // Test Case 9: Try to rebalance a replica on the same node, but also add an extra. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 3}}, + }) + require.Regexp(t, "can only add-remove a replica within a node", err) + + // Test Case 10: Try to add twice to the same node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 5}}, + }) + require.Regexp(t, "refer to n4 twice for change ADD_REPLICA", err) + + // Test Case 11: Try to remove twice to the same node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "refer to n1 twice for change REMOVE_REPLICA", err) + + // Test Case 12: Try to add where there is already a learner. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 5}}, + }) + require.Error(t, err) + + // Test Case 13: Add/Remove multiple replicas. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 5, StoreID: 5}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 6, StoreID: 6}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}}, + }) + require.NoError(t, err) + + // Test Case 14: We are rebalancing within a node and do a remove. + descRebalancing := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 2}, + {NodeID: 1, StoreID: 2, Type: &learnerType}, + }, + } + err = validateReplicationChanges(descRebalancing, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 15: Do an add while rebalancing within a node + err = validateReplicationChanges(descRebalancing, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}}, + }) + require.NoError(t, err) + + // Test Case 16: Remove/Add within a node is not allowed, since we expect Add/Remove + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "can only add-remove a replica within a node, but got ", err) + + // Test Case 17: We are rebalancing within a node and have only one replica + descSingle := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + }, + } + err = validateReplicationChanges(descSingle, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.NoError(t, err) +} diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index d4582205e695..bc8b433b0fc2 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -6432,6 +6432,12 @@ func TestChangeReplicasDuplicateError(t *testing.T) { defer stopper.Stop(context.Background()) tc.Start(t, stopper) + // We now allow adding a replica to the same node, to support rebalances + // within the same node when replication is 1x, so add another replica to the + // range descriptor to avoid this case. + if _, err := tc.addBogusReplicaToRangeDesc(context.Background()); err != nil { + t.Fatalf("Unexpected error %v", err) + } chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{ NodeID: tc.store.Ident.NodeID, StoreID: 9999, diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index ccf3e133edc8..4a3b25b79a7e 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -757,10 +757,34 @@ func (sp *StorePool) throttle(reason throttleReason, why string, storeID roachpb } } -// getLocalities returns the localities for the provided replicas. +// getLocalitiesByStore returns the localities for the provided replicas. In +// this case we consider the node part of the failure domain and add it to +// the locality data. +func (sp *StorePool) getLocalitiesByStore( + replicas []roachpb.ReplicaDescriptor, +) map[roachpb.StoreID]roachpb.Locality { + sp.localitiesMu.RLock() + defer sp.localitiesMu.RUnlock() + localities := make(map[roachpb.StoreID]roachpb.Locality) + for _, replica := range replicas { + nodeTier := roachpb.Tier{Key: "node", Value: replica.NodeID.String()} + if locality, ok := sp.localitiesMu.nodeLocalities[replica.NodeID]; ok { + localities[replica.StoreID] = locality.locality.AddTier(nodeTier) + } else { + localities[replica.StoreID] = roachpb.Locality{ + Tiers: []roachpb.Tier{nodeTier}, + } + } + } + return localities +} + +// getLocalitiesByNode returns the localities for the provided replicas. In this +// case we only consider the locality by node, where the node itself is not +// part of the failure domain. // TODO(bram): consider storing a full list of all node to node diversity // scores for faster lookups. -func (sp *StorePool) getLocalities( +func (sp *StorePool) getLocalitiesByNode( replicas []roachpb.ReplicaDescriptor, ) map[roachpb.NodeID]roachpb.Locality { sp.localitiesMu.RLock() diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index adaf281e18c0..ca0ac4cf6258 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) var uniqueStore = []*roachpb.StoreDescriptor{ @@ -837,7 +838,7 @@ func TestGetLocalities(t *testing.T) { createDescWithLocality := func(tierCount int) roachpb.NodeDescriptor { return roachpb.NodeDescriptor{ NodeID: roachpb.NodeID(tierCount), - Locality: createLocality(tierCount), + Locality: createLocality(tierCount - 1), } } @@ -864,22 +865,30 @@ func TestGetLocalities(t *testing.T) { var existingReplicas []roachpb.ReplicaDescriptor for _, store := range stores { - existingReplicas = append(existingReplicas, roachpb.ReplicaDescriptor{NodeID: store.Node.NodeID}) + existingReplicas = append(existingReplicas, + roachpb.ReplicaDescriptor{ + NodeID: store.Node.NodeID, + StoreID: store.StoreID, + }, + ) } - localities := sp.getLocalities(existingReplicas) + localitiesByStore := sp.getLocalitiesByStore(existingReplicas) + localitiesByNode := sp.getLocalitiesByNode(existingReplicas) for _, store := range stores { + storeID := store.StoreID nodeID := store.Node.NodeID - locality, ok := localities[nodeID] + localityByStore, ok := localitiesByStore[storeID] if !ok { - t.Fatalf("could not find locality for node %d", nodeID) - } - if e, a := int(nodeID), len(locality.Tiers); e != a { - t.Fatalf("for node %d, expected %d tiers, only got %d", nodeID, e, a) - } - if e, a := createLocality(int(nodeID)).String(), sp.getNodeLocalityString(nodeID); e != a { - t.Fatalf("for getNodeLocalityString(%d), expected %q, got %q", nodeID, e, a) + t.Fatalf("could not find locality for store %d", storeID) } + localityByNode, ok := localitiesByNode[nodeID] + require.Truef(t, ok, "could not find locality for node %d", nodeID) + require.Equal(t, int(nodeID), len(localityByStore.Tiers)) + require.Equal(t, localityByStore.Tiers[len(localityByStore.Tiers)-1], + roachpb.Tier{Key: "node", Value: nodeID.String()}) + require.Equal(t, int(nodeID)-1, len(localityByNode.Tiers)) + require.Equal(t, createLocality(int(nodeID)-1).String(), sp.getNodeLocalityString(nodeID)) } } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 0fc25b15b30b..1219d8c1024b 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -509,7 +509,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( // Check the range's existing diversity score, since we want to ensure we // don't hurt locality diversity just to improve QPS. curDiversity := rangeDiversityScore( - sr.rq.allocator.storePool.getLocalities(currentReplicas)) + sr.rq.allocator.storePool.getLocalitiesByStore(currentReplicas)) // Check the existing replicas, keeping around those that aren't overloaded. for i := range currentReplicas { @@ -579,7 +579,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( desc.RangeID, len(targets), desiredReplicas) continue } - newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalities(targetReplicas)) + newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalitiesByStore(targetReplicas)) if newDiversity < curDiversity { log.VEventf(ctx, 3, "new diversity %.2f for r%d worse than current diversity %.2f; not rebalancing", diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 638de8524814..167d0ad9e2e0 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -659,3 +659,21 @@ var DefaultLocationInformation = []struct { Longitude: "3.81886", }, } + +// Locality returns the locality of the Store, which is the Locality of the node +// plus an extra tier for the node itself. +func (s StoreDescriptor) Locality() Locality { + return s.Node.Locality.AddTier( + Tier{Key: "node", Value: s.Node.NodeID.String()}) +} + +// AddTier creates a new Locality with a Tier at the end. +func (l Locality) AddTier(tier Tier) Locality { + if len(l.Tiers) > 0 { + tiers := make([]Tier, len(l.Tiers), len(l.Tiers)+1) + copy(tiers, l.Tiers) + tiers = append(tiers, tier) + return Locality{Tiers: tiers} + } + return Locality{Tiers: []Tier{tier}} +} diff --git a/pkg/roachpb/metadata_test.go b/pkg/roachpb/metadata_test.go index 76f12b182f95..dc9300fac542 100644 --- a/pkg/roachpb/metadata_test.go +++ b/pkg/roachpb/metadata_test.go @@ -268,3 +268,15 @@ func TestDiversityScore(t *testing.T) { }) } } + +func TestAddTier(t *testing.T) { + l1 := Locality{} + l2 := Locality{ + Tiers: []Tier{{Key: "foo", Value: "bar"}}, + } + l3 := Locality{ + Tiers: []Tier{{Key: "foo", Value: "bar"}, {Key: "bar", Value: "foo"}}, + } + require.Equal(t, l2, l1.AddTier(Tier{Key: "foo", Value: "bar"})) + require.Equal(t, l3, l2.AddTier(Tier{Key: "bar", Value: "foo"})) +} diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index d52ab46d5f0b..fb5b1b4a3b61 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -329,6 +329,13 @@ func startServer(t *testing.T) *TestServer { base.DefaultTestStoreSpec, base.DefaultTestStoreSpec, }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Now that we allow same node rebalances, disable it in these tests, + // as they dont expect replicas to move. + DisableReplicaRebalancing: true, + }, + }, }) ts := tsI.(*TestServer)