Skip to content

Commit

Permalink
kvserver: consider suspect stores "live" for computing quorum
Browse files Browse the repository at this point in the history
Previously, when making the determination of whether a range could achieve
quorum, the allocator ignored "suspect" stores. In other words, a range with 3
replicas would be considered unavailable for rebalancing decisions if it had 2
or more replicas on stores that are marked suspect.

This meant that if a given cluster had multiple nodes missing their liveness
heartbeats intermittently, operations like node decommissioning would never
make progress past a certain point (the replicate queue would never decide to
move replicas away because it would think their ranges are unavailable, even
though they're really not).

This patch fixes this by slightly altering the state transitions for how stores
go in and out of "suspect" and by having the replica rebalancing code
specifically ask for suspect stores to be included in the set of "live"
replicas when it makes the determination of whether a given range can achieve.

Release note (bug fix): A bug that was introduced in 21.1.5, which prevented
nodes from decommissioning in a cluster if it had multiple nodes intermittently
missing their liveness heartbeats has been fixed.
  • Loading branch information
aayushshah15 committed Jul 18, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 36ed61e commit 295338b
Showing 7 changed files with 109 additions and 40 deletions.
23 changes: 16 additions & 7 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
@@ -544,7 +544,14 @@ func (a *Allocator) computeAction(
return action, adjustedPriority
}

liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas)
// NB: For the purposes of determining whether a range has quorum, we
// consider stores marked as "suspect" to be live. This is necessary because
// we would otherwise spuriously consider ranges with replicas on suspect
// stores to be unavailable, just because their nodes have failed a liveness
// heartbeat in the recent past. This means we won't move those replicas
// elsewhere (for a regular rebalance or for decommissioning).
const includeSuspectStores = true
liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas, includeSuspectStores)

if len(liveVoters) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of
@@ -623,7 +630,9 @@ func (a *Allocator) computeAction(
return action, action.Priority()
}

liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(nonVoterReplicas)
liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(
nonVoterReplicas, includeSuspectStores,
)
if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 {
// The range has non-voter(s) on a dead node that we should replace.
action = AllocatorReplaceDeadNonVoter
@@ -1289,7 +1298,7 @@ func (a *Allocator) TransferLeaseTarget(
return roachpb.ReplicaDescriptor{}
}
// Verify that the preferred replica is eligible to receive the lease.
preferred, _ = a.storePool.liveAndDeadReplicas(preferred)
preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectStores */)
if len(preferred) == 1 {
return preferred[0]
}
@@ -1303,8 +1312,8 @@ func (a *Allocator) TransferLeaseTarget(
}
}

// Only consider live, non-draining replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing)
// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectStores */)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) {
@@ -1408,8 +1417,8 @@ func (a *Allocator) ShouldTransferLease(
sl = sl.filter(zone.VoterConstraints)
log.VEventf(ctx, 3, "ShouldTransferLease (lease-holder=%d):\n%s", leaseStoreID, sl)

// Only consider live, non-draining replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing)
// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectNodes */)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == source.StoreID) {
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
@@ -5641,12 +5641,14 @@ func TestAllocatorComputeActionSuspect(t *testing.T) {
suspect: []roachpb.StoreID{3},
expectedAction: AllocatorConsiderRebalance,
},
// Needs three replicas, two are suspect (i.e. the range lacks a quorum).
{
// When trying to determine whether a range can achieve quorum, we count
// suspect nodes as live because they _currently_ have a "live" node
// liveness record.
desc: threeReplDesc,
live: []roachpb.StoreID{1, 4},
suspect: []roachpb.StoreID{2, 3},
expectedAction: AllocatorRangeUnavailable,
expectedAction: AllocatorConsiderRebalance,
},
}

