From 8383f6a6a680af1e606a81b92794e720c55e30ff Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 10 Aug 2021 18:01:40 -0400 Subject: [PATCH] kvserver: count draining nodes as live when computing quorum Similar to https://github.com/cockroachdb/cockroach/pull/67714 Draining nodes were considered non-live by the allocator when it made the determination of whether a range could achieve quorum. This meant that, for instance, on a cluster with a replication factor of 5, if we had 3 or more nodes marked draining, we (with a high likelihood) wouldn't be able to decommission nodes from the cluster. Furthermore, due to the same reason as above the system also would incorrectly decide to not rebalance ranges that had more than a quorum of replicas on draining nodes. This patch fixes this problem by considering replicas on draining nodes as live for the purposes of determining whether a range has quorum. This likely fixes a considerable subset of "stuck decommissioning" issues we've seen in the wild. Follows from https://github.com/cockroachlabs/support/issues/1105 Release note (bug fix): Previously, draining a quorum of nodes (i.e. >=2 if replication factor is 3, >=3 if replication factor is 5, etc) would block the subsequent decommissioning of any other nodes in the cluster. This patch fixes this bug. Now, if the lowest replication factor of some zone in the cluster is RF, operators should be able to safely drain up to RF-1 nodes simultaneously. --- pkg/kv/kvserver/allocator.go | 10 +-- pkg/kv/kvserver/replica_command.go | 17 +++-- pkg/kv/kvserver/replicate_queue.go | 4 +- pkg/kv/kvserver/store_pool.go | 37 +++++++--- pkg/kv/kvserver/store_pool_test.go | 113 +++++++++++++++++++++++++++-- 5 files changed, 149 insertions(+), 32 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 7b94b584ae9f..b9c30fd95e4a 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -550,8 +550,8 @@ func (a *Allocator) computeAction( // stores to be unavailable, just because their nodes have failed a liveness // heartbeat in the recent past. This means we won't move those replicas // elsewhere (for a regular rebalance or for decommissioning). - const includeSuspectStores = true - liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectStores) + const includeSuspectAndDrainingStores = true + liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores) if len(liveVoters) < quorum { // Do not take any replacement/removal action if we do not have a quorum of @@ -631,7 +631,7 @@ func (a *Allocator) computeAction( } liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas( - nonVoterReplicas, includeSuspectStores, + nonVoterReplicas, includeSuspectAndDrainingStores, ) if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 { // The range has non-voter(s) on a dead node that we should replace. @@ -1298,7 +1298,7 @@ func (a *Allocator) TransferLeaseTarget( return roachpb.ReplicaDescriptor{} } // Verify that the preferred replica is eligible to receive the lease. - preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectStores */) + preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectAndDrainingStores */) if len(preferred) == 1 { return preferred[0] } @@ -1313,7 +1313,7 @@ func (a *Allocator) TransferLeaseTarget( } // Only consider live, non-draining, non-suspect replicas. - existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectStores */) + existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectAndDrainingStores */) // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) { diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index d8732a03bedf..1b5cf919041b 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2042,11 +2042,12 @@ type changeReplicasTxnArgs struct { // // - Replicas on decommissioning node/store are considered live. // - // - If `includeSuspectStores` is true, stores that are marked suspect (i.e. - // stores that have failed a liveness heartbeat in the recent past) are - // considered live. Otherwise, they are excluded from the returned slices. + // - If `includeSuspectAndDrainingStores` is true, stores that are marked + // suspect (i.e. stores that have failed a liveness heartbeat in the recent + // past) are considered live. Otherwise, they are excluded from the returned + // slices. liveAndDeadReplicas func( - repls []roachpb.ReplicaDescriptor, includeSuspectStores bool, + repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) logChange logChangeFn @@ -2141,7 +2142,7 @@ func execChangeReplicasTxn( // Note that the allocator will avoid rebalancing to stores that are // currently marked suspect. See uses of StorePool.getStoreList() in // allocator.go. - liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectStores */) + liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectAndDrainingStores */) if !replicas.CanMakeProgress( func(rDesc roachpb.ReplicaDescriptor) bool { for _, inner := range liveReplicas { @@ -2895,8 +2896,10 @@ func (s *Store) relocateOne( for _, candidate := range candidateTargets { store, ok := storeMap[candidate.StoreID] if !ok { - return nil, nil, fmt.Errorf("cannot up-replicate to s%d; missing gossiped StoreDescriptor", - candidate.StoreID) + return nil, nil, fmt.Errorf( + "cannot up-replicate to s%d; missing gossiped StoreDescriptor"+ + " (the store is likely dead, draining or decommissioning)", candidate.StoreID, + ) } candidateDescs = append(candidateDescs, *store) } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 88eb11a6c8bb..d52e1ab42e37 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -366,10 +366,10 @@ func (rq *replicateQueue) processOneChange( voterReplicas := desc.Replicas().VoterDescriptors() nonVoterReplicas := desc.Replicas().NonVoterDescriptors() liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas( - voterReplicas, true, /* includeSuspectStores */ + voterReplicas, true, /* includeSuspectAndDrainingStores */ ) liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas( - nonVoterReplicas, true, /* includeSuspectStores */ + nonVoterReplicas, true, /* includeSuspectAndDrainingStores */ ) // NB: the replication layer ensures that the below operations don't cause diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 55bcfdf3a1cf..4a0fce63bf7e 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -235,10 +235,17 @@ const ( storeStatusThrottled // The store is alive and available. storeStatusAvailable - // The store is decommissioning. + // The store is decommissioning. If draining or suspect stores are + // decommissioning, this status takes precedence over `storeStatusDraining` + // and `storeStatusSuspect`. storeStatusDecommissioning - // The store failed it's liveness heartbeat recently and is considered suspect. + // The store failed it's liveness heartbeat recently and is considered + // suspect. Note that stores must always pass from `storeStatusUnknown` first + // before being marked suspect. storeStatusSuspect + // The store is alive but is currently marked as draining, so it is not a + // candidate for lease transfers or replica rebalancing. + storeStatusDraining ) func (sd *storeDetail) status( @@ -303,7 +310,7 @@ func (sd *storeDetail) status( // and we may not see a store in this state. To help with that we perform // a similar clear of lastAvailable on a DEAD store. sd.lastAvailable = time.Time{} - return storeStatusUnknown + return storeStatusDraining } if sd.isThrottled(now) { @@ -669,11 +676,15 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { // // - Replicas on decommissioning node/store are considered live. // -// - If `includeSuspectStores` is true, stores that are marked suspect (i.e. -// stores that have failed a liveness heartbeat in the recent past) are -// considered live. Otherwise, they are excluded from the returned slices. +// - If `includeSuspectAndDrainingStores` is true, stores that are marked +// suspect (i.e. stores that have failed a liveness heartbeat in the recent +// past), and stores that are marked as draining are considered live. Otherwise, +// they are excluded from the returned slices. +// +// TODO(aayush): Clean up the return type / contract of this method to make it +// more explicit which callers care about computing quorum and which ones dont. func (sp *StorePool) liveAndDeadReplicas( - repls []roachpb.ReplicaDescriptor, includeSuspectStores bool, + repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -697,8 +708,8 @@ func (sp *StorePool) liveAndDeadReplicas( liveReplicas = append(liveReplicas, repl) case storeStatusUnknown: // No-op. - case storeStatusSuspect: - if includeSuspectStores { + case storeStatusSuspect, storeStatusDraining: + if includeSuspectAndDrainingStores { liveReplicas = append(liveReplicas, repl) } default: @@ -825,7 +836,7 @@ type throttledStoreReasons []string // getStoreList returns a storeList that contains all active stores that contain // the required attributes and their associated stats. The storeList is filtered // according to the provided storeFilter. It also returns the total number of -// alive and throttled stores. +// alive stores and a list of throttled stores explaining why. func (sp *StorePool) getStoreList(filter storeFilter) (StoreList, int, throttledStoreReasons) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -882,9 +893,11 @@ func (sp *StorePool) getStoreListFromIDsLocked( case storeStatusAvailable: aliveStoreCount++ storeDescriptors = append(storeDescriptors, *detail.desc) + case storeStatusDraining: + throttled = append(throttled, fmt.Sprintf("[s%d draining]", storeID)) case storeStatusSuspect: aliveStoreCount++ - throttled = append(throttled, "throttled because the node is considered suspect") + throttled = append(throttled, fmt.Sprintf("[s%d suspect]", storeID)) if filter != storeFilterThrottled && filter != storeFilterSuspect { storeDescriptors = append(storeDescriptors, *detail.desc) } @@ -1007,7 +1020,7 @@ func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal( log.VEventf(ctx, 3, "s%d is a live target, candidate for rebalancing", targetStoreID) return true - case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect: + case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect, storeStatusDraining: log.VEventf(ctx, 3, "not considering non-live store s%d (%v)", targetStoreID, status) return false diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index 0f1a55662a9c..b291d87cc84e 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -755,7 +755,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) } - liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) if len(liveReplicas) != 5 { t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) } @@ -777,7 +777,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { // Mark node 4 as merely unavailable. mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_UNAVAILABLE) - liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) { t.Fatalf("expected live replicas %+v; got %+v", e, a) } @@ -804,7 +804,7 @@ func TestStorePoolDefaultState(t *testing.T) { liveReplicas, deadReplicas := sp.liveAndDeadReplicas( []roachpb.ReplicaDescriptor{{StoreID: 1}}, - false, /* includeSuspectStores */ + false, /* includeSuspectAndDrainingStores */ ) if len(liveReplicas) != 0 || len(deadReplicas) != 0 { t.Errorf("expected 0 live and 0 dead replicas; got %v and %v", liveReplicas, deadReplicas) @@ -914,7 +914,7 @@ func TestStorePoolSuspected(t *testing.T) { s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) sp.detailsMu.Unlock() - require.Equal(t, s, storeStatusUnknown) + require.Equal(t, s, storeStatusDraining) require.True(t, detail.lastAvailable.IsZero()) mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) @@ -1006,6 +1006,107 @@ func TestGetLocalities(t *testing.T) { } } +func TestStorePoolDrainingReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + stopper, g, _, sp, mnl := createTestStorePool( + TestTimeUntilStoreDead, false, /* deterministic */ + func() int { return 5 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_LIVE) + defer stopper.Stop(context.Background()) + sg := gossiputil.NewStoreGossiper(g) + + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + }, + } + + replicas := []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + { + NodeID: 3, + StoreID: 3, + ReplicaID: 4, + }, + { + NodeID: 4, + StoreID: 4, + ReplicaID: 4, + }, + { + NodeID: 5, + StoreID: 5, + ReplicaID: 5, + }, + } + + sg.GossipStores(stores, t) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas( + replicas, + false, /* includeSuspectAndDrainingStores */ + ) + if len(liveReplicas) != 5 { + t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) + } + if len(deadReplicas) > 0 { + t.Fatalf("expected no dead replicas initially, found %d (%v)", len(deadReplicas), deadReplicas) + } + // Mark node 4 as decommissioning. + mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_DECOMMISSIONING) + // Mark node 5 as draining. + mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DRAINING) + + liveReplicas, deadReplicas = sp.liveAndDeadReplicas( + replicas, false, /* includeSuspectAndDrainingStores */ + ) + // Decommissioning replicas are considered live but draining replicas + // shouldn't be. + if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) { + t.Fatalf("expected live replicas %+v; got %+v", e, a) + } + if len(deadReplicas) != 0 { + t.Fatalf("expected 0 dead replicas found %+v", deadReplicas) + } + + liveReplicas, deadReplicas = sp.liveAndDeadReplicas( + replicas, true, /* includeSuspectAndDrainingStores */ + ) + // Draining replicas must also be considered live. + if a, e := liveReplicas, replicas; !reflect.DeepEqual(a, e) { + t.Fatalf("expected live replicas %+v; got %+v", e, a) + } + if len(deadReplicas) != 0 { + t.Fatalf("expected 0 dead replicas found %+v", deadReplicas) + } +} + func TestStorePoolDecommissioningReplicas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1072,7 +1173,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) } - liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) if len(liveReplicas) != 5 { t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) } @@ -1084,7 +1185,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { // Mark node 5 as dead. mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD) - liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) // Decommissioning replicas are considered live. if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) { t.Fatalf("expected live replicas %+v; got %+v", e, a)