Skip to content

Commit

Permalink
kvserver: count draining nodes as live when computing quorum
Browse files Browse the repository at this point in the history
Similar to #67714

Draining nodes were considered non-live by the allocator when it made the
determination of whether a range could achieve quorum. This meant that, for
instance, on a cluster with a replication factor of 5, if we had 3 or more
nodes marked draining, we (with a high likelihood) wouldn't be able to
decommission nodes from the cluster.

Furthermore, due to the same reason as above the system also would incorrectly
decide to not rebalance ranges that had more than a quorum of replicas on
draining nodes.

This patch fixes this problem by considering replicas on draining nodes as live
for the purposes of determining whether a range has quorum. This likely fixes a
considerable subset of "stuck decommissioning" issues we've seen in the wild.

Follows from https://github.com/cockroachlabs/support/issues/1105

Release note: None
  • Loading branch information
aayushshah15 committed Aug 18, 2021
1 parent 78a788f commit 6631a38
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 28 deletions.
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,8 @@ func (a *Allocator) computeAction(
// stores to be unavailable, just because their nodes have failed a liveness
// heartbeat in the recent past. This means we won't move those replicas
// elsewhere (for a regular rebalance or for decommissioning).
const includeSuspectStores = true
liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectStores)
const includeSuspectAndDrainingStores = true
liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores)

if len(liveVoters) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of
Expand Down Expand Up @@ -631,7 +631,7 @@ func (a *Allocator) computeAction(
}

liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(
nonVoterReplicas, includeSuspectStores,
nonVoterReplicas, includeSuspectAndDrainingStores,
)
if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 {
// The range has non-voter(s) on a dead node that we should replace.
Expand Down Expand Up @@ -1298,7 +1298,7 @@ func (a *Allocator) TransferLeaseTarget(
return roachpb.ReplicaDescriptor{}
}
// Verify that the preferred replica is eligible to receive the lease.
preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectStores */)
preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectAndDrainingStores */)
if len(preferred) == 1 {
return preferred[0]
}
Expand All @@ -1313,7 +1313,7 @@ func (a *Allocator) TransferLeaseTarget(
}

// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectStores */)
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectAndDrainingStores */)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2042,11 +2042,11 @@ type changeReplicasTxnArgs struct {
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// - If `includeSuspectAndDrainingStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
liveAndDeadReplicas func(
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor)

logChange logChangeFn
Expand Down Expand Up @@ -2141,7 +2141,7 @@ func execChangeReplicasTxn(
// Note that the allocator will avoid rebalancing to stores that are
// currently marked suspect. See uses of StorePool.getStoreList() in
// allocator.go.
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectStores */)
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectAndDrainingStores */)
if !replicas.CanMakeProgress(
func(rDesc roachpb.ReplicaDescriptor) bool {
for _, inner := range liveReplicas {
Expand Down Expand Up @@ -2895,8 +2895,10 @@ func (s *Store) relocateOne(
for _, candidate := range candidateTargets {
store, ok := storeMap[candidate.StoreID]
if !ok {
return nil, nil, fmt.Errorf("cannot up-replicate to s%d; missing gossiped StoreDescriptor",
candidate.StoreID)
return nil, nil, fmt.Errorf(
"cannot up-replicate to s%d; missing gossiped StoreDescriptor"+
" (the store is likely dead, draining or decommissioning)", candidate.StoreID,
)
}
candidateDescs = append(candidateDescs, *store)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ func (rq *replicateQueue) processOneChange(
voterReplicas := desc.Replicas().VoterDescriptors()
nonVoterReplicas := desc.Replicas().NonVoterDescriptors()
liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
voterReplicas, true, /* includeSuspectStores */
voterReplicas, true, /* includeSuspectAndDrainingStores */
)
liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
nonVoterReplicas, true, /* includeSuspectStores */
nonVoterReplicas, true, /* includeSuspectAndDrainingStores */
)

// NB: the replication layer ensures that the below operations don't cause
Expand Down
27 changes: 17 additions & 10 deletions pkg/kv/kvserver/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,12 @@ const (
storeStatusAvailable
// The store is decommissioning.
storeStatusDecommissioning
// The store failed it's liveness heartbeat recently and is considered suspect.
// The store failed it's liveness heartbeat recently and is considered
// suspect.
storeStatusSuspect
// The store is alive but is currently marked as draining, so it is not a
// candidate for lease transfers or replica rebalancing.
storeStatusDraining
)

func (sd *storeDetail) status(
Expand Down Expand Up @@ -303,7 +307,7 @@ func (sd *storeDetail) status(
// and we may not see a store in this state. To help with that we perform
// a similar clear of lastAvailable on a DEAD store.
sd.lastAvailable = time.Time{}
return storeStatusUnknown
return storeStatusDraining
}

if sd.isThrottled(now) {
Expand Down Expand Up @@ -669,11 +673,12 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) {
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
// - If `includeSuspectAndDrainingStores` is true, stores that are marked
// suspect (i.e. stores that have failed a liveness heartbeat in the recent
// past), and stores that are marked as draining are considered live. Otherwise,
// they are excluded from the returned slices.
func (sp *StorePool) liveAndDeadReplicas(
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) {
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()
Expand All @@ -697,8 +702,8 @@ func (sp *StorePool) liveAndDeadReplicas(
liveReplicas = append(liveReplicas, repl)
case storeStatusUnknown:
// No-op.
case storeStatusSuspect:
if includeSuspectStores {
case storeStatusSuspect, storeStatusDraining:
if includeSuspectAndDrainingStores {
liveReplicas = append(liveReplicas, repl)
}
default:
Expand Down Expand Up @@ -882,9 +887,11 @@ func (sp *StorePool) getStoreListFromIDsLocked(
case storeStatusAvailable:
aliveStoreCount++
storeDescriptors = append(storeDescriptors, *detail.desc)
case storeStatusDraining:
throttled = append(throttled, "draining")
case storeStatusSuspect:
aliveStoreCount++
throttled = append(throttled, "throttled because the node is considered suspect")
throttled = append(throttled, "suspect")
if filter != storeFilterThrottled && filter != storeFilterSuspect {
storeDescriptors = append(storeDescriptors, *detail.desc)
}
Expand Down Expand Up @@ -1007,7 +1014,7 @@ func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal(
log.VEventf(ctx, 3,
"s%d is a live target, candidate for rebalancing", targetStoreID)
return true
case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect:
case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect, storeStatusDraining:
log.VEventf(ctx, 3,
"not considering non-live store s%d (%v)", targetStoreID, status)
return false
Expand Down
113 changes: 107 additions & 6 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
Expand All @@ -777,7 +777,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
// Mark node 4 as merely unavailable.
mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_UNAVAILABLE)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */)
if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
Expand All @@ -804,7 +804,7 @@ func TestStorePoolDefaultState(t *testing.T) {

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(
[]roachpb.ReplicaDescriptor{{StoreID: 1}},
false, /* includeSuspectStores */
false, /* includeSuspectAndDrainingStores */
)
if len(liveReplicas) != 0 || len(deadReplicas) != 0 {
t.Errorf("expected 0 live and 0 dead replicas; got %v and %v", liveReplicas, deadReplicas)
Expand Down Expand Up @@ -914,7 +914,7 @@ func TestStorePoolSuspected(t *testing.T) {
s = detail.status(now,
timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
sp.detailsMu.Unlock()
require.Equal(t, s, storeStatusUnknown)
require.Equal(t, s, storeStatusDraining)
require.True(t, detail.lastAvailable.IsZero())

mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE)
Expand Down Expand Up @@ -1006,6 +1006,107 @@ func TestGetLocalities(t *testing.T) {
}
}

func TestStorePoolDrainingReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
stopper, g, _, sp, mnl := createTestStorePool(
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 5 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
defer stopper.Stop(context.Background())
sg := gossiputil.NewStoreGossiper(g)

stores := []*roachpb.StoreDescriptor{
{
StoreID: 1,
Node: roachpb.NodeDescriptor{NodeID: 1},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{NodeID: 2},
},
{
StoreID: 3,
Node: roachpb.NodeDescriptor{NodeID: 3},
},
{
StoreID: 4,
Node: roachpb.NodeDescriptor{NodeID: 4},
},
{
StoreID: 5,
Node: roachpb.NodeDescriptor{NodeID: 5},
},
}

replicas := []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
ReplicaID: 1,
},
{
NodeID: 2,
StoreID: 2,
ReplicaID: 2,
},
{
NodeID: 3,
StoreID: 3,
ReplicaID: 4,
},
{
NodeID: 4,
StoreID: 4,
ReplicaID: 4,
},
{
NodeID: 5,
StoreID: 5,
ReplicaID: 5,
},
}

sg.GossipStores(stores, t)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(
replicas,
false, /* includeSuspectAndDrainingStores */
)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
if len(deadReplicas) > 0 {
t.Fatalf("expected no dead replicas initially, found %d (%v)", len(deadReplicas), deadReplicas)
}
// Mark node 4 as decommissioning.
mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_DECOMMISSIONING)
// Mark node 5 as draining.
mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DRAINING)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(
replicas, false, /* includeSuspectAndDrainingStores */
)
// Decommissioning replicas are considered live but draining replicas
// shouldn't be.
if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
if len(deadReplicas) != 0 {
t.Fatalf("expected 0 dead replicas found %+v", deadReplicas)
}

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(
replicas, true, /* includeSuspectAndDrainingStores */
)
// Draining replicas must also be considered live.
if a, e := liveReplicas, replicas; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
if len(deadReplicas) != 0 {
t.Fatalf("expected 0 dead replicas found %+v", deadReplicas)
}
}

func TestStorePoolDecommissioningReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1072,7 +1173,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
Expand All @@ -1084,7 +1185,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
// Mark node 5 as dead.
mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */)
// Decommissioning replicas are considered live.
if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
Expand Down

0 comments on commit 6631a38

Please sign in to comment.