Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: kvserver: improve suspect replica GC heuristics #65186

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2046,7 +2046,7 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA
if removeBeforeTruncateAndReAdd {
// Verify old replica is GC'd. Wait out the replica gc queue
// inactivity threshold and force a gc scan.
manualClock.Increment(int64(kvserver.ReplicaGCQueueInactivityThreshold + 1))
manualClock.Increment(int64(kvserver.ReplicaGCQueueCheckInterval + 1))
testutils.SucceedsSoon(t, func() error {
tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess()
_, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID)
Expand Down Expand Up @@ -2156,7 +2156,7 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) {

// Wait out the range lease and the unleased duration to make the replica GC'able.
manualClock.Increment(store.GetStoreConfig().LeaseExpiration())
manualClock.Increment(int64(kvserver.ReplicaGCQueueInactivityThreshold + 1))
manualClock.Increment(int64(kvserver.ReplicaGCQueueCheckInterval + 1))
tc.GetFirstStoreFromServer(t, 1).SetReplicaGCQueueActive(true)
tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess()

Expand Down Expand Up @@ -3303,7 +3303,7 @@ func TestReplicateRogueRemovedNode(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
manualClock.Increment(store.GetStoreConfig().LeaseExpiration())
manualClock.Increment(int64(
kvserver.ReplicaGCQueueInactivityThreshold) + 1)
kvserver.ReplicaGCQueueCheckInterval) + 1)
tc.GetFirstStoreFromServer(t, 1).MustForceReplicaGCScanAndProcess()

actual := tc.ReadIntFromStores(key)
Expand Down Expand Up @@ -3386,7 +3386,7 @@ func TestReplicateRogueRemovedNode(t *testing.T) {
tc.GetFirstStoreFromServer(t, 2).SetReplicaGCQueueActive(true)
manualClock.Increment(store.GetStoreConfig().LeaseExpiration())
manualClock.Increment(int64(
kvserver.ReplicaGCQueueInactivityThreshold) + 1)
kvserver.ReplicaGCQueueCheckInterval) + 1)
tc.GetFirstStoreFromServer(t, 2).MustForceReplicaGCScanAndProcess()
tc.WaitForValues(t, key, []int64{16, 0, 0})

Expand Down
7 changes: 0 additions & 7 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,6 @@ func SetMockAddSSTable() (undo func()) {
}
}

// IsQuiescent returns whether the replica is quiescent or not.
func (r *Replica) IsQuiescent() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.mu.quiescent
}

// GetQueueLastProcessed returns the last processed timestamp for the
// specified queue, or the zero timestamp if not available.
func (r *Replica) GetQueueLastProcessed(ctx context.Context, queue string) (hlc.Timestamp, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,13 @@ func (r *Replica) isDestroyedRLocked() (DestroyReason, error) {
return r.mu.destroyStatus.reason, r.mu.destroyStatus.err
}

// IsQuiescent returns whether the replica is quiescent or not.
func (r *Replica) IsQuiescent() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.quiescent
}

