diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 26c48c1563e5..490b11ab3e10 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -167,6 +167,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/protectedts/ptstorage:ptstorage_test", "//pkg/kv/kvserver/protectedts:protectedts_test", "//pkg/kv/kvserver/raftentry:raftentry_test", + "//pkg/kv/kvserver/raftutil:raftutil_test", "//pkg/kv/kvserver/rangefeed:rangefeed_test", "//pkg/kv/kvserver/rditer:rditer_test", "//pkg/kv/kvserver/replicastats:replicastats_test", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 8286b6b304ab..87d796b3a683 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -135,6 +135,7 @@ go_library( "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/raftentry", + "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel b/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel index 6b9b7537495f..1886ef4e08b4 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/constraint", "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/replicastats", "//pkg/roachpb", "//pkg/settings", diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 97c94cfeb75b..776171b394ef 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -1468,8 +1469,9 @@ func (a *Allocator) ValidLeaseTargets( conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, leaseRepl interface { - RaftStatus() *raft.Status StoreID() roachpb.StoreID + RaftStatus() *raft.Status + GetFirstIndex() uint64 }, // excludeLeaseRepl dictates whether the result set can include the source // replica. @@ -1510,7 +1512,8 @@ func (a *Allocator) ValidLeaseTargets( // potentially transferring the lease to a replica that may be waiting for a // snapshot (which will wedge the range until the replica applies that // snapshot). - candidates = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), candidates) + candidates = excludeReplicasInNeedOfSnapshots( + ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), candidates) } // Determine which store(s) is preferred based on user-specified preferences. @@ -1532,8 +1535,9 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( ctx context.Context, conf roachpb.SpanConfig, leaseRepl interface { - RaftStatus() *raft.Status StoreID() roachpb.StoreID + RaftStatus() *raft.Status + GetFirstIndex() uint64 }, allExistingReplicas []roachpb.ReplicaDescriptor, ) bool { @@ -1557,7 +1561,8 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( // If there are any replicas that do match lease preferences, then we check if // the existing leaseholder is one of them. preferred := a.PreferredLeaseholders(conf, candidates) - preferred = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), preferred) + preferred = excludeReplicasInNeedOfSnapshots( + ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred) if len(preferred) == 0 { return false } @@ -1606,9 +1611,10 @@ func (a *Allocator) TransferLeaseTarget( conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, leaseRepl interface { - RaftStatus() *raft.Status StoreID() roachpb.StoreID GetRangeID() roachpb.RangeID + RaftStatus() *raft.Status + GetFirstIndex() uint64 }, stats *replicastats.ReplicaStats, forceDecisionWithoutStats bool, @@ -1854,8 +1860,9 @@ func (a *Allocator) ShouldTransferLease( conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, leaseRepl interface { - RaftStatus() *raft.Status StoreID() roachpb.StoreID + RaftStatus() *raft.Status + GetFirstIndex() uint64 }, stats *replicastats.ReplicaStats, ) bool { @@ -2193,73 +2200,27 @@ func computeQuorum(nodes int) int { // slice. A "behind" replica is one which is not at or past the quorum commit // index. func FilterBehindReplicas( - ctx context.Context, raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor, + ctx context.Context, st *raft.Status, replicas []roachpb.ReplicaDescriptor, ) []roachpb.ReplicaDescriptor { - if raftStatus == nil || len(raftStatus.Progress) == 0 { - // raftStatus.Progress is only populated on the Raft leader which means we - // won't be able to rebalance a lease away if the lease holder is not the - // Raft leader. This is rare enough not to matter. - return nil - } - candidates := make([]roachpb.ReplicaDescriptor, 0, len(replicas)) + var candidates []roachpb.ReplicaDescriptor for _, r := range replicas { - if !ReplicaIsBehind(raftStatus, r.ReplicaID) { + if !raftutil.ReplicaIsBehind(st, r.ReplicaID) { candidates = append(candidates, r) } } return candidates } -// ReplicaIsBehind returns whether the given replica ID is considered behind -// according to the raft log. -func ReplicaIsBehind(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool { - if raftStatus == nil || len(raftStatus.Progress) == 0 { - return true - } - // NB: We use raftStatus.Commit instead of getQuorumIndex() because the - // latter can return a value that is less than the commit index. This is - // useful for Raft log truncation which sometimes wishes to keep those - // earlier indexes, but not appropriate for determining which nodes are - // behind the actual commit index of the range. - if progress, ok := raftStatus.Progress[uint64(replicaID)]; ok { - if uint64(replicaID) == raftStatus.Lead || - (progress.State == tracker.StateReplicate && - progress.Match >= raftStatus.Commit) { - return false - } - } - return true -} - -// replicaMayNeedSnapshot determines whether the replica referred to by -// `replicaID` may be in need of a raft snapshot. If this function is called -// with an empty or nil `raftStatus` (as will be the case when its called by a -// replica that is not the raft leader), we pessimistically assume that -// `replicaID` may need a snapshot. -func replicaMayNeedSnapshot(raftStatus *raft.Status, replica roachpb.ReplicaDescriptor) bool { - if raftStatus == nil || len(raftStatus.Progress) == 0 { - return true - } - if progress, ok := raftStatus.Progress[uint64(replica.ReplicaID)]; ok { - // We can only reasonably assume that the follower replica is not in need of - // a snapshot iff it is in `StateReplicate`. However, even this is racey - // because we can still possibly have an ill-timed log truncation between - // when we make this determination and when we act on it. - return progress.State != tracker.StateReplicate - } - return true -} - // excludeReplicasInNeedOfSnapshots filters out the `replicas` that may be in // need of a raft snapshot. VOTER_INCOMING replicas are not filtered out. // Other replicas may be filtered out if this function is called with the // `raftStatus` of a non-raft leader replica. func excludeReplicasInNeedOfSnapshots( - ctx context.Context, raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor, + ctx context.Context, st *raft.Status, firstIndex uint64, replicas []roachpb.ReplicaDescriptor, ) []roachpb.ReplicaDescriptor { filled := 0 for _, repl := range replicas { - if replicaMayNeedSnapshot(raftStatus, repl) { + if raftutil.ReplicaMayNeedSnapshot(st, firstIndex, repl.ReplicaID) != raftutil.NoSnapshotNeeded { log.VEventf( ctx, 5, diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 2018a73623c1..fe0106903437 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -1664,6 +1664,7 @@ func (r *mockRepl) RaftStatus() *raft.Status { raftStatus := &raft.Status{ Progress: make(map[uint64]tracker.Progress), } + raftStatus.RaftState = raft.StateLeader for i := int32(1); i <= r.replicationFactor; i++ { state := tracker.StateReplicate if _, ok := r.replsInNeedOfSnapshot[roachpb.ReplicaID(i)]; ok { @@ -1674,6 +1675,10 @@ func (r *mockRepl) RaftStatus() *raft.Status { return raftStatus } +func (r *mockRepl) GetFirstIndex() uint64 { + return 0 +} + func (r *mockRepl) StoreID() roachpb.StoreID { return r.storeID } @@ -7138,6 +7143,7 @@ func TestFilterBehindReplicas(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = c.leader + status.RaftState = raft.StateLeader status.Commit = c.commit var replicas []roachpb.ReplicaDescriptor for j, v := range c.progress { @@ -7210,6 +7216,7 @@ func TestFilterUnremovableReplicas(t *testing.T) { // Use an invalid replica ID for the leader. TestFilterBehindReplicas covers // valid replica IDs. status.Lead = 99 + status.RaftState = raft.StateLeader status.Commit = c.commit var replicas []roachpb.ReplicaDescriptor for j, v := range c.progress { @@ -7267,6 +7274,7 @@ func TestSimulateFilterUnremovableReplicas(t *testing.T) { // Use an invalid replica ID for the leader. TestFilterBehindReplicas covers // valid replica IDs. status.Lead = 99 + status.RaftState = raft.StateLeader status.Commit = c.commit var replicas []roachpb.ReplicaDescriptor for j, v := range c.progress { diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index b9ebe320450b..010282e593d0 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -171,6 +171,8 @@ func TestAllocatorRebalanceTarget(t *testing.T) { status := &raft.Status{ Progress: make(map[uint64]tracker.Progress), } + status.Lead = 1 + status.RaftState = raft.StateLeader status.Commit = 10 for _, replica := range replicas { status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ diff --git a/pkg/kv/kvserver/deprecated_store_rebalancer.go b/pkg/kv/kvserver/deprecated_store_rebalancer.go index e47dad57f8be..d65957d88899 100644 --- a/pkg/kv/kvserver/deprecated_store_rebalancer.go +++ b/pkg/kv/kvserver/deprecated_store_rebalancer.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -108,7 +109,7 @@ func (sr *StoreRebalancer) deprecatedChooseLeaseToTransfer( if raftStatus == nil { raftStatus = sr.getRaftStatusFn(replWithStats.repl) } - if allocatorimpl.ReplicaIsBehind(raftStatus, candidate.ReplicaID) { + if raftutil.ReplicaIsBehind(raftStatus, candidate.ReplicaID) { log.VEventf(ctx, 3, "%v is behind or this store isn't the raft leader for r%d; raftStatus: %v", candidate, desc.RangeID, raftStatus) continue @@ -297,7 +298,7 @@ func (sr *StoreRebalancer) deprecatedChooseRangeToRebalance( if raftStatus == nil { raftStatus = sr.getRaftStatusFn(replWithStats.repl) } - if allocatorimpl.ReplicaIsBehind(raftStatus, replica.ReplicaID) { + if raftutil.ReplicaIsBehind(raftStatus, replica.ReplicaID) { continue } } diff --git a/pkg/kv/kvserver/deprecated_store_rebalancer_test.go b/pkg/kv/kvserver/deprecated_store_rebalancer_test.go index aff73ed7754e..901d10134b89 100644 --- a/pkg/kv/kvserver/deprecated_store_rebalancer_test.go +++ b/pkg/kv/kvserver/deprecated_store_rebalancer_test.go @@ -105,6 +105,7 @@ func TestDeprecatedChooseLeaseToTransfer(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) + status.RaftState = raft.StateLeader status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ @@ -219,6 +220,7 @@ func TestDeprecatedChooseRangeToRebalanceBalanceScore(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) + status.RaftState = raft.StateLeader status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ @@ -290,6 +292,7 @@ func TestDeprecatedChooseRangeToRebalance(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) + status.RaftState = raft.StateLeader status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ @@ -652,6 +655,7 @@ func TestDeprecatedNoLeaseTransferToBehindReplicas(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) + status.RaftState = raft.StateLeader status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { match := uint64(1) diff --git a/pkg/kv/kvserver/raftutil/BUILD.bazel b/pkg/kv/kvserver/raftutil/BUILD.bazel new file mode 100644 index 000000000000..c3a2cdc3077d --- /dev/null +++ b/pkg/kv/kvserver/raftutil/BUILD.bazel @@ -0,0 +1,24 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "raftutil", + srcs = ["util.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "@io_etcd_go_etcd_raft_v3//:raft", + "@io_etcd_go_etcd_raft_v3//tracker", + ], +) + +go_test( + name = "raftutil_test", + srcs = ["util_test.go"], + embed = [":raftutil"], + deps = [ + "@com_github_stretchr_testify//require", + "@io_etcd_go_etcd_raft_v3//:raft", + "@io_etcd_go_etcd_raft_v3//tracker", + ], +) diff --git a/pkg/kv/kvserver/raftutil/util.go b/pkg/kv/kvserver/raftutil/util.go new file mode 100644 index 000000000000..029ae4d3e04e --- /dev/null +++ b/pkg/kv/kvserver/raftutil/util.go @@ -0,0 +1,185 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package raftutil + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/tracker" +) + +// ReplicaIsBehind returns whether the given peer replica is considered behind +// according to the raft log. If this function is called with a raft.Status that +// indicates that our local replica is not the raft leader, we pessimistically +// assume that replicaID is behind on its log. +func ReplicaIsBehind(st *raft.Status, replicaID roachpb.ReplicaID) bool { + if st == nil { + // Testing only. + return true + } + if st.RaftState != raft.StateLeader { + // If we aren't the Raft leader, we aren't tracking the replica's progress, + // so we can't be sure it's not behind. + return true + } + progress, ok := st.Progress[uint64(replicaID)] + if !ok { + return true + } + if uint64(replicaID) == st.Lead { + // If the replica is the leader, it cannot be behind on the log. + return false + } + if progress.State == tracker.StateReplicate && progress.Match >= st.Commit { + // If the replica has a matching log entry at or above the current commit + // index, it is caught up on its log. + return false + } + return true +} + +// ReplicaNeedsSnapshotStatus enumerates the possible states that a peer replica +// may be in when the local replica makes a determination of whether the peer +// may be in need of a Raft snapshot. NoSnapshotNeeded indicates that a Raft +// snapshot is definitely not needed. All other values indicate that for one +// reason or another the local replica cannot say with certainty that the peer +// does not need a snapshot, either because it knows that the peer does need a +// snapshot or because it does not know. +type ReplicaNeedsSnapshotStatus int + +const ( + // NoSnapshotNeeded means that the local replica knows for sure that the peer + // replica does not need a Raft snapshot. This status is only possible if the + // local replica is the Raft leader, because only the Raft leader tracks the + // progress of other replicas. + // + // There are two ways that a peer replica that is at some point seen to be in + // this state can later end up in need of a Raft snapshot: + // 1. a log truncation cuts the peer replica off from the log, preventing it + // from catching up using log entries. The Raft log queue attempts to avoid + // cutting any follower off from connectivity with the leader's log, but + // there are cases where it does so. + // 2. raft leadership is transferred to a replica whose log does not extend + // back as far as the current raft leader's log. This is possible because + // different replicas can have Raft logs with different starting points + // ("first indexes"). See discussion about #35701 in #81561. + NoSnapshotNeeded ReplicaNeedsSnapshotStatus = iota + + // LocalReplicaNotLeader means that the local replica is not the Raft leader, + // so it does not keep track of enough progress information about peers to + // determine whether they are in need of a Raft snapshot or not. + LocalReplicaNotLeader + + // ReplicaUnknown means that the peer replica is not known by the Raft leader + // and is not part of the Raft group. + ReplicaUnknown + + // ReplicaStateProbe means that the local Raft leader is still probing the + // peer replica to determine the index of matching tail of its log. + ReplicaStateProbe + + // ReplicaStateSnapshot means that the local Raft leader has determined that + // the peer replica needs a Raft snapshot. + ReplicaStateSnapshot + + // ReplicaMatchBelowLeadersFirstIndex means that the local Raft leader has + // determined that the peer replica's latest matching log index is below the + // leader's log's current first index. This can happen if a peer replica is + // initially connected to the Raft leader's log but gets disconnected due to a + // log truncation. etcd/raft will notice this state after sending the next + // MsgApp and move the peer to StateSnapshot. + ReplicaMatchBelowLeadersFirstIndex + + // NoRaftStatusAvailable is only possible in tests. + NoRaftStatusAvailable +) + +func (s ReplicaNeedsSnapshotStatus) String() string { + switch s { + case NoSnapshotNeeded: + return "no snapshot needed" + case LocalReplicaNotLeader: + return "local replica not raft leader" + case ReplicaUnknown: + return "replica unknown" + case ReplicaStateProbe: + return "replica in StateProbe" + case ReplicaStateSnapshot: + return "replica in StateSnapshot" + case ReplicaMatchBelowLeadersFirstIndex: + return "replica's match index below leader's first index" + case NoRaftStatusAvailable: + return "no raft status available" + default: + return "unknown ReplicaNeedsSnapshotStatus" + } +} + +// ReplicaMayNeedSnapshot determines whether the given peer replica may be in +// need of a raft snapshot. If this function is called with a raft.Status that +// indicates that our local replica is not the raft leader, we pessimistically +// assume that replicaID may need a snapshot. +func ReplicaMayNeedSnapshot( + st *raft.Status, firstIndex uint64, replicaID roachpb.ReplicaID, +) ReplicaNeedsSnapshotStatus { + if st == nil { + // Testing only. + return NoRaftStatusAvailable + } + if st.RaftState != raft.StateLeader { + // If we aren't the Raft leader, we aren't tracking the replica's progress, + // so we can't be sure it does not need a snapshot. + return LocalReplicaNotLeader + } + progress, ok := st.Progress[uint64(replicaID)] + if !ok { + // We don't know about the specified replica. + return ReplicaUnknown + } + switch progress.State { + case tracker.StateReplicate: + // We can only reasonably assume that the follower replica is not in need of + // a snapshot if it is in StateReplicate. + case tracker.StateProbe: + // If the follower is in StateProbe then we are still in the process of + // determining where our logs match. + return ReplicaStateProbe + case tracker.StateSnapshot: + // If the follower is in StateSnapshot then it needs a snapshot. + return ReplicaStateSnapshot + default: + panic("unknown tracker.StateType") + } + if progress.Match+1 < firstIndex { + // Even if the follower is in StateReplicate, it could have been cut off + // from the log by a recent log truncation that hasn't been recognized yet + // by raft. Confirm that this is not the case. + return ReplicaMatchBelowLeadersFirstIndex + } + // Even if we get here, this can still be racy because: + // 1. we may think we are the Raft leader but may have been or will be voted + // out without realizing. If another peer takes over as the Raft leader, it + // may commit additional log entries and then cut the peer off from the log. + // 2. we can still have an ill-timed log truncation between when we make this + // determination and when we act on it. + // + // In order to eliminate the potential for a race when acting on this + // information, we must ensure: + // 1. that any action we take is conditional on still being the Raft leader. + // In practice, this means that we should check this condition immediately + // before proposing a Raft command, so we can be sure that the command is + // not redirected through another Raft leader. That way, if we were + // replaced as Raft leader, the proposal will fail. + // 2. that we do not perform a log truncation between now and when our action + // goes into effect. In practice, this means serializing with Raft log + // truncation operations using latching. + return NoSnapshotNeeded +} diff --git a/pkg/kv/kvserver/raftutil/util_test.go b/pkg/kv/kvserver/raftutil/util_test.go new file mode 100644 index 000000000000..529d018f1336 --- /dev/null +++ b/pkg/kv/kvserver/raftutil/util_test.go @@ -0,0 +1,207 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package raftutil + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/tracker" +) + +func TestReplicaIsBehind(t *testing.T) { + const replicaID = 3 + makeStatus := func(f func(*raft.Status)) *raft.Status { + st := new(raft.Status) + st.Commit = 10 + st.Progress = make(map[uint64]tracker.Progress) + f(st) + return st + } + + tests := []struct { + name string + st *raft.Status + expect bool + }{ + { + name: "local follower", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateFollower + }), + expect: true, + }, + { + name: "local candidate", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateCandidate + }), + expect: true, + }, + { + name: "local leader, no progress for peer", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + }), + expect: true, + }, + { + name: "local leader, peer leader", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateReplicate} + st.Lead = replicaID + }), + expect: false, + }, + { + name: "local leader, peer state probe", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateProbe} + }), + expect: true, + }, + { + name: "local leader, peer state snapshot", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateSnapshot} + }), + expect: true, + }, + { + name: "local leader, peer state replicate, match < commit", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateReplicate, Match: 9} + }), + expect: true, + }, + { + name: "local leader, peer state replicate, match == commit", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateReplicate, Match: 10} + }), + expect: false, + }, + { + name: "local leader, peer state replicate, match > commit", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateReplicate, Match: 11} + }), + expect: false, + }, + { + name: "nil raft status", + st: nil, + expect: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expect, ReplicaIsBehind(tt.st, replicaID)) + }) + } +} + +func TestReplicaMayNeedSnapshot(t *testing.T) { + const firstIndex = 10 + const replicaID = 3 + makeStatus := func(f func(*raft.Status)) *raft.Status { + st := new(raft.Status) + st.Commit = 10 + st.Progress = make(map[uint64]tracker.Progress) + f(st) + return st + } + + tests := []struct { + name string + st *raft.Status + expect ReplicaNeedsSnapshotStatus + }{ + { + name: "local follower", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateFollower + }), + expect: LocalReplicaNotLeader, + }, + { + name: "local candidate", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateCandidate + }), + expect: LocalReplicaNotLeader, + }, + { + name: "local leader, no progress for peer", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + }), + expect: ReplicaUnknown, + }, + { + name: "local leader, peer state probe", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateProbe} + }), + expect: ReplicaStateProbe, + }, + { + name: "local leader, peer state snapshot", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateSnapshot} + }), + expect: ReplicaStateSnapshot, + }, + { + name: "local leader, peer state replicate, match+1 < firstIndex", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateReplicate, Match: 8} + }), + expect: ReplicaMatchBelowLeadersFirstIndex, + }, + { + name: "local leader, peer state replicate, match+1 == firstIndex", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateReplicate, Match: 9} + }), + expect: NoSnapshotNeeded, + }, + { + name: "local leader, peer state replicate, match+1 == firstIndex", + st: makeStatus(func(st *raft.Status) { + st.RaftState = raft.StateLeader + st.Progress[replicaID] = tracker.Progress{State: tracker.StateReplicate, Match: 10} + }), + expect: NoSnapshotNeeded, + }, + { + name: "nil raft status", + st: nil, + expect: NoRaftStatusAvailable, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expect, ReplicaMayNeedSnapshot(tt.st, firstIndex, replicaID)) + }) + } +} diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 5ccc1bdc51a8..6abb6022e2e2 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -648,7 +649,7 @@ func (sr *StoreRebalancer) chooseRangeToRebalance( if raftStatus == nil { raftStatus = sr.getRaftStatusFn(replWithStats.repl) } - if allocatorimpl.ReplicaIsBehind(raftStatus, replica.ReplicaID) { + if raftutil.ReplicaIsBehind(raftStatus, replica.ReplicaID) { continue } } diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 937e8e44b24d..80f9e25af08f 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -455,6 +455,7 @@ func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) { Expiration: &hlc.MaxTimestamp, Replica: repl.mu.state.Desc.InternalReplicas[0], } + repl.mu.state.TruncatedState = &roachpb.RaftTruncatedState{} for _, storeID := range r.nonVoters { repl.mu.state.Desc.InternalReplicas = append(repl.mu.state.Desc.InternalReplicas, roachpb.ReplicaDescriptor{ NodeID: roachpb.NodeID(storeID), @@ -517,6 +518,7 @@ func TestChooseLeaseToTransfer(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) + status.RaftState = raft.StateLeader status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ @@ -786,6 +788,7 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) + status.RaftState = raft.StateLeader status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ @@ -1064,6 +1067,7 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) + status.RaftState = raft.StateLeader status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ @@ -1312,6 +1316,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) + status.RaftState = raft.StateLeader status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ @@ -1394,6 +1399,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) + status.RaftState = raft.StateLeader status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { match := uint64(1)