4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
@@ -889,12 +889,12 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {

testutils.SucceedsSoon(t, func() error {
for _, i := range []int{2, 3} {
suspect, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.IsSuspect(tc.Target(1).StoreID)
suspect, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.IsUnknown(tc.Target(1).StoreID)
if err != nil {
return err
}
if !suspect {
return errors.Errorf("Expected server 1 to be suspect on server %d", i)
return errors.Errorf("Expected server 1 to be in `storeStatusUnknown` on server %d", i)
}
}
return nil
26 changes: 20 additions & 6 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
@@ -2035,12 +2035,18 @@ type changeReplicasTxnArgs struct {
db *kv.DB

// liveAndDeadReplicas divides the provided repls slice into two slices: the
// first for live replicas, and the second for dead replicas. Replicas for
// which liveness or deadness cannot be ascertained are excluded from the
// returned slices. Replicas on decommissioning node/store are considered
// live.
// first for live replicas, and the second for dead replicas.
//
// - Replicas for which liveness or deadness cannot be ascertained are
// excluded from the returned slices.
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
liveAndDeadReplicas func(
repls []roachpb.ReplicaDescriptor,
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor)

logChange logChangeFn
@@ -2127,7 +2133,15 @@ func execChangeReplicasTxn(
// See:
// https://github.com/cockroachdb/cockroach/issues/54444#issuecomment-707706553
replicas := crt.Desc.Replicas()
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors())
// We consider stores marked as "suspect" to be alive for the purposes of
// determining whether the range can achieve quorum since these stores are
// known to be currently live but have failed a liveness heartbeat in the
// recent past.
//
// Note that the allocator will avoid rebalancing to stores that are
// currently marked suspect. See uses of StorePool.getStoreList() in
// allocator.go.
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectStores */)
if !replicas.CanMakeProgress(
func(rDesc roachpb.ReplicaDescriptor) bool {
for _, inner := range liveReplicas {
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
@@ -368,12 +368,16 @@ func (rq *replicateQueue) processOneChange(
// range descriptor.
desc, zone := repl.DescAndZone()

// Avoid taking action if the range has too many dead replicas to make
// quorum.
// Avoid taking action if the range has too many dead replicas to make quorum.
// Consider stores marked suspect as live in order to make this determination.
voterReplicas := desc.Replicas().VoterDescriptors()
nonVoterReplicas := desc.Replicas().NonVoterDescriptors()
liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(voterReplicas)
liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(nonVoterReplicas)
liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
voterReplicas, true, /* includeSuspectStores */
)
liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(
nonVoterReplicas, true, /* includeSuspectStores */
)

// NB: the replication layer ensures that the below operations don't cause
// unavailability; see:
54 changes: 42 additions & 12 deletions pkg/kv/kvserver/store_pool.go
Original file line number Diff line number Diff line change
@@ -244,6 +244,26 @@ const (
func (sd *storeDetail) status(
now time.Time, threshold time.Duration, nl NodeLivenessFunc, suspectDuration time.Duration,
) storeStatus {
// During normal operation, we expect the state transitions for stores to look like the following:
//
// Successful heartbeats
// throughout the suspect
// +-----------------------+ duration
// | storeStatusAvailable |<-+------------------------------------+
// +-----------------------+ | |
// | |
// | +--------------------+
// | | storeStatusSuspect |
// +---------------------------+ +--------------------+
// | Failed liveness ^
// | heartbeat |
// | |
// | |
// | +----------------------+ |
// +->| storeStatusUnknown |--------------------------------------+
// +----------------------+ Successful liveness
// heartbeat
//
// The store is considered dead if it hasn't been updated via gossip
// within the liveness threshold. Note that lastUpdatedTime is set
// when the store detail is created and will have a non-zero value
@@ -270,11 +290,9 @@ func (sd *storeDetail) status(
return storeStatusDecommissioning
case livenesspb.NodeLivenessStatus_UNAVAILABLE:
// We don't want to suspect a node on startup or when it's first added to a
// cluster, because we dont know it's liveness yet. A node is only considered
// suspect if it's been alive and fails to heartbeat liveness.
// cluster, because we dont know its liveness yet.
if !sd.lastAvailable.IsZero() {
sd.lastUnavailable = now
return storeStatusSuspect
}
return storeStatusUnknown
case livenesspb.NodeLivenessStatus_UNKNOWN:
@@ -605,14 +623,16 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error
return false, deadAsOf.Sub(now), nil
}

// IsSuspect returns true if the node is suspected by the store pool or an error
// if the store is not found in the pool.
func (sp *StorePool) IsSuspect(storeID roachpb.StoreID) (bool, error) {
// IsUnknown returns true if the given store's status is `storeStatusUnknown`
// (i.e. it just failed a liveness heartbeat and we cannot ascertain its
// liveness or deadness at the moment) or an error if the store is not found in
// the pool.
func (sp *StorePool) IsUnknown(storeID roachpb.StoreID) (bool, error) {
status, err := sp.storeStatus(storeID)
if err != nil {
return false, err
}
return status == storeStatusSuspect, nil
return status == storeStatusUnknown, nil
}

// IsLive returns true if the node is considered alive by the store pool or an error
@@ -643,11 +663,17 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) {

// liveAndDeadReplicas divides the provided repls slice into two slices: the
// first for live replicas, and the second for dead replicas.
// Replicas for which liveness or deadness cannot be ascertained are excluded
// from the returned slices. Replicas on decommissioning node/store are
// considered live.
//
// - Replicas for which liveness or deadness cannot be ascertained
// (storeStatusUnknown) are excluded from the returned slices.
//
// - Replicas on decommissioning node/store are considered live.
//
// - If `includeSuspectStores` is true, stores that are marked suspect (i.e.
// stores that have failed a liveness heartbeat in the recent past) are
// considered live. Otherwise, they are excluded from the returned slices.
func (sp *StorePool) liveAndDeadReplicas(
repls []roachpb.ReplicaDescriptor,
repls []roachpb.ReplicaDescriptor, includeSuspectStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) {
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()
@@ -669,8 +695,12 @@ func (sp *StorePool) liveAndDeadReplicas(
// We count decommissioning replicas to be alive because they are readable
// and should be used for up-replication if necessary.
liveReplicas = append(liveReplicas, repl)
case storeStatusUnknown, storeStatusSuspect:
case storeStatusUnknown:
// No-op.
case storeStatusSuspect:
if includeSuspectStores {
liveReplicas = append(liveReplicas, repl)
}
default:
log.Fatalf(context.TODO(), "unknown store status %d", status)
}
24 changes: 17 additions & 7 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
@@ -755,7 +755,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
@@ -766,7 +766,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_DEAD)
mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectNodes */)
if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
@@ -777,7 +777,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
// Mark node 4 as merely unavailable.
mnl.setNodeStatus(4, livenesspb.NodeLivenessStatus_UNAVAILABLE)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)
}
@@ -802,7 +802,10 @@ func TestStorePoolDefaultState(t *testing.T) {
livenesspb.NodeLivenessStatus_DEAD)
defer stopper.Stop(context.Background())

liveReplicas, deadReplicas := sp.liveAndDeadReplicas([]roachpb.ReplicaDescriptor{{StoreID: 1}})
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(
[]roachpb.ReplicaDescriptor{{StoreID: 1}},
false, /* includeSuspectStores */
)
if len(liveReplicas) != 0 || len(deadReplicas) != 0 {
t.Errorf("expected 0 live and 0 dead replicas; got %v and %v", liveReplicas, deadReplicas)
}
@@ -875,6 +878,8 @@ func TestStorePoolSuspected(t *testing.T) {
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV)

// See state transition diagram in storeDetail.status() for a visual
// representation of what this test asserts.
mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE)
sp.detailsMu.Lock()
detail := sp.getStoreDetailLocked(store.StoreID)
@@ -888,11 +893,16 @@ func TestStorePoolSuspected(t *testing.T) {
sp.detailsMu.Lock()
s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
sp.detailsMu.Unlock()
require.Equal(t, s, storeStatusSuspect)
require.Equal(t, s, storeStatusUnknown)
require.False(t, detail.lastAvailable.IsZero())
require.False(t, detail.lastUnavailable.IsZero())

mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE)
sp.detailsMu.Lock()
s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
sp.detailsMu.Unlock()
require.Equal(t, s, storeStatusSuspect)

sp.detailsMu.Lock()
s = detail.status(now.Add(timeAfterStoreSuspect).Add(time.Millisecond),
timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect)
@@ -1062,7 +1072,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE)
}

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
if len(liveReplicas) != 5 {
t.Fatalf("expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas)
}
@@ -1074,7 +1084,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) {
// Mark node 5 as dead.
mnl.setNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD)

liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas)
liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas, false /* includeSuspectStores */)
// Decommissioning replicas are considered live.
if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) {
t.Fatalf("expected live replicas %+v; got %+v", e, a)

0 comments on commit 295338b

Please sign in to comment.