diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 40656f9051da..f79e31c96e36 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -820,18 +820,17 @@ func getRemoveIdx( // voter and non-voter replicas needed to allocate a target for the given action. // NB: This is a convenience method for callers of allocator.AllocateTarget(..). func FilterReplicasForAction( - storePool storepool.AllocatorStorePool, desc *roachpb.RangeDescriptor, action AllocatorAction, -) ( - filteredVoters, filteredNonVoters []roachpb.ReplicaDescriptor, - isReplacement, nothingToDo bool, - err error, -) { + storePool storepool.AllocatorStorePool, + desc *roachpb.RangeDescriptor, + action AllocatorAction, +) (filteredVoters, filteredNonVoters []roachpb.ReplicaDescriptor, replacing *roachpb.ReplicaDescriptor, nothingToDo bool, err error) { voterReplicas, nonVoterReplicas, liveVoterReplicas, deadVoterReplicas, liveNonVoterReplicas, deadNonVoterReplicas := LiveAndDeadVoterAndNonVoterReplicas(storePool, desc) removeIdx := -1 - _, filteredVoters, filteredNonVoters, removeIdx, nothingToDo, err = DetermineReplicaToReplaceAndFilter( + var existing []roachpb.ReplicaDescriptor + existing, filteredVoters, filteredNonVoters, removeIdx, nothingToDo, err = DetermineReplicaToReplaceAndFilter( storePool, action, voterReplicas, nonVoterReplicas, @@ -839,7 +838,11 @@ func FilterReplicasForAction( liveNonVoterReplicas, deadNonVoterReplicas, ) - return filteredVoters, filteredNonVoters, removeIdx >= 0, nothingToDo, err + if removeIdx >= 0 { + replacing = &existing[removeIdx] + } + + return filteredVoters, filteredNonVoters, replacing, nothingToDo, err } // ComputeAction determines the exact operation needed to repair the @@ -1181,6 +1184,7 @@ func (a *Allocator) AllocateTarget( storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replacing *roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string, error) { @@ -1205,6 +1209,7 @@ func (a *Allocator) AllocateTarget( conf, existingVoters, existingNonVoters, + replacing, a.ScorerOptions(ctx), selector, // When allocating a *new* replica, we explicitly disregard nodes with any @@ -1274,7 +1279,7 @@ func (a *Allocator) CheckAvoidsFragileQuorum( roachpb.ReplicaDescriptor{NodeID: newTarget.NodeID, StoreID: newTarget.StoreID}, ) - _, _, err := a.AllocateVoter(ctx, storePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus) + _, _, err := a.AllocateVoter(ctx, storePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, nil /* replacing */, replicaStatus) return err } @@ -1289,9 +1294,10 @@ func (a *Allocator) AllocateVoter( storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replacing *roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget) + return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replacing, replicaStatus, VoterTarget) } // AllocateNonVoter returns a suitable store for a new allocation of a @@ -1302,9 +1308,10 @@ func (a *Allocator) AllocateNonVoter( storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replacing *roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget) + return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replacing, replicaStatus, NonVoterTarget) } // AllocateTargetFromList returns a suitable store for a new allocation of a @@ -1316,13 +1323,14 @@ func (a *Allocator) AllocateTargetFromList( candidateStores storepool.StoreList, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replacing *roachpb.ReplicaDescriptor, options ScorerOptions, selector CandidateSelector, allowMultipleReplsPerNode bool, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string) { return a.allocateTargetFromList(ctx, storePool, candidateStores, conf, existingVoters, - existingNonVoters, options, selector, allowMultipleReplsPerNode, targetType) + existingNonVoters, replacing, options, selector, allowMultipleReplsPerNode, targetType) } func (a *Allocator) allocateTargetFromList( @@ -1331,12 +1339,16 @@ func (a *Allocator) allocateTargetFromList( candidateStores storepool.StoreList, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replacing *roachpb.ReplicaDescriptor, options ScorerOptions, selector CandidateSelector, allowMultipleReplsPerNode bool, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string) { existingReplicas := append(existingVoters, existingNonVoters...) + if replacing != nil { + existingReplicas = append(existingReplicas, *replacing) + } analyzedOverallConstraints := constraint.AnalyzeConstraints( storePool, existingReplicas, @@ -1350,15 +1362,40 @@ func (a *Allocator) allocateTargetFromList( conf.VoterConstraints, ) + var replacingStore roachpb.StoreDescriptor + var replacingStoreOK bool + if replacing != nil { + replacingStore, replacingStoreOK = storePool.GetStoreDescriptor(replacing.StoreID) + } + var constraintsChecker constraintsCheckFn switch t := targetType; t { case VoterTarget: - constraintsChecker = voterConstraintsCheckerForAllocation( - analyzedOverallConstraints, - analyzedVoterConstraints, - ) + // If we are replacing an existing replica, make sure we check the + // constraints to ensure we are not going from a state in which a + // constraint is satisfied to one in which we are not. In this case, we + // consider no candidates to be valid, as no sorting of replicas would lead + // to a satisfying candidate being selected. + if replacing != nil && replacingStoreOK { + constraintsChecker = voterConstraintsCheckerForReplace( + analyzedOverallConstraints, + analyzedVoterConstraints, + replacingStore, + ) + } else { + constraintsChecker = voterConstraintsCheckerForAllocation( + analyzedOverallConstraints, + analyzedVoterConstraints, + ) + } case NonVoterTarget: - constraintsChecker = nonVoterConstraintsCheckerForAllocation(analyzedOverallConstraints) + if replacing != nil && replacingStoreOK { + constraintsChecker = nonVoterConstraintsCheckerForReplace( + analyzedOverallConstraints, replacingStore, + ) + } else { + constraintsChecker = nonVoterConstraintsCheckerForAllocation(analyzedOverallConstraints) + } default: log.KvDistribution.Fatalf(ctx, "unsupported targetReplicaType: %v", t) } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go index 1d5b07ef653a..832e1dbde45e 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go @@ -975,6 +975,15 @@ func rankedCandidateListForAllocation( } constraintsOK, necessary := constraintsCheck(s) if !constraintsOK { + if necessary { + log.KvDistribution.VEventf( + ctx, + 3, + "cannot allocate necessary %s on s%d", + targetType, + s.StoreID, + ) + } continue } @@ -1789,15 +1798,43 @@ func nonVoterConstraintsCheckerForRebalance( } } +// voterConstraintsCheckerForReplace returns a constraintsCheckFn +// that determines whether a given store is a valid and/or necessary replacement +// candidate for the given store of an existing voting replica. +func voterConstraintsCheckerForReplace( + overallConstraints, voterConstraints constraint.AnalyzedConstraints, + existingStore roachpb.StoreDescriptor, +) constraintsCheckFn { + return func(s roachpb.StoreDescriptor) (valid, necessary bool) { + overallConstraintsOK, necessaryOverall := replaceConstraintsCheck(s, existingStore, overallConstraints) + voterConstraintsOK, necessaryForVoters := replaceConstraintsCheck(s, existingStore, voterConstraints) + + return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters + } +} + +// nonVoterConstraintsCheckerForReplace returns a constraintsCheckFn +// that determines whether a given store is a valid and/or necessary replacement +// candidate for the given store of an existing non-voting replica. +func nonVoterConstraintsCheckerForReplace( + overallConstraints constraint.AnalyzedConstraints, + existingStore roachpb.StoreDescriptor, +) constraintsCheckFn { + return func(s roachpb.StoreDescriptor) (valid, necessary bool) { + return replaceConstraintsCheck(s, existingStore, overallConstraints) + } +} + // allocateConstraintsCheck checks the potential allocation target store // against all the constraints. If it matches a constraint at all, it's valid. // If it matches a constraint that is not already fully satisfied by existing // replicas, then it's necessary. // -// NB: This assumes that the sum of all constraints.NumReplicas is equal to -// configured number of replicas for the range, or that there's just one set of -// constraints with NumReplicas set to 0. This is meant to be enforced in the -// config package. +// NB: Formerly there was an assumption that the sum of all +// constraints.NumReplicas was equal to the configured number of replicas for +// the range, or that there was just one set of constraints with NumReplicas +// set to 0, however this is not enforced by the config package and this +// no longer holds, as we may have unconstrained replicas. func allocateConstraintsCheck( store roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints, ) (valid bool, necessary bool) { @@ -1828,6 +1865,57 @@ func allocateConstraintsCheck( return valid, false } +// replaceConstraintsCheck checks the potential allocation target store +// for a replacement operation against all the constraints, including checking +// that the candidate store matches a constraint satisfied by the existing +// store. If it matches a constraint, it's valid. If it matches a constraint +// that is not already overly satisfied by existing replicas (other than the +// replacement), then it's necessary. If there are any necessary constraints +// that are not satisfied by the candidate when the existing store did satisfy +// that constraint, then the candidate is considered invalid entirely. +func replaceConstraintsCheck(store, existingStore roachpb.StoreDescriptor, + analyzed constraint.AnalyzedConstraints, +) (valid bool, necessary bool) { + // All stores are valid when there are no constraints. + if len(analyzed.Constraints) == 0 { + return true, false + } + + for i, constraints := range analyzed.Constraints { + matchingStores := analyzed.SatisfiedBy[i] + satisfiedByExistingStore := containsStore(matchingStores, existingStore.StoreID) + satisfiedByCandidateStore := constraint.ConjunctionsCheck( + store, constraints.Constraints, + ) + if satisfiedByCandidateStore { + valid = true + } + + // If the constraint is not already satisfied, it's necessary. + // Additionally, if the constraint is only just satisfied by the existing + // store being replaced, since that store is going away, the constraint is + // also marked as necessary. + if len(matchingStores) < int(constraints.NumReplicas) || + (len(matchingStores) == int(constraints.NumReplicas) && + satisfiedByExistingStore) { + necessary = true + } + + // Check if existing store matches a constraint that isn't overly satisfied. + // If so, then only replacing it with a satisfying store is valid to ensure + // that the constraint stays fully satisfied. + if necessary && satisfiedByExistingStore && !satisfiedByCandidateStore { + return false, necessary + } + } + + if analyzed.UnconstrainedReplicas { + valid = true + } + + return valid, necessary +} + // removeConstraintsCheck checks the existing store against the analyzed // constraints, determining whether it's valid (matches some constraint) and // necessary (matches some constraint that no other existing replica matches). @@ -1867,6 +1955,19 @@ func removeConstraintsCheck( // against the analyzed constraints, determining whether it's valid whether it // will be necessary if fromStoreID (an existing replica) is removed from the // range. +// +// NB: Formerly there was an assumption that the sum of all +// constraints.NumReplicas was equal to the configured number of replicas for +// the range, or that there was just one set of constraints with NumReplicas +// set to 0, however this is not enforced by the config package and this +// no longer holds, as we may have unconstrained replicas. +// +// Note that rebalance, while seemingly similar to replacement, is distinct +// because leaving the replica on the existing store is a valid option. +// Hence, when leaving the existing store (and using it to satisfy a particular +// constraint) is not a possibility such as in the case of a decommissioning or +// dead node, the specialized replacement check is required. +// See replaceConstraintsCheck(..). func rebalanceFromConstraintsCheck( store, fromStoreID roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints, ) (valid bool, necessary bool) { @@ -1879,11 +1980,6 @@ func rebalanceFromConstraintsCheck( // all, it's valid. If it matches a constraint that is not already fully // satisfied by existing replicas or that is only fully satisfied because of // fromStoreID, then it's necessary. - // - // NB: This assumes that the sum of all constraints.NumReplicas is equal to - // configured number of replicas for the range, or that there's just one set - // of constraints with NumReplicas set to 0. This is meant to be enforced in - // the config package. for i, constraints := range analyzed.Constraints { if constraintsOK := constraint.ConjunctionsCheck( store, constraints.Constraints, diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index cfef3624ee01..42cfe36ba382 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -128,7 +128,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, Node: roachpb.NodeDescriptor{ NodeID: 1, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n1"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -141,7 +141,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, Node: roachpb.NodeDescriptor{ NodeID: 2, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n2"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -154,7 +154,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"hdd"}}, Node: roachpb.NodeDescriptor{ NodeID: 3, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n3"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -167,7 +167,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"hdd"}}, Node: roachpb.NodeDescriptor{ NodeID: 4, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n4"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -180,7 +180,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"mem"}}, Node: roachpb.NodeDescriptor{ NodeID: 5, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n5"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -562,7 +562,7 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { ctx, sp, simpleSpanConfig, - nil /* existingVoters */, nil, /* existingNonVoters */ + nil /* existingVoters */, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if err != nil { @@ -584,7 +584,7 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { ctx, sp, simpleSpanConfig, - nil /* existingVoters */, nil, /* existingNonVoters */ + nil /* existingVoters */, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if !roachpb.Empty(result) { @@ -703,6 +703,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { test.conf, nil, nil, + nil, Alive, ) require.NoError(t, err) @@ -718,6 +719,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { test.conf, nil, nil, + nil, // Dead and Decommissioning should behave the same here, use either. func() ReplicaStatus { if i%2 == 0 { @@ -747,7 +749,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { ctx, sp, multiDCConfigSSD, - nil /* existingVoters */, nil, /* existingNonVoters */ + nil /* existingVoters */, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if err != nil { @@ -760,7 +762,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { []roachpb.ReplicaDescriptor{{ NodeID: result1.NodeID, StoreID: result1.StoreID, - }}, nil, /* existingNonVoters */ + }}, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if err != nil { @@ -785,7 +787,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { NodeID: result2.NodeID, StoreID: result2.StoreID, }, - }, nil, /* existingNonVoters */ + }, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if err == nil { @@ -821,6 +823,7 @@ func TestAllocatorExistingReplica(t *testing.T) { StoreID: 2, }, }, nil, /* existingNonVoters */ + nil, /* replacing */ Dead, ) if err != nil { @@ -886,6 +889,11 @@ func TestAllocatorReplaceDecommissioningReplica(t *testing.T) { ReplicaID: 2, }, }, nil, /* existingNonVoters */ + &roachpb.ReplicaDescriptor{ + NodeID: 3, + StoreID: 3, + ReplicaID: 3, + }, Decommissioning, ) if err != nil { @@ -896,6 +904,70 @@ func TestAllocatorReplaceDecommissioningReplica(t *testing.T) { } } +func TestAllocatorReplaceFailsOnConstrainedDecommissioningReplica(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) + defer stopper.Stop(ctx) + gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) + + // Override liveness of n3 to decommissioning so the only available target is s4. + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if nid == roachpb.NodeID(3) { + return livenesspb.NodeLivenessStatus_DECOMMISSIONING + } + + return sp.NodeLivenessFn(nid, now, timeUntilStoreDead) + }) + + _, _, err := a.AllocateVoter( + ctx, + oSp, + roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + Constraints: []roachpb.Constraint{ + {Value: "mem", Type: roachpb.Constraint_PROHIBITED}, + }, + }, + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + {Value: "n3", Type: roachpb.Constraint_REQUIRED}, + }, + }, + }, + }, + []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + }, nil, /* existingNonVoters */ + &roachpb.ReplicaDescriptor{ + NodeID: 3, + StoreID: 3, + ReplicaID: 3, + }, + Decommissioning, + ) + require.Errorf(t, err, "Unable to perform allocation: "+ + "0 of 4 live stores are able to take a new replica for the range "+ + "(2 already have a voter, 0 already have a non-voter); "+ + "replicas must match constraints [{-mem} {+n3:1}]; "+ + "voting replicas must match voter_constraints []", + ) +} + func TestAllocatorMultipleStoresPerNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -989,7 +1061,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { for _, tc := range testCases { { result, _, err := a.AllocateVoter( - ctx, sp, emptySpanConfig(), tc.existing, nil, + ctx, sp, emptySpanConfig(), tc.existing, nil, nil, Dead, ) if e, a := tc.expectTargetAllocate, !roachpb.Empty(result); e != a { @@ -3075,7 +3147,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { // Allocate the voting replica first, before the non-voter. This is the // order in which we'd expect the allocator to repair a given range. See // TestAllocatorComputeAction. - voterTarget, _, err := a.AllocateVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, Dead) + voterTarget, _, err := a.AllocateVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, nil, Dead) if test.shouldVoterAllocFail { require.Errorf(t, err, "expected voter allocation to fail; got %v as a valid target instead", voterTarget) } else { @@ -3084,7 +3156,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { test.existingVoters = append(test.existingVoters, replicas(voterTarget.StoreID)...) } - nonVoterTarget, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, Dead) + nonVoterTarget, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, nil /* replacing */, Dead) if test.shouldNonVoterAllocFail { require.Errorf(t, err, "expected non-voter allocation to fail; got %v as a valid target instead", nonVoterTarget) } else { @@ -3158,7 +3230,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { StoreID: storeID, } } - targetStore, details, err := a.AllocateVoter(ctx, sp, emptySpanConfig(), existingRepls, nil, Dead) + targetStore, details, err := a.AllocateVoter(ctx, sp, emptySpanConfig(), existingRepls, nil, nil, Dead) if err != nil { t.Fatal(err) } @@ -3625,7 +3697,7 @@ func TestAllocatorNonVoterAllocationExcludesVoterNodes(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) - result, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, Dead) + result, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, nil /* replacing */, Dead) if test.shouldFail { require.Error(t, err) require.Regexp(t, test.expError, err) @@ -8878,7 +8950,7 @@ func TestNonVoterPrioritizationInVoterAdditions(t *testing.T) { } for i, tc := range testCases { - result, _, _ := a.AllocateVoter(ctx, sp, tc.spanConfig, tc.existingVoters, tc.existingNonVoters, Alive) + result, _, _ := a.AllocateVoter(ctx, sp, tc.spanConfig, tc.existingVoters, tc.existingNonVoters, nil, Alive) assert.Equal(t, tc.expectedTargetAllocate, result, "Unexpected replication target returned by allocate voter in test %d", i) } } diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index 00dd644058a6..06793c8f15ec 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -105,22 +105,15 @@ var threeStores = []*roachpb.StoreDescriptor{ var fourSingleStoreRacks = []*roachpb.StoreDescriptor{ { StoreID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"red"}}, Node: roachpb.NodeDescriptor{ NodeID: 1, Locality: roachpb.Locality{ Tiers: []roachpb.Tier{ - { - Key: "cloud", - Value: "local", - }, { Key: "region", Value: "local", }, - { - Key: "zone", - Value: "local", - }, { Key: "rack", Value: "1", @@ -136,22 +129,15 @@ var fourSingleStoreRacks = []*roachpb.StoreDescriptor{ }, { StoreID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"red"}}, Node: roachpb.NodeDescriptor{ NodeID: 2, Locality: roachpb.Locality{ Tiers: []roachpb.Tier{ - { - Key: "cloud", - Value: "local", - }, { Key: "region", Value: "local", }, - { - Key: "zone", - Value: "local", - }, { Key: "rack", Value: "2", @@ -167,22 +153,15 @@ var fourSingleStoreRacks = []*roachpb.StoreDescriptor{ }, { StoreID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"black"}}, Node: roachpb.NodeDescriptor{ NodeID: 3, Locality: roachpb.Locality{ Tiers: []roachpb.Tier{ - { - Key: "cloud", - Value: "local", - }, { Key: "region", Value: "local", }, - { - Key: "zone", - Value: "local", - }, { Key: "rack", Value: "3", @@ -198,22 +177,15 @@ var fourSingleStoreRacks = []*roachpb.StoreDescriptor{ }, { StoreID: 4, + Attrs: roachpb.Attributes{Attrs: []string{"black"}}, Node: roachpb.NodeDescriptor{ NodeID: 4, Locality: roachpb.Locality{ Tiers: []roachpb.Tier{ - { - Key: "cloud", - Value: "local", - }, { Key: "region", Value: "local", }, - { - Key: "zone", - Value: "local", - }, { Key: "rack", Value: "4", @@ -425,14 +397,14 @@ func TestAllocatorThrottled(t *testing.T) { defer stopper.Stop(ctx) // First test to make sure we would send the replica to purgatory. - _, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + _, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); !ok { t.Fatalf("expected a purgatory error, got: %+v", err) } // Second, test the normal case in which we can allocate to the store. gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + result, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, nil, allocatorimpl.Dead) if err != nil { t.Fatalf("unable to perform allocation: %+v", err) } @@ -449,7 +421,7 @@ func TestAllocatorThrottled(t *testing.T) { } storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) sp.DetailsMu.Unlock() - _, _, err = a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + _, _, err = a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { t.Fatalf("expected a non purgatory error, got: %+v", err) } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index e6129048b450..a2a4f221f7ce 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3605,6 +3605,7 @@ func RelocateOne( conf, existingVoters, existingNonVoters, + nil, /* replacing */ allocator.ScorerOptions(ctx), allocator.NewBestCandidateSelector(), // NB: Allow the allocator to return target stores that might be on the diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index d8dd67ba5cd2..c1cba61fba4f 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -1154,16 +1154,21 @@ func (rq *replicateQueue) addOrReplaceVoters( ) (op AllocationOp, _ error) { effects := effectBuilder{} desc, conf := repl.DescAndSpanConfig() - isReplace := removeIdx >= 0 + var replacing *roachpb.ReplicaDescriptor + if removeIdx >= 0 { + replacing = &existingVoters[removeIdx] + } // The allocator should not try to re-add this replica since there is a reason // we're removing it (i.e. dead or decommissioning). If we left the replica in // the slice, the allocator would not be guaranteed to pick a replica that // fills the gap removeRepl leaves once it's gone. - newVoter, details, err := rq.allocator.AllocateVoter(ctx, rq.storePool, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus) + newVoter, details, err := rq.allocator.AllocateVoter(ctx, rq.storePool, conf, remainingLiveVoters, remainingLiveNonVoters, replacing, replicaStatus) if err != nil { return nil, err } + + isReplace := removeIdx >= 0 if isReplace && newVoter.StoreID == existingVoters[removeIdx].StoreID { return nil, errors.AssertionFailedf("allocator suggested to replace replica on s%d with itself", newVoter.StoreID) } @@ -1254,8 +1259,12 @@ func (rq *replicateQueue) addOrReplaceNonVoters( ) (op AllocationOp, _ error) { effects := effectBuilder{} conf := repl.SpanConfig() + var replacing *roachpb.ReplicaDescriptor + if removeIdx >= 0 { + replacing = &existingNonVoters[removeIdx] + } - newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, rq.storePool, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus) + newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, rq.storePool, conf, liveVoterReplicas, liveNonVoterReplicas, replacing, replicaStatus) if err != nil { return nil, err } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 73ef75c0d3e4..4579b0e2d41b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3249,7 +3249,7 @@ func (s *Store) AllocatorCheckRange( return action, roachpb.ReplicationTarget{}, sp.FinishAndGetConfiguredRecording(), err } - liveVoters, liveNonVoters, isReplacement, nothingToDo, err := + filteredVoters, filteredNonVoters, replacing, nothingToDo, err := allocatorimpl.FilterReplicasForAction(storePool, desc, action) if nothingToDo || err != nil { @@ -3257,7 +3257,7 @@ func (s *Store) AllocatorCheckRange( } target, _, err := s.allocator.AllocateTarget(ctx, storePool, conf, - liveVoters, liveNonVoters, action.ReplicaStatus(), action.TargetReplicaType(), + filteredVoters, filteredNonVoters, replacing, action.ReplicaStatus(), action.TargetReplicaType(), ) if err == nil { log.Eventf(ctx, "found valid allocation of %s target %v", action.TargetReplicaType(), target) @@ -3269,11 +3269,11 @@ func (s *Store) AllocatorCheckRange( storePool, conf, desc.Replicas().VoterDescriptors(), - liveNonVoters, + filteredVoters, action.ReplicaStatus(), action.TargetReplicaType(), target, - isReplacement, + replacing != nil, ) if fragileQuorumErr != nil { diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 2a73cb7d057d..54a685982873 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3486,9 +3486,9 @@ func TestAllocatorCheckRange(t *testing.T) { existingReplicas []roachpb.ReplicaDescriptor spanConfig *roachpb.SpanConfig livenessOverrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus - baselineExpNoop bool expectedAction allocatorimpl.AllocatorAction expectValidTarget bool + expectedTarget roachpb.ReplicationTarget expectedLogMessage string expectErr bool expectAllocatorErr bool @@ -3583,7 +3583,6 @@ func TestAllocatorCheckRange(t *testing.T) { livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING, }, - baselineExpNoop: true, expectedAction: allocatorimpl.AllocatorRemoveDecommissioningVoter, expectErr: false, expectValidTarget: false, @@ -3614,12 +3613,119 @@ func TestAllocatorCheckRange(t *testing.T) { }, }, }, - baselineExpNoop: true, - expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, - // We should get an error attempting to break constraints, but this is - // currently a bug. - // TODO(sarkesian): Change below to true once #94809 is fixed. - expectErr: false, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectAllocatorErr: true, + expectedErrStr: "replicas must match constraints", + expectedLogMessage: "cannot allocate necessary voter on s3", + }, + { + name: "decommissioning without satisfying multiple partial constraints", + stores: fourSingleStoreRacks, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 4, StoreID: 4, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 4: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + spanConfig: &roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Value: "black", + }, + }, + }, + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "rack", + Value: "4", + }, + }, + }, + }, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectAllocatorErr: true, + expectedErrStr: "replicas must match constraints", + expectedLogMessage: "cannot allocate necessary voter on s3", + }, + { + name: "decommissioning during upreplication with partial constraints", + stores: fourSingleStoreRacks, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 4: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + spanConfig: &roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "rack", + Value: "4", + }, + }, + }, + }, + }, + expectedAction: allocatorimpl.AllocatorAddVoter, + expectValidTarget: true, + }, + { + name: "decommissioning with replacement satisfying locality", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 4, StoreID: 4, ReplicaID: 2}, + {NodeID: 7, StoreID: 7, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 1: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + 3: livenesspb.NodeLivenessStatus_DEAD, + }, + spanConfig: &roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "region", + Value: "a", + }, + }, + }, + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "region", + Value: "b", + }, + }, + }, + }, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectValidTarget: true, + expectedTarget: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}, + expectErr: false, }, { name: "decommissioning without satisfying fully constrained locality", @@ -3667,7 +3773,6 @@ func TestAllocatorCheckRange(t *testing.T) { }, }, }, - baselineExpNoop: true, expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, expectAllocatorErr: true, expectedErrStr: "replicas must match constraints", @@ -3730,19 +3835,6 @@ func TestAllocatorCheckRange(t *testing.T) { storePoolOverride = storepool.NewOverrideStorePool(sp, livenessOverride, nodeCountOverride) } - // Check if our baseline action without overrides is a noop; i.e., the - // range is fully replicated as configured and needs no actions. - if tc.baselineExpNoop { - action, _, _, err := s.AllocatorCheckRange(ctx, desc, - false /* collectTraces */, nil, /* overrideStorePool */ - ) - require.NoError(t, err, "expected baseline check without error") - require.Containsf(t, []allocatorimpl.AllocatorAction{ - allocatorimpl.AllocatorNoop, - allocatorimpl.AllocatorConsiderRebalance, - }, action, "expected baseline noop, got %s", action) - } - // Execute actual allocator range repair check. action, target, recording, err := s.AllocatorCheckRange(ctx, desc, true /* collectTraces */, storePoolOverride, @@ -3771,12 +3863,18 @@ func TestAllocatorCheckRange(t *testing.T) { ) if tc.expectValidTarget { - require.NotEqualf(t, roachpb.ReplicationTarget{}, target, "expected valid target") + require.Falsef(t, roachpb.Empty(target), "expected valid target") + } + + if !roachpb.Empty(tc.expectedTarget) { + require.Equalf(t, tc.expectedTarget, target, "expected target %s, got %s", + tc.expectedTarget, target, + ) } if tc.expectedLogMessage != "" { _, ok := recording.FindLogMessage(tc.expectedLogMessage) - require.Truef(t, ok, "expected to find trace \"%s\"", tc.expectedLogMessage) + require.Truef(t, ok, "expected to find \"%s\" in trace:\n%s", tc.expectedLogMessage, recording) } }) }