Skip to content

Commit

Permalink
storage: Consider recently-added replicas healthy when rebalancing
Browse files Browse the repository at this point in the history
  • Loading branch information
a-robinson committed Aug 28, 2017
1 parent 2c4f14b commit 54618ea
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 53 deletions.
19 changes: 14 additions & 5 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,12 @@ func computeQuorum(nodes int) int {

// filterBehindReplicas removes any "behind" replicas from the supplied
// slice. A "behind" replica is one which is not at or past the quorum commit
// index.
// index. We forgive brandNewReplicaID for being behind, since a new range can
// take a little while to fully catch up.
func filterBehindReplicas(
raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor,
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
if raftStatus == nil || len(raftStatus.Progress) == 0 {
// raftStatus.Progress is only populated on the Raft leader which means we
Expand All @@ -866,6 +869,7 @@ func filterBehindReplicas(
for _, r := range replicas {
if progress, ok := raftStatus.Progress[uint64(r.ReplicaID)]; ok {
if uint64(r.ReplicaID) == raftStatus.Lead ||
r.ReplicaID == brandNewReplicaID ||
(progress.State == raft.ProgressStateReplicate &&
progress.Match >= raftStatus.Commit) {
candidates = append(candidates, r)
Expand All @@ -877,11 +881,16 @@ func filterBehindReplicas(

// filterUnremovableReplicas removes any unremovable replicas from the supplied
// slice. An unremovable replicas is one which is a necessary part of the
// quorum that will result from removing 1 replica.
// quorum that will result from removing 1 replica. We forgive brandNewReplicaID
// for being behind, since a new range can take a little while to catch up.
// This is important when we've just added a replica in order to rebalance to
// it (#17879).
func filterUnremovableReplicas(
raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor,
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
upToDateReplicas := filterBehindReplicas(raftStatus, replicas)
upToDateReplicas := filterBehindReplicas(raftStatus, replicas, brandNewReplicaID)
quorum := computeQuorum(len(replicas) - 1)
if len(upToDateReplicas) < quorum {
// The number of up-to-date replicas is less than quorum. No replicas can
Expand Down
112 changes: 66 additions & 46 deletions pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2299,33 +2299,43 @@ func TestFilterBehindReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
commit uint64
leader uint64
progress []uint64
expected []uint64
commit uint64
leader uint64
progress []uint64
brandNewReplicaID roachpb.ReplicaID
expected []uint64
}{
{0, 99, []uint64{0}, nil},
{1, 99, []uint64{1}, []uint64{1}},
{2, 99, []uint64{2}, []uint64{2}},
{1, 99, []uint64{0, 1}, []uint64{1}},
{1, 99, []uint64{1, 2}, []uint64{1, 2}},
{2, 99, []uint64{3, 2}, []uint64{3, 2}},
{1, 99, []uint64{0, 0, 1}, []uint64{1}},
{1, 99, []uint64{0, 1, 2}, []uint64{1, 2}},
{2, 99, []uint64{1, 2, 3}, []uint64{2, 3}},
{3, 99, []uint64{4, 3, 2}, []uint64{4, 3}},
{1, 99, []uint64{1, 1, 1}, []uint64{1, 1, 1}},
{1, 99, []uint64{1, 1, 2}, []uint64{1, 1, 2}},
{2, 99, []uint64{1, 2, 2}, []uint64{2, 2}},
{2, 99, []uint64{0, 1, 2, 3}, []uint64{2, 3}},
{2, 99, []uint64{1, 2, 3, 4}, []uint64{2, 3, 4}},
{3, 99, []uint64{5, 4, 3, 2}, []uint64{5, 4, 3}},
{3, 99, []uint64{1, 2, 3, 4, 5}, []uint64{3, 4, 5}},
{4, 99, []uint64{6, 5, 4, 3, 2}, []uint64{6, 5, 4}},
{0, 0, []uint64{0}, []uint64{0}},
{0, 0, []uint64{0, 0, 0}, []uint64{0}},
{1, 0, []uint64{2, 0, 1}, []uint64{2, 1}},
{1, 1, []uint64{0, 2, 1}, []uint64{2, 1}},
{0, 99, []uint64{0}, 0, nil},
{1, 99, []uint64{1}, 0, []uint64{1}},
{2, 99, []uint64{2}, 0, []uint64{2}},
{1, 99, []uint64{0, 1}, 0, []uint64{1}},
{1, 99, []uint64{1, 2}, 0, []uint64{1, 2}},
{2, 99, []uint64{3, 2}, 0, []uint64{3, 2}},
{1, 99, []uint64{0, 0, 1}, 0, []uint64{1}},
{1, 99, []uint64{0, 1, 2}, 0, []uint64{1, 2}},
{2, 99, []uint64{1, 2, 3}, 0, []uint64{2, 3}},
{3, 99, []uint64{4, 3, 2}, 0, []uint64{4, 3}},
{1, 99, []uint64{1, 1, 1}, 0, []uint64{1, 1, 1}},
{1, 99, []uint64{1, 1, 2}, 0, []uint64{1, 1, 2}},
{2, 99, []uint64{1, 2, 2}, 0, []uint64{2, 2}},
{2, 99, []uint64{0, 1, 2, 3}, 0, []uint64{2, 3}},
{2, 99, []uint64{1, 2, 3, 4}, 0, []uint64{2, 3, 4}},
{3, 99, []uint64{5, 4, 3, 2}, 0, []uint64{5, 4, 3}},
{3, 99, []uint64{1, 2, 3, 4, 5}, 0, []uint64{3, 4, 5}},
{4, 99, []uint64{6, 5, 4, 3, 2}, 0, []uint64{6, 5, 4}},
{4, 99, []uint64{6, 5, 4, 3, 2}, 0, []uint64{6, 5, 4}},
{0, 1, []uint64{0}, 0, []uint64{0}},
{0, 1, []uint64{0, 0, 0}, 0, []uint64{0}},
{1, 1, []uint64{2, 0, 1}, 0, []uint64{2, 1}},
{1, 2, []uint64{0, 2, 1}, 0, []uint64{2, 1}},
{1, 99, []uint64{0, 1}, 1, []uint64{0, 1}},
{1, 99, []uint64{0, 1}, 2, []uint64{1}},
{9, 99, []uint64{0, 9}, 1, []uint64{0, 9}},
{9, 99, []uint64{0, 1}, 1, []uint64{0}},
{1, 1, []uint64{2, 0, 1}, 2, []uint64{2, 0, 1}},
{1, 1, []uint64{2, 0, 1}, 3, []uint64{2, 1}},
{4, 99, []uint64{6, 5, 4, 3, 2}, 5, []uint64{6, 5, 4, 2}},
{4, 99, []uint64{6, 5, 4, 3, 0}, 5, []uint64{6, 5, 4, 0}},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
Expand All @@ -2343,13 +2353,14 @@ func TestFilterBehindReplicas(t *testing.T) {
if v == 0 {
p.State = raft.ProgressStateProbe
}
status.Progress[uint64(j)] = p
replicaID := uint64(j + 1)
status.Progress[replicaID] = p
replicas = append(replicas, roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(j),
ReplicaID: roachpb.ReplicaID(replicaID),
StoreID: roachpb.StoreID(v),
})
}
candidates := filterBehindReplicas(status, replicas)
candidates := filterBehindReplicas(status, replicas, c.brandNewReplicaID)
var ids []uint64
for _, c := range candidates {
ids = append(ids, uint64(c.StoreID))
Expand All @@ -2365,21 +2376,29 @@ func TestFilterUnremovableReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
commit uint64
progress []uint64
expected []uint64
commit uint64
progress []uint64
brandNewReplicaID roachpb.ReplicaID
expected []uint64
}{
{0, []uint64{0}, nil},
{1, []uint64{1}, nil},
{1, []uint64{0, 1}, []uint64{0}},
{1, []uint64{1, 2, 3}, []uint64{1, 2, 3}},
{2, []uint64{1, 2, 3}, []uint64{1}},
{3, []uint64{1, 2, 3}, nil},
{1, []uint64{1, 2, 3, 4}, []uint64{1, 2, 3, 4}},
{2, []uint64{1, 2, 3, 4}, []uint64{1, 2, 3, 4}},
{3, []uint64{1, 2, 3, 4}, []uint64{1, 2}},
{2, []uint64{1, 2, 3, 4, 5}, []uint64{1, 2, 3, 4, 5}},
{3, []uint64{1, 2, 3, 4, 5}, []uint64{1, 2}},
{0, []uint64{0}, 0, nil},
{1, []uint64{1}, 0, nil},
{1, []uint64{0, 1}, 0, []uint64{0}},
{1, []uint64{1, 2, 3}, 0, []uint64{1, 2, 3}},
{2, []uint64{1, 2, 3}, 0, []uint64{1}},
{3, []uint64{1, 2, 3}, 0, nil},
{1, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2, 3, 4}},
{2, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2, 3, 4}},
{3, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2}},
{2, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2, 3, 4, 5}},
{3, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2}},
{1, []uint64{1, 0}, 2, []uint64{1, 0}},
{1, []uint64{1, 0}, 1, []uint64{0}},
{3, []uint64{3, 2, 1}, 3, []uint64{2}},
{3, []uint64{3, 2, 0}, 3, []uint64{2}},
{3, []uint64{4, 3, 2, 1}, 4, []uint64{4, 3, 2, 1}},
{3, []uint64{4, 3, 2, 0}, 3, []uint64{4, 3, 2, 0}},
{3, []uint64{4, 3, 2, 0}, 4, []uint64{4, 3, 2, 0}},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
Expand All @@ -2399,14 +2418,15 @@ func TestFilterUnremovableReplicas(t *testing.T) {
if v == 0 {
p.State = raft.ProgressStateProbe
}
status.Progress[uint64(j)] = p
replicaID := uint64(j + 1)
status.Progress[replicaID] = p
replicas = append(replicas, roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(j),
ReplicaID: roachpb.ReplicaID(replicaID),
StoreID: roachpb.StoreID(v),
})
}

candidates := filterUnremovableReplicas(status, replicas)
candidates := filterUnremovableReplicas(status, replicas, c.brandNewReplicaID)
var ids []uint64
for _, c := range candidates {
ids = append(ids, uint64(c.StoreID))
Expand Down
40 changes: 40 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ type Replica struct {
// The ID of the leader replica within the Raft group. Used to determine
// when the leadership changes.
leaderID roachpb.ReplicaID
// The most recently added replica for the range and when it was added.
// Used to determine whether a replica is new enough that we shouldn't
// penalize it for being slightly behind. These field gets cleared out once
// we know that the replica has caught up.
lastReplicaAdded roachpb.ReplicaID
lastReplicaAddedTime time.Time

// The last seen replica descriptors from incoming Raft messages. These are
// stored so that the replica still knows the replica descriptors for itself
Expand Down Expand Up @@ -992,6 +998,13 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
if progress.Match > 0 && progress.Match < minIndex {
minIndex = progress.Match
}
// If this is the most recently added replica and it has caught up, clear
// our state that was tracking it. This is unrelated to managing proposal
// quota, but this is a convenient place to do so.
if rep.ReplicaID == r.mu.lastReplicaAdded && progress.Match >= status.Commit {
r.mu.lastReplicaAdded = 0
r.mu.lastReplicaAddedTime = time.Time{}
}
}
}

Expand Down Expand Up @@ -1454,10 +1467,37 @@ func (r *Replica) setDescWithoutProcessUpdate(desc *roachpb.RangeDescriptor) {
r.mu.state.Desc, desc)
}

newMaxID := maxReplicaID(desc)
if newMaxID > r.mu.lastReplicaAdded {
r.mu.lastReplicaAdded = newMaxID
r.mu.lastReplicaAddedTime = timeutil.Now()
}

r.rangeStr.store(r.mu.replicaID, desc)
r.mu.state.Desc = desc
}

func maxReplicaID(desc *roachpb.RangeDescriptor) roachpb.ReplicaID {
if desc == nil || !desc.IsInitialized() {
return 0
}
var maxID roachpb.ReplicaID
for _, repl := range desc.Replicas {
if repl.ReplicaID > maxID {
maxID = repl.ReplicaID
}
}
return maxID
}

// LastReplicaAdded returns the ID of the most recently added replica and the
// time at which it was added.
func (r *Replica) LastReplicaAdded() (roachpb.ReplicaID, time.Time) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.lastReplicaAdded, r.mu.lastReplicaAddedTime
}

// GetReplicaDescriptor returns the replica for this range from the range
// descriptor. Returns a *RangeNotFoundError if the replica is not found.
// No other errors are returned.
Expand Down
18 changes: 16 additions & 2 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ const (
// for rebalancing. It does not prevent transferring leases in order to allow
// a replica to be removed from a range.
minLeaseTransferInterval = time.Second

// newReplicaGracePeriod is the amount of time that we allow for a new
// replica's raft state to catch up to the leader's before we start
// considering it to be behind for the sake of rebalancing. We choose a
// large value here because snapshots of large replicas can take a while
// in high latency clusters, and not allowing enough of a cushion can
// make rebalance thrashing more likely (#17879).
newReplicaGracePeriod = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -338,7 +346,13 @@ func (rq *replicateQueue) processOneChange(
if log.V(1) {
log.Infof(ctx, "removing a replica")
}
candidates := filterUnremovableReplicas(repl.RaftStatus(), desc.Replicas)
lastReplAdded, lastAddedTime := repl.LastReplicaAdded()
if timeutil.Since(lastAddedTime) < newReplicaGracePeriod {
lastReplAdded = 0
}
candidates := filterUnremovableReplicas(repl.RaftStatus(), desc.Replicas, lastReplAdded)
log.VEventf(ctx, 3, "filtered unremovable replicas from %v to get %v as candidates for removal",
desc.Replicas, candidates)
removeReplica, details, err := rq.allocator.RemoveTarget(ctx, zone.Constraints, candidates, rangeInfo)
if err != nil {
return false, err
Expand Down Expand Up @@ -530,7 +544,7 @@ func (rq *replicateQueue) transferLease(
checkTransferLeaseSource bool,
checkCandidateFullness bool,
) (bool, error) {
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas)
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas, 0 /* brandNewReplicaID */)
if target := rq.allocator.TransferLeaseTarget(
ctx,
zone.Constraints,
Expand Down

0 comments on commit 54618ea

Please sign in to comment.