From 39ea3684f3f2c64474603b54c3c9250dbfb4748a Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 25 Aug 2017 15:11:43 -0400 Subject: [PATCH] storage: Consider recently-added replicas healthy when rebalancing Fixes #17879 --- pkg/storage/allocator.go | 21 +++++-- pkg/storage/allocator_test.go | 112 +++++++++++++++++++-------------- pkg/storage/replica.go | 31 +++++++++ pkg/storage/replicate_queue.go | 15 ++++- 4 files changed, 125 insertions(+), 54 deletions(-) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index ad241b70d08d..a70903376d09 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -847,9 +847,12 @@ func computeQuorum(nodes int) int { // filterBehindReplicas removes any "behind" replicas from the supplied // slice. A "behind" replica is one which is not at or past the quorum commit -// index. +// index. We forgive brandNewReplicaID for being at commit index 0, indicating +// that it's so new it hasn't finished initialized yet. func filterBehindReplicas( - raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor, + raftStatus *raft.Status, + replicas []roachpb.ReplicaDescriptor, + brandNewReplicaID roachpb.ReplicaID, ) []roachpb.ReplicaDescriptor { if raftStatus == nil || len(raftStatus.Progress) == 0 { // raftStatus.Progress is only populated on the Raft leader which means we @@ -867,7 +870,8 @@ func filterBehindReplicas( if progress, ok := raftStatus.Progress[uint64(r.ReplicaID)]; ok { if uint64(r.ReplicaID) == raftStatus.Lead || (progress.State == raft.ProgressStateReplicate && - progress.Match >= raftStatus.Commit) { + progress.Match >= raftStatus.Commit) || + (progress.Match == 0 && r.ReplicaID == brandNewReplicaID) { candidates = append(candidates, r) } } @@ -877,11 +881,16 @@ func filterBehindReplicas( // filterUnremovableReplicas removes any unremovable replicas from the supplied // slice. An unremovable replicas is one which is a necessary part of the -// quorum that will result from removing 1 replica. +// quorum that will result from removing 1 replica. We forgive brandNewReplicaID +// for being at commit index 0, indicating that it's so new it hasn't finished +// initialized yet. This is important when we've just added a replica in order +// to rebalance to it, since it may not be done initializing yet (#17879). func filterUnremovableReplicas( - raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor, + raftStatus *raft.Status, + replicas []roachpb.ReplicaDescriptor, + brandNewReplicaID roachpb.ReplicaID, ) []roachpb.ReplicaDescriptor { - upToDateReplicas := filterBehindReplicas(raftStatus, replicas) + upToDateReplicas := filterBehindReplicas(raftStatus, replicas, brandNewReplicaID) quorum := computeQuorum(len(replicas) - 1) if len(upToDateReplicas) < quorum { // The number of up-to-date replicas is less than quorum. No replicas can diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index c27d3ae9effd..c084f6dc1d04 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -2299,33 +2299,43 @@ func TestFilterBehindReplicas(t *testing.T) { defer leaktest.AfterTest(t)() testCases := []struct { - commit uint64 - leader uint64 - progress []uint64 - expected []uint64 + commit uint64 + leader uint64 + progress []uint64 + brandNewReplicaID roachpb.ReplicaID + expected []uint64 }{ - {0, 99, []uint64{0}, nil}, - {1, 99, []uint64{1}, []uint64{1}}, - {2, 99, []uint64{2}, []uint64{2}}, - {1, 99, []uint64{0, 1}, []uint64{1}}, - {1, 99, []uint64{1, 2}, []uint64{1, 2}}, - {2, 99, []uint64{3, 2}, []uint64{3, 2}}, - {1, 99, []uint64{0, 0, 1}, []uint64{1}}, - {1, 99, []uint64{0, 1, 2}, []uint64{1, 2}}, - {2, 99, []uint64{1, 2, 3}, []uint64{2, 3}}, - {3, 99, []uint64{4, 3, 2}, []uint64{4, 3}}, - {1, 99, []uint64{1, 1, 1}, []uint64{1, 1, 1}}, - {1, 99, []uint64{1, 1, 2}, []uint64{1, 1, 2}}, - {2, 99, []uint64{1, 2, 2}, []uint64{2, 2}}, - {2, 99, []uint64{0, 1, 2, 3}, []uint64{2, 3}}, - {2, 99, []uint64{1, 2, 3, 4}, []uint64{2, 3, 4}}, - {3, 99, []uint64{5, 4, 3, 2}, []uint64{5, 4, 3}}, - {3, 99, []uint64{1, 2, 3, 4, 5}, []uint64{3, 4, 5}}, - {4, 99, []uint64{6, 5, 4, 3, 2}, []uint64{6, 5, 4}}, - {0, 0, []uint64{0}, []uint64{0}}, - {0, 0, []uint64{0, 0, 0}, []uint64{0}}, - {1, 0, []uint64{2, 0, 1}, []uint64{2, 1}}, - {1, 1, []uint64{0, 2, 1}, []uint64{2, 1}}, + {0, 99, []uint64{0}, 0, nil}, + {1, 99, []uint64{1}, 0, []uint64{1}}, + {2, 99, []uint64{2}, 0, []uint64{2}}, + {1, 99, []uint64{0, 1}, 0, []uint64{1}}, + {1, 99, []uint64{1, 2}, 0, []uint64{1, 2}}, + {2, 99, []uint64{3, 2}, 0, []uint64{3, 2}}, + {1, 99, []uint64{0, 0, 1}, 0, []uint64{1}}, + {1, 99, []uint64{0, 1, 2}, 0, []uint64{1, 2}}, + {2, 99, []uint64{1, 2, 3}, 0, []uint64{2, 3}}, + {3, 99, []uint64{4, 3, 2}, 0, []uint64{4, 3}}, + {1, 99, []uint64{1, 1, 1}, 0, []uint64{1, 1, 1}}, + {1, 99, []uint64{1, 1, 2}, 0, []uint64{1, 1, 2}}, + {2, 99, []uint64{1, 2, 2}, 0, []uint64{2, 2}}, + {2, 99, []uint64{0, 1, 2, 3}, 0, []uint64{2, 3}}, + {2, 99, []uint64{1, 2, 3, 4}, 0, []uint64{2, 3, 4}}, + {3, 99, []uint64{5, 4, 3, 2}, 0, []uint64{5, 4, 3}}, + {3, 99, []uint64{1, 2, 3, 4, 5}, 0, []uint64{3, 4, 5}}, + {4, 99, []uint64{6, 5, 4, 3, 2}, 0, []uint64{6, 5, 4}}, + {4, 99, []uint64{6, 5, 4, 3, 2}, 0, []uint64{6, 5, 4}}, + {0, 1, []uint64{0}, 0, []uint64{0}}, + {0, 1, []uint64{0, 0, 0}, 0, []uint64{0}}, + {1, 1, []uint64{2, 0, 1}, 0, []uint64{2, 1}}, + {1, 2, []uint64{0, 2, 1}, 0, []uint64{2, 1}}, + {1, 99, []uint64{0, 1}, 1, []uint64{0, 1}}, + {1, 99, []uint64{0, 1}, 2, []uint64{1}}, + {9, 99, []uint64{0, 9}, 1, []uint64{0, 9}}, + {9, 99, []uint64{0, 1}, 1, []uint64{0}}, + {1, 1, []uint64{2, 0, 1}, 2, []uint64{2, 0, 1}}, + {1, 1, []uint64{2, 0, 1}, 3, []uint64{2, 1}}, + {4, 99, []uint64{6, 5, 4, 3, 2}, 5, []uint64{6, 5, 4}}, + {4, 99, []uint64{6, 5, 4, 3, 0}, 5, []uint64{6, 5, 4, 0}}, } for _, c := range testCases { t.Run("", func(t *testing.T) { @@ -2343,13 +2353,14 @@ func TestFilterBehindReplicas(t *testing.T) { if v == 0 { p.State = raft.ProgressStateProbe } - status.Progress[uint64(j)] = p + replicaID := uint64(j + 1) + status.Progress[replicaID] = p replicas = append(replicas, roachpb.ReplicaDescriptor{ - ReplicaID: roachpb.ReplicaID(j), + ReplicaID: roachpb.ReplicaID(replicaID), StoreID: roachpb.StoreID(v), }) } - candidates := filterBehindReplicas(status, replicas) + candidates := filterBehindReplicas(status, replicas, c.brandNewReplicaID) var ids []uint64 for _, c := range candidates { ids = append(ids, uint64(c.StoreID)) @@ -2365,21 +2376,29 @@ func TestFilterUnremovableReplicas(t *testing.T) { defer leaktest.AfterTest(t)() testCases := []struct { - commit uint64 - progress []uint64 - expected []uint64 + commit uint64 + progress []uint64 + brandNewReplicaID roachpb.ReplicaID + expected []uint64 }{ - {0, []uint64{0}, nil}, - {1, []uint64{1}, nil}, - {1, []uint64{0, 1}, []uint64{0}}, - {1, []uint64{1, 2, 3}, []uint64{1, 2, 3}}, - {2, []uint64{1, 2, 3}, []uint64{1}}, - {3, []uint64{1, 2, 3}, nil}, - {1, []uint64{1, 2, 3, 4}, []uint64{1, 2, 3, 4}}, - {2, []uint64{1, 2, 3, 4}, []uint64{1, 2, 3, 4}}, - {3, []uint64{1, 2, 3, 4}, []uint64{1, 2}}, - {2, []uint64{1, 2, 3, 4, 5}, []uint64{1, 2, 3, 4, 5}}, - {3, []uint64{1, 2, 3, 4, 5}, []uint64{1, 2}}, + {0, []uint64{0}, 0, nil}, + {1, []uint64{1}, 0, nil}, + {1, []uint64{0, 1}, 0, []uint64{0}}, + {1, []uint64{1, 2, 3}, 0, []uint64{1, 2, 3}}, + {2, []uint64{1, 2, 3}, 0, []uint64{1}}, + {3, []uint64{1, 2, 3}, 0, nil}, + {1, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2, 3, 4}}, + {2, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2, 3, 4}}, + {3, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2}}, + {2, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2, 3, 4, 5}}, + {3, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2}}, + {1, []uint64{1, 0}, 2, []uint64{1, 0}}, + {1, []uint64{1, 0}, 1, []uint64{0}}, + {3, []uint64{3, 2, 1}, 3, nil}, + {3, []uint64{3, 2, 0}, 3, []uint64{2}}, + {3, []uint64{4, 3, 2, 1}, 4, []uint64{2, 1}}, + {3, []uint64{4, 3, 2, 0}, 3, []uint64{2, 0}}, + {3, []uint64{4, 3, 2, 0}, 4, []uint64{4, 3, 2, 0}}, } for _, c := range testCases { t.Run("", func(t *testing.T) { @@ -2399,14 +2418,15 @@ func TestFilterUnremovableReplicas(t *testing.T) { if v == 0 { p.State = raft.ProgressStateProbe } - status.Progress[uint64(j)] = p + replicaID := uint64(j + 1) + status.Progress[replicaID] = p replicas = append(replicas, roachpb.ReplicaDescriptor{ - ReplicaID: roachpb.ReplicaID(j), + ReplicaID: roachpb.ReplicaID(replicaID), StoreID: roachpb.StoreID(v), }) } - candidates := filterUnremovableReplicas(status, replicas) + candidates := filterUnremovableReplicas(status, replicas, c.brandNewReplicaID) var ids []uint64 for _, c := range candidates { ids = append(ids, uint64(c.StoreID)) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 9534e4cc9da8..dd2bc9fbd732 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -338,6 +338,10 @@ type Replica struct { // The ID of the leader replica within the Raft group. Used to determine // when the leadership changes. leaderID roachpb.ReplicaID + // The time at which the most recently added replica for the range was added. + // Used to determine whether a replica is new enough that it might not have + // finished receiving/applying its first snapshot yet. + lastReplicaAdded time.Time // The last seen replica descriptors from incoming Raft messages. These are // stored so that the replica still knows the replica descriptors for itself @@ -1454,10 +1458,37 @@ func (r *Replica) setDescWithoutProcessUpdate(desc *roachpb.RangeDescriptor) { r.mu.state.Desc, desc) } + prevMaxID := maxReplicaID(r.mu.state.Desc) + newMaxID := maxReplicaID(desc) + if newMaxID > prevMaxID { + r.mu.lastReplicaAdded = timeutil.Now() + } + r.rangeStr.store(r.mu.replicaID, desc) r.mu.state.Desc = desc } +func maxReplicaID(desc *roachpb.RangeDescriptor) roachpb.ReplicaID { + if desc == nil || !desc.IsInitialized() { + return 0 + } + var maxID roachpb.ReplicaID + for _, repl := range desc.Replicas { + if repl.ReplicaID > maxID { + maxID = repl.ReplicaID + } + } + return maxID +} + +// LastReplicaAdded returns the ID of the most recently added replica and the +// time at which it was added. +func (r *Replica) LastReplicaAdded() (roachpb.ReplicaID, time.Time) { + r.mu.RLock() + defer r.mu.RUnlock() + return maxReplicaID(r.mu.state.Desc), r.mu.lastReplicaAdded +} + // GetReplicaDescriptor returns the replica for this range from the range // descriptor. Returns a *RangeNotFoundError if the replica is not found. // No other errors are returned. diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index eb44a1187ae7..fb83f8648610 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -43,6 +43,11 @@ const ( // for rebalancing. It does not prevent transferring leases in order to allow // a replica to be removed from a range. minLeaseTransferInterval = time.Second + + // newReplicaGracePeriod is the amount of time that we allow for a new + // replica's raft state to catch up to the leader's before we start + // considering it to be behind for the sake of rebalancing. + newReplicaGracePeriod = 5 * time.Minute ) var ( @@ -338,7 +343,13 @@ func (rq *replicateQueue) processOneChange( if log.V(1) { log.Infof(ctx, "removing a replica") } - candidates := filterUnremovableReplicas(repl.RaftStatus(), desc.Replicas) + lastReplAdded, lastAddedTime := repl.LastReplicaAdded() + if timeutil.Since(lastAddedTime) < newReplicaGracePeriod { + lastReplAdded = 0 + } + candidates := filterUnremovableReplicas(repl.RaftStatus(), desc.Replicas, lastReplAdded) + log.VEventf(ctx, 3, "filtered unremovable replicas from %v to get %v as candidates for removal", + desc.Replicas, candidates) removeReplica, details, err := rq.allocator.RemoveTarget(ctx, zone.Constraints, candidates, rangeInfo) if err != nil { return false, err @@ -530,7 +541,7 @@ func (rq *replicateQueue) transferLease( checkTransferLeaseSource bool, checkCandidateFullness bool, ) (bool, error) { - candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas) + candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas, 0 /* lastAddedRepl */) if target := rq.allocator.TransferLeaseTarget( ctx, zone.Constraints,