From 18b278aa9a1ad6a5416267414a88cc80cdfb9299 Mon Sep 17 00:00:00 2001 From: Alex Lunev Date: Fri, 17 Jul 2020 13:00:21 -0700 Subject: [PATCH] kvserver: Allow rebalances between stores on the same nodes. Closes #6782 This change modifies the replica_queue to allow rebalances between multiple stores within a single node. To make this correct we add a number of guard rails into the Allocator to prevent a rebalance from placing multiple replicas of a range on the same node. This is not desired because in a simple 3x replication scenario a single node crash may result in a whole range becoming unavailable. Unfortunately these changes are not enough and in the naive scenario the allocator ends up in an endless rebalance loop between the two stores on the same node. An additional set of changes were necessary in the allocator heuristics to better detect when the stores on a single node are balanced and stop attempting to move ranges around. Release note (performance improvement): This change removes the last roadblock to running CockroachDB with multiple stores (i.e. disks) per node. The allocation algorithm now supports intra-node rebalances, which means CRDB can fully utilize the additional stores on the same node. --- pkg/kv/kvserver/allocator.go | 124 ++++++++++++++-------- pkg/kv/kvserver/allocator_scorer.go | 28 +++-- pkg/kv/kvserver/allocator_test.go | 133 ++++++++++++++++++++++-- pkg/kv/kvserver/replica_command.go | 93 ++++++++++++----- pkg/kv/kvserver/replica_command_test.go | 116 +++++++++++++++++++++ pkg/roachpb/metadata.go | 28 +++++ 6 files changed, 430 insertions(+), 92 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index b2d38985bdb6..8857af55e3b9 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -526,7 +526,8 @@ func (a Allocator) simulateRemoveTarget( candidates []roachpb.ReplicaDescriptor, existingReplicas []roachpb.ReplicaDescriptor, rangeUsageInfo RangeUsageInfo, -) (roachpb.ReplicaDescriptor, string, error) { + storeFilter roachpb.StoreDescriptorFilter, +) (*candidate, string, error) { // Update statistics first // TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines, // but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue @@ -537,7 +538,7 @@ func (a Allocator) simulateRemoveTarget( a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.REMOVE_REPLICA) }() log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore) - return a.RemoveTarget(ctx, zone, candidates, existingReplicas) + return a.doRemoveTarget(ctx, zone, candidates, existingReplicas, storeFilter) } // RemoveTarget returns a suitable replica to remove from the provided replica @@ -551,41 +552,16 @@ func (a Allocator) RemoveTarget( candidates []roachpb.ReplicaDescriptor, existingReplicas []roachpb.ReplicaDescriptor, ) (roachpb.ReplicaDescriptor, string, error) { - if len(candidates) == 0 { - return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()") - } - - // Retrieve store descriptors for the provided candidates from the StorePool. - existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates)) - for i, exist := range candidates { - existingStoreIDs[i] = exist.StoreID + removeCandidate, removeDetails, err := a.doRemoveTarget(ctx, zone, candidates, + existingReplicas, roachpb.NoOpStoreDescriptorFilter{}) + if err != nil { + return roachpb.ReplicaDescriptor{}, removeDetails, err } - sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone) - - analyzedConstraints := constraint.AnalyzeConstraints( - ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) - options := a.scorerOptions() - rankedCandidates := removeCandidates( - sl, - analyzedConstraints, - a.storePool.getLocalities(existingReplicas), - options, - ) - log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates) - if bad := rankedCandidates.selectBad(a.randGen); bad != nil { - for _, exist := range existingReplicas { - if exist.StoreID == bad.store.StoreID { - log.VEventf(ctx, 3, "remove target: %s", bad) - details := decisionDetails{Target: bad.compactString(options)} - detailsBytes, err := json.Marshal(details) - if err != nil { - log.Warningf(ctx, "failed to marshal details for choosing remove target: %+v", err) - } - return exist, string(detailsBytes), nil - } + for _, exist := range existingReplicas { + if exist.StoreID == removeCandidate.store.StoreID { + return exist, removeDetails, nil } } - return roachpb.ReplicaDescriptor{}, "", errors.New("could not select an appropriate replica to be removed") } @@ -675,14 +651,13 @@ func (a Allocator) RebalanceTarget( // pretty sure we won't want to remove immediately after adding it. // If we would, we don't want to actually rebalance to that target. var target *candidate - var removeReplica roachpb.ReplicaDescriptor + var removalTarget *candidate var existingCandidates candidateList for { target, existingCandidates = bestRebalanceTarget(a.randGen, results) if target == nil { return zero, zero, "", false } - // Add a fake new replica to our copy of the range descriptor so that we can // simulate the removal logic. If we decide not to go with this target, note // that this needs to be removed from desc before we try any other target. @@ -710,23 +685,26 @@ func (a Allocator) RebalanceTarget( return zero, zero, "", false } - var removeDetails string var err error - removeReplica, removeDetails, err = a.simulateRemoveTarget( + var removeDetails string + var storeFilter roachpb.StoreDescriptorFilter = roachpb.NoOpStoreDescriptorFilter{} + if isSameNodeRebalanceAttempt(existingPlusOneNew, target) { + storeFilter = roachpb.SameNodeStoreDescriptorFilter{NodeID: target.store.Node.NodeID} + } + removalTarget, removeDetails, err = a.simulateRemoveTarget( ctx, target.store.StoreID, zone, replicaCandidates, existingPlusOneNew, rangeUsageInfo, + storeFilter, ) if err != nil { log.Warningf(ctx, "simulating RemoveTarget failed: %+v", err) return zero, zero, "", false } - if target.store.StoreID != removeReplica.StoreID { - // Successfully populated these variables - _, _ = target, removeReplica + if target.store.StoreID != removalTarget.store.StoreID && removalTarget.less(*target) { break } @@ -750,8 +728,8 @@ func (a Allocator) RebalanceTarget( StoreID: target.store.StoreID, } removeTarget := roachpb.ReplicationTarget{ - NodeID: removeReplica.NodeID, - StoreID: removeReplica.StoreID, + NodeID: removalTarget.store.Node.NodeID, + StoreID: removalTarget.store.StoreID, } return addTarget, removeTarget, string(detailsBytes), true } @@ -1378,3 +1356,63 @@ func maxReplicaID(replicas []roachpb.ReplicaDescriptor) roachpb.ReplicaID { } return max } + +// doRemoveTarget returns a suitable replica to remove from the provided replica +// set. It first attempts to randomly select a target from the set of stores +// that have greater than the average number of replicas. Failing that, it +// falls back to selecting a random target from any of the existing +// replicas. +func (a Allocator) doRemoveTarget( + ctx context.Context, + zone *zonepb.ZoneConfig, + candidates []roachpb.ReplicaDescriptor, + existingReplicas []roachpb.ReplicaDescriptor, + storeFilter roachpb.StoreDescriptorFilter, +) (*candidate, string, error) { + if len(candidates) == 0 { + return nil, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()") + } + + // Retrieve store descriptors for the provided candidates from the StorePool. + existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates)) + for i, exist := range candidates { + existingStoreIDs[i] = exist.StoreID + } + sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone) + + analyzedConstraints := constraint.AnalyzeConstraints( + ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) + options := a.scorerOptions() + rankedCandidates := removeCandidates( + sl, + analyzedConstraints, + a.storePool.getLocalities(existingReplicas), + options, + storeFilter, + ) + log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates) + if bad := rankedCandidates.selectBad(a.randGen); bad != nil { + log.VEventf(ctx, 3, "remove target: %s", bad) + details := decisionDetails{Target: bad.compactString(options)} + detailsBytes, err := json.Marshal(details) + if err != nil { + log.Warningf(ctx, "failed to marshal details for choosing remove target: %+v", err) + } + return bad, string(detailsBytes), nil + } + + return nil, "", errors.New("could not select an appropriate replica to be removed") +} + +// isSameNodeRebalanceAttempt returns true if the target candidate is on the +//same node as an existing replica. +func isSameNodeRebalanceAttempt( + existingReplicas []roachpb.ReplicaDescriptor, target *candidate, +) bool { + for _, repl := range existingReplicas { + if target.store.Node.NodeID == repl.NodeID && target.store.StoreID != repl.StoreID { + return true + } + } + return false +} diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index a13b65eab46c..23f3c8d5fbd1 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -209,6 +209,18 @@ func (c candidate) compare(o candidate) float64 { if c.rangeCount == 0 && o.rangeCount == 0 { return 0 } + // For same node rebalances we need to manually consider the + // minRangeRebalanceThreshold, since in lopsided clusters their under/over + // replication is not properly captured in the balanceScore. + if c.store.Node.NodeID == o.store.Node.NodeID && c.store.Node.NodeID != 0 { + if c.rangeCount < o.rangeCount { + if o.rangeCount-c.rangeCount <= minRangeRebalanceThreshold { + return 0 + } + } else if c.rangeCount-o.rangeCount <= minRangeRebalanceThreshold { + return 0 + } + } if c.rangeCount < o.rangeCount { return float64(o.rangeCount-c.rangeCount) / float64(o.rangeCount) } @@ -465,6 +477,7 @@ func removeCandidates( constraints constraint.AnalyzedConstraints, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, options scorerOptions, + storeFilter roachpb.StoreDescriptorFilter, ) candidateList { var candidates candidateList for _, s := range sl.stores { @@ -478,6 +491,11 @@ func removeCandidates( }) continue } + // If the candidate does not pass the filter then we cannot remove it, + // as in it must be a replica that remains. + if !storeFilter.Filter(s) { + continue + } diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities) balanceScore := balanceScore(sl, s.Capacity, options) var convergesScore int @@ -610,16 +628,6 @@ func rebalanceCandidates( } var comparableCands candidateList for _, store := range allStores.stores { - // Nodes that already have a replica on one of their stores aren't valid - // rebalance targets. We do include stores that currently have a replica - // because we want them to be considered as valid stores in the - // ConvergesOnMean calculations below. This is subtle but important. - if nodeHasReplica(store.Node.NodeID, existingReplicas) && - !storeHasReplica(store.StoreID, existingReplicas) { - log.VEventf(ctx, 2, "nodeHasReplica(n%d, %v)=true", - store.Node.NodeID, existingReplicas) - continue - } constraintsOK, necessary := rebalanceFromConstraintsCheck( store, existing.cand.store.StoreID, constraints) maxCapacityOK := maxCapacityCheck(store) diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 8a21b41c551d..d7ec1215f8f6 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -550,28 +550,32 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(stores, t) testCases := []struct { - existing []roachpb.ReplicaDescriptor - expectTarget bool + existing []roachpb.ReplicaDescriptor + expectTargetAllocate bool + expectTargetRebalance bool }{ { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 2}, {NodeID: 2, StoreID: 3}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 2}, {NodeID: 3, StoreID: 6}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ @@ -579,7 +583,8 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { {NodeID: 2, StoreID: 3}, {NodeID: 3, StoreID: 5}, }, - expectTarget: false, + expectTargetAllocate: false, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ @@ -587,7 +592,8 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { {NodeID: 2, StoreID: 4}, {NodeID: 3, StoreID: 6}, }, - expectTarget: false, + expectTargetAllocate: false, + expectTargetRebalance: false, }, } @@ -598,9 +604,9 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), tc.existing, ) - if e, a := tc.expectTarget, result != nil; e != a { + if e, a := tc.expectTargetAllocate, result != nil; e != a { t.Errorf("AllocateTarget(%v) got target %v, err %v; expectTarget=%v", - tc.existing, result, err, tc.expectTarget) + tc.existing, result, err, tc.expectTargetAllocate) } } @@ -614,14 +620,118 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { rangeUsageInfo, storeFilterThrottled, ) - if e, a := tc.expectTarget, ok; e != a { + if e, a := tc.expectTargetRebalance, ok; e != a { t.Errorf("RebalanceTarget(%v) got target %v, details %v; expectTarget=%v", - tc.existing, target, details, tc.expectTarget) + tc.existing, target, details, tc.expectTargetRebalance) } } } } +func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + store1 := roachpb.StoreDescriptor{ + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + } + store2 := roachpb.StoreDescriptor{ + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 0}, + } + + // We start out with 40 ranges on 3 nodes and 3 stores, we then add a new store + // on Node 1 and try to rebalance all the ranges. What we want to see happen + // is an equilibrium where 20 ranges move from Stopre 1 to Store 2. + stores := []*roachpb.StoreDescriptor{ + &store1, + &store2, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + }, + } + + ranges := make([]roachpb.RangeDescriptor, 40) + for i := 0; i < 40; i++ { + ranges[i] = roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 3}, + {NodeID: 3, StoreID: 4}, + }, + } + } + + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(context.Background()) + storeGossiper := gossiputil.NewStoreGossiper(g) + storeGossiper.GossipStores(stores, t) + + // We run through all the ranges once to get the cluster to balance, + // after that we should not be seeing replicas move + var rangeUsageInfo RangeUsageInfo + for i := 1; i < 40; i++ { + add, remove, _, ok := a.RebalanceTarget( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + nil, /* raftStatus */ + ranges[i].InternalReplicas, + rangeUsageInfo, + storeFilterThrottled, + ) + if ok { + // Update the descriptor + newReplicas := make([]roachpb.ReplicaDescriptor, len(ranges[i].InternalReplicas)) + for _, repl := range ranges[i].InternalReplicas { + if remove.StoreID != repl.StoreID { + newReplicas = append(newReplicas, repl) + } + } + newReplicas = append(newReplicas, roachpb.ReplicaDescriptor{ + StoreID: add.StoreID, + NodeID: add.NodeID, + }) + ranges[i].InternalReplicas = newReplicas + + for _, store := range stores { + if store.StoreID == add.StoreID { + store.Capacity.RangeCount = store.Capacity.RangeCount + 1 + } else if store.StoreID == remove.StoreID { + store.Capacity.RangeCount = store.Capacity.RangeCount - 1 + } + } + storeGossiper.GossipStores(stores, t) + } + } + + // Verify that the stores are reasonably balanced + require.True(t, math.Abs(float64( + store1.Capacity.RangeCount-store2.Capacity.RangeCount)) <= minRangeRebalanceThreshold) + // We dont expect any range wanting to move since the system should have + // reached a stable state at this point + for i := 1; i < 40; i++ { + _, _, _, ok := a.RebalanceTarget( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + nil, /* raftStatus */ + ranges[i].InternalReplicas, + rangeUsageInfo, + storeFilterThrottled, + ) + require.False(t, ok) + } +} + // TestAllocatorRebalance verifies that rebalance targets are chosen // randomly from amongst stores under the maxFractionUsedThreshold. func TestAllocatorRebalance(t *testing.T) { @@ -2784,6 +2894,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { analyzed, a.storePool.getLocalities(existingRepls), a.scorerOptions(), + roachpb.NoOpStoreDescriptorFilter{}, ) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d: expected removeCandidates(%v) = %v, but got %v", diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 851c1933afbb..b084150241b7 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1125,50 +1125,87 @@ func validateReplicationChanges( ) error { // First make sure that the changes don't self-overlap (i.e. we're not adding // a replica twice, or removing and immediately re-adding it). - byNodeID := make(map[roachpb.NodeID]roachpb.ReplicationChange, len(chgs)) + byNodeAndStoreID := make(map[roachpb.NodeID]map[roachpb.StoreID]roachpb.ReplicationChange, len(chgs)) for _, chg := range chgs { - if _, ok := byNodeID[chg.Target.NodeID]; ok { - return fmt.Errorf("changes %+v refer to n%d twice", chgs, chg.Target.NodeID) + byStoreID, ok := byNodeAndStoreID[chg.Target.NodeID] + if !ok { + byStoreID = make(map[roachpb.StoreID]roachpb.ReplicationChange) + byNodeAndStoreID[chg.Target.NodeID] = byStoreID + } else { + // The only operation that is allowed within a node is an Add/Remove + for _, prevChg := range byStoreID { + if prevChg.ChangeType == chg.ChangeType { + return fmt.Errorf("changes %+v refer to n%d twice for change %v", + chgs, chg.Target.NodeID, chg.ChangeType) + } + } + } + if _, ok := byStoreID[chg.Target.StoreID]; ok { + return fmt.Errorf("changes %+v refer to n%d and s%d twice", chgs, + chg.Target.NodeID, chg.Target.StoreID) } - byNodeID[chg.Target.NodeID] = chg + byStoreID[chg.Target.StoreID] = chg } // Then, check that we're not adding a second replica on nodes that already - // have one, or "re-add" an existing replica. We delete from byNodeID so that - // after this loop, it contains only StoreIDs that we haven't seen in desc. + // have one, or "re-add" an existing replica. We delete from byNodeAndStoreID so that + // after this loop, it contains only Nodes that we haven't seen in desc. for _, rDesc := range desc.Replicas().All() { - chg, ok := byNodeID[rDesc.NodeID] - delete(byNodeID, rDesc.NodeID) - if !ok || chg.ChangeType != roachpb.ADD_REPLICA { + byStoreID, ok := byNodeAndStoreID[rDesc.NodeID] + if !ok { continue } - // We're adding a replica that's already there. This isn't allowed, even - // when the newly added one would be on a different store. - if rDesc.StoreID != chg.Target.StoreID { - return errors.Errorf("unable to add replica %v; node already has a replica in %s", chg.Target.StoreID, desc) + delete(byNodeAndStoreID, rDesc.NodeID) + if len(byStoreID) == 2 { + chg, k := byStoreID[rDesc.StoreID] + // We should be removing the replica from the existing store during a + // rebalance within the node + if !k || chg.ChangeType != roachpb.REMOVE_REPLICA { + return errors.Errorf( + "Expected replica to be removed from %v during a lateral rebalance within the node.", rDesc) + } + continue } + chg, ok := byStoreID[rDesc.StoreID] + // The only valid condition here is that if this is a removal + // of an existing store + if ok { + if chg.ChangeType == roachpb.REMOVE_REPLICA { + continue + } + // Looks like we found a replica with the same store and node id. If the + // replica is already a learner, then either some previous leaseholder was + // trying to add it with the learner+snapshot+voter cycle and got + // interrupted or else we hit a race between the replicate queue and + // AdminChangeReplicas. + if rDesc.GetType() == roachpb.LEARNER { + return errors.Errorf( + "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + } - // Looks like we found a replica with the same store and node id. If the - // replica is already a learner, then either some previous leaseholder was - // trying to add it with the learner+snapshot+voter cycle and got - // interrupted or else we hit a race between the replicate queue and - // AdminChangeReplicas. - if rDesc.GetType() == roachpb.LEARNER { - return errors.Errorf( - "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + // Otherwise, we already had a full voter replica. Can't add another to + // this store. + return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) } - // Otherwise, we already had a full voter replica. Can't add another to - // this store. - return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) + for _, c := range byStoreID { + // We're adding a replica that's already there. This isn't allowed, even + // when the newly added one would be on a different store. + if c.ChangeType == roachpb.ADD_REPLICA { + return errors.Errorf("unable to add replica %v; node already has a replica in %s", c.Target.StoreID, desc) + } + return errors.Errorf("removing %v which is not in %s", c.Target, desc) + } } // Any removals left in the map now refer to nonexisting replicas, and we refuse them. - for _, chg := range byNodeID { - if chg.ChangeType != roachpb.REMOVE_REPLICA { - continue + for _, byStoreID := range byNodeAndStoreID { + for _, chg := range byStoreID { + if chg.ChangeType != roachpb.REMOVE_REPLICA { + continue + } + return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } - return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } return nil } diff --git a/pkg/kv/kvserver/replica_command_test.go b/pkg/kv/kvserver/replica_command_test.go index 75bb0c030787..085530fd3f14 100644 --- a/pkg/kv/kvserver/replica_command_test.go +++ b/pkg/kv/kvserver/replica_command_test.go @@ -91,3 +91,119 @@ func TestRangeDescriptorUpdateProtoChangedAcrossVersions(t *testing.T) { t.Fatal(err) } } + +func TestValidateReplicationChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + + learnerType := roachpb.LEARNER + desc := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 3, StoreID: 1}, + {NodeID: 4, StoreID: 1, Type: &learnerType}, + }, + } + + // Test Case 1: Add a new replica to another node. + err := validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 2: Remove a replica from an existing node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 3: Remove a replica from wrong node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 1}}, + }) + require.Error(t, err) + + // Test Case 4: Remove a replica from wrong store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 5: Re-balance a replica within a store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.NoError(t, err) + + // Test Case 6: Re-balance a replica within a store, but attempt remove from + // the wrong one. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 3}}, + }) + require.Error(t, err) + + // Test Case 7: Add replica to same node and store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.Error(t, err) + + // Test Case 8: Add replica to same node and different store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 9: Try to rebalance a replica on the same node, but also add an extra. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 3}}, + }) + require.Error(t, err) + + // Test Case 10: Try to add twice to the same node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 11: Try to remove twice to the same node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 12: Try to add where there is already a learner. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 2}}, + }) + require.Error(t, err) + + // Test Case 13: Add/Remove multiple replicas. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 5, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 6, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 14: We are rebalancing within a node and do a remove + descRelancing := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 1}, + {NodeID: 1, StoreID: 2, Type: &learnerType}, + }, + } + err = validateReplicationChanges(descRelancing, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + +} diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 4c5cd699bac6..1372be9b2aad 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -644,3 +644,31 @@ var DefaultLocationInformation = []struct { Longitude: "3.81886", }, } + +// StoreDescriptorFilter defines an interfaces that filters out +// StoreDescriptors. +type StoreDescriptorFilter interface { + Filter(descriptor StoreDescriptor) bool +} + +// SameNodeStoreDescriptorFilter is a struct that implements +// StoreDescriptorFilter and filters StoreDescriptors based on their NodeID. +type SameNodeStoreDescriptorFilter struct { + NodeID NodeID +} + +// Filter on SameNodeStoreDescriptorFilter, filters out nodes based on their +// NodeID. +func (filter SameNodeStoreDescriptorFilter) Filter(descriptor StoreDescriptor) bool { + return descriptor.Node.NodeID == filter.NodeID +} + +// NoOpStoreDescriptorFilter is a dummy implementation of StoreDescriptorFilter +type NoOpStoreDescriptorFilter struct { +} + +// Filter on NoOpStoreDescriptorFilter is a dummy implementation of +// StoreDescriptorFilter +func (filter NoOpStoreDescriptorFilter) Filter(descriptor StoreDescriptor) bool { + return true +}