Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: only pause followers when holding active lease #85732

Merged
merged 4 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 1 addition & 47 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,53 +1186,7 @@ func (r *Replica) tick(
return false, nil
}

if r.replicaID == r.mu.leaderID && len(ioOverloadMap) > 0 && quotaPoolEnabledForRange(*r.descRLocked()) {
// When multiple followers are overloaded, we may not be able to exclude all
// of them from replication traffic due to quorum constraints. We would like
// a given Range to deterministically exclude the same store (chosen
// randomly), so that across multiple Ranges we have a chance of removing
// load from all overloaded Stores in the cluster. (It would be a bad idea
// to roll a per-Range dice here on every tick, since that would rapidly
// include and exclude individual followers from replication traffic, which
// would be akin to a high rate of packet loss. Once we've decided to ignore
// a follower, this decision should be somewhat stable for at least a few
// seconds).
//
// Note that we don't enable this mechanism for the liveness range (see
// quotaPoolEnabledForRange), simply to play it safe, as we know that the
// liveness range is unlikely to be a major contributor to any follower's
// I/O and wish to reduce the likelihood of a problem in replication pausing
// contributing to an outage of that critical range.
seed := int64(r.RangeID)
now := r.store.Clock().Now().GoTime()
d := computeExpendableOverloadedFollowersInput{
replDescs: r.descRLocked().Replicas(),
ioOverloadMap: ioOverloadMap,
getProgressMap: func(_ context.Context) map[uint64]tracker.Progress {
prs := r.mu.internalRaftGroup.Status().Progress
updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool {
return r.mu.lastUpdateTimes.isFollowerActiveSince(ctx, id, now, r.store.cfg.RangeLeaseActiveDuration())
})
return prs
},
minLiveMatchIndex: r.mu.proposalQuotaBaseIndex,
seed: seed,
}
r.mu.pausedFollowers, _ = computeExpendableOverloadedFollowers(ctx, d)
for replicaID := range r.mu.pausedFollowers {
// We're dropping messages to those followers (see handleRaftReady) but
// it's a good idea to tell raft not to even bother sending in the first
// place. Raft will react to this by moving the follower to probing state
// where it will be contacted only sporadically until it responds to an
// MsgApp (which it can only do once we stop dropping messages). Something
// similar would result naturally if we didn't report as unreachable, but
// with more wasted work.
r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID))
}
} else if len(r.mu.pausedFollowers) > 0 {
// No store in the cluster is overloaded, or this replica is not raft leader.
r.mu.pausedFollowers = nil
}
r.updatePausedFollowersLocked(ctx, ioOverloadMap)

