Skip to content

Commit

Permalink
storage: Consider recently-added replicas healthy when rebalancing
Browse files Browse the repository at this point in the history
  • Loading branch information
a-robinson committed Aug 25, 2017
1 parent 2c4f14b commit 39ea368
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 54 deletions.
21 changes: 15 additions & 6 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,12 @@ func computeQuorum(nodes int) int {

// filterBehindReplicas removes any "behind" replicas from the supplied
// slice. A "behind" replica is one which is not at or past the quorum commit
// index.
// index. We forgive brandNewReplicaID for being at commit index 0, indicating
// that it's so new it hasn't finished initialized yet.
func filterBehindReplicas(
raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor,
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
if raftStatus == nil || len(raftStatus.Progress) == 0 {
// raftStatus.Progress is only populated on the Raft leader which means we
Expand All @@ -867,7 +870,8 @@ func filterBehindReplicas(
if progress, ok := raftStatus.Progress[uint64(r.ReplicaID)]; ok {
if uint64(r.ReplicaID) == raftStatus.Lead ||
(progress.State == raft.ProgressStateReplicate &&
progress.Match >= raftStatus.Commit) {
progress.Match >= raftStatus.Commit) ||
(progress.Match == 0 && r.ReplicaID == brandNewReplicaID) {
candidates = append(candidates, r)
}
}
Expand All @@ -877,11 +881,16 @@ func filterBehindReplicas(

// filterUnremovableReplicas removes any unremovable replicas from the supplied
// slice. An unremovable replicas is one which is a necessary part of the
// quorum that will result from removing 1 replica.
// quorum that will result from removing 1 replica. We forgive brandNewReplicaID
// for being at commit index 0, indicating that it's so new it hasn't finished
// initialized yet. This is important when we've just added a replica in order
// to rebalance to it, since it may not be done initializing yet (#17879).
func filterUnremovableReplicas(
raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor,
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
upToDateReplicas := filterBehindReplicas(raftStatus, replicas)
upToDateReplicas := filterBehindReplicas(raftStatus, replicas, brandNewReplicaID)
quorum := computeQuorum(len(replicas) - 1)
if len(upToDateReplicas) < quorum {
// The number of up-to-date replicas is less than quorum. No replicas can
Expand Down
112 changes: 66 additions & 46 deletions pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2299,33 +2299,43 @@ func TestFilterBehindReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
commit uint64
leader uint64
progress []uint64
expected []uint64
commit uint64
leader uint64
progress []uint64
brandNewReplicaID roachpb.ReplicaID
expected []uint64
}{
{0, 99, []uint64{0}, nil},
{1, 99, []uint64{1}, []uint64{1}},
{2, 99, []uint64{2}, []uint64{2}},
{1, 99, []uint64{0, 1}, []uint64{1}},
{1, 99, []uint64{1, 2}, []uint64{1, 2}},
{2, 99, []uint64{3, 2}, []uint64{3, 2}},
{1, 99, []uint64{0, 0, 1}, []uint64{1}},
{1, 99, []uint64{0, 1, 2}, []uint64{1, 2}},
{2, 99, []uint64{1, 2, 3}, []uint64{2, 3}},
{3, 99, []uint64{4, 3, 2}, []uint64{4, 3}},
{1, 99, []uint64{1, 1, 1}, []uint64{1, 1, 1}},
{1, 99, []uint64{1, 1, 2}, []uint64{1, 1, 2}},
{2, 99, []uint64{1, 2, 2}, []uint64{2, 2}},
{2, 99, []uint64{0, 1, 2, 3}, []uint64{2, 3}},
{2, 99, []uint64{1, 2, 3, 4}, []uint64{2, 3, 4}},
{3, 99, []uint64{5, 4, 3, 2}, []uint64{5, 4, 3}},
{3, 99, []uint64{1, 2, 3, 4, 5}, []uint64{3, 4, 5}},
{4, 99, []uint64{6, 5, 4, 3, 2}, []uint64{6, 5, 4}},
{0, 0, []uint64{0}, []uint64{0}},
{0, 0, []uint64{0, 0, 0}, []uint64{0}},
{1, 0, []uint64{2, 0, 1}, []uint64{2, 1}},
{1, 1, []uint64{0, 2, 1}, []uint64{2, 1}},
{0, 99, []uint64{0}, 0, nil},
{1, 99, []uint64{1}, 0, []uint64{1}},
{2, 99, []uint64{2}, 0, []uint64{2}},
{1, 99, []uint64{0, 1}, 0, []uint64{1}},
{1, 99, []uint64{1, 2}, 0, []uint64{1, 2}},
{2, 99, []uint64{3, 2}, 0, []uint64{3, 2}},
{1, 99, []uint64{0, 0, 1}, 0, []uint64{1}},
{1, 99, []uint64{0, 1, 2}, 0, []uint64{1, 2}},
{2, 99, []uint64{1, 2, 3}, 0, []uint64{2, 3}},
{3, 99, []uint64{4, 3, 2}, 0, []uint64{4, 3}},
{1, 99, []uint64{1, 1, 1}, 0, []uint64{1, 1, 1}},
{1, 99, []uint64{1, 1, 2}, 0, []uint64{1, 1, 2}},
{2, 99, []uint64{1, 2, 2}, 0, []uint64{2, 2}},
{2, 99, []uint64{0, 1, 2, 3}, 0, []uint64{2, 3}},
{2, 99, []uint64{1, 2, 3, 4}, 0, []uint64{2, 3, 4}},
{3, 99, []uint64{5, 4, 3, 2}, 0, []uint64{5, 4, 3}},
{3, 99, []uint64{1, 2, 3, 4, 5}, 0, []uint64{3, 4, 5}},
{4, 99, []uint64{6, 5, 4, 3, 2}, 0, []uint64{6, 5, 4}},
{4, 99, []uint64{6, 5, 4, 3, 2}, 0, []uint64{6, 5, 4}},
{0, 1, []uint64{0}, 0, []uint64{0}},
{0, 1, []uint64{0, 0, 0}, 0, []uint64{0}},
{1, 1, []uint64{2, 0, 1}, 0, []uint64{2, 1}},
{1, 2, []uint64{0, 2, 1}, 0, []uint64{2, 1}},
{1, 99, []uint64{0, 1}, 1, []uint64{0, 1}},
{1, 99, []uint64{0, 1}, 2, []uint64{1}},
{9, 99, []uint64{0, 9}, 1, []uint64{0, 9}},
{9, 99, []uint64{0, 1}, 1, []uint64{0}},
{1, 1, []uint64{2, 0, 1}, 2, []uint64{2, 0, 1}},
{1, 1, []uint64{2, 0, 1}, 3, []uint64{2, 1}},
{4, 99, []uint64{6, 5, 4, 3, 2}, 5, []uint64{6, 5, 4}},
{4, 99, []uint64{6, 5, 4, 3, 0}, 5, []uint64{6, 5, 4, 0}},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
Expand All @@ -2343,13 +2353,14 @@ func TestFilterBehindReplicas(t *testing.T) {
if v == 0 {
p.State = raft.ProgressStateProbe
}
status.Progress[uint64(j)] = p
replicaID := uint64(j + 1)
status.Progress[replicaID] = p
replicas = append(replicas, roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(j),
ReplicaID: roachpb.ReplicaID(replicaID),
StoreID: roachpb.StoreID(v),
})
}
candidates := filterBehindReplicas(status, replicas)
candidates := filterBehindReplicas(status, replicas, c.brandNewReplicaID)
var ids []uint64
for _, c := range candidates {
ids = append(ids, uint64(c.StoreID))
Expand All @@ -2365,21 +2376,29 @@ func TestFilterUnremovableReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
commit uint64
progress []uint64
expected []uint64
commit uint64
progress []uint64
brandNewReplicaID roachpb.ReplicaID
expected []uint64
}{
{0, []uint64{0}, nil},
{1, []uint64{1}, nil},
{1, []uint64{0, 1}, []uint64{0}},
{1, []uint64{1, 2, 3}, []uint64{1, 2, 3}},
{2, []uint64{1, 2, 3}, []uint64{1}},
{3, []uint64{1, 2, 3}, nil},
{1, []uint64{1, 2, 3, 4}, []uint64{1, 2, 3, 4}},
{2, []uint64{1, 2, 3, 4}, []uint64{1, 2, 3, 4}},
{3, []uint64{1, 2, 3, 4}, []uint64{1, 2}},
{2, []uint64{1, 2, 3, 4, 5}, []uint64{1, 2, 3, 4, 5}},
{3, []uint64{1, 2, 3, 4, 5}, []uint64{1, 2}},
{0, []uint64{0}, 0, nil},
{1, []uint64{1}, 0, nil},
{1, []uint64{0, 1}, 0, []uint64{0}},
{1, []uint64{1, 2, 3}, 0, []uint64{1, 2, 3}},
{2, []uint64{1, 2, 3}, 0, []uint64{1}},
{3, []uint64{1, 2, 3}, 0, nil},
{1, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2, 3, 4}},
{2, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2, 3, 4}},
{3, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2}},
{2, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2, 3, 4, 5}},
{3, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2}},
{1, []uint64{1, 0}, 2, []uint64{1, 0}},
{1, []uint64{1, 0}, 1, []uint64{0}},
{3, []uint64{3, 2, 1}, 3, nil},
{3, []uint64{3, 2, 0}, 3, []uint64{2}},
{3, []uint64{4, 3, 2, 1}, 4, []uint64{2, 1}},
{3, []uint64{4, 3, 2, 0}, 3, []uint64{2, 0}},
{3, []uint64{4, 3, 2, 0}, 4, []uint64{4, 3, 2, 0}},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
Expand All @@ -2399,14 +2418,15 @@ func TestFilterUnremovableReplicas(t *testing.T) {
if v == 0 {
p.State = raft.ProgressStateProbe
}
status.Progress[uint64(j)] = p
replicaID := uint64(j + 1)
status.Progress[replicaID] = p
replicas = append(replicas, roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(j),
ReplicaID: roachpb.ReplicaID(replicaID),
StoreID: roachpb.StoreID(v),
})
}

candidates := filterUnremovableReplicas(status, replicas)
candidates := filterUnremovableReplicas(status, replicas, c.brandNewReplicaID)
var ids []uint64
for _, c := range candidates {
ids = append(ids, uint64(c.StoreID))
Expand Down
31 changes: 31 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ type Replica struct {
// The ID of the leader replica within the Raft group. Used to determine
// when the leadership changes.
leaderID roachpb.ReplicaID
// The time at which the most recently added replica for the range was added.
// Used to determine whether a replica is new enough that it might not have
// finished receiving/applying its first snapshot yet.
lastReplicaAdded time.Time

// The last seen replica descriptors from incoming Raft messages. These are
// stored so that the replica still knows the replica descriptors for itself
Expand Down Expand Up @@ -1454,10 +1458,37 @@ func (r *Replica) setDescWithoutProcessUpdate(desc *roachpb.RangeDescriptor) {
r.mu.state.Desc, desc)
}

prevMaxID := maxReplicaID(r.mu.state.Desc)
newMaxID := maxReplicaID(desc)
if newMaxID > prevMaxID {
r.mu.lastReplicaAdded = timeutil.Now()
}

r.rangeStr.store(r.mu.replicaID, desc)
r.mu.state.Desc = desc
}

func maxReplicaID(desc *roachpb.RangeDescriptor) roachpb.ReplicaID {
if desc == nil || !desc.IsInitialized() {
return 0
}
var maxID roachpb.ReplicaID
for _, repl := range desc.Replicas {
if repl.ReplicaID > maxID {
maxID = repl.ReplicaID
}
}
return maxID
}

// LastReplicaAdded returns the ID of the most recently added replica and the
// time at which it was added.
func (r *Replica) LastReplicaAdded() (roachpb.ReplicaID, time.Time) {
r.mu.RLock()
defer r.mu.RUnlock()
return maxReplicaID(r.mu.state.Desc), r.mu.lastReplicaAdded
}

// GetReplicaDescriptor returns the replica for this range from the range
// descriptor. Returns a *RangeNotFoundError if the replica is not found.
// No other errors are returned.
Expand Down
15 changes: 13 additions & 2 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ const (
// for rebalancing. It does not prevent transferring leases in order to allow
// a replica to be removed from a range.
minLeaseTransferInterval = time.Second

// newReplicaGracePeriod is the amount of time that we allow for a new
// replica's raft state to catch up to the leader's before we start
// considering it to be behind for the sake of rebalancing.
newReplicaGracePeriod = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -338,7 +343,13 @@ func (rq *replicateQueue) processOneChange(
if log.V(1) {
log.Infof(ctx, "removing a replica")
}
candidates := filterUnremovableReplicas(repl.RaftStatus(), desc.Replicas)
lastReplAdded, lastAddedTime := repl.LastReplicaAdded()
if timeutil.Since(lastAddedTime) < newReplicaGracePeriod {
lastReplAdded = 0
}
candidates := filterUnremovableReplicas(repl.RaftStatus(), desc.Replicas, lastReplAdded)
log.VEventf(ctx, 3, "filtered unremovable replicas from %v to get %v as candidates for removal",
desc.Replicas, candidates)
removeReplica, details, err := rq.allocator.RemoveTarget(ctx, zone.Constraints, candidates, rangeInfo)
if err != nil {
return false, err
Expand Down Expand Up @@ -530,7 +541,7 @@ func (rq *replicateQueue) transferLease(
checkTransferLeaseSource bool,
checkCandidateFullness bool,
) (bool, error) {
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas)
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas, 0 /* lastAddedRepl */)
if target := rq.allocator.TransferLeaseTarget(
ctx,
zone.Constraints,
Expand Down

0 comments on commit 39ea368

Please sign in to comment.