Skip to content

Commit

Permalink
Merge #65062
Browse files Browse the repository at this point in the history
65062: kvserver: improve suspect replica GC heuristics r=tbg a=erikgrinaker

The replica GC queue will normally check a replica against the canonical
range descriptor every 12 hours. Under some circumstances the replica
may be considered suspect, which causes it to be checked against the
canonical descriptor every second instead. However, these heuristics
were fairly limited and missed a couple of cases that could cause stale
replicas to linger.

This patch adds two conditions to the suspect replica heuristics:
followers that have lost contact with their leader (which in particular
handles non-voting replicas), and quiescent replicas that lose contact
with any other voters (which could cause false underreplication alerts).

Since this change is expected to increase suspect replica matches, the
`ReplicaGCQueueSuspectCheckInterval` duration between checking suspect
replica descriptors was also increased from 1 to 5 seconds, and the
`replicaGCQueueTimerDuration` interval between replica GCs was increased
from 50 to 100 ms.

The previous logic would take into account replica activity such as
store startup and lease proposals as the offset for timeouts, but this
did not appear to have any significant benefit over simply using the
last check time, so these have been removed and the timeouts given more
appropriate names. The previous logic also failed to enforce the check
interval for suspect replicas, and would always check them in a tight
50ms loop, this has been fixed as well.

Resolves #62075, resolves #60259.

Release note (bug fix): Improved garbage collection of stale replicas by
proactively checking certain replicas that have lost contact with other
voting replicas.

/cc @cockroachdb/kv 

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed May 13, 2021
2 parents 7364dde + 1041afb commit 9279c8f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 125 deletions.
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2016,7 +2016,7 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA
if removeBeforeTruncateAndReAdd {
// Verify old replica is GC'd. Wait out the replica gc queue
// inactivity threshold and force a gc scan.
manualClock.Increment(int64(kvserver.ReplicaGCQueueInactivityThreshold + 1))
manualClock.Increment(int64(kvserver.ReplicaGCQueueCheckInterval + 1))
testutils.SucceedsSoon(t, func() error {
tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess()
_, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID)
Expand Down Expand Up @@ -2126,7 +2126,7 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) {

// Wait out the range lease and the unleased duration to make the replica GC'able.
manualClock.Increment(store.GetStoreConfig().LeaseExpiration())
manualClock.Increment(int64(kvserver.ReplicaGCQueueInactivityThreshold + 1))
manualClock.Increment(int64(kvserver.ReplicaGCQueueCheckInterval + 1))
tc.GetFirstStoreFromServer(t, 1).SetReplicaGCQueueActive(true)
tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess()

Expand Down Expand Up @@ -3275,7 +3275,7 @@ func TestReplicateRogueRemovedNode(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
manualClock.Increment(store.GetStoreConfig().LeaseExpiration())
manualClock.Increment(int64(
kvserver.ReplicaGCQueueInactivityThreshold) + 1)
kvserver.ReplicaGCQueueCheckInterval) + 1)
tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess()

actual := tc.ReadIntFromStores(key)
Expand Down Expand Up @@ -3365,7 +3365,7 @@ func TestReplicateRogueRemovedNode(t *testing.T) {
tc.GetFirstStoreFromServer(t, 2).SetReplicaGCQueueActive(true)
manualClock.Increment(store.GetStoreConfig().LeaseExpiration())
manualClock.Increment(int64(
kvserver.ReplicaGCQueueInactivityThreshold) + 1)
kvserver.ReplicaGCQueueCheckInterval) + 1)
tc.GetFirstStoreFromServer(t, 2).MustForceReplicaGCScanAndProcess()
tc.WaitForValues(t, key, []int64{16, 0, 0})

Expand Down
7 changes: 0 additions & 7 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,13 +469,6 @@ func SetMockAddSSTable() (undo func()) {
}
}

// IsQuiescent returns whether the replica is quiescent or not.
func (r *Replica) IsQuiescent() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.mu.quiescent
}

// GetQueueLastProcessed returns the last processed timestamp for the
// specified queue, or the zero timestamp if not available.
func (r *Replica) GetQueueLastProcessed(ctx context.Context, queue string) (hlc.Timestamp, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,13 @@ func (r *Replica) isDestroyedRLocked() (DestroyReason, error) {
return r.mu.destroyStatus.reason, r.mu.destroyStatus.err
}

// IsQuiescent returns whether the replica is quiescent or not.
func (r *Replica) IsQuiescent() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.quiescent
}

