Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
105041: kvserver: clean up replica unquiescence r=erikgrinaker a=erikgrinaker

**kvserver: remove stale `maybeQuiesce` TODO**

The leader won't quiesce if followers aren't caught up.

**kvserver: remove `maybeUnquiesceAndWakeLeaderLocked`**

This patch merges `maybeUnquiesceAndWakeLeaderLocked()` into `maybeUnquiesceWithOptionsLocked()`, using a separate parameter to wake the leader. Care is taken to make this purely mechanical, with no logical changes at all.

**kvserver: remove `maybeUnquiesceWithOptionsLocked`**

This patch merges `maybeUnquiesceWithOptionsLocked()` into `maybeUnquiesceLocked()`, requiring callers to always specify options. The subtlety around unquiescence, and few call sites, makes it beneficial to be explicit.

This is a purely mechanical change, with no logical changes. A couple of tests have been changed to now wake the leader when unquiescing, but this has no bearing on the tests.

**kvserver: set last update times on leader unquiesce**

Previously, `Replica.lastUpdateTimes` was updated whenever a replica unquiesced without attempting to wake the leader. However, this had two flaws: a leader could fail to call it if it hit a code path where it did attempt to wake the leader (even if it was leader itself), e.g. by returning `true` from `withRaftGroup()`, and it could also be called on a follower where it would have no effect.

This patch instead updates it when unquiescing the leader, regardless of unquiesce options.

**kvserver: only wake leader when unquiescing a follower**

Previously, any replica would wake the leader when unquiescing, if requested by the caller. However, this could cause the leader to propose an empty command to wake itself, which commonly happens in `handleRaftReady()` via `withRaftGroupLocked()`. This appears unnecessary, and likely causes a large amount of Raft proposals with ranges that frequently (un)quiesce.

This patch instead only attempts to wake the leader from followers.

**kvserver: wake leader before campaigning when unquiescing**

We should wake the leader before campaigning when unquiescing, since we won't send the proposal in the candidate state and thus won't give the leader a chance to assert leadership if we're wrong about it being dead.

**kvserver: omit unnecessary campaign checks on leader unquiescence**

This patch changes a couple of `maybeUnquiesce()` call sites to not attempt to campaign when we know the replica must already be the leader.

Epic: none
Release note: None


Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Jun 20, 2023
2 parents aae9989 + 3dd1a37 commit a416398
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func mergeCheckingTimestampCaches(
// Make sure the LHS range in uniquiesced so that it elects a new
// Raft leader after the partition is established.
for _, r := range lhsRepls {
r.MaybeUnquiesceAndWakeLeader()
r.MaybeUnquiesce()
}

// Issue an increment on the range. The leaseholder should evaluate
Expand Down
119 changes: 113 additions & 6 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func TestRequestsOnLaggingReplica(t *testing.T) {
// Make sure this replica has not inadvertently quiesced. We need the
// replica ticking so that it campaigns.
if otherRepl.IsQuiescent() {
otherRepl.MaybeUnquiesceAndWakeLeader()
otherRepl.MaybeUnquiesce()
}
lead := otherRepl.RaftStatus().Lead
if lead == raft.None {
Expand Down Expand Up @@ -3841,7 +3841,7 @@ func TestReplicaTooOldGC(t *testing.T) {
} else if replica != nil {
// Make sure the replica is unquiesced so that it will tick and
// contact the leader to discover it's no longer part of the range.
replica.MaybeUnquiesceAndWakeLeader()
replica.MaybeUnquiesce()
}
return errors.Errorf("found %s, waiting for it to be GC'd", replica)
})
Expand Down Expand Up @@ -6415,11 +6415,12 @@ func TestRaftCheckQuorum(t *testing.T) {
require.Equal(t, raft.StateLeader, initialStatus.RaftState)
logStatus(initialStatus)

// Unquiesce the leader if necessary. We have to use AndWakeLeader to
// submit a proposal, otherwise it will immediately quiesce again without
// ticking.
// Unquiesce the leader if necessary. We have to do so by submitting an
// empty proposal, otherwise the leader will immediately quiesce again.
if quiesce {
require.True(t, repl1.MaybeUnquiesceAndWakeLeader())
ok, err := repl1.MaybeUnquiesceAndPropose()
require.NoError(t, err)
require.True(t, ok)
t.Logf("n1 unquiesced")
} else {
require.False(t, repl1.IsQuiescent())
Expand Down Expand Up @@ -6582,3 +6583,109 @@ func TestRaftLeaderRemovesItself(t *testing.T) {
return false
}, 10*time.Second, 500*time.Millisecond)
}

// TestRaftUnquiesceLeaderNoProposal tests that unquiescing a Raft leader does
// not result in a proposal, since this is unnecessary and expensive.
func TestRaftUnquiesceLeaderNoProposal(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timing-sensitive, so skip under deadlock detector and stressrace.
skip.UnderDeadlock(t)
skip.UnderStressRace(t)

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

// Disable lease extensions and expiration-based lease transfers,
// since these cause range writes and prevent quiescence.
st := cluster.MakeTestingClusterSettings()
kvserver.TransferExpirationLeasesFirstEnabled.Override(ctx, &st.SV, false)
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false)

// Block writes to the range, to prevent spurious proposals (typically due to
// txn record GC).
var blockRange atomic.Int64
reqFilter := func(ctx context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
if rangeID := roachpb.RangeID(blockRange.Load()); rangeID > 0 && rangeID == ba.RangeID {
t.Logf("r%d write rejected: %s", rangeID, ba)
return kvpb.NewError(errors.New("rejected"))
}
return nil
}

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
RaftTickInterval: 100 * time.Millisecond, // speed up test
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: reqFilter,
},
},
},
})
defer tc.Stopper().Stop(ctx)

