diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 7b94b584ae9f..b9c30fd95e4a 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -550,8 +550,8 @@ func (a *Allocator) computeAction( // stores to be unavailable, just because their nodes have failed a liveness // heartbeat in the recent past. This means we won't move those replicas // elsewhere (for a regular rebalance or for decommissioning). - const includeSuspectStores = true - liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectStores) + const includeSuspectAndDrainingStores = true + liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores) if len(liveVoters) < quorum { // Do not take any replacement/removal action if we do not have a quorum of @@ -631,7 +631,7 @@ func (a *Allocator) computeAction( } liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas( - nonVoterReplicas, includeSuspectStores, + nonVoterReplicas, includeSuspectAndDrainingStores, ) if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 { // The range has non-voter(s) on a dead node that we should replace. @@ -1298,7 +1298,7 @@ func (a *Allocator) TransferLeaseTarget( return roachpb.ReplicaDescriptor{} } // Verify that the preferred replica is eligible to receive the lease. - preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectStores */) + preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectAndDrainingStores */) if len(preferred) == 1 { return preferred[0] } @@ -1313,7 +1313,7 @@ func (a *Allocator) TransferLeaseTarget( } // Only consider live, non-draining, non-suspect replicas. - existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectStores */) + existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectAndDrainingStores */) // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) { diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index d8732a03bedf..d19f1268a6b0 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2042,11 +2042,11 @@ type changeReplicasTxnArgs struct { // // - Replicas on decommissioning node/store are considered live. // - // - If `includeSuspectStores` is true, stores that are marked suspect (i.e. + // - If `includeSuspectAndDrainingStores` is true, stores that are marked suspect (i.e. // stores that have failed a liveness heartbeat in the recent past) are // considered live. Otherwise, they are excluded from the returned slices. liveAndDeadReplicas func( - repls []roachpb.ReplicaDescriptor, includeSuspectStores bool, + repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) logChange logChangeFn @@ -2141,7 +2141,7 @@ func execChangeReplicasTxn( // Note that the allocator will avoid rebalancing to stores that are // currently marked suspect. See uses of StorePool.getStoreList() in // allocator.go. - liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectStores */) + liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectAndDrainingStores */) if !replicas.CanMakeProgress( func(rDesc roachpb.ReplicaDescriptor) bool { for _, inner := range liveReplicas { @@ -2895,8 +2895,10 @@ func (s *Store) relocateOne( for _, candidate := range candidateTargets { store, ok := storeMap[candidate.StoreID] if !ok { - return nil, nil, fmt.Errorf("cannot up-replicate to s%d; missing gossiped StoreDescriptor", - candidate.StoreID) + return nil, nil, fmt.Errorf( + "cannot up-replicate to s%d; missing gossiped StoreDescriptor"+ + " (the store is likely dead, draining or decommissioning)", candidate.StoreID, + ) } candidateDescs = append(candidateDescs, *store) } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index feedd6045a4b..ce14e500fa43 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -373,10 +373,10 @@ func (rq *replicateQueue) processOneChange( voterReplicas := desc.Replicas().VoterDescriptors() nonVoterReplicas := desc.Replicas().NonVoterDescriptors() liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas( - voterReplicas, true, /* includeSuspectStores */ + voterReplicas, true, /* includeSuspectAndDrainingStores */ ) liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas( - nonVoterReplicas, true, /* includeSuspectStores */ + nonVoterReplicas, true, /* includeSuspectAndDrainingStores */ ) // NB: the replication layer ensures that the below operations don't cause diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 55bcfdf3a1cf..8a5f49dc7ca2 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -235,10 +235,17 @@ const ( storeStatusThrottled // The store is alive and available. storeStatusAvailable - // The store is decommissioning. + // The store is decommissioning. If draining or suspect stores are + // decommissioned, this status takes precendence over `storeStatusDraining` + // and `storeStatusSuspect`. storeStatusDecommissioning - // The store failed it's liveness heartbeat recently and is considered suspect. + // The store failed it's liveness heartbeat recently and is considered + // suspect. Note that stores must always pass from `storeStatusUnknown` first + // before being marked suspect. storeStatusSuspect + // The store is alive but is currently marked as draining, so it is not a + // candidate for lease transfers or replica rebalancing. + storeStatusDraining ) func (sd *storeDetail) status( @@ -303,7 +310,7 @@ func (sd *storeDetail) status( // and we may not see a store in this state. To help with that we perform // a similar clear of lastAvailable on a DEAD store. sd.lastAvailable = time.Time{} - return storeStatusUnknown + return storeStatusDraining } if sd.isThrottled(now) { @@ -669,11 +676,12 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { // // - Replicas on decommissioning node/store are considered live. // -// - If `includeSuspectStores` is true, stores that are marked suspect (i.e. -// stores that have failed a liveness heartbeat in the recent past) are -// considered live. Otherwise, they are excluded from the returned slices. +// - If `includeSuspectAndDrainingStores` is true, stores that are marked +// suspect (i.e. stores that have failed a liveness heartbeat in the recent +// past), and stores that are marked as draining are considered live. Otherwise, +// they are excluded from the returned slices. func (sp *StorePool) liveAndDeadReplicas( - repls []roachpb.ReplicaDescriptor, includeSuspectStores bool, + repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -697,8 +705,8 @@ func (sp *StorePool) liveAndDeadReplicas( liveReplicas = append(liveReplicas, repl) case storeStatusUnknown: // No-op. - case storeStatusSuspect: - if includeSuspectStores { + case storeStatusSuspect, storeStatusDraining: + if includeSuspectAndDrainingStores { liveReplicas = append(liveReplicas, repl) } default: @@ -825,7 +833,7 @@ type throttledStoreReasons []string // getStoreList returns a storeList that contains all active stores that contain // the required attributes and their associated stats. The storeList is filtered // according to the provided storeFilter. It also returns the total number of -// alive and throttled stores. +// alive stores and a list of throttled stores explaining why. func (sp *StorePool) getStoreList(filter storeFilter) (StoreList, int, throttledStoreReasons) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -882,9 +890,11 @@ func (sp *StorePool) getStoreListFromIDsLocked( case storeStatusAvailable: aliveStoreCount++ storeDescriptors = append(storeDescriptors, *detail.desc) + case storeStatusDraining: + throttled = append(throttled, fmt.Sprintf("[s%d draining]", storeID)) case storeStatusSuspect: aliveStoreCount++ - throttled = append(throttled, "throttled because the node is considered suspect") + throttled = append(throttled, fmt.Sprintf("[s%d suspect]", storeID)) if filter != storeFilterThrottled && filter != storeFilterSuspect { storeDescriptors = append(storeDescriptors, *detail.desc) } @@ -1007,7 +1017,7 @@ func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal( log.VEventf(ctx, 3, "s%d is a live target, candidate for rebalancing", targetStoreID) return true - case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect: + case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect, storeStatusDraining: log.VEventf(ctx, 3, "not considering non-live store s%d (%v)", targetStoreID, status) return false diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index 0f1a55662a9c..b291d87cc84e 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -755,7 +755,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) } - liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) if len(liveReplicas) != 5 { t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) } @@ -777,7 +777,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { // Mark node 4 as merely unavailable. mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_UNAVAILABLE) - liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) { t.Fatalf("expected live replicas %+v; got %+v", e, a) } @@ -804,7 +804,7 @@ func TestStorePoolDefaultState(t *testing.T) { liveReplicas, deadReplicas := sp.liveAndDeadReplicas( []roachpb.ReplicaDescriptor{{StoreID: 1}}, - false, /* includeSuspectStores */ + false, /* includeSuspectAndDrainingStores */ ) if len(liveReplicas) != 0 || len(deadReplicas) != 0 { t.Errorf("expected 0 live and 0 dead replicas; got %v and %v", liveReplicas, deadReplicas) @@ -914,7 +914,7 @@ func TestStorePoolSuspected(t *testing.T) { s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) sp.detailsMu.Unlock() - require.Equal(t, s, storeStatusUnknown) + require.Equal(t, s, storeStatusDraining) require.True(t, detail.lastAvailable.IsZero()) mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) @@ -1006,6 +1006,107 @@ func TestGetLocalities(t *testing.T) { } } +func TestStorePoolDrainingReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + stopper, g, _, sp, mnl := createTestStorePool( + TestTimeUntilStoreDead, false, /* deterministic */ + func() int { return 5 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_LIVE) + defer stopper.Stop(context.Background()) + sg := gossiputil.NewStoreGossiper(g) + + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + }, + } + + replicas := []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + { + NodeID: 3, + StoreID: 3, + ReplicaID: 4, + }, + { + NodeID: 4, + StoreID: 4, + ReplicaID: 4, + }, + { + NodeID: 5, + StoreID: 5, + ReplicaID: 5, + }, + } + + sg.GossipStores(stores, t) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas( + replicas, + false, /* includeSuspectAndDrainingStores */ + ) + if len(liveReplicas) != 5 { + t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) + } + if len(deadReplicas) > 0 { + t.Fatalf("expected no dead replicas initially, found %d (%v)", len(deadReplicas), deadReplicas) + } + // Mark node 4 as decommissioning. + mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_DECOMMISSIONING) + // Mark node 5 as draining. + mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DRAINING) + + liveReplicas, deadReplicas = sp.liveAndDeadReplicas( + replicas, false, /* includeSuspectAndDrainingStores */ + ) + // Decommissioning replicas are considered live but draining replicas + // shouldn't be. + if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) { + t.Fatalf("expected live replicas %+v; got %+v", e, a) + } + if len(deadReplicas) != 0 { + t.Fatalf("expected 0 dead replicas found %+v", deadReplicas) + } + + liveReplicas, deadReplicas = sp.liveAndDeadReplicas( + replicas, true, /* includeSuspectAndDrainingStores */ + ) + // Draining replicas must also be considered live. + if a, e := liveReplicas, replicas; !reflect.DeepEqual(a, e) { + t.Fatalf("expected live replicas %+v; got %+v", e, a) + } + if len(deadReplicas) != 0 { + t.Fatalf("expected 0 dead replicas found %+v", deadReplicas) + } +} + func TestStorePoolDecommissioningReplicas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1072,7 +1173,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) } - liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) if len(liveReplicas) != 5 { t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) } @@ -1084,7 +1185,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { // Mark node 5 as dead. mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD) - liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */) + liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) // Decommissioning replicas are considered live. if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) { t.Fatalf("expected live replicas %+v; got %+v", e, a)