From 19843430e2c00975c6369738fb4c574231f8c566 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Tue, 16 Jul 2019 14:59:01 -0700 Subject: [PATCH] deal with raft upgrade Release note: None --- pkg/server/status.go | 2 +- pkg/storage/allocator.go | 7 +-- pkg/storage/allocator_test.go | 29 +++++------ pkg/storage/raft_log_queue.go | 11 ++-- pkg/storage/raft_log_queue_test.go | 69 +++++++++++++++----------- pkg/storage/raft_snapshot_queue.go | 6 +-- pkg/storage/replica_command.go | 7 +-- pkg/storage/replica_proposal_quota.go | 3 +- pkg/storage/replica_raft.go | 9 ++-- pkg/storage/replica_raft_test.go | 16 +++--- pkg/storage/replica_test.go | 25 +++++----- pkg/storage/split_delay_helper.go | 9 +--- pkg/storage/split_delay_helper_test.go | 33 ++++++------ pkg/storage/store_rebalancer_test.go | 19 +++---- 14 files changed, 131 insertions(+), 114 deletions(-) diff --git a/pkg/server/status.go b/pkg/server/status.go index 87ee58abe7cb..cb13ad75085f 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -1205,7 +1205,7 @@ func (s *statusServer) Ranges( state.Progress[id] = serverpb.RaftState_Progress{ Match: progress.Match, Next: progress.Next, - Paused: progress.Paused, + Paused: progress.IsPaused(), PendingSnapshot: progress.PendingSnapshot, State: progress.State.String(), } diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 39fe69142a18..e900955c5811 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/pkg/errors" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) const ( @@ -1196,7 +1197,7 @@ func replicaIsBehind(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool // behind the actual commit index of the range. if progress, ok := raftStatus.Progress[uint64(replicaID)]; ok { if uint64(replicaID) == raftStatus.Lead || - (progress.State == raft.ProgressStateReplicate && + (progress.State == tracker.StateReplicate && progress.Match >= raftStatus.Commit) { return false } @@ -1214,8 +1215,8 @@ func simulateFilterUnremovableReplicas( brandNewReplicaID roachpb.ReplicaID, ) []roachpb.ReplicaDescriptor { status := *raftStatus - status.Progress[uint64(brandNewReplicaID)] = raft.Progress{ - State: raft.ProgressStateReplicate, + status.Progress[uint64(brandNewReplicaID)] = tracker.Progress{ + State: tracker.StateReplicate, Match: status.Commit, } return filterUnremovableReplicas(&status, replicas, brandNewReplicaID) diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index ccdc1089dac6..c75440510402 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -44,6 +44,7 @@ import ( "github.com/olekukonko/tablewriter" "github.com/pkg/errors" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) const firstRange = roachpb.RangeID(1) @@ -825,10 +826,10 @@ func TestAllocatorRebalanceTarget(t *testing.T) { rangeInfo := rangeInfoForRepl(repl, desc) status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } for _, replica := range replicas { - status.Progress[uint64(replica.NodeID)] = raft.Progress{ + status.Progress[uint64(replica.NodeID)] = tracker.Progress{ Match: 10, } } @@ -5154,18 +5155,18 @@ func TestFilterBehindReplicas(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } status.Lead = c.leader status.Commit = c.commit var replicas []roachpb.ReplicaDescriptor for j, v := range c.progress { - p := raft.Progress{ + p := tracker.Progress{ Match: v, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } if v == 0 { - p.State = raft.ProgressStateProbe + p.State = tracker.StateProbe } replicaID := uint64(j + 1) status.Progress[replicaID] = p @@ -5222,7 +5223,7 @@ func TestFilterUnremovableReplicas(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } // Use an invalid replica ID for the leader. TestFilterBehindReplicas covers // valid replica IDs. @@ -5230,12 +5231,12 @@ func TestFilterUnremovableReplicas(t *testing.T) { status.Commit = c.commit var replicas []roachpb.ReplicaDescriptor for j, v := range c.progress { - p := raft.Progress{ + p := tracker.Progress{ Match: v, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } if v == 0 { - p.State = raft.ProgressStateProbe + p.State = tracker.StateProbe } replicaID := uint64(j + 1) status.Progress[replicaID] = p @@ -5277,7 +5278,7 @@ func TestSimulateFilterUnremovableReplicas(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } // Use an invalid replica ID for the leader. TestFilterBehindReplicas covers // valid replica IDs. @@ -5285,12 +5286,12 @@ func TestSimulateFilterUnremovableReplicas(t *testing.T) { status.Commit = c.commit var replicas []roachpb.ReplicaDescriptor for j, v := range c.progress { - p := raft.Progress{ + p := tracker.Progress{ Match: v, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } if v == 0 { - p.State = raft.ProgressStateProbe + p.State = tracker.StateProbe } replicaID := uint64(j + 1) status.Progress[replicaID] = p diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index 3632d399698e..1519fb1fccc7 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) const ( @@ -209,7 +210,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err if pr, ok := raftStatus.Progress[raftStatus.Lead]; ok { // TODO(tschottdorf): remove this line once we have picked up // https://github.com/etcd-io/etcd/pull/10279 - pr.State = raft.ProgressStateReplicate + pr.State = tracker.StateReplicate raftStatus.Progress[raftStatus.Lead] = pr } @@ -229,7 +230,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err func updateRaftProgressFromActivity( ctx context.Context, - prs map[uint64]raft.Progress, + prs map[uint64]tracker.Progress, replicas []roachpb.ReplicaDescriptor, lastUpdate lastUpdateTimesMap, now time.Time, @@ -285,7 +286,7 @@ type truncateDecision struct { func (td *truncateDecision) raftSnapshotsForIndex(index uint64) int { var n int for _, p := range td.Input.RaftStatus.Progress { - if p.State != raft.ProgressStateReplicate { + if p.State != tracker.StateReplicate { // If the follower isn't replicating, we can't trust its Match in // the first place. But note that this shouldn't matter in practice // as we already take care to not cut off these followers when @@ -427,7 +428,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision { // overlapping bounds that put significant stress on the Raft snapshot // queue. if progress.RecentActive { - if progress.State == raft.ProgressStateProbe { + if progress.State == tracker.StateProbe { decision.ProtectIndex(decision.Input.FirstIndex, truncatableIndexChosenViaProbingFollower) } else { decision.ProtectIndex(progress.Match, truncatableIndexChosenViaFollowers) @@ -475,7 +476,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision { func getQuorumIndex(raftStatus *raft.Status) uint64 { match := make([]uint64, 0, len(raftStatus.Progress)) for _, progress := range raftStatus.Progress { - if progress.State == raft.ProgressStateReplicate { + if progress.State == tracker.StateReplicate { match = append(match, progress.Match) } else { match = append(match, 0) diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index 0b188b1152bf..56edc99b2a62 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -32,6 +32,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) func TestShouldTruncate(t *testing.T) { @@ -85,10 +86,13 @@ func TestGetQuorumIndex(t *testing.T) { } for i, c := range testCases { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } for j, v := range c.progress { - status.Progress[uint64(j)] = raft.Progress{State: raft.ProgressStateReplicate, Match: v} + status.Progress[uint64(j)] = tracker.Progress{ + State: tracker.StateReplicate, + Match: v, + } } quorumMatchedIndex := getQuorumIndex(status) if c.expected != quorumMatchedIndex { @@ -99,10 +103,10 @@ func TestGetQuorumIndex(t *testing.T) { // Verify that only replicating followers are taken into account (i.e. others // are treated as Match == 0). status := &raft.Status{ - Progress: map[uint64]raft.Progress{ - 1: {State: raft.ProgressStateReplicate, Match: 100}, - 2: {State: raft.ProgressStateSnapshot, Match: 100}, - 3: {State: raft.ProgressStateReplicate, Match: 90}, + Progress: map[uint64]tracker.Progress{ + 1: {State: tracker.StateReplicate, Match: 100}, + 2: {State: tracker.StateSnapshot, Match: 100}, + 3: {State: tracker.StateReplicate, Match: 90}, }, } assert.Equal(t, uint64(90), getQuorumIndex(status)) @@ -211,10 +215,15 @@ func TestComputeTruncateDecision(t *testing.T) { for i, c := range testCases { t.Run("", func(t *testing.T) { status := raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } for j, v := range c.progress { - status.Progress[uint64(j)] = raft.Progress{RecentActive: true, State: raft.ProgressStateReplicate, Match: v, Next: v + 1} + status.Progress[uint64(j)] = tracker.Progress{ + RecentActive: true, + State: tracker.StateReplicate, + Match: v, + Next: v + 1, + } } input := truncateDecisionInput{ RaftStatus: status, @@ -277,26 +286,26 @@ func TestComputeTruncateDecisionProgressStatusProbe(t *testing.T) { testutils.RunTrueAndFalse(t, "tooLarge", func(t *testing.T, tooLarge bool) { testutils.RunTrueAndFalse(t, "active", func(t *testing.T, active bool) { status := raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } for j, v := range []uint64{100, 200, 300, 400, 500} { - var pr raft.Progress + var pr tracker.Progress if v == 100 { // A probing follower is probed with some index (Next) but // it has a zero Match (i.e. no idea how much of its log // agrees with ours). - pr = raft.Progress{ + pr = tracker.Progress{ RecentActive: active, - State: raft.ProgressStateProbe, + State: tracker.StateProbe, Match: 0, Next: v, } } else { // everyone else - pr = raft.Progress{ + pr = tracker.Progress{ Match: v, Next: v + 1, RecentActive: true, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } } status.Progress[uint64(j)] = pr @@ -335,15 +344,15 @@ func TestTruncateDecisionNumSnapshots(t *testing.T) { defer leaktest.AfterTest(t)() status := raft.Status{ - Progress: map[uint64]raft.Progress{ + Progress: map[uint64]tracker.Progress{ // Fully caught up. - 5: {State: raft.ProgressStateReplicate, Match: 11, Next: 12}, + 5: {State: tracker.StateReplicate, Match: 11, Next: 12}, // Behind. - 6: {State: raft.ProgressStateReplicate, Match: 10, Next: 11}, + 6: {State: tracker.StateReplicate, Match: 10, Next: 11}, // Last MsgApp in flight, so basically caught up. - 7: {State: raft.ProgressStateReplicate, Match: 10, Next: 12}, - 8: {State: raft.ProgressStateProbe}, // irrelevant - 9: {State: raft.ProgressStateSnapshot}, // irrelevant + 7: {State: tracker.StateReplicate, Match: 10, Next: 12}, + 8: {State: tracker.StateProbe}, // irrelevant + 9: {State: tracker.StateSnapshot}, // irrelevant }, } @@ -374,12 +383,12 @@ func TestUpdateRaftStatusActivity(t *testing.T) { defer leaktest.AfterTest(t)() type testCase struct { - prs []raft.Progress + prs []tracker.Progress replicas []roachpb.ReplicaDescriptor lastUpdate lastUpdateTimesMap now time.Time - exp []raft.Progress + exp []tracker.Progress } now := timeutil.Now() @@ -388,20 +397,20 @@ func TestUpdateRaftStatusActivity(t *testing.T) { // No data, no crash. {}, // No knowledge = no update. - {prs: []raft.Progress{{RecentActive: true}}, exp: []raft.Progress{{RecentActive: true}}}, - {prs: []raft.Progress{{RecentActive: false}}, exp: []raft.Progress{{RecentActive: false}}}, + {prs: []tracker.Progress{{RecentActive: true}}, exp: []tracker.Progress{{RecentActive: true}}}, + {prs: []tracker.Progress{{RecentActive: false}}, exp: []tracker.Progress{{RecentActive: false}}}, // See replica in descriptor but then don't find it in the map. Assumes the follower is not // active. { replicas: []roachpb.ReplicaDescriptor{{ReplicaID: 1}}, - prs: []raft.Progress{{RecentActive: true}}, - exp: []raft.Progress{{RecentActive: false}}, + prs: []tracker.Progress{{RecentActive: true}}, + exp: []tracker.Progress{{RecentActive: false}}, }, // Three replicas in descriptor. The first one responded recently, the second didn't, // the third did but it doesn't have a Progress. { replicas: []roachpb.ReplicaDescriptor{{ReplicaID: 1}, {ReplicaID: 2}, {ReplicaID: 3}}, - prs: []raft.Progress{{RecentActive: false}, {RecentActive: true}}, + prs: []tracker.Progress{{RecentActive: false}, {RecentActive: true}}, lastUpdate: map[roachpb.ReplicaID]time.Time{ 1: now.Add(-1 * MaxQuotaReplicaLivenessDuration / 2), 2: now.Add(-1 - MaxQuotaReplicaLivenessDuration), @@ -409,7 +418,7 @@ func TestUpdateRaftStatusActivity(t *testing.T) { }, now: now, - exp: []raft.Progress{{RecentActive: true}, {RecentActive: false}}, + exp: []tracker.Progress{{RecentActive: true}, {RecentActive: false}}, }, } @@ -417,11 +426,11 @@ func TestUpdateRaftStatusActivity(t *testing.T) { for _, tc := range tcs { t.Run("", func(t *testing.T) { - prs := make(map[uint64]raft.Progress) + prs := make(map[uint64]tracker.Progress) for i, pr := range tc.prs { prs[uint64(i+1)] = pr } - expPRs := make(map[uint64]raft.Progress) + expPRs := make(map[uint64]tracker.Progress) for i, pr := range tc.exp { expPRs[uint64(i+1)] = pr } diff --git a/pkg/storage/raft_snapshot_queue.go b/pkg/storage/raft_snapshot_queue.go index 22b6db067f1c..32b6d2aa9328 100644 --- a/pkg/storage/raft_snapshot_queue.go +++ b/pkg/storage/raft_snapshot_queue.go @@ -20,7 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" - "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) const ( @@ -66,7 +66,7 @@ func (rq *raftSnapshotQueue) shouldQueue( if status := repl.RaftStatus(); status != nil { // raft.Status.Progress is only populated on the Raft group leader. for _, p := range status.Progress { - if p.State == raft.ProgressStateSnapshot { + if p.State == tracker.StateSnapshot { if log.V(2) { log.Infof(ctx, "raft snapshot needed, enqueuing") } @@ -84,7 +84,7 @@ func (rq *raftSnapshotQueue) process( if status := repl.RaftStatus(); status != nil { // raft.Status.Progress is only populated on the Raft group leader. for id, p := range status.Progress { - if p.State == raft.ProgressStateSnapshot { + if p.State == tracker.StateSnapshot { if log.V(1) { log.Infof(ctx, "sending raft snapshot") } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 96bc69780982..5e045f2ee110 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -38,6 +38,7 @@ import ( "github.com/pkg/errors" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) // AdminSplit divides the range into into two ranges using args.SplitKey. @@ -80,17 +81,17 @@ func splitSnapshotWarningStr(rangeID roachpb.RangeID, status *raft.Status) strin // https://github.com/etcd-io/etcd/pull/10279 continue } - if pr.State == raft.ProgressStateReplicate { + if pr.State == tracker.StateReplicate { // This follower is in good working order. continue } s += fmt.Sprintf("; r%d/%d is ", rangeID, replicaID) switch pr.State { - case raft.ProgressStateSnapshot: + case tracker.StateSnapshot: // If the Raft snapshot queue is backed up, replicas can spend // minutes or worse until they are caught up. s += "waiting for a Raft snapshot" - case raft.ProgressStateProbe: + case tracker.StateProbe: // Assuming the split has already been delayed for a little bit, // seeing a follower that is probing hints at some problem with // Raft or Raft message delivery. (Of course it's possible that diff --git a/pkg/storage/replica_proposal_quota.go b/pkg/storage/replica_proposal_quota.go index 4272bdbea534..4ccdfe05629c 100644 --- a/pkg/storage/replica_proposal_quota.go +++ b/pkg/storage/replica_proposal_quota.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) // MaxQuotaReplicaLivenessDuration is the maximum duration that a replica @@ -146,7 +147,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( now := timeutil.Now() status := r.mu.internalRaftGroup.StatusWithoutProgress() commitIndex, minIndex := status.Commit, status.Commit - r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress raft.Progress) { + r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) { rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id)) if !ok { return diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index c3bc51c7c424..612e1694f6df 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -37,6 +37,7 @@ import ( "github.com/pkg/errors" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) func makeIDKey() storagebase.CmdIDKey { @@ -1059,8 +1060,8 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { // below for more context: _ = maybeDropMsgApp // NB: this code is allocation free. - r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr raft.Progress) { - if id == msg.To && pr.State == raft.ProgressStateProbe { + r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr tracker.Progress) { + if id == msg.To && pr.State == tracker.StateProbe { // It is moderately expensive to attach a full key to the message, but note that // a probing follower will only be appended to once per heartbeat interval (i.e. // on the order of seconds). See: @@ -1367,10 +1368,10 @@ func (m lastUpdateTimesMap) update(replicaID roachpb.ReplicaID, now time.Time) { // a suitable pattern of quiesce and unquiesce operations (and this in turn // can interfere with Raft log truncations). func (m lastUpdateTimesMap) updateOnUnquiesce( - descs []roachpb.ReplicaDescriptor, prs map[uint64]raft.Progress, now time.Time, + descs []roachpb.ReplicaDescriptor, prs map[uint64]tracker.Progress, now time.Time, ) { for _, desc := range descs { - if prs[uint64(desc.ReplicaID)].State == raft.ProgressStateReplicate { + if prs[uint64(desc.ReplicaID)].State == tracker.StateReplicate { m.update(desc.ReplicaID, now) } } diff --git a/pkg/storage/replica_raft_test.go b/pkg/storage/replica_raft_test.go index 6206a9f94503..d1604d1fd545 100644 --- a/pkg/storage/replica_raft_test.go +++ b/pkg/storage/replica_raft_test.go @@ -17,7 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) func TestLastUpdateTimesMap(t *testing.T) { @@ -37,14 +37,14 @@ func TestLastUpdateTimesMap(t *testing.T) { t4 := t3.Add(time.Second) descs = append(descs, []roachpb.ReplicaDescriptor{{ReplicaID: 5}, {ReplicaID: 6}}...) - prs := map[uint64]raft.Progress{ - 1: {State: raft.ProgressStateReplicate}, // should be updated + prs := map[uint64]tracker.Progress{ + 1: {State: tracker.StateReplicate}, // should be updated // 2 is missing because why not - 3: {State: raft.ProgressStateProbe}, // should be ignored - 4: {State: raft.ProgressStateSnapshot}, // should be ignored - 5: {State: raft.ProgressStateProbe}, // should be ignored - 6: {State: raft.ProgressStateReplicate}, // should be added - 7: {State: raft.ProgressStateReplicate}, // ignored, not in descs + 3: {State: tracker.StateProbe}, // should be ignored + 4: {State: tracker.StateSnapshot}, // should be ignored + 5: {State: tracker.StateProbe}, // should be ignored + 6: {State: tracker.StateReplicate}, // should be added + 7: {State: tracker.StateReplicate}, // ignored, not in descs } m.updateOnUnquiesce(descs, prs, t4) assert.EqualValues(t, map[roachpb.ReplicaID]time.Time{ diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 4901063b0138..f005b5c72c9d 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -63,6 +63,7 @@ import ( "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) // allSpans is a SpanSet that covers *everything* for use in tests that don't @@ -133,10 +134,10 @@ func leaseExpiry(repl *Replica) int64 { // Create a Raft status that shows everyone fully up to date. func upToDateRaftStatus(repls []roachpb.ReplicaDescriptor) *raft.Status { - prs := make(map[uint64]raft.Progress) + prs := make(map[uint64]tracker.Progress) for _, repl := range repls { - prs[uint64(repl.ReplicaID)] = raft.Progress{ - State: raft.ProgressStateReplicate, + prs[uint64(repl.ReplicaID)] = tracker.Progress{ + State: tracker.StateReplicate, Match: 100, } } @@ -8142,14 +8143,14 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { func TestReplicaMetrics(t *testing.T) { defer leaktest.AfterTest(t)() - progress := func(vals ...uint64) map[uint64]raft.Progress { - m := make(map[uint64]raft.Progress) + progress := func(vals ...uint64) map[uint64]tracker.Progress { + m := make(map[uint64]tracker.Progress) for i, v := range vals { - m[uint64(i+1)] = raft.Progress{Match: v} + m[uint64(i+1)] = tracker.Progress{Match: v} } return m } - status := func(lead uint64, progress map[uint64]raft.Progress) *raft.Status { + status := func(lead uint64, progress map[uint64]tracker.Progress) *raft.Status { status := &raft.Status{ Progress: progress, } @@ -9083,7 +9084,7 @@ func TestShouldReplicaQuiesce(t *testing.T) { RaftState: raft.StateLeader, }, Applied: logIndex, - Progress: map[uint64]raft.Progress{ + Progress: map[uint64]tracker.Progress{ 1: {Match: logIndex}, 2: {Match: logIndex}, 3: {Match: logIndex}, @@ -9152,7 +9153,7 @@ func TestShouldReplicaQuiesce(t *testing.T) { }) for _, i := range []uint64{1, 2, 3} { test(false, func(q *testQuiescer) *testQuiescer { - q.status.Progress[i] = raft.Progress{Match: invalidIndex} + q.status.Progress[i] = tracker.Progress{Match: invalidIndex} return q }) } @@ -9186,7 +9187,7 @@ func TestShouldReplicaQuiesce(t *testing.T) { for _, i := range []uint64{1, 2, 3} { test(true, func(q *testQuiescer) *testQuiescer { q.livenessMap[roachpb.NodeID(i)] = IsLiveMapEntry{IsLive: false} - q.status.Progress[i] = raft.Progress{Match: invalidIndex} + q.status.Progress[i] = tracker.Progress{Match: invalidIndex} return q }) } @@ -11454,7 +11455,7 @@ func TestSplitSnapshotWarningStr(t *testing.T) { assert.Equal(t, "", splitSnapshotWarningStr(12, status)) pr := status.Progress[2] - pr.State = raft.ProgressStateProbe + pr.State = tracker.StateProbe status.Progress[2] = pr assert.Equal( @@ -11463,7 +11464,7 @@ func TestSplitSnapshotWarningStr(t *testing.T) { splitSnapshotWarningStr(12, status), ) - pr.State = raft.ProgressStateSnapshot + pr.State = tracker.StateSnapshot assert.Equal( t, diff --git a/pkg/storage/split_delay_helper.go b/pkg/storage/split_delay_helper.go index 2a451d232f62..de180f82cce2 100644 --- a/pkg/storage/split_delay_helper.go +++ b/pkg/storage/split_delay_helper.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) type splitDelayHelperI interface { @@ -106,13 +107,7 @@ func maybeDelaySplitToAvoidSnapshot(ctx context.Context, sdh splitDelayHelperI) done := true for replicaID, pr := range raftStatus.Progress { - if replicaID == raftStatus.Lead { - // TODO(tschottdorf): remove this once we have picked up - // https://github.com/etcd-io/etcd/pull/10279 - continue - } - - if pr.State != raft.ProgressStateReplicate { + if pr.State != tracker.StateReplicate { if !pr.RecentActive { if ticks == 0 { // Having set done = false, we make sure we're not exiting early. diff --git a/pkg/storage/split_delay_helper_test.go b/pkg/storage/split_delay_helper_test.go index e628af04ccab..8e13bd061dd3 100644 --- a/pkg/storage/split_delay_helper_test.go +++ b/pkg/storage/split_delay_helper_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) type testSplitDelayHelper struct { @@ -84,8 +85,8 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { numAttempts: 5, rangeID: 1, raftStatus: &raft.Status{ - Progress: map[uint64]raft.Progress{ - 2: {State: raft.ProgressStateProbe}, + Progress: map[uint64]tracker.Progress{ + 2: {State: tracker.StateProbe}, }, }, } @@ -96,23 +97,27 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { assert.Equal(t, 1, h.emptyProposed) }) - for _, state := range []raft.ProgressStateType{raft.ProgressStateProbe, raft.ProgressStateSnapshot} { + for _, state := range []tracker.StateType{tracker.StateProbe, tracker.StateSnapshot} { t.Run(state.String(), func(t *testing.T) { h := &testSplitDelayHelper{ numAttempts: 5, rangeID: 1, raftStatus: &raft.Status{ - Progress: map[uint64]raft.Progress{ - 2: {State: state, RecentActive: true, Paused: true /* unifies string output below */}, + Progress: map[uint64]tracker.Progress{ + 2: { + State: state, + RecentActive: true, + ProbeSent: true, // Unifies string output below. + Inflights: &tracker.Inflights{}, + }, // Healthy follower just for kicks. - 3: {State: raft.ProgressStateReplicate}, + 3: {State: tracker.StateReplicate}, }, }, } s := maybeDelaySplitToAvoidSnapshot(ctx, h) - assert.Equal(t, "; replica r1/2 not caught up: next = 0, match = 0, state = "+ - state.String()+ - ", waiting = true, pendingSnapshot = 0; delayed split for 5.0s to avoid Raft snapshot (without success)", s) + assert.Equal(t, "; replica r1/2 not caught up: "+state.String()+ + " match=0 next=0 paused; delayed split for 5.0s to avoid Raft snapshot (without success)", s) assert.Equal(t, 5, h.slept) assert.Equal(t, 5, h.emptyProposed) }) @@ -123,8 +128,8 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { numAttempts: 5, rangeID: 1, raftStatus: &raft.Status{ - Progress: map[uint64]raft.Progress{ - 2: {State: raft.ProgressStateReplicate}, // intentionally not recently active + Progress: map[uint64]tracker.Progress{ + 2: {State: tracker.StateReplicate}, // intentionally not recently active }, }, } @@ -139,8 +144,8 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { numAttempts: 5, rangeID: 1, raftStatus: &raft.Status{ - Progress: map[uint64]raft.Progress{ - 2: {State: raft.ProgressStateProbe, RecentActive: true}, + Progress: map[uint64]tracker.Progress{ + 2: {State: tracker.StateProbe, RecentActive: true, Inflights: &tracker.Inflights{}}, }, }, } @@ -148,7 +153,7 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { h.sleep = func() { if h.slept == 2 { pr := h.raftStatus.Progress[2] - pr.State = raft.ProgressStateReplicate + pr.State = tracker.StateReplicate h.raftStatus.Progress[2] = pr } } diff --git a/pkg/storage/store_rebalancer_test.go b/pkg/storage/store_rebalancer_test.go index a4b9eeef543a..affacafc26e6 100644 --- a/pkg/storage/store_rebalancer_test.go +++ b/pkg/storage/store_rebalancer_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/gogo/protobuf/proto" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/tracker" ) var ( @@ -135,14 +136,14 @@ func TestChooseLeaseToTransfer(t *testing.T) { // raft status with one that always returns all replicas as up to date. sr.getRaftStatusFn = func(r *Replica) *raft.Status { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { - status.Progress[uint64(replica.ReplicaID)] = raft.Progress{ + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ Match: 1, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } } return status @@ -218,14 +219,14 @@ func TestChooseReplicaToRebalance(t *testing.T) { // raft status with one that always returns all replicas as up to date. sr.getRaftStatusFn = func(r *Replica) *raft.Status { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) status.Commit = 1 for _, replica := range r.Desc().InternalReplicas { - status.Progress[uint64(replica.ReplicaID)] = raft.Progress{ + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ Match: 1, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } } return status @@ -331,7 +332,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { // are caught up). We thus shouldn't transfer a lease to s5. sr.getRaftStatusFn = func(r *Replica) *raft.Status { status := &raft.Status{ - Progress: make(map[uint64]raft.Progress), + Progress: make(map[uint64]tracker.Progress), } status.Lead = uint64(r.ReplicaID()) status.Commit = 1 @@ -340,9 +341,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { if replica.StoreID == roachpb.StoreID(5) { match = 0 } - status.Progress[uint64(replica.ReplicaID)] = raft.Progress{ + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ Match: match, - State: raft.ProgressStateReplicate, + State: tracker.StateReplicate, } } return status