logStatus := func(s *raft.Status) {
t.Helper()
require.NotNil(t, s)
t.Logf("n%d %s at term=%d commit=%d", s.ID, s.RaftState, s.Term, s.Commit)
}

sender := tc.GetFirstStoreFromServer(t, 0).TestSender()

// Create a range, upreplicate it, and replicate a write.
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
_, pErr := kv.SendWrapped(ctx, sender, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{1, 1, 1})

repl1, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)
repl2, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID)
require.NoError(t, err)
repl3, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)
repls := []*kvserver.Replica{repl1, repl2, repl3}

// Block writes.
blockRange.Store(int64(desc.RangeID))
defer blockRange.Store(0)

// Wait for the range to quiesce.
require.Eventually(t, func() bool {
for _, repl := range repls {
if !repl.IsQuiescent() {
return false
}
}
return true
}, 10*time.Second, 100*time.Millisecond)
t.Logf("range quiesced")

// Make sure n1 is still leader.
initialStatus := repl1.RaftStatus()
require.Equal(t, raft.StateLeader, initialStatus.RaftState)
logStatus(initialStatus)
t.Logf("n1 leader")

// Unquiesce n1. This may result in it immediately quiescing again, which is
// fine, but it shouldn't submit a proposal to wake up the followers.
require.True(t, repl1.MaybeUnquiesce())
t.Logf("n1 unquiesced")

require.Eventually(t, repl1.IsQuiescent, 10*time.Second, 100*time.Millisecond)
t.Logf("n1 quiesced")

