Skip to content

Commit

Permalink
storage: fix filterUnremovableReplicas badness
Browse files Browse the repository at this point in the history
`filterUnremovableReplicas` was allowing replicas that were a necessary
part of quorum to be removed. This occurred due to
`filterBehindReplicas` taking a "brand new replica" that was being
considered up-to-date even when we didn't have evidence of it being
up-to-date. `filterBehindReplicas` needs to return an accurate picture
of the up-to-date replicas. Rather than push this work into
`filterBehindReplicas`, `filterUnremovableReplicas` has been changed to
perform the filtering of the "brand new replica" from the removable
candidates.

See cockroachdb#34122

Release note: None
  • Loading branch information
petermattis committed Jan 21, 2019
1 parent 3ccf0d4 commit 0d6bd16
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 54 deletions.
36 changes: 27 additions & 9 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,12 +1157,9 @@ func computeQuorum(nodes int) int {

// filterBehindReplicas removes any "behind" replicas from the supplied
// slice. A "behind" replica is one which is not at or past the quorum commit
// index. We forgive brandNewReplicaID for being behind, since a new range can
// take a little while to fully catch up.
// index.
func filterBehindReplicas(
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor,
) []roachpb.ReplicaDescriptor {
if raftStatus == nil || len(raftStatus.Progress) == 0 {
// raftStatus.Progress is only populated on the Raft leader which means we
Expand All @@ -1172,7 +1169,7 @@ func filterBehindReplicas(
}
candidates := make([]roachpb.ReplicaDescriptor, 0, len(replicas))
for _, r := range replicas {
if !replicaIsBehind(raftStatus, r.ReplicaID) || r.ReplicaID == brandNewReplicaID {
if !replicaIsBehind(raftStatus, r.ReplicaID) {
candidates = append(candidates, r)
}
}
Expand All @@ -1198,13 +1195,20 @@ func replicaIsBehind(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool
return true
}

// simulateFilterUnremovableReplicas removes any unremovable replicas from the
// supplied slice. Unlike filterUnremovableReplicas, brandNewReplicaID is
// considered up-to-date (and thus can participiate in quorum), but is not
// considered a candidate for removal.
func simulateFilterUnremovableReplicas(
raftStatus *raft.Status,
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
status := *raftStatus
status.Progress[uint64(brandNewReplicaID)] = raft.Progress{Match: 0}
status.Progress[uint64(brandNewReplicaID)] = raft.Progress{
State: raft.ProgressStateReplicate,
Match: status.Commit,
}
return filterUnremovableReplicas(&status, replicas, brandNewReplicaID)
}

Expand All @@ -1219,20 +1223,34 @@ func filterUnremovableReplicas(
replicas []roachpb.ReplicaDescriptor,
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
upToDateReplicas := filterBehindReplicas(raftStatus, replicas, brandNewReplicaID)
upToDateReplicas := filterBehindReplicas(raftStatus, replicas)
quorum := computeQuorum(len(replicas) - 1)
if len(upToDateReplicas) < quorum {
// The number of up-to-date replicas is less than quorum. No replicas can
// be removed.
return nil
}

if len(upToDateReplicas) > quorum {
// The number of up-to-date replicas is larger than quorum. Any replica can
// be removed.
// be removed, though we want to filter out brandNewReplicaID
if brandNewReplicaID != 0 {
candidates := make([]roachpb.ReplicaDescriptor, 0, len(replicas)-len(upToDateReplicas))
for _, r := range replicas {
if r.ReplicaID != brandNewReplicaID {
candidates = append(candidates, r)
}
}
return candidates
}
return replicas
}

candidates := make([]roachpb.ReplicaDescriptor, 0, len(replicas)-len(upToDateReplicas))
necessary := func(r roachpb.ReplicaDescriptor) bool {
if r.ReplicaID == brandNewReplicaID {
return true
}
for _, t := range upToDateReplicas {
if t == r {
return true
Expand Down
130 changes: 88 additions & 42 deletions pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5155,43 +5155,34 @@ func TestFilterBehindReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
commit uint64
leader uint64
progress []uint64
brandNewReplicaID roachpb.ReplicaID
expected []uint64
commit uint64
leader uint64
progress []uint64
expected []uint64
}{
{0, 99, []uint64{0}, 0, nil},
{1, 99, []uint64{1}, 0, []uint64{1}},
{2, 99, []uint64{2}, 0, []uint64{2}},
{1, 99, []uint64{0, 1}, 0, []uint64{1}},
{1, 99, []uint64{1, 2}, 0, []uint64{1, 2}},
{2, 99, []uint64{3, 2}, 0, []uint64{3, 2}},
{1, 99, []uint64{0, 0, 1}, 0, []uint64{1}},
{1, 99, []uint64{0, 1, 2}, 0, []uint64{1, 2}},
{2, 99, []uint64{1, 2, 3}, 0, []uint64{2, 3}},
{3, 99, []uint64{4, 3, 2}, 0, []uint64{4, 3}},
{1, 99, []uint64{1, 1, 1}, 0, []uint64{1, 1, 1}},
{1, 99, []uint64{1, 1, 2}, 0, []uint64{1, 1, 2}},
{2, 99, []uint64{1, 2, 2}, 0, []uint64{2, 2}},
{2, 99, []uint64{0, 1, 2, 3}, 0, []uint64{2, 3}},
{2, 99, []uint64{1, 2, 3, 4}, 0, []uint64{2, 3, 4}},
{3, 99, []uint64{5, 4, 3, 2}, 0, []uint64{5, 4, 3}},
{3, 99, []uint64{1, 2, 3, 4, 5}, 0, []uint64{3, 4, 5}},
{4, 99, []uint64{6, 5, 4, 3, 2}, 0, []uint64{6, 5, 4}},
{4, 99, []uint64{6, 5, 4, 3, 2}, 0, []uint64{6, 5, 4}},
{0, 1, []uint64{0}, 0, []uint64{0}},
{0, 1, []uint64{0, 0, 0}, 0, []uint64{0}},
{1, 1, []uint64{2, 0, 1}, 0, []uint64{2, 1}},
{1, 2, []uint64{0, 2, 1}, 0, []uint64{2, 1}},
{1, 99, []uint64{0, 1}, 1, []uint64{0, 1}},
{1, 99, []uint64{0, 1}, 2, []uint64{1}},
{9, 99, []uint64{0, 9}, 1, []uint64{0, 9}},
{9, 99, []uint64{0, 1}, 1, []uint64{0}},
{1, 1, []uint64{2, 0, 1}, 2, []uint64{2, 0, 1}},
{1, 1, []uint64{2, 0, 1}, 3, []uint64{2, 1}},
{4, 99, []uint64{6, 5, 4, 3, 2}, 5, []uint64{6, 5, 4, 2}},
{4, 99, []uint64{6, 5, 4, 3, 0}, 5, []uint64{6, 5, 4, 0}},
{0, 99, []uint64{0}, nil},
{1, 99, []uint64{1}, []uint64{1}},
{2, 99, []uint64{2}, []uint64{2}},
{1, 99, []uint64{0, 1}, []uint64{1}},
{1, 99, []uint64{1, 2}, []uint64{1, 2}},
{2, 99, []uint64{3, 2}, []uint64{3, 2}},
{1, 99, []uint64{0, 0, 1}, []uint64{1}},
{1, 99, []uint64{0, 1, 2}, []uint64{1, 2}},
{2, 99, []uint64{1, 2, 3}, []uint64{2, 3}},
{3, 99, []uint64{4, 3, 2}, []uint64{4, 3}},
{1, 99, []uint64{1, 1, 1}, []uint64{1, 1, 1}},
{1, 99, []uint64{1, 1, 2}, []uint64{1, 1, 2}},
{2, 99, []uint64{1, 2, 2}, []uint64{2, 2}},
{2, 99, []uint64{0, 1, 2, 3}, []uint64{2, 3}},
{2, 99, []uint64{1, 2, 3, 4}, []uint64{2, 3, 4}},
{3, 99, []uint64{5, 4, 3, 2}, []uint64{5, 4, 3}},
{3, 99, []uint64{1, 2, 3, 4, 5}, []uint64{3, 4, 5}},
{4, 99, []uint64{6, 5, 4, 3, 2}, []uint64{6, 5, 4}},
{4, 99, []uint64{6, 5, 4, 3, 2}, []uint64{6, 5, 4}},
{0, 1, []uint64{0}, []uint64{0}},
{0, 1, []uint64{0, 0, 0}, []uint64{0}},
{1, 1, []uint64{2, 0, 1}, []uint64{2, 1}},
{1, 2, []uint64{0, 2, 1}, []uint64{2, 1}},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
Expand All @@ -5216,7 +5207,7 @@ func TestFilterBehindReplicas(t *testing.T) {
StoreID: roachpb.StoreID(v),
})
}
candidates := filterBehindReplicas(status, replicas, c.brandNewReplicaID)
candidates := filterBehindReplicas(status, replicas)
var ids []uint64
for _, c := range candidates {
ids = append(ids, uint64(c.StoreID))
Expand Down Expand Up @@ -5248,13 +5239,68 @@ func TestFilterUnremovableReplicas(t *testing.T) {
{3, []uint64{1, 2, 3, 4}, 0, []uint64{1, 2}},
{2, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2, 3, 4, 5}},
{3, []uint64{1, 2, 3, 4, 5}, 0, []uint64{1, 2}},
{1, []uint64{1, 0}, 2, []uint64{1, 0}},
{1, []uint64{1, 0}, 2, nil},
{1, []uint64{1, 0}, 1, []uint64{0}},
{3, []uint64{3, 2, 1}, 3, nil},
{3, []uint64{3, 2, 0}, 3, nil},
{3, []uint64{4, 3, 2, 1}, 4, []uint64{2}},
{3, []uint64{4, 3, 2, 0}, 3, []uint64{0}},
{3, []uint64{4, 3, 2, 0}, 4, []uint64{2}},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
}
// Use an invalid replica ID for the leader. TestFilterBehindReplicas covers
// valid replica IDs.
status.Lead = 99
status.Commit = c.commit
var replicas []roachpb.ReplicaDescriptor
for j, v := range c.progress {
p := raft.Progress{
Match: v,
State: raft.ProgressStateReplicate,
}
if v == 0 {
p.State = raft.ProgressStateProbe
}
replicaID := uint64(j + 1)
status.Progress[replicaID] = p
replicas = append(replicas, roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(replicaID),
StoreID: roachpb.StoreID(v),
})
}

candidates := filterUnremovableReplicas(status, replicas, c.brandNewReplicaID)
var ids []uint64
for _, c := range candidates {
ids = append(ids, uint64(c.StoreID))
}
if !reflect.DeepEqual(c.expected, ids) {
t.Fatalf("expected %d, but got %d", c.expected, ids)
}
})
}
}

func TestSimulateFilterUnremovableReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
commit uint64
progress []uint64
brandNewReplicaID roachpb.ReplicaID
expected []uint64
}{
{1, []uint64{1, 0}, 2, []uint64{1}},
{1, []uint64{1, 0}, 1, []uint64{0}},
{3, []uint64{3, 2, 1}, 3, []uint64{2}},
{3, []uint64{3, 2, 0}, 3, []uint64{2}},
{3, []uint64{4, 3, 2, 1}, 4, []uint64{4, 3, 2, 1}},
{3, []uint64{4, 3, 2, 0}, 3, []uint64{4, 3, 2, 0}},
{3, []uint64{4, 3, 2, 0}, 4, []uint64{4, 3, 2, 0}},
{3, []uint64{4, 3, 2, 1}, 4, []uint64{4, 3, 2}},
{3, []uint64{4, 3, 2, 0}, 3, []uint64{4, 3, 0}},
{3, []uint64{4, 3, 2, 0}, 4, []uint64{4, 3, 2}},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
Expand Down Expand Up @@ -5282,7 +5328,7 @@ func TestFilterUnremovableReplicas(t *testing.T) {
})
}