now := r.store.Clock().NowAsClockTimestamp()
if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) {
Expand Down
75 changes: 73 additions & 2 deletions pkg/kv/kvserver/replica_raft_overload.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var pauseReplicationIOThreshold = settings.RegisterFloatSetting(
)

type computeExpendableOverloadedFollowersInput struct {
self roachpb.ReplicaID
replDescs roachpb.ReplicaSet
// TODO(tbg): all entries are overloaded, so consdier removing the IOThreshold here
// because it's confusing.
Expand Down Expand Up @@ -104,11 +105,10 @@ func computeExpendableOverloadedFollowers(
var nonLive map[roachpb.ReplicaID]nonLiveReason
var liveOverloadedVoterCandidates map[roachpb.ReplicaID]struct{}
var liveOverloadedNonVoterCandidates map[roachpb.ReplicaID]struct{}

var prs map[uint64]tracker.Progress

for _, replDesc := range d.replDescs.AsProto() {
if _, overloaded := d.ioOverloadMap[replDesc.StoreID]; !overloaded {
if _, overloaded := d.ioOverloadMap[replDesc.StoreID]; !overloaded || replDesc.ReplicaID == d.self {
continue
}
// There's at least one overloaded follower, so initialize
Expand Down Expand Up @@ -206,3 +206,74 @@ func (osm *overloadedStoresMap) Swap(
v, _ := (*atomic.Value)(osm).Swap(m).(map[roachpb.StoreID]*admissionpb.IOThreshold)
return v
}

func (r *Replica) updatePausedFollowersLocked(
ctx context.Context, ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold,
) {
r.mu.pausedFollowers = nil

if len(ioOverloadMap) == 0 {
return
}

if r.replicaID != r.mu.leaderID {
// Only the raft leader pauses followers. Followers never send meaningful
// amounts of data in raft messages, so pausing doesn't make sense on them.
return
}

if !quotaPoolEnabledForRange(*r.descRLocked()) {
// If the quota pool isn't enabled (like for the liveness range), play it
// safe. The range is unlikely to be a major contributor to any follower's
// I/O and wish to reduce the likelihood of a problem in replication pausing
// contributing to an outage of that critical range.
return
}

status := r.leaseStatusAtRLocked(ctx, r.Clock().NowAsClockTimestamp())
if !status.IsValid() || !status.OwnedBy(r.StoreID()) {
// If we're not the leaseholder (which includes the case in which we just
// transferred the lease away), leave all followers unpaused. Otherwise, the
// leaseholder won't learn that the entries it submitted were committed
// which effectively causes range unavailability.
return
}
tbg marked this conversation as resolved.
Show resolved Hide resolved

// When multiple followers are overloaded, we may not be able to exclude all
// of them from replication traffic due to quorum constraints. We would like
// a given Range to deterministically exclude the same store (chosen
// randomly), so that across multiple Ranges we have a chance of removing
// load from all overloaded Stores in the cluster. (It would be a bad idea
// to roll a per-Range dice here on every tick, since that would rapidly
// include and exclude individual followers from replication traffic, which
// would be akin to a high rate of packet loss. Once we've decided to ignore
// a follower, this decision should be somewhat stable for at least a few
// seconds).
seed := int64(r.RangeID)
now := r.store.Clock().Now().GoTime()
d := computeExpendableOverloadedFollowersInput{
self: r.replicaID,
replDescs: r.descRLocked().Replicas(),
ioOverloadMap: ioOverloadMap,
getProgressMap: func(_ context.Context) map[uint64]tracker.Progress {
prs := r.mu.internalRaftGroup.Status().Progress
updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool {
return r.mu.lastUpdateTimes.isFollowerActiveSince(ctx, id, now, r.store.cfg.RangeLeaseActiveDuration())
})
return prs
},
minLiveMatchIndex: r.mu.proposalQuotaBaseIndex,
seed: seed,
}
r.mu.pausedFollowers, _ = computeExpendableOverloadedFollowers(ctx, d)
for replicaID := range r.mu.pausedFollowers {
// We're dropping messages to those followers (see handleRaftReady) but
// it's a good idea to tell raft not to even bother sending in the first
// place. Raft will react to this by moving the follower to probing state
// where it will be contacted only sporadically until it responds to an
// MsgApp (which it can only do once we stop dropping messages). Something
// similar would result naturally if we didn't report as unreachable, but
// with more wasted work.
r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID))
}
}
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_raft_overload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T)
require.Equal(t, "run", d.Cmd)
var seed uint64
var replDescs roachpb.ReplicaSet
var self roachpb.ReplicaID
ioOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{}
snapshotMap := map[roachpb.ReplicaID]struct{}{}
downMap := map[roachpb.ReplicaID]struct{}{}
Expand All @@ -65,6 +66,8 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T)
switch arg.Key {
case "min-live-match-index":
minLiveMatchIndex = id
case "self":
self = roachpb.ReplicaID(id)
case "voters", "learners":
replicaID := roachpb.ReplicaID(id)
if matchS != "" {
Expand Down Expand Up @@ -134,6 +137,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T)
}

m, _ := computeExpendableOverloadedFollowers(ctx, computeExpendableOverloadedFollowersInput{
self: self,
replDescs: replDescs,
ioOverloadMap: ioOverloadMap,
getProgressMap: getProgressMap,
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/testdata/replica_raft_overload/self.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Won't consider itself for pausing.
run voters=(1,2,3) overloaded=(1) self=1
----
[]