status := repl1.RaftStatus()
logStatus(status)
require.Equal(t, raft.StateLeader, status.RaftState)
require.Equal(t, initialStatus.Term, status.Term)
require.Equal(t, initialStatus.Progress[1].Match, status.Progress[1].Match)
t.Logf("n1 still leader with no new proposals at log index %d", status.Progress[1].Match)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3463,7 +3463,7 @@ func TestReplicaTombstone(t *testing.T) {
})
tc.RemoveVotersOrFatal(t, key, tc.Target(2))
testutils.SucceedsSoon(t, func() error {
repl.MaybeUnquiesceAndWakeLeader()
repl.MaybeUnquiesce()
if len(sawTooOld) == 0 {
return errors.New("still haven't seen ReplicaTooOldError")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1975,7 +1975,8 @@ ORDER BY name ASC;

h.comment(`-- (Unquiesce the range.)`)
testutils.SucceedsSoon(t, func() error {
tc.GetRaftLeader(t, roachpb.RKey(k)).MaybeUnquiesceAndWakeLeader()
_, err := tc.GetRaftLeader(t, roachpb.RKey(k)).MaybeUnquiesceAndPropose()
require.NoError(t, err)
return h.checkAllTokensReturned(ctx, 3)
})

Expand Down
19 changes: 14 additions & 5 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,15 +495,24 @@ func (r *Replica) GetQueueLastProcessed(ctx context.Context, queue string) (hlc.
}

func (r *Replica) MaybeUnquiesce() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.maybeUnquiesceWithOptionsLocked(false /* campaignOnWake */)
return r.maybeUnquiesce(true /* wakeLeader */, true /* mayCampaign */)
}

func (r *Replica) MaybeUnquiesceAndWakeLeader() bool {
// MaybeUnquiesceAndPropose will unquiesce the range and submit a noop proposal.
// This is useful when unquiescing the leader and wanting to also unquiesce
// followers, since the leader may otherwise simply quiesce again immediately.
func (r *Replica) MaybeUnquiesceAndPropose() (bool, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.maybeUnquiesceAndWakeLeaderLocked()
if !r.canUnquiesceRLocked() {
return false, nil
}
return true, r.withRaftGroupLocked(false, func(r *raft.RawNode) (bool, error) {
if err := r.Propose(nil); err != nil {
return false, err
}
return true /* unquiesceAndWakeLeader */, nil
})
}

func (r *Replica) ReadCachedProtectedTS() (readAt, earliestProtectionTimestamp hlc.Timestamp) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2065,7 +2065,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
// orphaned followers would fail to queue themselves for GC.) Unquiesce the
// range in case it managed to quiesce between when the Subsume request
// arrived and now, which is rare but entirely legal.
r.maybeUnquiesceLocked()
r.maybeUnquiesceLocked(false /* wakeLeader */, true /* mayCampaign */)

taskCtx := r.AnnotateCtx(context.Background())
err = r.store.stopper.RunAsyncTask(taskCtx, "wait-for-merge", func(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error
return (*Replica)(rp).withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
// We're proposing a command here so there is no need to wake the leader
// if we were quiesced. However, we should make sure we are unquiesced.
(*Replica)(rp).maybeUnquiesceLocked()
(*Replica)(rp).maybeUnquiesceLocked(false /* wakeLeader */, true /* mayCampaign */)
return false /* maybeUnquiesceLocked */, fn(raftGroup)
})
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,13 +668,14 @@ func (r *Replica) stepRaftGroup(req *kvserverpb.RaftMessageRequest) error {
st := r.raftBasicStatusRLocked()
hasLeader := st.RaftState == raft.StateFollower && st.Lead != 0
fromLeader := uint64(req.FromReplica.ReplicaID) == st.Lead

var wakeLeader, mayCampaign bool
if hasLeader && !fromLeader {
// TODO(erikgrinaker): This is likely to result in election ties, find
// some way to avoid that.
r.maybeUnquiesceAndWakeLeaderLocked()
} else {
r.maybeUnquiesceWithOptionsLocked(false /* campaignOnWake */)
wakeLeader, mayCampaign = true, true
}
r.maybeUnquiesceLocked(wakeLeader, mayCampaign)
}
r.mu.lastUpdateTimes.update(req.FromReplica.ReplicaID, timeutil.Now())
if req.Message.Type == raftpb.MsgSnap {
Expand Down Expand Up @@ -2117,7 +2118,7 @@ func (r *Replica) withRaftGroupLocked(
unquiesce = true
}
if unquiesce {
r.maybeUnquiesceAndWakeLeaderLocked()
r.maybeUnquiesceLocked(true /* wakeLeader */, true /* mayCampaign */)
}
return err
}
Expand Down
88 changes: 42 additions & 46 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,69 +40,74 @@ var raftDisableQuiescence = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_QUIESCEN
func (r *Replica) quiesceLocked(ctx context.Context, lagging laggingReplicaSet) {
if !r.mu.quiescent {
if log.V(3) {
log.Infof(ctx, "quiescing %d", r.RangeID)
log.Infof(ctx, "quiescing r%d", r.RangeID)
}
r.mu.quiescent = true
r.mu.laggingFollowersOnQuiesce = lagging
r.store.unquiescedReplicas.Lock()
delete(r.store.unquiescedReplicas.m, r.RangeID)
r.store.unquiescedReplicas.Unlock()
} else if log.V(4) {
log.Infof(ctx, "already quiesced")
log.Infof(ctx, "r%d already quiesced", r.RangeID)
}
}

func (r *Replica) maybeUnquiesce() bool {
// maybeUnquiesce unquiesces the replica if it is quiesced and can be
// unquiesced, returning true in that case. See maybeUnquiesceLocked() for
// details.
func (r *Replica) maybeUnquiesce(wakeLeader, mayCampaign bool) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.maybeUnquiesceLocked()
return r.maybeUnquiesceLocked(wakeLeader, mayCampaign)
}

func (r *Replica) maybeUnquiesceLocked() bool {
return r.maybeUnquiesceWithOptionsLocked(true /* campaignOnWake */)
}

func (r *Replica) maybeUnquiesceWithOptionsLocked(campaignOnWake bool) bool {
// maybeUnquiesceLocked unquiesces the replica if it is quiesced and can be
// unquiesced, returning true in that case.
//
// If wakeLeader is true, wake the leader by proposing an empty command. Should
// typically be true, unless e.g. the caller is either about to propose a
// command anyway, or it knows the leader is awake because it received a message
// from it.
//
// If mayCampaign is true, the replica may campaign if appropriate. This will
// respect PreVote and CheckQuorum, and thus won't disrupt a current leader.
// Should typically be true, unless the caller wants to avoid election ties.
func (r *Replica) maybeUnquiesceLocked(wakeLeader, mayCampaign bool) bool {
if !r.canUnquiesceRLocked() {
return false
}
ctx := r.AnnotateCtx(context.TODO())
if log.V(3) {
log.Infof(ctx, "unquiescing %d", r.RangeID)
log.Infof(ctx, "unquiescing r%d", r.RangeID)
}
r.mu.quiescent = false
r.mu.laggingFollowersOnQuiesce = nil
r.store.unquiescedReplicas.Lock()
r.store.unquiescedReplicas.m[r.RangeID] = struct{}{}
r.store.unquiescedReplicas.Unlock()
if campaignOnWake {
r.maybeCampaignOnWakeLocked(ctx)
}
// NB: we know there's a non-nil RaftStatus because internalRaftGroup isn't nil.
r.mu.lastUpdateTimes.updateOnUnquiesce(
r.mu.state.Desc.Replicas().Descriptors(), r.raftSparseStatusRLocked().Progress, timeutil.Now(),
)
return true
}

func (r *Replica) maybeUnquiesceAndWakeLeaderLocked() bool {
if !r.canUnquiesceRLocked() {
return false
st := r.raftSparseStatusRLocked()
if st.RaftState == raft.StateLeader {
r.mu.lastUpdateTimes.updateOnUnquiesce(
r.mu.state.Desc.Replicas().Descriptors(), st.Progress, timeutil.Now())

} else if st.RaftState == raft.StateFollower && wakeLeader {
// Propose an empty command which will wake the leader.
if log.V(3) {
log.Infof(ctx, "waking r%d leader", r.RangeID)
}
data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, makeIDKey(), nil)
_ = r.mu.internalRaftGroup.Propose(data)
r.mu.lastProposalAtTicks = r.mu.ticks // delay imminent quiescence
}
ctx := r.AnnotateCtx(context.TODO())
if log.V(3) {
log.Infof(ctx, "unquiescing %d: waking leader", r.RangeID)

// NB: campaign after attempting to wake leader, since we won't send the
// proposal in candidate state. This gives it a chance to assert leadership if
// we're wrong about it being dead.
if mayCampaign {
r.maybeCampaignOnWakeLocked(ctx)
}
r.mu.quiescent = false
r.mu.laggingFollowersOnQuiesce = nil
r.store.unquiescedReplicas.Lock()
r.store.unquiescedReplicas.m[r.RangeID] = struct{}{}
r.store.unquiescedReplicas.Unlock()
r.maybeCampaignOnWakeLocked(ctx)
// Propose an empty command which will wake the leader.
data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, makeIDKey(), nil)
_ = r.mu.internalRaftGroup.Propose(data)
r.mu.lastProposalAtTicks = r.mu.ticks // delay imminent quiescence

return true
}

Expand Down Expand Up @@ -184,14 +189,6 @@ func (r *Replica) canUnquiesceRLocked() bool {
// are behind, whether or not they are live. If any entry in the livenessMap is
// nil, then the missing node ID is treated as live and will prevent the range
// from quiescing.
//
// TODO(peter): There remains a scenario in which a follower is left unquiesced
// while the leader is quiesced: the follower's receive queue is full and the
// "quiesce" message is dropped. This seems very very unlikely because if the
// follower isn't keeping up with raft messages it is unlikely that the leader
// would quiesce. The fallout from this situation are undesirable raft
// elections which will cause throughput hiccups to the range, but not
// correctness issues.
func (r *Replica) maybeQuiesceRaftMuLockedReplicaMuLocked(
ctx context.Context, leaseStatus kvserverpb.LeaseStatus, livenessMap livenesspb.IsLiveMap,
) bool {
Expand Down Expand Up @@ -468,13 +465,12 @@ func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked(
if roachpb.ReplicaID(id) == r.replicaID {
continue
}
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(
roachpb.ReplicaID(id), lastFromReplica)
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(id), lastFromReplica)
if toErr != nil {
if log.V(4) {
log.Infof(ctx, "failed to quiesce: cannot find to replica (%d)", id)
}
r.maybeUnquiesceLocked()
r.maybeUnquiesceLocked(false /* wakeLeader */, false /* mayCampaign */) // already leader
return false
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) {
lagging := r.mu.laggingFollowersOnQuiesce
r.mu.RUnlock()
if quiescent && lagging.MemberStale(l) {
r.maybeUnquiesce()
r.maybeUnquiesce(false /* wakeLeader */, false /* mayCampaign */) // already leader
}
})
}
Expand Down

0 comments on commit a416398

Please sign in to comment.