Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: remove special-case in FilterUnremovableReplicas for recently-added replicas #84160

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ go_test(
"replica_evaluate_test.go",
"replica_follower_read_test.go",
"replica_gc_queue_test.go",
"replica_init_test.go",
"replica_learner_test.go",
"replica_lease_renewal_test.go",
"replica_metrics_test.go",
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@io_etcd_go_etcd_raft_v3//:raft",
"@io_etcd_go_etcd_raft_v3//tracker",
],
)

Expand Down
49 changes: 5 additions & 44 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/tracker"
)

const (
Expand Down Expand Up @@ -1285,8 +1284,7 @@ func (a Allocator) RebalanceTarget(
// it's better to simulate the removal with the info that we do have than to
// assume that the rebalance is ok (#20241).
if targetType == VoterTarget && raftStatus != nil && raftStatus.Progress != nil {
replicaCandidates = simulateFilterUnremovableReplicas(
ctx, raftStatus, replicaCandidates, newReplica.ReplicaID)
replicaCandidates = FilterUnremovableReplicas(ctx, raftStatus, replicaCandidates)
}
if len(replicaCandidates) == 0 {
// No existing replicas are suitable to remove.
Expand Down Expand Up @@ -2237,35 +2235,11 @@ func excludeReplicasInNeedOfSnapshots(
return replicas[:filled]
}

// simulateFilterUnremovableReplicas removes any unremovable replicas from the
// supplied slice. Unlike FilterUnremovableReplicas, brandNewReplicaID is
// considered up-to-date (and thus can participate in quorum), but is not
// considered a candidate for removal.
func simulateFilterUnremovableReplicas(
ctx context.Context,
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
status := *raftStatus
status.Progress[uint64(brandNewReplicaID)] = tracker.Progress{
State: tracker.StateReplicate,
Match: status.Commit,
}
return FilterUnremovableReplicas(ctx, &status, replicas, brandNewReplicaID)
}

// FilterUnremovableReplicas removes any unremovable replicas from the supplied
// slice. An unremovable replica is one which is a necessary part of the
// quorum that will result from removing 1 replica. We forgive brandNewReplicaID
// for being behind, since a new range can take a little while to catch up.
// This is important when we've just added a replica in order to rebalance to
// it (#17879).
// quorum that will result from removing 1 replica.
func FilterUnremovableReplicas(
ctx context.Context,
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
ctx context.Context, raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor,
) []roachpb.ReplicaDescriptor {
upToDateReplicas := FilterBehindReplicas(ctx, raftStatus, replicas)
oldQuorum := computeQuorum(len(replicas))
Expand All @@ -2280,27 +2254,14 @@ func FilterUnremovableReplicas(
newQuorum := computeQuorum(len(replicas) - 1)
if len(upToDateReplicas) > newQuorum {
// The number of up-to-date replicas is larger than the new quorum. Any
// replica can be removed, though we want to filter out brandNewReplicaID.
if brandNewReplicaID != 0 {
candidates := make([]roachpb.ReplicaDescriptor, 0, len(replicas)-len(upToDateReplicas))
for _, r := range replicas {
if r.ReplicaID != brandNewReplicaID {
candidates = append(candidates, r)
}
}
return candidates
}
// replica can be removed.
return replicas
}

// The number of up-to-date replicas is equal to the new quorum. Only allow
// removal of behind replicas (except for brandNewReplicaID which is given a
// free pass).
// removal of behind replicas.
candidates := make([]roachpb.ReplicaDescriptor, 0, len(replicas)-len(upToDateReplicas))
necessary := func(r roachpb.ReplicaDescriptor) bool {
if r.ReplicaID == brandNewReplicaID {
return true
}
for _, t := range upToDateReplicas {
if t == r {
return true
Expand Down
102 changes: 16 additions & 86 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7179,92 +7179,22 @@ func TestFilterUnremovableReplicas(t *testing.T) {
ctx := context.Background()

testCases := []struct {
commit uint64
progress []uint64
brandNewReplicaID roachpb.ReplicaID
expected []uint64
}{
{0, []uint64{0}, 0, nil},
{1, []uint64{1}, 0, nil},
{1, []uint64{0, 1}, 0, nil},
{1, []uint64{1, 2}, 0, []uint64{1, 2}},
{1, []uint64{1, 2, 3}, 0, []uint64{1, 2, 3}},
{2, []uint64{1, 2, 3}, 0, []uint64{1}},
{3, []uint64{1, 2, 3}, 0, nil},
{1, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2, 3, 4}},
{2, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2, 3, 4}},
{3, []uint64{1, 2, 3, 4}, 0, nil},
{2, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2, 3, 4, 5}},
{3, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2}},
{1, []uint64{1, 0}, 2, nil},
{1, []uint64{2, 1}, 2, []uint64{2}},
{1, []uint64{1, 0}, 1, nil},
{1, []uint64{2, 1}, 1, []uint64{1}},
{3, []uint64{3, 2, 1}, 3, nil},
{3, []uint64{3, 2, 0}, 3, nil},
{2, []uint64{4, 3, 2, 1}, 4, []uint64{4, 3, 2}},
{2, []uint64{4, 3, 2, 0}, 3, []uint64{4, 3, 0}},
{2, []uint64{4, 3, 2, 0}, 4, []uint64{4, 3, 2}},
{3, []uint64{4, 3, 2, 1}, 0, nil},
{3, []uint64{4, 3, 2, 1}, 4, nil},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
status := &raft.Status{
Progress: make(map[uint64]tracker.Progress),
}
// Use an invalid replica ID for the leader. TestFilterBehindReplicas covers
// valid replica IDs.
status.Lead = 99
status.RaftState = raft.StateLeader
status.Commit = c.commit
var replicas []roachpb.ReplicaDescriptor
for j, v := range c.progress {
p := tracker.Progress{
Match: v,
State: tracker.StateReplicate,
}
if v == 0 {
p.State = tracker.StateProbe
}
replicaID := uint64(j + 1)
status.Progress[replicaID] = p
replicas = append(replicas, roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(replicaID),
StoreID: roachpb.StoreID(v),
})
}

candidates := FilterUnremovableReplicas(ctx, status, replicas, c.brandNewReplicaID)
var ids []uint64
for _, c := range candidates {
ids = append(ids, uint64(c.StoreID))
}
if !reflect.DeepEqual(c.expected, ids) {
t.Fatalf("expected %d, but got %d", c.expected, ids)
}
})
}
}

func TestSimulateFilterUnremovableReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

testCases := []struct {
commit uint64
progress []uint64
brandNewReplicaID roachpb.ReplicaID
expected []uint64
commit uint64
progress []uint64
expected []uint64
}{
{1, []uint64{1, 0}, 2, []uint64{1}},
{1, []uint64{1, 0}, 1, nil},
{3, []uint64{3, 2, 1}, 3, []uint64{2}},
{3, []uint64{3, 2, 0}, 3, []uint64{2}},
{3, []uint64{4, 3, 2, 1}, 4, []uint64{4, 3, 2}},
{3, []uint64{4, 3, 2, 0}, 3, []uint64{4, 3, 0}},
{3, []uint64{4, 3, 2, 0}, 4, []uint64{4, 3, 2}},
{0, []uint64{0}, nil},
{1, []uint64{1}, nil},
{1, []uint64{0, 1}, nil},
{1, []uint64{1, 2}, []uint64{1, 2}},
{1, []uint64{1, 2, 3}, []uint64{1, 2, 3}},
{2, []uint64{1, 2, 3}, []uint64{1}},
{3, []uint64{1, 2, 3}, nil},
{1, []uint64{1, 2, 3, 4}, []uint64{1, 2, 3, 4}},
{2, []uint64{1, 2, 3, 4}, []uint64{1, 2, 3, 4}},
{3, []uint64{1, 2, 3, 4}, nil},
{2, []uint64{1, 2, 3, 4, 5}, []uint64{1, 2, 3, 4, 5}},
{3, []uint64{1, 2, 3, 4, 5}, []uint64{1, 2}},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
Expand Down Expand Up @@ -7293,7 +7223,7 @@ func TestSimulateFilterUnremovableReplicas(t *testing.T) {
})
}