candidates := filterUnremovableReplicas(status, replicas, c.brandNewReplicaID)
candidates := simulateFilterUnremovableReplicas(status, replicas, c.brandNewReplicaID)
var ids []uint64
for _, c := range candidates {
ids = append(ids, uint64(c.StoreID))
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ func (rq *replicateQueue) processOneChange(
lastReplAdded = 0
}
candidates := filterUnremovableReplicas(repl.RaftStatus(), desc.Replicas, lastReplAdded)
log.VEventf(ctx, 3, "filtered unremovable replicas from %v to get %v as candidates for removal",
desc.Replicas, candidates)
log.VEventf(ctx, 3, "filtered unremovable replicas from %v to get %v as candidates for removal: %s",
desc.Replicas, candidates, rangeRaftProgress(repl.RaftStatus(), desc.Replicas))
if len(candidates) == 0 {
// After rapid upreplication, the candidates for removal could still be catching up.
// Mark this error as benign so it doesn't create confusion in the logs.
Expand Down Expand Up @@ -568,7 +568,7 @@ func (rq *replicateQueue) findTargetAndTransferLease(
zone *config.ZoneConfig,
opts transferLeaseOptions,
) (bool, error) {
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas, 0 /* brandNewReplicaID */)
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas)
target := rq.allocator.TransferLeaseTarget(
ctx,
zone,
Expand Down

0 comments on commit 0d6bd16

Please sign in to comment.