// DescAndZone returns the authoritative range descriptor as well
// as the zone config for the replica.
func (r *Replica) DescAndZone() (*roachpb.RangeDescriptor, *zonepb.ZoneConfig) {
Expand Down
156 changes: 74 additions & 82 deletions pkg/kv/kvserver/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,16 @@ import (

const (
// replicaGCQueueTimerDuration is the duration between GCs of queued replicas.
replicaGCQueueTimerDuration = 50 * time.Millisecond
replicaGCQueueTimerDuration = 100 * time.Millisecond

// ReplicaGCQueueInactivityThreshold is the inactivity duration after which
// ReplicaGCQueueCheckInterval is the inactivity duration after which
// a range will be considered for garbage collection. Exported for testing.
ReplicaGCQueueInactivityThreshold = 12 * time.Hour
// ReplicaGCQueueSuspectTimeout is the duration after which a Replica which
// is suspected to be removed should be processed by the queue.
// A Replica is suspected to have been removed if either it is in the
// candidate Raft state (which is a typical sign of having been removed
// from the group) or it is not in the VOTER_FULL state. Replicas which are
// in the LEARNER state will never become candidates. It seems possible that
// a range will quiesce and never tell a VOTER_OUTGOING that is was removed.
// Cases where a replica gets stuck in VOTER_INCOMING seem farfetched and
// would require the replica to be removed from the range before it ever
// learned about its promotion but that state shouldn't last long so we
// also treat idle replicas in that state as suspect.
//
// A leader unable to make progress (e.g. because it's lost a quorum) is
// also considered suspect, since Node.ResetQuorum() may be used to restore
// the range elsewhere.
ReplicaGCQueueSuspectTimeout = 1 * time.Second
ReplicaGCQueueCheckInterval = 12 * time.Hour
// ReplicaGCQueueSuspectCheckInterval is the duration after which a Replica
// which is suspected to be removed should be considered for garbage
// collection. See replicaIsSuspect() for details on what makes a replica
// suspect.
ReplicaGCQueueSuspectCheckInterval = 3 * time.Second
)

// Priorities for the replica GC queue.
Expand Down Expand Up @@ -129,96 +118,99 @@ func newReplicaGCQueue(store *Store, db *kv.DB, gossip *gossip.Gossip) *replicaG
func (rgcq *replicaGCQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ *config.SystemConfig,
) (shouldQ bool, prio float64) {

if _, currentMember := repl.Desc().GetReplicaDescriptor(repl.store.StoreID()); !currentMember {
return true, replicaGCPriorityRemoved
}
lastCheck, err := repl.GetLastReplicaGCTimestamp(ctx)
if err != nil {
log.Errorf(ctx, "could not read last replica GC timestamp: %+v", err)
return false, 0
}
replDesc, currentMember := repl.Desc().GetReplicaDescriptor(repl.store.StoreID())
if !currentMember {
return true, replicaGCPriorityRemoved
}

lastActivity := hlc.Timestamp{
WallTime: repl.store.startedAt,
}
isSuspect := replicaIsSuspect(repl)

if lease, _ := repl.GetLease(); lease.ProposedTS != nil {
lastActivity.Forward(lease.ProposedTS.ToTimestamp())
}
return replicaGCShouldQueueImpl(now.ToTimestamp(), lastCheck, isSuspect)
}

func replicaIsSuspect(repl *Replica) bool {
// It is critical to think of the replica as suspect if it is a learner as
// it both shouldn't be a learner for long but will never become a candidate.
// It is less critical to consider joint configuration members as suspect
// but in cases where a replica is removed but only ever hears about the
// command which sets it to VOTER_OUTGOING we would conservatively wait
// 10 days before removing the node. Finally we consider replicas which are
// 12 hours before removing the node. Finally we consider replicas which are
// VOTER_INCOMING as suspect because no replica should stay in that state for
// too long and being conservative here doesn't seem worthwhile.
var isSuspect bool
if raftStatus := repl.RaftStatus(); raftStatus == nil {
// If a replica doesn't have an active raft group, we should check
// whether or not it is active. If not, we should process the replica
// because it has probably already been removed from its raft group but
// doesn't know it. Without this, node decommissioning can stall on such
// dormant ranges. Make sure NodeLiveness isn't nil because it can be in
// tests/benchmarks.
if repl.store.cfg.NodeLiveness != nil {
if liveness, ok := repl.store.cfg.NodeLiveness.Self(); ok && !liveness.Membership.Active() {
return true, replicaGCPriorityDefault
}
replDesc, ok := repl.Desc().GetReplicaDescriptor(repl.store.StoreID())
if !ok {
return true
}
if t := replDesc.GetType(); t != roachpb.VOTER_FULL && t != roachpb.NON_VOTER {
return true
}

// NodeLiveness can be nil in tests/benchmarks.
if repl.store.cfg.NodeLiveness == nil {
return false
}

// If a replica doesn't have an active raft group, we should check whether
// or not the node is active. If not, we should consider the replica suspect
// because it has probably already been removed from its raft group but
// doesn't know it. Without this, node decommissioning can stall on such
// dormant ranges.
raftStatus := repl.RaftStatus()
if raftStatus == nil {
liveness, ok := repl.store.cfg.NodeLiveness.Self()
return ok && !liveness.Membership.Active()
}

livenessMap := repl.store.cfg.NodeLiveness.GetIsLiveMap()
switch raftStatus.SoftState.RaftState {
// If a replica is a candidate, then by definition it has lost contact with
// its leader and possibly the rest of the Raft group, so consider it suspect.
case raft.StateCandidate, raft.StatePreCandidate:
return true

// If the replica is a follower, check that the leader is in our range
// descriptor and that we're still in touch with it. This handles e.g. a
// non-voting replica which has lost its leader. It also attempts to handle
// a quiesced follower which was partitioned away from the Raft group during
// its own removal from the range -- this case is vulnerable to race
// conditions, but if it fails it will be GCed within 12 hours anyway.
case raft.StateFollower:
leadDesc, ok := repl.Desc().GetReplicaDescriptorByID(roachpb.ReplicaID(raftStatus.Lead))
if !ok || !livenessMap[leadDesc.NodeID].IsLive {
return true
}
} else if t := replDesc.GetType(); t != roachpb.VOTER_FULL && t != roachpb.NON_VOTER {
isSuspect = true
} else {
switch raftStatus.SoftState.RaftState {
case raft.StateCandidate, raft.StatePreCandidate:
isSuspect = true
case raft.StateLeader:
// If the replica is the leader, we check whether it has a quorum.
// Otherwise, it's possible that e.g. Node.ResetQuorum will be used
// to recover the range elsewhere, and we should relinquish our
// lease and GC the range.
if repl.store.cfg.NodeLiveness != nil {
livenessMap := repl.store.cfg.NodeLiveness.GetIsLiveMap()
isSuspect = !repl.Desc().Replicas().CanMakeProgress(func(d roachpb.ReplicaDescriptor) bool {
return livenessMap[d.NodeID].IsLive
})
}

// If the replica is a leader, check that it has a quorum. This handles e.g.
// a stuck leader with a lost quorum being replaced via Node.ResetQuorum,
// which must cause the stale leader to relinquish its lease and GC itself.
case raft.StateLeader:
if !repl.Desc().Replicas().CanMakeProgress(func(d roachpb.ReplicaDescriptor) bool {
return livenessMap[d.NodeID].IsLive
}) {
return true
}
}
return replicaGCShouldQueueImpl(now.ToTimestamp(), lastCheck, lastActivity, isSuspect)

return false
}

func replicaGCShouldQueueImpl(
now, lastCheck, lastActivity hlc.Timestamp, isSuspect bool,
) (bool, float64) {
timeout := ReplicaGCQueueInactivityThreshold
func replicaGCShouldQueueImpl(now, lastCheck hlc.Timestamp, isSuspect bool) (bool, float64) {
timeout := ReplicaGCQueueCheckInterval
priority := replicaGCPriorityDefault

if isSuspect {
// If the range is suspect (which happens if its former replica set
// ignores it), let it expire much earlier.
timeout = ReplicaGCQueueSuspectTimeout
timeout = ReplicaGCQueueSuspectCheckInterval
priority = replicaGCPrioritySuspect
} else if now.Less(lastCheck.Add(ReplicaGCQueueInactivityThreshold.Nanoseconds(), 0)) {
// Return false immediately if the previous check was less than the
// check interval in the past. Note that we don't do this if the
// replica is in candidate state, in which case we want to be more
// aggressive - a failed rebalance attempt could have checked this
// range, and candidate state suggests that a retry succeeded. See
// #7489.
return false, 0
}

shouldQ := lastActivity.Add(timeout.Nanoseconds(), 0).Less(now)

if !shouldQ {
// Only queue for GC if the timeout interval has passed since the last check.
if !lastCheck.Add(timeout.Nanoseconds(), 0).Less(now) {
return false, 0
}

return shouldQ, priority
return true, priority
}

// process performs a consistent lookup on the range descriptor to see if we are
Expand Down
67 changes: 34 additions & 33 deletions pkg/kv/kvserver/replica_gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
package kvserver

import (
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestReplicaGCShouldQueue(t *testing.T) {
Expand All @@ -27,45 +29,44 @@ func TestReplicaGCShouldQueue(t *testing.T) {
return hlc.Timestamp{WallTime: t.Nanoseconds()}
}

base := 2 * (ReplicaGCQueueSuspectTimeout + ReplicaGCQueueInactivityThreshold)
base := 2 * (ReplicaGCQueueSuspectCheckInterval + ReplicaGCQueueCheckInterval)
var (
z = ts(0)
bTS = ts(base)
cTS = ts(base + ReplicaGCQueueSuspectTimeout)
cTSnext = ts(base + ReplicaGCQueueSuspectTimeout + 1)
iTSprev = ts(base + ReplicaGCQueueInactivityThreshold - 1)
iTS = ts(base + ReplicaGCQueueInactivityThreshold)
sTS = ts(base + ReplicaGCQueueSuspectCheckInterval)
sTSnext = ts(base + ReplicaGCQueueSuspectCheckInterval + 1)
iTS = ts(base + ReplicaGCQueueCheckInterval)
iTSnext = ts(base + ReplicaGCQueueCheckInterval + 1)
)

for i, test := range []struct {
now, lastCheck, lastActivity hlc.Timestamp
isCandidate bool

shouldQ bool
priority float64
testcases := []struct {
now hlc.Timestamp
lastCheck hlc.Timestamp
isSuspect bool
expectQueue bool
expectPriority float64
}{
// Test outcomes when range is in candidate state.

// All timestamps current: candidacy plays no role.
{now: z, lastCheck: z, lastActivity: z, isCandidate: true, shouldQ: false, priority: 0},
// All timestamps current: suspect plays no role.
{now: z, lastCheck: z, isSuspect: true, expectQueue: false, expectPriority: 0},
// Threshold: no action taken.
{now: cTS, lastCheck: z, lastActivity: bTS, isCandidate: true, shouldQ: false, priority: 0},
// Queue with priority.
{now: cTSnext, lastCheck: z, lastActivity: bTS, isCandidate: true, shouldQ: true, priority: 1},
// Last processed recently: candidate still gets processed eagerly.
{now: cTSnext, lastCheck: bTS, lastActivity: z, isCandidate: true, shouldQ: true, priority: 1},
// Last processed recently: non-candidate stays put.
{now: cTSnext, lastCheck: bTS, lastActivity: z, isCandidate: false, shouldQ: false, priority: 0},
// Still no effect until iTS reached.
{now: iTSprev, lastCheck: bTS, lastActivity: z, isCandidate: false, shouldQ: false, priority: 0},
{now: iTS, lastCheck: bTS, lastActivity: z, isCandidate: true, shouldQ: true, priority: 1},
// Verify again that candidacy increases priority.
{now: iTS, lastCheck: bTS, lastActivity: z, isCandidate: false, shouldQ: true, priority: 0},
} {
if sq, pr := replicaGCShouldQueueImpl(
test.now, test.lastCheck, test.lastActivity, test.isCandidate,
); sq != test.shouldQ || pr != test.priority {
t.Errorf("%d: %+v: got (%t,%f)", i, test, sq, pr)
}
{now: sTS, lastCheck: bTS, isSuspect: true, expectQueue: false, expectPriority: 0},
// Last processed recently: suspect still gets processed eagerly.
{now: sTSnext, lastCheck: bTS, isSuspect: true, expectQueue: true, expectPriority: 1},
// Last processed recently: non-suspect stays put.
{now: sTSnext, lastCheck: bTS, isSuspect: false, expectQueue: false, expectPriority: 0},
// No effect until iTS crossed.
{now: iTS, lastCheck: bTS, isSuspect: false, expectQueue: false, expectPriority: 0},
{now: iTSnext, lastCheck: bTS, isSuspect: false, expectQueue: true, expectPriority: 0},
// Verify again that suspect increases priority.
{now: iTSnext, lastCheck: bTS, isSuspect: true, expectQueue: true, expectPriority: 1},
}
for _, tc := range testcases {
tc := tc
name := fmt.Sprintf("now=%s lastCheck=%s isSuspect=%v", tc.now, tc.lastCheck, tc.isSuspect)
t.Run(name, func(t *testing.T) {
shouldQueue, priority := replicaGCShouldQueueImpl(tc.now, tc.lastCheck, tc.isSuspect)
require.Equal(t, tc.expectQueue, shouldQueue)
require.Equal(t, tc.expectPriority, priority)
})
}
}