candidates := simulateFilterUnremovableReplicas(ctx, status, replicas, c.brandNewReplicaID)
candidates := FilterUnremovableReplicas(ctx, status, replicas)
var ids []uint64
for _, c := range candidates {
ids = append(ids, uint64(c.StoreID))
Expand Down
29 changes: 0 additions & 29 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,6 @@ type Replica struct {
// The ID of the leader replica within the Raft group. Used to determine
// when the leadership changes.
leaderID roachpb.ReplicaID
// The most recently added replica for the range and when it was added.
// Used to determine whether a replica is new enough that we shouldn't
// penalize it for being slightly behind. These field gets cleared out once
// we know that the replica has caught up.
lastReplicaAdded roachpb.ReplicaID
lastReplicaAddedTime time.Time

// The most recently updated time for each follower of this range. This is updated
// every time a Raft message is received from a peer.
Expand Down Expand Up @@ -1055,29 +1049,6 @@ func (r *Replica) shouldIgnoreStrictGCEnforcementRLocked() (ret bool) {
return r.mu.conf.GCPolicy.IgnoreStrictEnforcement
}

// maxReplicaIDOfAny returns the maximum ReplicaID of any replica, including
// voters and learners.
func maxReplicaIDOfAny(desc *roachpb.RangeDescriptor) roachpb.ReplicaID {
if desc == nil || !desc.IsInitialized() {
return 0
}
var maxID roachpb.ReplicaID
for _, repl := range desc.Replicas().Descriptors() {
if repl.ReplicaID > maxID {
maxID = repl.ReplicaID
}
}
return maxID
}

// LastReplicaAdded returns the ID of the most recently added replica and the
// time at which it was added.
func (r *Replica) LastReplicaAdded() (roachpb.ReplicaID, time.Time) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.lastReplicaAdded, r.mu.lastReplicaAddedTime
}

// GetReplicaDescriptor returns the replica for this range from the range
// descriptor. Returns a *RangeNotFoundError if the replica is not found.
// No other errors are returned.
Expand Down
13 changes: 0 additions & 13 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,19 +364,6 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
}
}