// DescAndZone returns the authoritative range descriptor as well
// as the zone config for the replica.
func (r *Replica) DescAndZone() (*roachpb.RangeDescriptor, *zonepb.ZoneConfig) {
Expand Down
165 changes: 84 additions & 81 deletions pkg/kv/kvserver/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,16 @@ import (

const (
// replicaGCQueueTimerDuration is the duration between GCs of queued replicas.
replicaGCQueueTimerDuration = 50 * time.Millisecond
replicaGCQueueTimerDuration = 100 * time.Millisecond

// ReplicaGCQueueInactivityThreshold is the inactivity duration after which
// ReplicaGCQueueCheckInterval is the inactivity duration after which
// a range will be considered for garbage collection. Exported for testing.
ReplicaGCQueueInactivityThreshold = 12 * time.Hour
// ReplicaGCQueueSuspectTimeout is the duration after which a Replica which
// is suspected to be removed should be processed by the queue.
// A Replica is suspected to have been removed if either it is in the
// candidate Raft state (which is a typical sign of having been removed
// from the group) or it is not in the VOTER_FULL state. Replicas which are
// in the LEARNER state will never become candidates. It seems possible that
// a range will quiesce and never tell a VOTER_OUTGOING that is was removed.
// Cases where a replica gets stuck in VOTER_INCOMING seem farfetched and
// would require the replica to be removed from the range before it ever
// learned about its promotion but that state shouldn't last long so we
// also treat idle replicas in that state as suspect.
//
// A leader unable to make progress (e.g. because it's lost a quorum) is
// also considered suspect, since Node.ResetQuorum() may be used to restore
// the range elsewhere.
ReplicaGCQueueSuspectTimeout = 1 * time.Second
ReplicaGCQueueCheckInterval = 12 * time.Hour
// ReplicaGCQueueSuspectCheckInterval is the duration after which a Replica
// which is suspected to be removed should be considered for garbage
// collection. See replicaIsSuspect() for details on what makes a replica
// suspect.
ReplicaGCQueueSuspectCheckInterval = 5 * time.Second
)

// Priorities for the replica GC queue.
Expand Down Expand Up @@ -129,96 +118,110 @@ func newReplicaGCQueue(store *Store, db *kv.DB, gossip *gossip.Gossip) *replicaG
func (rgcq *replicaGCQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ *config.SystemConfig,
) (shouldQ bool, prio float64) {

if _, currentMember := repl.Desc().GetReplicaDescriptor(repl.store.StoreID()); !currentMember {
return true, replicaGCPriorityRemoved
}
lastCheck, err := repl.GetLastReplicaGCTimestamp(ctx)
if err != nil {
log.Errorf(ctx, "could not read last replica GC timestamp: %+v", err)
return false, 0
}
replDesc, currentMember := repl.Desc().GetReplicaDescriptor(repl.store.StoreID())
if !currentMember {
return true, replicaGCPriorityRemoved
}

lastActivity := hlc.Timestamp{
WallTime: repl.store.startedAt,
}
isSuspect := replicaIsSuspect(repl)

if lease, _ := repl.GetLease(); lease.ProposedTS != nil {
lastActivity.Forward(lease.ProposedTS.ToTimestamp())
}
return replicaGCShouldQueueImpl(now.ToTimestamp(), lastCheck, isSuspect)
}

func replicaIsSuspect(repl *Replica) bool {
// It is critical to think of the replica as suspect if it is a learner as
// it both shouldn't be a learner for long but will never become a candidate.
// It is less critical to consider joint configuration members as suspect
// but in cases where a replica is removed but only ever hears about the
// command which sets it to VOTER_OUTGOING we would conservatively wait
// 10 days before removing the node. Finally we consider replicas which are
// 12 hours before removing the node. Finally we consider replicas which are
// VOTER_INCOMING as suspect because no replica should stay in that state for
// too long and being conservative here doesn't seem worthwhile.
var isSuspect bool
if raftStatus := repl.RaftStatus(); raftStatus == nil {
// If a replica doesn't have an active raft group, we should check
// whether or not it is active. If not, we should process the replica
// because it has probably already been removed from its raft group but
// doesn't know it. Without this, node decommissioning can stall on such
// dormant ranges. Make sure NodeLiveness isn't nil because it can be in
// tests/benchmarks.
if repl.store.cfg.NodeLiveness != nil {
if liveness, ok := repl.store.cfg.NodeLiveness.Self(); ok && !liveness.Membership.Active() {
return true, replicaGCPriorityDefault
}
replDesc, ok := repl.Desc().GetReplicaDescriptor(repl.store.StoreID())
if !ok {
return true
}
if t := replDesc.GetType(); t != roachpb.VOTER_FULL && t != roachpb.NON_VOTER {
return true
}

// NodeLiveness can be nil in tests/benchmarks.
if repl.store.cfg.NodeLiveness == nil {
return false
}

// If a replica doesn't have an active raft group, we should check whether
// or not the node is active. If not, we should consider the replica suspect
// because it has probably already been removed from its raft group but
// doesn't know it. Without this, node decommissioning can stall on such
// dormant ranges.
raftStatus := repl.RaftStatus()
if raftStatus == nil {
liveness, ok := repl.store.cfg.NodeLiveness.Self()
return ok && !liveness.Membership.Active()
}

livenessMap := repl.store.cfg.NodeLiveness.GetIsLiveMap()
switch raftStatus.SoftState.RaftState {
// If a replica is a candidate, then by definition it has lost contact with
// its leader and possibly the rest of the Raft group, so consider it suspect.
case raft.StateCandidate, raft.StatePreCandidate:
return true

// If the replica is a follower, check that the leader is in our range
// descriptor and that we're still in touch with it. This handles e.g. a
// non-voting replica which has lost its leader.
case raft.StateFollower:
leadDesc, ok := repl.Desc().GetReplicaDescriptorByID(roachpb.ReplicaID(raftStatus.Lead))
if !ok || !livenessMap[leadDesc.NodeID].IsLive {
return true
}
} else if t := replDesc.GetType(); t != roachpb.VOTER_FULL && t != roachpb.NON_VOTER {
isSuspect = true
} else {
switch raftStatus.SoftState.RaftState {
case raft.StateCandidate, raft.StatePreCandidate:
isSuspect = true
case raft.StateLeader:
// If the replica is the leader, we check whether it has a quorum.
// Otherwise, it's possible that e.g. Node.ResetQuorum will be used
// to recover the range elsewhere, and we should relinquish our
// lease and GC the range.
if repl.store.cfg.NodeLiveness != nil {
livenessMap := repl.store.cfg.NodeLiveness.GetIsLiveMap()
isSuspect = !repl.Desc().Replicas().CanMakeProgress(func(d roachpb.ReplicaDescriptor) bool {
return livenessMap[d.NodeID].IsLive
})

// If the replica is a leader, check that it has a quorum. This handles e.g.
// a stuck leader with a lost quorum being replaced via Node.ResetQuorum,
// which must cause the stale leader to relinquish its lease and GC itself.
case raft.StateLeader:
if !repl.Desc().Replicas().CanMakeProgress(func(d roachpb.ReplicaDescriptor) bool {
return livenessMap[d.NodeID].IsLive
}) {
return true
}
}

// If the replica is quiesced, consider it suspect if any of the other
// voters are unavailable. This tries to detect cases where a quiesced
// replica does not notice that it's been removed from the range and then
// triggers an underreplicated alert when the range membership changes again
// later. In other cases where a quiesced replica fails to notice that it's
// been removed it will be GCed during the next periodic check (every 12 hours).
if repl.IsQuiescent() {
for _, rd := range repl.Desc().Replicas().VoterDescriptors() {
if !livenessMap[rd.NodeID].IsLive {
return true
}
}
}
return replicaGCShouldQueueImpl(now.ToTimestamp(), lastCheck, lastActivity, isSuspect)

return false
}

func replicaGCShouldQueueImpl(
now, lastCheck, lastActivity hlc.Timestamp, isSuspect bool,
) (bool, float64) {
timeout := ReplicaGCQueueInactivityThreshold
func replicaGCShouldQueueImpl(now, lastCheck hlc.Timestamp, isSuspect bool) (bool, float64) {
timeout := ReplicaGCQueueCheckInterval
priority := replicaGCPriorityDefault

if isSuspect {
// If the range is suspect (which happens if its former replica set
// ignores it), let it expire much earlier.
timeout = ReplicaGCQueueSuspectTimeout
timeout = ReplicaGCQueueSuspectCheckInterval
priority = replicaGCPrioritySuspect
} else if now.Less(lastCheck.Add(ReplicaGCQueueInactivityThreshold.Nanoseconds(), 0)) {
// Return false immediately if the previous check was less than the
// check interval in the past. Note that we don't do this if the
// replica is in candidate state, in which case we want to be more
// aggressive - a failed rebalance attempt could have checked this
// range, and candidate state suggests that a retry succeeded. See
// #7489.
return false, 0
}

shouldQ := lastActivity.Add(timeout.Nanoseconds(), 0).Less(now)

if !shouldQ {
// Only queue for GC if the timeout interval has passed since the last check.
if !lastCheck.Add(timeout.Nanoseconds(), 0).Less(now) {
return false, 0
}

return shouldQ, priority
return true, priority
}

// process performs a consistent lookup on the range descriptor to see if we are
Expand Down
67 changes: 34 additions & 33 deletions pkg/kv/kvserver/replica_gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
package kvserver

import (
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestReplicaGCShouldQueue(t *testing.T) {
Expand All @@ -27,45 +29,44 @@ func TestReplicaGCShouldQueue(t *testing.T) {
return hlc.Timestamp{WallTime: t.Nanoseconds()}
}

base := 2 * (ReplicaGCQueueSuspectTimeout + ReplicaGCQueueInactivityThreshold)
base := 2 * (ReplicaGCQueueSuspectCheckInterval + ReplicaGCQueueCheckInterval)
var (
z = ts(0)
bTS = ts(base)
cTS = ts(base + ReplicaGCQueueSuspectTimeout)
cTSnext = ts(base + ReplicaGCQueueSuspectTimeout + 1)
iTSprev = ts(base + ReplicaGCQueueInactivityThreshold - 1)
iTS = ts(base + ReplicaGCQueueInactivityThreshold)
sTS = ts(base + ReplicaGCQueueSuspectCheckInterval)
sTSnext = ts(base + ReplicaGCQueueSuspectCheckInterval + 1)
iTS = ts(base + ReplicaGCQueueCheckInterval)
iTSnext = ts(base + ReplicaGCQueueCheckInterval + 1)
)

for i, test := range []struct {
now, lastCheck, lastActivity hlc.Timestamp
isCandidate bool

shouldQ bool
priority float64
testcases := []struct {
now hlc.Timestamp
lastCheck hlc.Timestamp
isSuspect bool
expectQueue bool
expectPriority float64
}{
// Test outcomes when range is in candidate state.

// All timestamps current: candidacy plays no role.
{now: z, lastCheck: z, lastActivity: z, isCandidate: true, shouldQ: false, priority: 0},
// All timestamps current: suspect plays no role.
{now: z, lastCheck: z, isSuspect: true, expectQueue: false, expectPriority: 0},
// Threshold: no action taken.
{now: cTS, lastCheck: z, lastActivity: bTS, isCandidate: true, shouldQ: false, priority: 0},
// Queue with priority.
{now: cTSnext, lastCheck: z, lastActivity: bTS, isCandidate: true, shouldQ: true, priority: 1},
// Last processed recently: candidate still gets processed eagerly.
{now: cTSnext, lastCheck: bTS, lastActivity: z, isCandidate: true, shouldQ: true, priority: 1},
// Last processed recently: non-candidate stays put.
{now: cTSnext, lastCheck: bTS, lastActivity: z, isCandidate: false, shouldQ: false, priority: 0},
// Still no effect until iTS reached.
{now: iTSprev, lastCheck: bTS, lastActivity: z, isCandidate: false, shouldQ: false, priority: 0},
{now: iTS, lastCheck: bTS, lastActivity: z, isCandidate: true, shouldQ: true, priority: 1},
// Verify again that candidacy increases priority.
{now: iTS, lastCheck: bTS, lastActivity: z, isCandidate: false, shouldQ: true, priority: 0},
} {
if sq, pr := replicaGCShouldQueueImpl(
test.now, test.lastCheck, test.lastActivity, test.isCandidate,
); sq != test.shouldQ || pr != test.priority {
t.Errorf("%d: %+v: got (%t,%f)", i, test, sq, pr)
}
{now: sTS, lastCheck: bTS, isSuspect: true, expectQueue: false, expectPriority: 0},
// Last processed recently: suspect still gets processed eagerly.
{now: sTSnext, lastCheck: bTS, isSuspect: true, expectQueue: true, expectPriority: 1},
// Last processed recently: non-suspect stays put.
{now: sTSnext, lastCheck: bTS, isSuspect: false, expectQueue: false, expectPriority: 0},
// No effect until iTS crossed.
{now: iTS, lastCheck: bTS, isSuspect: false, expectQueue: false, expectPriority: 0},
{now: iTSnext, lastCheck: bTS, isSuspect: false, expectQueue: true, expectPriority: 0},
// Verify again that suspect increases priority.
{now: iTSnext, lastCheck: bTS, isSuspect: true, expectQueue: true, expectPriority: 1},
}
for _, tc := range testcases {
tc := tc
name := fmt.Sprintf("now=%s lastCheck=%s isSuspect=%v", tc.now, tc.lastCheck, tc.isSuspect)
t.Run(name, func(t *testing.T) {
shouldQueue, priority := replicaGCShouldQueueImpl(tc.now, tc.lastCheck, tc.isSuspect)
require.Equal(t, tc.expectQueue, shouldQueue)
require.Equal(t, tc.expectPriority, priority)
})
}
}

0 comments on commit 9279c8f

Please sign in to comment.