diff --git a/pkg/config/zonepb/zone.go b/pkg/config/zonepb/zone.go index d4b31f064caa..98c45fae0ed6 100644 --- a/pkg/config/zonepb/zone.go +++ b/pkg/config/zonepb/zone.go @@ -685,6 +685,37 @@ func (z ZoneConfig) GetSubzoneForKeySuffix(keySuffix []byte) (*Subzone, int32) { return nil, -1 } +// GetNumVoters returns the number of voting replicas for the given zone config. +// +// This method will panic if called on a ZoneConfig with an uninitialized +// NumReplicas attribute. +func (z *ZoneConfig) GetNumVoters() int32 { + if z.NumReplicas == nil { + panic("NumReplicas must not be nil") + } + if z.NumVoters != nil && *z.NumVoters != 0 { + return *z.NumVoters + } + return *z.NumReplicas +} + +// GetNumNonVoters returns the number of non-voting replicas as defined in the +// zone config. +// +// This method will panic if called on a ZoneConfig with an uninitialized +// NumReplicas attribute. +func (z *ZoneConfig) GetNumNonVoters() int32 { + if z.NumReplicas == nil { + panic("NumReplicas must not be nil") + } + if z.NumVoters != nil && *z.NumVoters != 0 { + return *z.NumReplicas - *z.NumVoters + } + // `num_voters` hasn't been explicitly configured. Every replica should be a + // voting replica. + return 0 +} + // SetSubzone installs subzone into the ZoneConfig, overwriting any existing // subzone with the same IndexID and PartitionName. func (z *ZoneConfig) SetSubzone(subzone Subzone) { diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 4e334c65c7ef..3fb8ee9a2711 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -49,14 +49,23 @@ const ( minReplicaWeight = 0.001 // Priorities for various repair operations. + // + // NB: These priorities only influence the replicateQueue's understanding of + // which ranges are to be dealt with before others. In other words, these + // priorities don't influence the relative order of actions taken on a given + // range. Within a given range, the ordering of the various checks inside + // `Allocator.computeAction` determines which repair/rebalancing actions are + // taken before the others. finalizeAtomicReplicationChangePriority float64 = 12002 removeLearnerReplicaPriority float64 = 12001 addDeadReplacementVoterPriority float64 = 12000 addMissingVoterPriority float64 = 10000 addDecommissioningReplacementVoterPriority float64 = 5000 removeDeadVoterPriority float64 = 1000 - removeDecommissioningVoterPriority float64 = 200 - removeExtraVoterPriority float64 = 100 + removeDecommissioningVoterPriority float64 = 900 + removeExtraVoterPriority float64 = 800 + addMissingNonVoterPriority float64 = 700 + removeExtraNonVoterPriority float64 = 600 ) // MinLeaseTransferStatsDuration configures the minimum amount of time a @@ -100,7 +109,9 @@ const ( _ AllocatorAction = iota AllocatorNoop AllocatorRemoveVoter + AllocatorRemoveNonVoter AllocatorAddVoter + AllocatorAddNonVoter AllocatorReplaceDeadVoter AllocatorRemoveDeadVoter AllocatorReplaceDecommissioningVoter @@ -114,7 +125,9 @@ const ( var allocatorActionNames = map[AllocatorAction]string{ AllocatorNoop: "noop", AllocatorRemoveVoter: "remove voter", + AllocatorRemoveNonVoter: "remove non-voter", AllocatorAddVoter: "add voter", + AllocatorAddNonVoter: "add non-voter", AllocatorReplaceDeadVoter: "replace dead voter", AllocatorRemoveDeadVoter: "remove dead voter", AllocatorReplaceDecommissioningVoter: "replace decommissioning voter", @@ -262,11 +275,11 @@ func MakeAllocator( } } -// GetNeededReplicas calculates the number of replicas a range should -// have given its zone config and the number of nodes available for -// up-replication (i.e. not dead and not decommissioning). -func GetNeededReplicas(zoneConfigReplicaCount int32, clusterNodes int) int { - numZoneReplicas := int(zoneConfigReplicaCount) +// GetNeededVoters calculates the number of voters a range should have given its +// zone config and the number of nodes available for up-replication (i.e. not +// decommissioning). +func GetNeededVoters(zoneConfigVoterCount int32, clusterNodes int) int { + numZoneReplicas := int(zoneConfigVoterCount) need := numZoneReplicas // Adjust the replication factor for all ranges if there are fewer @@ -299,6 +312,27 @@ func GetNeededReplicas(zoneConfigReplicaCount int32, clusterNodes int) int { return need } +// GetNeededNonVoters calculates the number of non-voters a range should have +// given the number of voting replicas the range has and the number of nodes +// available for up-replication. +// +// If `num_voters` aren't explicitly configured in the zone config, all +// `num_replicas` replicas will be voting replicas and this method will always +// return 0. +// +// NB: This method assumes that we have exactly as many voters as we need, since +// this method should only be consulted after voting replicas have been +// upreplicated / rebalanced off of dead/decommissioning nodes. +func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) int { + need := zoneConfigNonVoterCount + if clusterNodes-numVoters < need { + // We only need non-voting replicas for the nodes that do not have a voting + // replica. + need = clusterNodes - numVoters + } + return need +} + // ComputeAction determines the exact operation needed to repair the // supplied range, as governed by the supplied zone configuration. It // returns the required action that should be taken and a priority. @@ -357,97 +391,141 @@ func (a *Allocator) ComputeAction( // removeLearnerReplicaPriority as the highest priority. return AllocatorRemoveLearner, removeLearnerReplicaPriority } - // computeAction expects to operate only on voters. - return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors()) + return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors(), + desc.Replicas().NonVoterDescriptors()) } func (a *Allocator) computeAction( - ctx context.Context, zone *zonepb.ZoneConfig, voterReplicas []roachpb.ReplicaDescriptor, + ctx context.Context, + zone *zonepb.ZoneConfig, + voterReplicas []roachpb.ReplicaDescriptor, + nonVoterReplicas []roachpb.ReplicaDescriptor, ) (AllocatorAction, float64) { - have := len(voterReplicas) - decommissioningReplicas := a.storePool.decommissioningReplicas(voterReplicas) + // NB: The ordering of the checks in this method is intentional. The order in + // which these actions are returned by this method determines the relative + // priority of the actions taken on a given range. We want this to be + // symmetric with regards to the priorities defined at the top of this file + // (which influence the replicateQueue's decision of which range it'll pick to + // repair/rebalance before the others). + // + // In broad strokes, we first handle all voting replica-based actions and then + // the actions pertaining to non-voting replicas. Within each replica set, we + // first handle operations that correspond to repairing/recovering the range. + // After that we handle rebalacing related actions, followed by removal + // actions. + + // TODO(mrtracy): Handle non-homogeneous and mismatched attribute sets. + haveVoters := len(voterReplicas) + decommissioningVoters := a.storePool.decommissioningReplicas(voterReplicas) + // Node count including dead nodes but excluding decommissioning/decommissioned + // nodes. clusterNodes := a.storePool.ClusterNodeCount() - need := GetNeededReplicas(*zone.NumReplicas, clusterNodes) - desiredQuorum := computeQuorum(need) - quorum := computeQuorum(have) - - if have < need { - // Range is under-replicated, and should add an additional replica. - // Priority is adjusted by the difference between the current replica - // count and the quorum of the desired replica count. - priority := addMissingVoterPriority + float64(desiredQuorum-have) + neededVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes) + desiredQuorum := computeQuorum(neededVoters) + quorum := computeQuorum(haveVoters) + + // TODO(aayush): When haveVoters < neededVoters but we don't have quorum to + // actually execute the addition of a new replica, we should be returning a + // ALlocatorRangeUnavailable. + if haveVoters < neededVoters { + // Range is under-replicated, and should add an additional voter. + // Priority is adjusted by the difference between the current voter + // count and the quorum of the desired voter count. + priority := addMissingVoterPriority + float64(desiredQuorum-haveVoters) action := AllocatorAddVoter - log.VEventf(ctx, 3, "%s - missing replica need=%d, have=%d, priority=%.2f", - action, need, have, priority) + log.VEventf(ctx, 3, "%s - missing voter need=%d, have=%d, priority=%.2f", + action, neededVoters, haveVoters, priority) return action, priority } - liveVoterReplicas, deadVoterReplicas := a.storePool.liveAndDeadReplicas(voterReplicas) + liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas) - if len(liveVoterReplicas) < quorum { - // Do not take any replacement/removal action if we do not have a quorum of live - // replicas. If we're correctly assessing the unavailable state of the range, we - // also won't be able to add replicas as we try above, but hope springs eternal. - log.VEventf(ctx, 1, "unable to take action - live replicas %v don't meet quorum of %d", - liveVoterReplicas, quorum) + if len(liveVoters) < quorum { + // Do not take any replacement/removal action if we do not have a quorum of + // live voters. If we're correctly assessing the unavailable state of the + // range, we also won't be able to add replicas as we try above, but hope + // springs eternal. + log.VEventf(ctx, 1, "unable to take action - live voters %v don't meet quorum of %d", + liveVoters, quorum) return AllocatorRangeUnavailable, 0 } - if have == need && len(deadVoterReplicas) > 0 { - // Range has dead replica(s). We should up-replicate to add another before + if haveVoters == neededVoters && len(deadVoters) > 0 { + // Range has dead voter(s). We should up-replicate to add another before // before removing the dead one. This can avoid permanent data loss in cases // where the node is only temporarily dead, but we remove it from the range // and lose a second node before we can up-replicate (#25392). - // The dead replica(s) will be down-replicated later. + // The dead voter(s) will be down-replicated later. priority := addDeadReplacementVoterPriority action := AllocatorReplaceDeadVoter - log.VEventf(ctx, 3, "%s - replacement for %d dead replicas priority=%.2f", - action, len(deadVoterReplicas), priority) + log.VEventf(ctx, 3, "%s - replacement for %d dead voters priority=%.2f", + action, len(deadVoters), priority) return action, priority } - if have == need && len(decommissioningReplicas) > 0 { - // Range has decommissioning replica(s), which should be replaced. + if haveVoters == neededVoters && len(decommissioningVoters) > 0 { + // Range has decommissioning voter(s), which should be replaced. priority := addDecommissioningReplacementVoterPriority action := AllocatorReplaceDecommissioningVoter - log.VEventf(ctx, 3, "%s - replacement for %d decommissioning replicas priority=%.2f", - action, len(decommissioningReplicas), priority) + log.VEventf(ctx, 3, "%s - replacement for %d decommissioning voters priority=%.2f", + action, len(decommissioningVoters), priority) return action, priority } - // Removal actions follow. - // TODO(a-robinson): There's an additional case related to dead replicas that - // we should handle above. If there are one or more dead replicas, have < - // need, and there are no available stores to up-replicate to, then we should - // try to remove the dead replica(s) to get down to an odd number of - // replicas. - if len(deadVoterReplicas) > 0 { + // Voting replica removal actions follow. + // TODO(aayush): There's an additional case related to dead voters that we + // should handle above. If there are one or more dead replicas, have < need, + // and there are no available stores to up-replicate to, then we should try to + // remove the dead replica(s) to get down to an odd number of replicas. + if len(deadVoters) > 0 { // The range has dead replicas, which should be removed immediately. - priority := removeDeadVoterPriority + float64(quorum-len(liveVoterReplicas)) + priority := removeDeadVoterPriority + float64(quorum-len(liveVoters)) action := AllocatorRemoveDeadVoter log.VEventf(ctx, 3, "%s - dead=%d, live=%d, quorum=%d, priority=%.2f", - action, len(deadVoterReplicas), len(liveVoterReplicas), quorum, priority) + action, len(deadVoters), len(liveVoters), quorum, priority) return action, priority } - if len(decommissioningReplicas) > 0 { - // Range is over-replicated, and has a decommissioning replica which + if len(decommissioningVoters) > 0 { + // Range is over-replicated, and has a decommissioning voter which // should be removed. priority := removeDecommissioningVoterPriority action := AllocatorRemoveDecommissioningVoter log.VEventf(ctx, 3, "%s - need=%d, have=%d, num_decommissioning=%d, priority=%.2f", - action, need, have, len(decommissioningReplicas), priority) + action, neededVoters, haveVoters, len(decommissioningVoters), priority) return action, priority } - if have > need { - // Range is over-replicated, and should remove a replica. - // Ranges with an even number of replicas get extra priority because + if haveVoters > neededVoters { + // Range is over-replicated, and should remove a voter. + // Ranges with an even number of voters get extra priority because // they have a more fragile quorum. - priority := removeExtraVoterPriority - float64(have%2) + priority := removeExtraVoterPriority - float64(haveVoters%2) action := AllocatorRemoveVoter - log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, need, have, priority) + log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededVoters, haveVoters, priority) + return action, priority + } + + // Non-voting replica actions follow. + // + // Non-voting replica addition. + haveNonVoters := len(nonVoterReplicas) + neededNonVoters := GetNeededNonVoters(haveVoters, int(zone.GetNumNonVoters()), clusterNodes) + if haveNonVoters < neededNonVoters { + priority := addMissingNonVoterPriority + action := AllocatorAddNonVoter + log.VEventf(ctx, 3, "%s - missing non-voter need=%d, have=%d, priority=%.2f", + action, neededNonVoters, haveNonVoters, priority) + return action, priority + } + + // Non-voting replica removal. + if haveNonVoters > neededNonVoters { + // Like above, the range is over-replicated but should remove a non-voter. + priority := removeExtraNonVoterPriority + action := AllocatorRemoveNonVoter + log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededNonVoters, haveNonVoters, priority) return action, priority } diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 3eac7a95900b..111c6a7f48d3 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -4306,6 +4306,68 @@ func TestAllocatorComputeAction(t *testing.T) { }, expectedAction: AllocatorReplaceDeadVoter, }, + // Need 1 non-voter but a voter is on a dead store. + { + zone: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(5), + NumVoters: proto.Int32(3), + RangeMinBytes: proto.Int64(0), + RangeMaxBytes: proto.Int64(64000), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + Type: roachpb.ReplicaTypeNonVoter(), + }, + }, + }, + expectedAction: AllocatorReplaceDeadVoter, + }, + // Need 3 replicas, have 2, but one of them is dead so we don't have quorum. + { + zone: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(3), + Constraints: []zonepb.ConstraintsConjunction{{Constraints: []zonepb.Constraint{{Value: "us-east", Type: zonepb.Constraint_DEPRECATED_POSITIVE}}}}, + RangeMinBytes: proto.Int64(0), + RangeMaxBytes: proto.Int64(64000), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + }, + }, + }, + // TODO(aayush): This test should be returning an + // AllocatorRangeUnavailable. + expectedAction: AllocatorAddVoter, + }, + // Need three replicas, have two. { zone: zonepb.ZoneConfig{ @@ -4330,6 +4392,36 @@ func TestAllocatorComputeAction(t *testing.T) { }, expectedAction: AllocatorAddVoter, }, + // Need a voter and a non-voter. + { + zone: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(5), + NumVoters: proto.Int32(3), + RangeMinBytes: proto.Int64(0), + RangeMaxBytes: proto.Int64(64000), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + Type: roachpb.ReplicaTypeNonVoter(), + }, + }, + }, + expectedAction: AllocatorAddVoter, + }, // Need five replicas, have four, one is on a dead store. { zone: zonepb.ZoneConfig{ @@ -4588,6 +4680,66 @@ func TestAllocatorComputeAction(t *testing.T) { }, expectedAction: AllocatorRemoveVoter, }, + // Need 2 non-voting replicas, have none. + { + zone: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(5), + NumVoters: proto.Int32(3), + RangeMinBytes: proto.Int64(0), + RangeMaxBytes: proto.Int64(64000), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + }, + expectedAction: AllocatorAddNonVoter, + }, + // Need 1 non-voting replicas, have 2. + { + zone: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(2), + NumVoters: proto.Int32(1), + RangeMinBytes: proto.Int64(0), + RangeMaxBytes: proto.Int64(64000), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + Type: roachpb.ReplicaTypeNonVoter(), + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + Type: roachpb.ReplicaTypeNonVoter(), + }, + }, + }, + expectedAction: AllocatorRemoveNonVoter, + }, // Need three replicas, two are on dead stores. Should // be a noop because there aren't enough live replicas for // a quorum. @@ -5261,7 +5413,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { desc.EndKey = prefixKey clusterNodes := a.storePool.ClusterNodeCount() - effectiveNumReplicas := GetNeededReplicas(*zone.NumReplicas, clusterNodes) + effectiveNumReplicas := GetNeededVoters(*zone.NumReplicas, clusterNodes) require.Equal(t, c.expectedNumReplicas, effectiveNumReplicas, "clusterNodes=%d", clusterNodes) action, _ := a.ComputeAction(ctx, zone, &desc) @@ -5280,7 +5432,7 @@ func TestAllocatorGetNeededReplicas(t *testing.T) { availNodes int expected int }{ - // If zone.NumReplicas <= 3, GetNeededReplicas should always return zone.NumReplicas. + // If zone.NumReplicas <= 3, GetNeededVoters should always return zone.NumReplicas. {1, 0, 1}, {1, 1, 1}, {2, 0, 2}, @@ -5315,9 +5467,9 @@ func TestAllocatorGetNeededReplicas(t *testing.T) { } for _, tc := range testCases { - if e, a := tc.expected, GetNeededReplicas(tc.zoneRepls, tc.availNodes); e != a { + if e, a := tc.expected, GetNeededVoters(tc.zoneRepls, tc.availNodes); e != a { t.Errorf( - "GetNeededReplicas(zone.NumReplicas=%d, availNodes=%d) got %d; want %d", + "GetNeededVoters(zone.NumReplicas=%d, availNodes=%d) got %d; want %d", tc.zoneRepls, tc.availNodes, a, e) } } diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index 6d7ad81c95ec..ba03337e2745 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -176,7 +176,7 @@ func calcRangeCounter( unavailable = !desc.Replicas().CanMakeProgress(func(rDesc roachpb.ReplicaDescriptor) bool { return livenessMap[rDesc.NodeID].IsLive }) - needed := GetNeededReplicas(numReplicas, clusterNodes) + needed := GetNeededVoters(numReplicas, clusterNodes) liveVoterReplicas := calcLiveVoterReplicas(desc, livenessMap) if needed > liveVoterReplicas { underreplicated = true diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index fe59733c099a..9494e1cfc9cc 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -404,6 +404,9 @@ func (rq *replicateQueue) processOneChange( // Requeue because either we failed to transition out of a joint state // (bad) or we did and there might be more to do for that range. return true, err + case AllocatorAddNonVoter, AllocatorRemoveNonVoter: + return false, errors.Errorf("allocator actions pertaining to non-voters are"+ + " currently not supported: %v", action) default: return false, errors.Errorf("unknown allocator action %v", action) } @@ -478,7 +481,7 @@ func (rq *replicateQueue) addOrReplace( } clusterNodes := rq.allocator.storePool.ClusterNodeCount() - need := GetNeededReplicas(*zone.NumReplicas, clusterNodes) + need := GetNeededVoters(*zone.NumReplicas, clusterNodes) // Only up-replicate if there are suitable allocation targets such that, // either the replication goal is met, or it is possible to get to the next diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 96b176fcc575..f14c3a9200a9 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -504,7 +504,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( desc.RangeID, replWithStats.qps) clusterNodes := sr.rq.allocator.storePool.ClusterNodeCount() - desiredReplicas := GetNeededReplicas(*zone.NumReplicas, clusterNodes) + desiredReplicas := GetNeededVoters(*zone.NumReplicas, clusterNodes) targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas) targetReplicas := make([]roachpb.ReplicaDescriptor, 0, desiredReplicas) currentReplicas := desc.Replicas().Descriptors() diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index e5650ba9d50f..fa11c8f95111 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -53,6 +53,13 @@ func ReplicaTypeLearner() *ReplicaType { return &t } +// ReplicaTypeNonVoter returns a NON_VOTER pointer suitable for use in +// a nullable proto field. +func ReplicaTypeNonVoter() *ReplicaType { + t := NON_VOTER + return &t +} + // ReplicaSet is a set of replicas, usually the nodes/stores on which // replicas of a range are stored. type ReplicaSet struct {