Skip to content

Commit

Permalink
kvserver: count draining nodes as live when computing quorum
Browse files Browse the repository at this point in the history
Similar to #67714

Draining nodes were considered non-live by the allocator when it made the
determination of whether a range could achieve quorum. This meant that, for
instance, on a cluster with a replication factor of 5, if we had 3 or more
nodes marked draining, we (with a high likelihood) wouldn't be able to
decommission nodes from the cluster.

Furthermore, due to the same reason as above the system also would incorrectly
decide to not rebalance ranges that had more than a quorum of replicas on
draining nodes.

This patch fixes this problem by considering replicas on draining nodes as live
for the purposes of determining whether a range has quorum. This likely fixes a
considerable subset of "stuck decommissioning" issues we've seen in the wild.

Follows from https://github.com/cockroachlabs/support/issues/1105

Release note: None
  • Loading branch information
aayushshah15 committed Sep 2, 2021
1 parent a46e3a2 commit f6fc512
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 29 deletions.
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ func (a *Allocator) computeAction(
// stores to be unavailable, just because their nodes have failed a liveness
// heartbeat in the recent past. This means we won't move those replicas
// elsewhere (for a regular rebalance or for decommissioning).
const includeSuspectStores = true
liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectStores)
const includeSuspectAndDrainingStores = true
liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores)

if len(liveVoters) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of
Expand Down Expand Up @@ -630,7 +630,7 @@ func (a *Allocator) computeAction(
}

liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(
nonVoterReplicas, includeSuspectStores,
nonVoterReplicas, includeSuspectAndDrainingStores,
)
if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 {
// The range has non-voter(s) on a dead node that we should replace.
Expand Down Expand Up @@ -1297,7 +1297,7 @@ func (a *Allocator) TransferLeaseTarget(
return roachpb.ReplicaDescriptor{}
}
// Verify that the preferred replica is eligible to receive the lease.
preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectStores */)
preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectAndDrainingStores */)
if len(preferred) == 1 {
return preferred[0]
}
Expand All @@ -1312,7 +1312,7 @@ func (a *Allocator) TransferLeaseTarget(
}

// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectStores */)
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectAndDrainingStores */)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2042,11 +2042,11 @@ type changeReplicasTxnArgs struct {
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// - If `includeSuspectAndDrainingStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
liveAndDeadReplicas func(
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor)

logChange logChangeFn
Expand Down Expand Up @@ -2141,7 +2141,7 @@ func execChangeReplicasTxn(
// Note that the allocator will avoid rebalancing to stores that are
// currently marked suspect. See uses of StorePool.getStoreList() in
// allocator.go.
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectStores */)
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectAndDrainingStores */)
if !replicas.CanMakeProgress(
func(rDesc roachpb.ReplicaDescriptor) bool {
for _, inner := range liveReplicas {
Expand Down Expand Up @@ -2895,8 +2895,10 @@ func (s *Store) relocateOne(
for _, candidate := range candidateTargets {
store, ok := storeMap[candidate.StoreID]
if !ok {
return nil, nil, fmt.Errorf("cannot up-replicate to s%d; missing gossiped StoreDescriptor",
candidate.StoreID)
return nil, nil, fmt.Errorf(
"cannot up-replicate to s%d; missing gossiped StoreDescriptor"+
" (the store is likely dead, draining or decommissioning)", candidate.StoreID,
)
}
candidateDescs = append(candidateDescs, *store)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,10 @@ func (rq *replicateQueue) processOneChange(
voterReplicas := desc.Replicas().VoterDescriptors()
nonVoterReplicas := desc.Replicas().NonVoterDescriptors()
liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
voterReplicas, true, /* includeSuspectStores */
voterReplicas, true, /* includeSuspectAndDrainingStores */
)
liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
nonVoterReplicas, true, /* includeSuspectStores */
nonVoterReplicas, true, /* includeSuspectAndDrainingStores */
)

// NB: the replication layer ensures that the below operations don't cause
Expand Down
35 changes: 24 additions & 11 deletions pkg/kv/kvserver/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,14 @@ const (
storeStatusAvailable
// The store is decommissioning.
storeStatusDecommissioning
// The store failed it's liveness heartbeat recently and is considered suspect.
// The store failed it's liveness heartbeat recently and is considered
// suspect. Consequently, stores always move from `storeStatusUnknown`
// (indicating a node that has a non-live node liveness record) to
// `storeStatusSuspect`.
storeStatusSuspect
// The store is alive but is currently marked as draining, so it is not a
// candidate for lease transfers or replica rebalancing.
storeStatusDraining
)

func (sd *storeDetail) status(
Expand Down Expand Up @@ -282,6 +288,9 @@ func (sd *storeDetail) status(

// Even if the store has been updated via gossip, we still rely on
// the node liveness to determine whether it is considered live.
//
// Store statuses checked in the following order:
// dead -> decommissioning -> unknown -> draining -> suspect -> available.
switch nl(sd.desc.Node.NodeID, now, threshold) {
case livenesspb.NodeLivenessStatus_DEAD, livenesspb.NodeLivenessStatus_DECOMMISSIONED:
return storeStatusDead
Expand All @@ -302,7 +311,7 @@ func (sd *storeDetail) status(
// and we may not see a store in this state. To help with that we perform
// a similar clear of lastAvailable on a DEAD store.
sd.lastAvailable = time.Time{}
return storeStatusUnknown
return storeStatusDraining
}

if sd.isThrottled(now) {
Expand Down Expand Up @@ -668,11 +677,12 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) {
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
// - If `includeSuspectAndDrainingStores` is true, stores that are marked
// suspect (i.e. stores that have failed a liveness heartbeat in the recent
// past), and stores that are marked as draining are considered live. Otherwise,
// they are excluded from the returned slices.
func (sp *StorePool) liveAndDeadReplicas(
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) {
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()
Expand All @@ -696,8 +706,8 @@ func (sp *StorePool) liveAndDeadReplicas(
liveReplicas = append(liveReplicas, repl)
case storeStatusUnknown:
// No-op.
case storeStatusSuspect:
if includeSuspectStores {
case storeStatusSuspect, storeStatusDraining:
if includeSuspectAndDrainingStores {
liveReplicas = append(liveReplicas, repl)
}
default:
Expand Down Expand Up @@ -824,7 +834,8 @@ type throttledStoreReasons []string
// getStoreList returns a storeList that contains all active stores that contain
// the required attributes and their associated stats. The storeList is filtered
// according to the provided storeFilter. It also returns the total number of
// alive and throttled stores.
// alive stores and a list of throttled stores with a reason for why they're
// throttled.
func (sp *StorePool) getStoreList(filter storeFilter) (StoreList, int, throttledStoreReasons) {
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()
Expand Down Expand Up @@ -881,9 +892,11 @@ func (sp *StorePool) getStoreListFromIDsLocked(
case storeStatusAvailable:
aliveStoreCount++
storeDescriptors = append(storeDescriptors, *detail.desc)
case storeStatusDraining:
throttled = append(throttled, fmt.Sprintf("s%d: draining", storeID))
case storeStatusSuspect:
aliveStoreCount++
throttled = append(throttled, "throttled because the node is considered suspect")
throttled = append(throttled, fmt.Sprintf("s%d: suspect", storeID))
if filter != storeFilterThrottled && filter != storeFilterSuspect {
storeDescriptors = append(storeDescriptors, *detail.desc)
}
Expand Down Expand Up @@ -1006,7 +1019,7 @@ func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal(
log.VEventf(ctx, 3,
"s%d is a live target, candidate for rebalancing", targetStoreID)
return true
case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect:
case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect, storeStatusDraining:
log.VEventf(ctx, 3,
"not considering non-live store s%d (%v)", targetStoreID, status)
return false
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
Expand All @@ -776,7 +776,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
// Mark node 4 as merely unavailable.
mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_UNAVAILABLE)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */)
if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
Expand All @@ -803,7 +803,7 @@ func TestStorePoolDefaultState(t *testing.T) {

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(
[]roachpb.ReplicaDescriptor{{StoreID: 1}},
false, /* includeSuspectStores */
false, /* includeSuspectAndDrainingStores */
)
if len(liveReplicas) != 0 || len(deadReplicas) != 0 {
t.Errorf("expected 0 live and 0 dead replicas; got %v and %v", liveReplicas, deadReplicas)
Expand Down Expand Up @@ -913,7 +913,7 @@ func TestStorePoolSuspected(t *testing.T) {
s = detail.status(now,
timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
sp.detailsMu.Unlock()
require.Equal(t, s, storeStatusUnknown)
require.Equal(t, s, storeStatusDraining)
require.True(t, detail.lastAvailable.IsZero())

mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE)
Expand Down Expand Up @@ -1071,7 +1071,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
Expand All @@ -1083,7 +1083,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
// Mark node 5 as dead.
mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */)
// Decommissioning replicas are considered live.
if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
Expand Down

0 comments on commit f6fc512

Please sign in to comment.