diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index b2d38985bdb6..8857af55e3b9 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -526,7 +526,8 @@ func (a Allocator) simulateRemoveTarget( candidates []roachpb.ReplicaDescriptor, existingReplicas []roachpb.ReplicaDescriptor, rangeUsageInfo RangeUsageInfo, -) (roachpb.ReplicaDescriptor, string, error) { + storeFilter roachpb.StoreDescriptorFilter, +) (*candidate, string, error) { // Update statistics first // TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines, // but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue @@ -537,7 +538,7 @@ func (a Allocator) simulateRemoveTarget( a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.REMOVE_REPLICA) }() log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore) - return a.RemoveTarget(ctx, zone, candidates, existingReplicas) + return a.doRemoveTarget(ctx, zone, candidates, existingReplicas, storeFilter) } // RemoveTarget returns a suitable replica to remove from the provided replica @@ -551,41 +552,16 @@ func (a Allocator) RemoveTarget( candidates []roachpb.ReplicaDescriptor, existingReplicas []roachpb.ReplicaDescriptor, ) (roachpb.ReplicaDescriptor, string, error) { - if len(candidates) == 0 { - return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()") - } - - // Retrieve store descriptors for the provided candidates from the StorePool. - existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates)) - for i, exist := range candidates { - existingStoreIDs[i] = exist.StoreID + removeCandidate, removeDetails, err := a.doRemoveTarget(ctx, zone, candidates, + existingReplicas, roachpb.NoOpStoreDescriptorFilter{}) + if err != nil { + return roachpb.ReplicaDescriptor{}, removeDetails, err } - sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone) - - analyzedConstraints := constraint.AnalyzeConstraints( - ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) - options := a.scorerOptions() - rankedCandidates := removeCandidates( - sl, - analyzedConstraints, - a.storePool.getLocalities(existingReplicas), - options, - ) - log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates) - if bad := rankedCandidates.selectBad(a.randGen); bad != nil { - for _, exist := range existingReplicas { - if exist.StoreID == bad.store.StoreID { - log.VEventf(ctx, 3, "remove target: %s", bad) - details := decisionDetails{Target: bad.compactString(options)} - detailsBytes, err := json.Marshal(details) - if err != nil { - log.Warningf(ctx, "failed to marshal details for choosing remove target: %+v", err) - } - return exist, string(detailsBytes), nil - } + for _, exist := range existingReplicas { + if exist.StoreID == removeCandidate.store.StoreID { + return exist, removeDetails, nil } } - return roachpb.ReplicaDescriptor{}, "", errors.New("could not select an appropriate replica to be removed") } @@ -675,14 +651,13 @@ func (a Allocator) RebalanceTarget( // pretty sure we won't want to remove immediately after adding it. // If we would, we don't want to actually rebalance to that target. var target *candidate - var removeReplica roachpb.ReplicaDescriptor + var removalTarget *candidate var existingCandidates candidateList for { target, existingCandidates = bestRebalanceTarget(a.randGen, results) if target == nil { return zero, zero, "", false } - // Add a fake new replica to our copy of the range descriptor so that we can // simulate the removal logic. If we decide not to go with this target, note // that this needs to be removed from desc before we try any other target. @@ -710,23 +685,26 @@ func (a Allocator) RebalanceTarget( return zero, zero, "", false } - var removeDetails string var err error - removeReplica, removeDetails, err = a.simulateRemoveTarget( + var removeDetails string + var storeFilter roachpb.StoreDescriptorFilter = roachpb.NoOpStoreDescriptorFilter{} + if isSameNodeRebalanceAttempt(existingPlusOneNew, target) { + storeFilter = roachpb.SameNodeStoreDescriptorFilter{NodeID: target.store.Node.NodeID} + } + removalTarget, removeDetails, err = a.simulateRemoveTarget( ctx, target.store.StoreID, zone, replicaCandidates, existingPlusOneNew, rangeUsageInfo, + storeFilter, ) if err != nil { log.Warningf(ctx, "simulating RemoveTarget failed: %+v", err) return zero, zero, "", false } - if target.store.StoreID != removeReplica.StoreID { - // Successfully populated these variables - _, _ = target, removeReplica + if target.store.StoreID != removalTarget.store.StoreID && removalTarget.less(*target) { break } @@ -750,8 +728,8 @@ func (a Allocator) RebalanceTarget( StoreID: target.store.StoreID, } removeTarget := roachpb.ReplicationTarget{ - NodeID: removeReplica.NodeID, - StoreID: removeReplica.StoreID, + NodeID: removalTarget.store.Node.NodeID, + StoreID: removalTarget.store.StoreID, } return addTarget, removeTarget, string(detailsBytes), true } @@ -1378,3 +1356,63 @@ func maxReplicaID(replicas []roachpb.ReplicaDescriptor) roachpb.ReplicaID { } return max } + +// doRemoveTarget returns a suitable replica to remove from the provided replica +// set. It first attempts to randomly select a target from the set of stores +// that have greater than the average number of replicas. Failing that, it +// falls back to selecting a random target from any of the existing +// replicas. +func (a Allocator) doRemoveTarget( + ctx context.Context, + zone *zonepb.ZoneConfig, + candidates []roachpb.ReplicaDescriptor, + existingReplicas []roachpb.ReplicaDescriptor, + storeFilter roachpb.StoreDescriptorFilter, +) (*candidate, string, error) { + if len(candidates) == 0 { + return nil, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()") + } + + // Retrieve store descriptors for the provided candidates from the StorePool. + existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates)) + for i, exist := range candidates { + existingStoreIDs[i] = exist.StoreID + } + sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone) + + analyzedConstraints := constraint.AnalyzeConstraints( + ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) + options := a.scorerOptions() + rankedCandidates := removeCandidates( + sl, + analyzedConstraints, + a.storePool.getLocalities(existingReplicas), + options, + storeFilter, + ) + log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates) + if bad := rankedCandidates.selectBad(a.randGen); bad != nil { + log.VEventf(ctx, 3, "remove target: %s", bad) + details := decisionDetails{Target: bad.compactString(options)} + detailsBytes, err := json.Marshal(details) + if err != nil { + log.Warningf(ctx, "failed to marshal details for choosing remove target: %+v", err) + } + return bad, string(detailsBytes), nil + } + + return nil, "", errors.New("could not select an appropriate replica to be removed") +} + +// isSameNodeRebalanceAttempt returns true if the target candidate is on the +//same node as an existing replica. +func isSameNodeRebalanceAttempt( + existingReplicas []roachpb.ReplicaDescriptor, target *candidate, +) bool { + for _, repl := range existingReplicas { + if target.store.Node.NodeID == repl.NodeID && target.store.StoreID != repl.StoreID { + return true + } + } + return false +} diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index a13b65eab46c..23f3c8d5fbd1 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -209,6 +209,18 @@ func (c candidate) compare(o candidate) float64 { if c.rangeCount == 0 && o.rangeCount == 0 { return 0 } + // For same node rebalances we need to manually consider the + // minRangeRebalanceThreshold, since in lopsided clusters their under/over + // replication is not properly captured in the balanceScore. + if c.store.Node.NodeID == o.store.Node.NodeID && c.store.Node.NodeID != 0 { + if c.rangeCount < o.rangeCount { + if o.rangeCount-c.rangeCount <= minRangeRebalanceThreshold { + return 0 + } + } else if c.rangeCount-o.rangeCount <= minRangeRebalanceThreshold { + return 0 + } + } if c.rangeCount < o.rangeCount { return float64(o.rangeCount-c.rangeCount) / float64(o.rangeCount) } @@ -465,6 +477,7 @@ func removeCandidates( constraints constraint.AnalyzedConstraints, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, options scorerOptions, + storeFilter roachpb.StoreDescriptorFilter, ) candidateList { var candidates candidateList for _, s := range sl.stores { @@ -478,6 +491,11 @@ func removeCandidates( }) continue } + // If the candidate does not pass the filter then we cannot remove it, + // as in it must be a replica that remains. + if !storeFilter.Filter(s) { + continue + } diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities) balanceScore := balanceScore(sl, s.Capacity, options) var convergesScore int @@ -610,16 +628,6 @@ func rebalanceCandidates( } var comparableCands candidateList for _, store := range allStores.stores { - // Nodes that already have a replica on one of their stores aren't valid - // rebalance targets. We do include stores that currently have a replica - // because we want them to be considered as valid stores in the - // ConvergesOnMean calculations below. This is subtle but important. - if nodeHasReplica(store.Node.NodeID, existingReplicas) && - !storeHasReplica(store.StoreID, existingReplicas) { - log.VEventf(ctx, 2, "nodeHasReplica(n%d, %v)=true", - store.Node.NodeID, existingReplicas) - continue - } constraintsOK, necessary := rebalanceFromConstraintsCheck( store, existing.cand.store.StoreID, constraints) maxCapacityOK := maxCapacityCheck(store) diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 8a21b41c551d..d7ec1215f8f6 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -550,28 +550,32 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(stores, t) testCases := []struct { - existing []roachpb.ReplicaDescriptor - expectTarget bool + existing []roachpb.ReplicaDescriptor + expectTargetAllocate bool + expectTargetRebalance bool }{ { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 2}, {NodeID: 2, StoreID: 3}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 2}, {NodeID: 3, StoreID: 6}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ @@ -579,7 +583,8 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { {NodeID: 2, StoreID: 3}, {NodeID: 3, StoreID: 5}, }, - expectTarget: false, + expectTargetAllocate: false, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ @@ -587,7 +592,8 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { {NodeID: 2, StoreID: 4}, {NodeID: 3, StoreID: 6}, }, - expectTarget: false, + expectTargetAllocate: false, + expectTargetRebalance: false, }, } @@ -598,9 +604,9 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), tc.existing, ) - if e, a := tc.expectTarget, result != nil; e != a { + if e, a := tc.expectTargetAllocate, result != nil; e != a { t.Errorf("AllocateTarget(%v) got target %v, err %v; expectTarget=%v", - tc.existing, result, err, tc.expectTarget) + tc.existing, result, err, tc.expectTargetAllocate) } } @@ -614,14 +620,118 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { rangeUsageInfo, storeFilterThrottled, ) - if e, a := tc.expectTarget, ok; e != a { + if e, a := tc.expectTargetRebalance, ok; e != a { t.Errorf("RebalanceTarget(%v) got target %v, details %v; expectTarget=%v", - tc.existing, target, details, tc.expectTarget) + tc.existing, target, details, tc.expectTargetRebalance) } } } } +func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + store1 := roachpb.StoreDescriptor{ + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + } + store2 := roachpb.StoreDescriptor{ + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 0}, + } + + // We start out with 40 ranges on 3 nodes and 3 stores, we then add a new store + // on Node 1 and try to rebalance all the ranges. What we want to see happen + // is an equilibrium where 20 ranges move from Stopre 1 to Store 2. + stores := []*roachpb.StoreDescriptor{ + &store1, + &store2, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + }, + } + + ranges := make([]roachpb.RangeDescriptor, 40) + for i := 0; i < 40; i++ { + ranges[i] = roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 3}, + {NodeID: 3, StoreID: 4}, + }, + } + } + + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(context.Background()) + storeGossiper := gossiputil.NewStoreGossiper(g) + storeGossiper.GossipStores(stores, t) + + // We run through all the ranges once to get the cluster to balance, + // after that we should not be seeing replicas move + var rangeUsageInfo RangeUsageInfo + for i := 1; i < 40; i++ { + add, remove, _, ok := a.RebalanceTarget( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + nil, /* raftStatus */ + ranges[i].InternalReplicas, + rangeUsageInfo, + storeFilterThrottled, + ) + if ok { + // Update the descriptor + newReplicas := make([]roachpb.ReplicaDescriptor, len(ranges[i].InternalReplicas)) + for _, repl := range ranges[i].InternalReplicas { + if remove.StoreID != repl.StoreID { + newReplicas = append(newReplicas, repl) + } + } + newReplicas = append(newReplicas, roachpb.ReplicaDescriptor{ + StoreID: add.StoreID, + NodeID: add.NodeID, + }) + ranges[i].InternalReplicas = newReplicas + + for _, store := range stores { + if store.StoreID == add.StoreID { + store.Capacity.RangeCount = store.Capacity.RangeCount + 1 + } else if store.StoreID == remove.StoreID { + store.Capacity.RangeCount = store.Capacity.RangeCount - 1 + } + } + storeGossiper.GossipStores(stores, t) + } + } + + // Verify that the stores are reasonably balanced + require.True(t, math.Abs(float64( + store1.Capacity.RangeCount-store2.Capacity.RangeCount)) <= minRangeRebalanceThreshold) + // We dont expect any range wanting to move since the system should have + // reached a stable state at this point + for i := 1; i < 40; i++ { + _, _, _, ok := a.RebalanceTarget( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + nil, /* raftStatus */ + ranges[i].InternalReplicas, + rangeUsageInfo, + storeFilterThrottled, + ) + require.False(t, ok) + } +} + // TestAllocatorRebalance verifies that rebalance targets are chosen // randomly from amongst stores under the maxFractionUsedThreshold. func TestAllocatorRebalance(t *testing.T) { @@ -2784,6 +2894,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { analyzed, a.storePool.getLocalities(existingRepls), a.scorerOptions(), + roachpb.NoOpStoreDescriptorFilter{}, ) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d: expected removeCandidates(%v) = %v, but got %v", diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 851c1933afbb..b084150241b7 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1125,50 +1125,87 @@ func validateReplicationChanges( ) error { // First make sure that the changes don't self-overlap (i.e. we're not adding // a replica twice, or removing and immediately re-adding it). - byNodeID := make(map[roachpb.NodeID]roachpb.ReplicationChange, len(chgs)) + byNodeAndStoreID := make(map[roachpb.NodeID]map[roachpb.StoreID]roachpb.ReplicationChange, len(chgs)) for _, chg := range chgs { - if _, ok := byNodeID[chg.Target.NodeID]; ok { - return fmt.Errorf("changes %+v refer to n%d twice", chgs, chg.Target.NodeID) + byStoreID, ok := byNodeAndStoreID[chg.Target.NodeID] + if !ok { + byStoreID = make(map[roachpb.StoreID]roachpb.ReplicationChange) + byNodeAndStoreID[chg.Target.NodeID] = byStoreID + } else { + // The only operation that is allowed within a node is an Add/Remove + for _, prevChg := range byStoreID { + if prevChg.ChangeType == chg.ChangeType { + return fmt.Errorf("changes %+v refer to n%d twice for change %v", + chgs, chg.Target.NodeID, chg.ChangeType) + } + } + } + if _, ok := byStoreID[chg.Target.StoreID]; ok { + return fmt.Errorf("changes %+v refer to n%d and s%d twice", chgs, + chg.Target.NodeID, chg.Target.StoreID) } - byNodeID[chg.Target.NodeID] = chg + byStoreID[chg.Target.StoreID] = chg } // Then, check that we're not adding a second replica on nodes that already - // have one, or "re-add" an existing replica. We delete from byNodeID so that - // after this loop, it contains only StoreIDs that we haven't seen in desc. + // have one, or "re-add" an existing replica. We delete from byNodeAndStoreID so that + // after this loop, it contains only Nodes that we haven't seen in desc. for _, rDesc := range desc.Replicas().All() { - chg, ok := byNodeID[rDesc.NodeID] - delete(byNodeID, rDesc.NodeID) - if !ok || chg.ChangeType != roachpb.ADD_REPLICA { + byStoreID, ok := byNodeAndStoreID[rDesc.NodeID] + if !ok { continue } - // We're adding a replica that's already there. This isn't allowed, even - // when the newly added one would be on a different store. - if rDesc.StoreID != chg.Target.StoreID { - return errors.Errorf("unable to add replica %v; node already has a replica in %s", chg.Target.StoreID, desc) + delete(byNodeAndStoreID, rDesc.NodeID) + if len(byStoreID) == 2 { + chg, k := byStoreID[rDesc.StoreID] + // We should be removing the replica from the existing store during a + // rebalance within the node + if !k || chg.ChangeType != roachpb.REMOVE_REPLICA { + return errors.Errorf( + "Expected replica to be removed from %v during a lateral rebalance within the node.", rDesc) + } + continue } + chg, ok := byStoreID[rDesc.StoreID] + // The only valid condition here is that if this is a removal + // of an existing store + if ok { + if chg.ChangeType == roachpb.REMOVE_REPLICA { + continue + } + // Looks like we found a replica with the same store and node id. If the + // replica is already a learner, then either some previous leaseholder was + // trying to add it with the learner+snapshot+voter cycle and got + // interrupted or else we hit a race between the replicate queue and + // AdminChangeReplicas. + if rDesc.GetType() == roachpb.LEARNER { + return errors.Errorf( + "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + } - // Looks like we found a replica with the same store and node id. If the - // replica is already a learner, then either some previous leaseholder was - // trying to add it with the learner+snapshot+voter cycle and got - // interrupted or else we hit a race between the replicate queue and - // AdminChangeReplicas. - if rDesc.GetType() == roachpb.LEARNER { - return errors.Errorf( - "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + // Otherwise, we already had a full voter replica. Can't add another to + // this store. + return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) } - // Otherwise, we already had a full voter replica. Can't add another to - // this store. - return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) + for _, c := range byStoreID { + // We're adding a replica that's already there. This isn't allowed, even + // when the newly added one would be on a different store. + if c.ChangeType == roachpb.ADD_REPLICA { + return errors.Errorf("unable to add replica %v; node already has a replica in %s", c.Target.StoreID, desc) + } + return errors.Errorf("removing %v which is not in %s", c.Target, desc) + } } // Any removals left in the map now refer to nonexisting replicas, and we refuse them. - for _, chg := range byNodeID { - if chg.ChangeType != roachpb.REMOVE_REPLICA { - continue + for _, byStoreID := range byNodeAndStoreID { + for _, chg := range byStoreID { + if chg.ChangeType != roachpb.REMOVE_REPLICA { + continue + } + return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } - return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } return nil } diff --git a/pkg/kv/kvserver/replica_command_test.go b/pkg/kv/kvserver/replica_command_test.go index 75bb0c030787..085530fd3f14 100644 --- a/pkg/kv/kvserver/replica_command_test.go +++ b/pkg/kv/kvserver/replica_command_test.go @@ -91,3 +91,119 @@ func TestRangeDescriptorUpdateProtoChangedAcrossVersions(t *testing.T) { t.Fatal(err) } } + +func TestValidateReplicationChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + + learnerType := roachpb.LEARNER + desc := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 3, StoreID: 1}, + {NodeID: 4, StoreID: 1, Type: &learnerType}, + }, + } + + // Test Case 1: Add a new replica to another node. + err := validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 2: Remove a replica from an existing node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 3: Remove a replica from wrong node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 1}}, + }) + require.Error(t, err) + + // Test Case 4: Remove a replica from wrong store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 5: Re-balance a replica within a store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.NoError(t, err) + + // Test Case 6: Re-balance a replica within a store, but attempt remove from + // the wrong one. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 3}}, + }) + require.Error(t, err) + + // Test Case 7: Add replica to same node and store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.Error(t, err) + + // Test Case 8: Add replica to same node and different store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 9: Try to rebalance a replica on the same node, but also add an extra. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 3}}, + }) + require.Error(t, err) + + // Test Case 10: Try to add twice to the same node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 11: Try to remove twice to the same node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 12: Try to add where there is already a learner. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 13: Add/Remove multiple replicas. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 5, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 6, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 14: We are rebalancing within a node and do a remove + descRelancing := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 1}, + {NodeID: 1, StoreID: 2, Type: &learnerType}, + }, + } + err = validateReplicationChanges(descRelancing, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + +} diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 4c5cd699bac6..1372be9b2aad 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -644,3 +644,31 @@ var DefaultLocationInformation = []struct { Longitude: "3.81886", }, } + +// StoreDescriptorFilter defines an interfaces that filters out +// StoreDescriptors. +type StoreDescriptorFilter interface { + Filter(descriptor StoreDescriptor) bool +} + +// SameNodeStoreDescriptorFilter is a struct that implements +// StoreDescriptorFilter and filters StoreDescriptors based on their NodeID. +type SameNodeStoreDescriptorFilter struct { + NodeID NodeID +} + +// Filter on SameNodeStoreDescriptorFilter, filters out nodes based on their +// NodeID. +func (filter SameNodeStoreDescriptorFilter) Filter(descriptor StoreDescriptor) bool { + return descriptor.Node.NodeID == filter.NodeID +} + +// NoOpStoreDescriptorFilter is a dummy implementation of StoreDescriptorFilter +type NoOpStoreDescriptorFilter struct { +} + +// Filter on NoOpStoreDescriptorFilter is a dummy implementation of +// StoreDescriptorFilter +func (filter NoOpStoreDescriptorFilter) Filter(descriptor StoreDescriptor) bool { + return true +}