From f6fc512c36d0b7a371dca6ec2423edfe38a4780a Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 10 Aug 2021 18:01:40 -0400 Subject: [PATCH] kvserver: count draining nodes as live when computing quorum Similar to https://github.com/cockroachdb/cockroach/pull/67714 Draining nodes were considered non-live by the allocator when it made the determination of whether a range could achieve quorum. This meant that, for instance, on a cluster with a replication factor of 5, if we had 3 or more nodes marked draining, we (with a high likelihood) wouldn't be able to decommission nodes from the cluster. Furthermore, due to the same reason as above the system also would incorrectly decide to not rebalance ranges that had more than a quorum of replicas on draining nodes. This patch fixes this problem by considering replicas on draining nodes as live for the purposes of determining whether a range has quorum. This likely fixes a considerable subset of "stuck decommissioning" issues we've seen in the wild. Follows from https://github.com/cockroachlabs/support/issues/1105 Release note: None --- pkg/kv/kvserver/allocator.go | 10 ++++----- pkg/kv/kvserver/replica_command.go | 12 +++++----- pkg/kv/kvserver/replicate_queue.go | 4 ++-- pkg/kv/kvserver/store_pool.go | 35 ++++++++++++++++++++---------- pkg/kv/kvserver/store_pool_test.go | 12 +++++----- 5 files changed, 44 insertions(+), 29 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 53b33bf2afdc..5c4ac86230d5 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -549,8 +549,8 @@ func (a *Allocator) computeAction( // stores to be unavailable, just because their nodes have failed a liveness // heartbeat in the recent past. This means we won't move those replicas // elsewhere (for a regular rebalance or for decommissioning). - const includeSuspectStores = true - liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectStores) + const includeSuspectAndDrainingStores = true + liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores) if len(liveVoters) < quorum { // Do not take any replacement/removal action if we do not have a quorum of @@ -630,7 +630,7 @@ func (a *Allocator) computeAction( } liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas( - nonVoterReplicas, includeSuspectStores, + nonVoterReplicas, includeSuspectAndDrainingStores, ) if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 { // The range has non-voter(s) on a dead node that we should replace. @@ -1297,7 +1297,7 @@ func (a *Allocator) TransferLeaseTarget( return roachpb.ReplicaDescriptor{} } // Verify that the preferred replica is eligible to receive the lease. - preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectStores */) + preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectAndDrainingStores */) if len(preferred) == 1 { return preferred[0] } @@ -1312,7 +1312,7 @@ func (a *Allocator) TransferLeaseTarget( } // Only consider live, non-draining, non-suspect replicas. - existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectStores */) + existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectAndDrainingStores */) // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) { diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 10d48087b433..88a9aaf4c93f 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2042,11 +2042,11 @@ type changeReplicasTxnArgs struct { // // - Replicas on decommissioning node/store are considered live. // - // - If `includeSuspectStores` is true, stores that are marked suspect (i.e. + // - If `includeSuspectAndDrainingStores` is true, stores that are marked suspect (i.e. // stores that have failed a liveness heartbeat in the recent past) are // considered live. Otherwise, they are excluded from the returned slices. liveAndDeadReplicas func( - repls []roachpb.ReplicaDescriptor, includeSuspectStores bool, + repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) logChange logChangeFn @@ -2141,7 +2141,7 @@ func execChangeReplicasTxn( // Note that the allocator will avoid rebalancing to stores that are // currently marked suspect. See uses of StorePool.getStoreList() in // allocator.go. - liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectStores */) + liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectAndDrainingStores */) if !replicas.CanMakeProgress( func(rDesc roachpb.ReplicaDescriptor) bool { for _, inner := range liveReplicas { @@ -2895,8 +2895,10 @@ func (s *Store) relocateOne( for _, candidate := range candidateTargets { store, ok := storeMap[candidate.StoreID] if !ok { - return nil, nil, fmt.Errorf("cannot up-replicate to s%d; missing gossiped StoreDescriptor", - candidate.StoreID) + return nil, nil, fmt.Errorf( + "cannot up-replicate to s%d; missing gossiped StoreDescriptor"+ + " (the store is likely dead, draining or decommissioning)", candidate.StoreID, + ) } candidateDescs = append(candidateDescs, *store) } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 976bc0603e8c..7dc487564347 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -365,10 +365,10 @@ func (rq *replicateQueue) processOneChange( voterReplicas := desc.Replicas().VoterDescriptors() nonVoterReplicas := desc.Replicas().NonVoterDescriptors() liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas( - voterReplicas, true, /* includeSuspectStores */ + voterReplicas, true, /* includeSuspectAndDrainingStores */ ) liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas( - nonVoterReplicas, true, /* includeSuspectStores */ + nonVoterReplicas, true, /* includeSuspectAndDrainingStores */ ) // NB: the replication layer ensures that the below operations don't cause diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 11fa84324420..d77eb725a890 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -236,8 +236,14 @@ const ( storeStatusAvailable // The store is decommissioning. storeStatusDecommissioning - // The store failed it's liveness heartbeat recently and is considered suspect. + // The store failed it's liveness heartbeat recently and is considered + // suspect. Consequently, stores always move from `storeStatusUnknown` + // (indicating a node that has a non-live node liveness record) to + // `storeStatusSuspect`. storeStatusSuspect + // The store is alive but is currently marked as draining, so it is not a + // candidate for lease transfers or replica rebalancing. + storeStatusDraining ) func (sd *storeDetail) status( @@ -282,6 +288,9 @@ func (sd *storeDetail) status( // Even if the store has been updated via gossip, we still rely on // the node liveness to determine whether it is considered live. + // + // Store statuses checked in the following order: + // dead -> decommissioning -> unknown -> draining -> suspect -> available. switch nl(sd.desc.Node.NodeID, now, threshold) { case livenesspb.NodeLivenessStatus_DEAD, livenesspb.NodeLivenessStatus_DECOMMISSIONED: return storeStatusDead @@ -302,7 +311,7 @@ func (sd *storeDetail) status( // and we may not see a store in this state. To help with that we perform // a similar clear of lastAvailable on a DEAD store. sd.lastAvailable = time.Time{} - return storeStatusUnknown + return storeStatusDraining } if sd.isThrottled(now) { @@ -668,11 +677,12 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { // // - Replicas on decommissioning node/store are considered live. // -// - If `includeSuspectStores` is true, stores that are marked suspect (i.e. -// stores that have failed a liveness heartbeat in the recent past) are -// considered live. Otherwise, they are excluded from the returned slices. +// - If `includeSuspectAndDrainingStores` is true, stores that are marked +// suspect (i.e. stores that have failed a liveness heartbeat in the recent +// past), and stores that are marked as draining are considered live. Otherwise, +// they are excluded from the returned slices. func (sp *StorePool) liveAndDeadReplicas( - repls []roachpb.ReplicaDescriptor, includeSuspectStores bool, + repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -696,8 +706,8 @@ func (sp *StorePool) liveAndDeadReplicas( liveReplicas = append(liveReplicas, repl) case storeStatusUnknown: // No-op. - case storeStatusSuspect: - if includeSuspectStores { + case storeStatusSuspect, storeStatusDraining: + if includeSuspectAndDrainingStores { liveReplicas = append(liveReplicas, repl) } default: @@ -824,7 +834,8 @@ type throttledStoreReasons []string // getStoreList returns a storeList that contains all active stores that contain // the required attributes and their associated stats. The storeList is filtered // according to the provided storeFilter. It also returns the total number of -// alive and throttled stores. +// alive stores and a list of throttled stores with a reason for why they're +// throttled. func (sp *StorePool) getStoreList(filter storeFilter) (StoreList, int, throttledStoreReasons) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -881,9 +892,11 @@ func (sp *StorePool) getStoreListFromIDsLocked( case storeStatusAvailable: aliveStoreCount++ storeDescriptors = append(storeDescriptors, *detail.desc) + case storeStatusDraining: + throttled = append(throttled, fmt.Sprintf("s%d: draining", storeID)) case storeStatusSuspect: aliveStoreCount++ - throttled = append(throttled, "throttled because the node is considered suspect") + throttled = append(throttled, fmt.Sprintf("s%d: suspect", storeID)) if filter != storeFilterThrottled && filter != storeFilterSuspect { storeDescriptors = append(storeDescriptors, *detail.desc) } @@ -1006,7 +1019,7 @@ func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal( log.VEventf(ctx, 3, "s%d is a live target, candidate for rebalancing", targetStoreID) return true - case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect: + case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect, storeStatusDraining: log.VEventf(ctx, 3, "not considering non-live store s%d (%v)", targetStoreID, status) return false diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index 4e8355142df1..3010375795ae 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -754,7 +754,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) } - liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) if len(liveReplicas) != 5 { t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) } @@ -776,7 +776,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { // Mark node 4 as merely unavailable. mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_UNAVAILABLE) - liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) { t.Fatalf("expected live replicas %+v; got %+v", e, a) } @@ -803,7 +803,7 @@ func TestStorePoolDefaultState(t *testing.T) { liveReplicas, deadReplicas := sp.liveAndDeadReplicas( []roachpb.ReplicaDescriptor{{StoreID: 1}}, - false, /* includeSuspectStores */ + false, /* includeSuspectAndDrainingStores */ ) if len(liveReplicas) != 0 || len(deadReplicas) != 0 { t.Errorf("expected 0 live and 0 dead replicas; got %v and %v", liveReplicas, deadReplicas) @@ -913,7 +913,7 @@ func TestStorePoolSuspected(t *testing.T) { s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) sp.detailsMu.Unlock() - require.Equal(t, s, storeStatusUnknown) + require.Equal(t, s, storeStatusDraining) require.True(t, detail.lastAvailable.IsZero()) mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) @@ -1071,7 +1071,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) } - liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) if len(liveReplicas) != 5 { t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) } @@ -1083,7 +1083,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { // Mark node 5 as dead. mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD) - liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) // Decommissioning replicas are considered live. if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) { t.Fatalf("expected live replicas %+v; got %+v", e, a)