// Determine if a new replica was added. This is true if the new max replica
// ID is greater than the old max replica ID.
oldMaxID := maxReplicaIDOfAny(r.mu.state.Desc)
newMaxID := maxReplicaIDOfAny(desc)
if newMaxID > oldMaxID {
r.mu.lastReplicaAdded = newMaxID
r.mu.lastReplicaAddedTime = timeutil.Now()
} else if r.mu.lastReplicaAdded > newMaxID {
// The last replica added was removed.
r.mu.lastReplicaAdded = 0
r.mu.lastReplicaAddedTime = time.Time{}
}

r.rangeStr.store(r.replicaID, desc)
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
r.concMgr.OnRangeDescUpdated(desc)
Expand Down
76 changes: 0 additions & 76 deletions pkg/kv/kvserver/replica_init_test.go

This file was deleted.

10 changes: 0 additions & 10 deletions pkg/kv/kvserver/replica_proposal_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(

// Find the minimum index that active followers have acknowledged.
now := timeutil.Now()
// commitIndex is used to determine whether a newly added replica has fully
// caught up.
commitIndex := status.Commit
// Initialize minIndex to the currently applied index. The below progress
// checks will only decrease the minIndex. Given that the quotaReleaseQueue
// cannot correspond to values beyond the applied index there's no reason
Expand Down Expand Up @@ -210,13 +207,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
if progress.Match > 0 && progress.Match < minIndex {
minIndex = progress.Match
}
// If this is the most recently added replica and it has caught up, clear
// our state that was tracking it. This is unrelated to managing proposal
// quota, but this is a convenient place to do so.
if rep.ReplicaID == r.mu.lastReplicaAdded && progress.Match >= commitIndex {
r.mu.lastReplicaAdded = 0
r.mu.lastReplicaAddedTime = time.Time{}
}
})

if r.mu.proposalQuotaBaseIndex < minIndex {
Expand Down
Loading