From c921401097652a5864318ab8a6c068b451586d84 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Thu, 18 Jul 2024 20:05:40 -0400 Subject: [PATCH] raft: make ProgressMap private The ProgressMap doesn't need to be public. This will be in line with other fields on the ProgressTracker and other trackers we'll introduce shortly. This patch makes the field private and gates access to it behind methods. Informs https://github.com/cockroachdb/cockroach/issues/125265 Release note: None --- pkg/raft/node.go | 6 +- pkg/raft/raft.go | 22 +++--- pkg/raft/raft_flow_control_test.go | 6 +- pkg/raft/raft_snap_test.go | 56 +++++++-------- pkg/raft/raft_test.go | 108 ++++++++++++++-------------- pkg/raft/rawnode.go | 2 +- pkg/raft/rawnode_test.go | 2 +- pkg/raft/status.go | 4 +- pkg/raft/tracker/progresstracker.go | 34 +++++++-- 9 files changed, 131 insertions(+), 109 deletions(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index fd7e820bfb9f..28904a91a420 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -377,13 +377,13 @@ func (n *node) run() { close(pm.result) } case m := <-n.recvc: - if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && r.trk.Progress[m.From] == nil { + if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && r.trk.Progress(m.From) == nil { // Filter out response message from unknown From. break } r.Step(m) case cc := <-n.confc: - _, okBefore := r.trk.Progress[r.id] + okBefore := r.trk.Progress(r.id) != nil cs := r.applyConfChange(cc) // If the node was removed, block incoming proposals. Note that we // only do this if the node was in the config before. Nodes may be @@ -394,7 +394,7 @@ func (n *node) run() { // NB: propc is reset when the leader changes, which, if we learn // about it, sort of implies that we got readded, maybe? This isn't // very sound and likely has bugs. - if _, okAfter := r.trk.Progress[r.id]; okBefore && !okAfter { + if okAfter := r.trk.Progress(r.id) != nil; okBefore && !okAfter { var found bool for _, sl := range [][]pb.PeerID{cs.Voters, cs.VotersOutgoing} { for _, id := range sl { diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index f425ea094874..0d2a7f4ffd6a 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -583,7 +583,7 @@ func (r *raft) send(m pb.Message) { // if the follower log and commit index are up-to-date, the flow is paused (for // reasons like in-flight limits), or the message could not be constructed. func (r *raft) maybeSendAppend(to pb.PeerID) bool { - pr := r.trk.Progress[to] + pr := r.trk.Progress(to) last, commit := r.raftLog.lastIndex(), r.raftLog.committed if !pr.ShouldSendMsgApp(last, commit) { @@ -652,7 +652,7 @@ func (r *raft) maybeSendSnapshot(to pb.PeerID, pr *tracker.Progress) bool { // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to pb.PeerID) { - pr := r.trk.Progress[to] + pr := r.trk.Progress(to) // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, // the receiver(follower) might not be matched with the leader @@ -805,7 +805,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { // On appending entries, the leader is effectively "sending" a MsgApp to its // local "acceptor". Since we don't actually send this self-MsgApp, update the // progress here as if it was sent. - r.trk.Progress[r.id].Next = app.lastIndex() + 1 + r.trk.Progress(r.id).Next = app.lastIndex() + 1 // The leader needs to self-ack the entries just appended once they have // been durably persisted (since it doesn't send an MsgApp to itself). This // response message will be added to msgsAfterAppend and delivered back to @@ -923,7 +923,7 @@ func (r *raft) becomeLeader() { // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this // progress with the last index already. - pr := r.trk.Progress[r.id] + pr := r.trk.Progress(r.id) pr.BecomeReplicate() // The leader always has RecentActive == true; MsgCheckQuorum makes sure to // preserve this. @@ -1271,7 +1271,7 @@ func stepLeader(r *raft, m pb.Message) error { if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) } - if r.trk.Progress[r.id] == nil { + if r.trk.Progress(r.id) == nil { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. @@ -1342,7 +1342,7 @@ func stepLeader(r *raft, m pb.Message) error { } // All other message types require a progress for m.From (pr). - pr := r.trk.Progress[m.From] + pr := r.trk.Progress(m.From) if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) return nil @@ -1934,7 +1934,7 @@ func (r *raft) restore(s snapshot) bool { // promotable indicates whether state machine can be promoted to leader, // which is true when its own id is in progress list. func (r *raft) promotable() bool { - pr := r.trk.Progress[r.id] + pr := r.trk.Progress(r.id) return pr != nil && !pr.IsLearner && !r.raftLog.hasNextOrInProgressSnapshot() } @@ -1942,7 +1942,7 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { cfg, progressMap, err := func() (quorum.Config, tracker.ProgressMap, error) { changer := confchange.Changer{ Config: r.config, - ProgressMap: r.trk.Progress, + ProgressMap: r.trk.MoveProgressMap(), MaxInflight: r.maxInflight, MaxInflightBytes: r.maxInflightBytes, LastIndex: r.raftLog.lastIndex(), @@ -1975,13 +1975,13 @@ func (r *raft) switchToConfig(cfg quorum.Config, progressMap tracker.ProgressMap r.logger.Infof("%x switched to configuration %s", r.id, r.config) cs := r.config.ConfState() - pr, ok := r.trk.Progress[r.id] + pr := r.trk.Progress(r.id) // Update whether the node itself is a learner, resetting to false when the // node is removed. - r.isLearner = ok && pr.IsLearner + r.isLearner = pr != nil && pr.IsLearner - if (!ok || r.isLearner) && r.state == StateLeader { + if (pr == nil || r.isLearner) && r.state == StateLeader { // This node is leader and was removed or demoted, step down if requested. // // We prevent demotions at the time writing but hypothetically we handle diff --git a/pkg/raft/raft_flow_control_test.go b/pkg/raft/raft_flow_control_test.go index acd6aa6c858f..b5aa80a0720c 100644 --- a/pkg/raft/raft_flow_control_test.go +++ b/pkg/raft/raft_flow_control_test.go @@ -32,7 +32,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.trk.Progress[2] + pr2 := r.trk.Progress(2) // force the progress to be in replicate state pr2.BecomeReplicate() // fill in the inflights window @@ -68,7 +68,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.trk.Progress[2] + pr2 := r.trk.Progress(2) // force the progress to be in replicate state pr2.BecomeReplicate() // fill in the inflights window @@ -113,7 +113,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.trk.Progress[2] + pr2 := r.trk.Progress(2) // force the progress to be in replicate state pr2.BecomeReplicate() // fill in the inflights window diff --git a/pkg/raft/raft_snap_test.go b/pkg/raft/raft_snap_test.go index fdc0c4f76ff3..463773dc9c77 100644 --- a/pkg/raft/raft_snap_test.go +++ b/pkg/raft/raft_snap_test.go @@ -45,11 +45,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { // force set the next of node 2, so that // node 2 needs a snapshot - sm.trk.Progress[2].Next = sm.raftLog.firstIndex() + sm.trk.Progress(2).Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress[2].Next - 1, Reject: true}) - if sm.trk.Progress[2].PendingSnapshot != 11 { - t.Fatalf("PendingSnapshot = %d, want 11", sm.trk.Progress[2].PendingSnapshot) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress(2).Next - 1, Reject: true}) + if sm.trk.Progress(2).PendingSnapshot != 11 { + t.Fatalf("PendingSnapshot = %d, want 11", sm.trk.Progress(2).PendingSnapshot) } } @@ -62,7 +62,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].BecomeSnapshot(11) + sm.trk.Progress(2).BecomeSnapshot(11) sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) msgs := sm.readMessages() @@ -80,18 +80,18 @@ func TestSnapshotFailure(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].Next = 1 - sm.trk.Progress[2].BecomeSnapshot(11) + sm.trk.Progress(2).Next = 1 + sm.trk.Progress(2).BecomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) - if sm.trk.Progress[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress[2].PendingSnapshot) + if sm.trk.Progress(2).PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress(2).PendingSnapshot) } - if sm.trk.Progress[2].Next != 1 { - t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next) + if sm.trk.Progress(2).Next != 1 { + t.Fatalf("Next = %d, want 1", sm.trk.Progress(2).Next) } - if !sm.trk.Progress[2].MsgAppProbesPaused { - t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) + if !sm.trk.Progress(2).MsgAppProbesPaused { + t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress(2).MsgAppProbesPaused) } } @@ -104,18 +104,18 @@ func TestSnapshotSucceed(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].Next = 1 - sm.trk.Progress[2].BecomeSnapshot(11) + sm.trk.Progress(2).Next = 1 + sm.trk.Progress(2).BecomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false}) - if sm.trk.Progress[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress[2].PendingSnapshot) + if sm.trk.Progress(2).PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress(2).PendingSnapshot) } - if sm.trk.Progress[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next) + if sm.trk.Progress(2).Next != 12 { + t.Fatalf("Next = %d, want 12", sm.trk.Progress(2).Next) } - if !sm.trk.Progress[2].MsgAppProbesPaused { - t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) + if !sm.trk.Progress(2).MsgAppProbesPaused { + t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress(2).MsgAppProbesPaused) } } @@ -128,23 +128,23 @@ func TestSnapshotAbort(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].Next = 1 - sm.trk.Progress[2].BecomeSnapshot(11) + sm.trk.Progress(2).Next = 1 + sm.trk.Progress(2).BecomeSnapshot(11) // A successful msgAppResp that has a higher/equal index than the // pending snapshot should abort the pending snapshot. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11}) - if sm.trk.Progress[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress[2].PendingSnapshot) + if sm.trk.Progress(2).PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress(2).PendingSnapshot) } // The follower entered StateReplicate and the leader send an append // and optimistically updated the progress (so we see 13 instead of 12). // There is something to append because the leader appended an empty entry // to the log at index 12 when it assumed leadership. - if sm.trk.Progress[2].Next != 13 { - t.Fatalf("Next = %d, want 13", sm.trk.Progress[2].Next) + if sm.trk.Progress(2).Next != 13 { + t.Fatalf("Next = %d, want 13", sm.trk.Progress(2).Next) } - if n := sm.trk.Progress[2].Inflights.Count(); n != 1 { + if n := sm.trk.Progress(2).Inflights.Count(); n != 1 { t.Fatalf("expected an inflight message, got %d", n) } } diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index a344bf38400b..ce798dbcad1b 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -98,7 +98,7 @@ func TestProgressLeader(t *testing.T) { r := newTestRaft(1, 5, 1, s) r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].BecomeReplicate() + r.trk.Progress(2).BecomeReplicate() // Send proposals to r1. The first 5 entries should be queued in the unstable log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} @@ -106,7 +106,7 @@ func TestProgressLeader(t *testing.T) { require.NoError(t, r.Step(propMsg), "#%d", i) } - require.Zero(t, r.trk.Progress[1].Match) + require.Zero(t, r.trk.Progress(1).Match) ents := r.raftLog.nextUnstableEnts() require.Len(t, ents, 6) @@ -115,8 +115,8 @@ func TestProgressLeader(t *testing.T) { r.advanceMessagesAfterAppend() - require.Equal(t, uint64(6), r.trk.Progress[1].Match) - require.Equal(t, uint64(7), r.trk.Progress[1].Next) + require.Equal(t, uint64(6), r.trk.Progress(1).Match) + require.Equal(t, uint64(7), r.trk.Progress(1).Next) } // TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response. @@ -125,16 +125,16 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].MsgAppProbesPaused = true + r.trk.Progress(2).MsgAppProbesPaused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) - r.trk.Progress[2].BecomeReplicate() - assert.False(t, r.trk.Progress[2].MsgAppProbesPaused) - r.trk.Progress[2].MsgAppProbesPaused = true + r.trk.Progress(2).BecomeReplicate() + assert.False(t, r.trk.Progress(2).MsgAppProbesPaused) + r.trk.Progress(2).MsgAppProbesPaused = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) } func TestProgressPaused(t *testing.T) { @@ -162,7 +162,7 @@ func TestProgressFlowControl(t *testing.T) { r.readMessages() // While node 2 is in probe state, propose a bunch of entries. - r.trk.Progress[2].BecomeProbe() + r.trk.Progress(2).BecomeProbe() blob := []byte(strings.Repeat("a", 1000)) large := []byte(strings.Repeat("b", 5000)) for i := 0; i < 22; i++ { @@ -238,8 +238,8 @@ func TestUncommittedEntryLimit(t *testing.T) { // Set the two followers to the replicate state. Commit to tail of log. const numFollowers = 2 - r.trk.Progress[2].BecomeReplicate() - r.trk.Progress[3].BecomeReplicate() + r.trk.Progress(2).BecomeReplicate() + r.trk.Progress(3).BecomeReplicate() r.uncommittedSize = 0 // Send proposals to r1. The first 5 entries should be appended to the log. @@ -649,7 +649,7 @@ func TestLearnerLogReplication(t *testing.T) { assert.Equal(t, nextCommitted, n1.raftLog.committed) assert.Equal(t, n1.raftLog.committed, n2.raftLog.committed) - match := n1.trk.Progress[2].Match + match := n1.trk.Progress(2).Match assert.Equal(t, n2.raftLog.committed, match) } @@ -1031,7 +1031,7 @@ func TestCommit(t *testing.T) { if id > 1 { sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}.AsV2()) } - pr := sm.trk.Progress[id] + pr := sm.trk.Progress(id) pr.Match, pr.Next = tt.matches[j], tt.matches[j]+1 } sm.maybeCommit() @@ -1927,7 +1927,7 @@ func TestLeaderAppResp(t *testing.T) { }, )) - p := sm.trk.Progress[2] + p := sm.trk.Progress(2) require.Equal(t, tt.wmatch, p.Match) require.Equal(t, tt.wnext, p.Next) @@ -1967,17 +1967,17 @@ func TestBcastBeat(t *testing.T) { sm.advanceMessagesAfterAppend() // slow follower - sm.trk.Progress[2].Match, sm.trk.Progress[2].Next = 5, 6 + sm.trk.Progress(2).Match, sm.trk.Progress(2).Next = 5, 6 // normal follower - sm.trk.Progress[3].Match, sm.trk.Progress[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm.trk.Progress(3).Match, sm.trk.Progress(3).Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() require.Len(t, msgs, 2) wantCommitMap := map[pb.PeerID]uint64{ - 2: min(sm.raftLog.committed, sm.trk.Progress[2].Match), - 3: min(sm.raftLog.committed, sm.trk.Progress[3].Match), + 2: min(sm.raftLog.committed, sm.trk.Progress(2).Match), + 3: min(sm.raftLog.committed, sm.trk.Progress(3).Match), } for i, m := range msgs { require.Equal(t, pb.MsgHeartbeat, m.Type, "#%d", i) @@ -2050,11 +2050,11 @@ func TestLeaderIncreaseNext(t *testing.T) { require.True(t, sm.raftLog.append(init)) sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].State = tt.state - sm.trk.Progress[2].Next = tt.next + sm.trk.Progress(2).State = tt.state + sm.trk.Progress(2).Next = tt.next sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - p := sm.trk.Progress[2] + p := sm.trk.Progress(2) assert.Equal(t, tt.wnext, p.Next, "#%d", i) } } @@ -2064,7 +2064,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeProbe() + r.trk.Progress(2).BecomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2079,7 +2079,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { assert.Zero(t, msg[0].Index) } - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) r.maybeSendAppend(2) @@ -2090,7 +2090,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) // consume the heartbeat msg := r.readMessages() @@ -2103,7 +2103,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { msg := r.readMessages() assert.Len(t, msg, 1) assert.Zero(t, msg[0].Index) - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) } func TestSendAppendForProgressReplicate(t *testing.T) { @@ -2111,7 +2111,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeReplicate() + r.trk.Progress(2).BecomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2126,7 +2126,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeSnapshot(10) + r.trk.Progress(2).BecomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2146,15 +2146,15 @@ func TestRecvMsgUnreachable(t *testing.T) { r.becomeLeader() r.readMessages() // set node 2 to state replicate - r.trk.Progress[2].Match = 3 - r.trk.Progress[2].BecomeReplicate() - r.trk.Progress[2].Next = 6 + r.trk.Progress(2).Match = 3 + r.trk.Progress(2).BecomeReplicate() + r.trk.Progress(2).Next = 6 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - assert.Equal(t, tracker.StateProbe, r.trk.Progress[2].State) - wnext := r.trk.Progress[2].Match + 1 - assert.Equal(t, wnext, r.trk.Progress[2].Next) + assert.Equal(t, tracker.StateProbe, r.trk.Progress(2).State) + wnext := r.trk.Progress(2).Match + 1 + assert.Equal(t, wnext, r.trk.Progress(2).Next) } func TestRestore(t *testing.T) { @@ -2205,10 +2205,10 @@ func TestRestoreWithLearner(t *testing.T) { assert.Len(t, lns, len(s.snap.Metadata.ConfState.Learners)) for _, n := range s.snap.Metadata.ConfState.Voters { - assert.False(t, sm.trk.Progress[n].IsLearner) + assert.False(t, sm.trk.Progress(n).IsLearner) } for _, n := range s.snap.Metadata.ConfState.Learners { - assert.True(t, sm.trk.Progress[n].IsLearner) + assert.True(t, sm.trk.Progress(n).IsLearner) } assert.False(t, sm.restore(s)) @@ -2368,8 +2368,8 @@ func TestProvideSnap(t *testing.T) { sm.becomeLeader() // force set the next of node 2, so that node 2 needs a snapshot - sm.trk.Progress[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress[2].Next - 1, Reject: true}) + sm.trk.Progress(2).Next = sm.raftLog.firstIndex() + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress(2).Next - 1, Reject: true}) msgs := sm.readMessages() require.Len(t, msgs, 1) @@ -2397,8 +2397,8 @@ func TestIgnoreProvidingSnap(t *testing.T) { // force set the next of node 2, so that node 2 needs a snapshot // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 - sm.trk.Progress[2].Next = sm.raftLog.firstIndex() - 1 - sm.trk.Progress[2].RecentActive = false + sm.trk.Progress(2).Next = sm.raftLog.firstIndex() - 1 + sm.trk.Progress(2).RecentActive = false sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -2442,7 +2442,7 @@ func TestSlowNodeRestore(t *testing.T) { // node 3 will only be considered as active when node 1 receives a reply from it. for { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if lead.trk.Progress[3].RecentActive { + if lead.trk.Progress(3).RecentActive { break } } @@ -2526,20 +2526,20 @@ func TestAddLearner(t *testing.T) { require.False(t, r.isLearner) nodes := r.trk.LearnerNodes() assert.Equal(t, []pb.PeerID{2}, nodes) - require.True(t, r.trk.Progress[2].IsLearner) + require.True(t, r.trk.Progress(2).IsLearner) // Promote peer to voter. r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) - require.False(t, r.trk.Progress[2].IsLearner) + require.False(t, r.trk.Progress(2).IsLearner) // Demote r. r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}.AsV2()) - require.True(t, r.trk.Progress[1].IsLearner) + require.True(t, r.trk.Progress(1).IsLearner) require.True(t, r.isLearner) // Promote r again. r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}.AsV2()) - require.False(t, r.trk.Progress[1].IsLearner) + require.False(t, r.trk.Progress(1).IsLearner) require.False(t, r.isLearner) } @@ -2819,7 +2819,7 @@ func TestLeaderTransferToSlowFollower(t *testing.T) { nt.recover() lead := nt.peers[1].(*raft) - require.Equal(t, uint64(1), lead.trk.Progress[3].Match) + require.Equal(t, uint64(1), lead.trk.Progress(3).Match) // Transfer leadership to 3 when node 3 is lack of log. nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) @@ -2840,7 +2840,7 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.storage[1].Compact(lead.raftLog.applied) nt.recover() - require.Equal(t, uint64(1), lead.trk.Progress[3].Match) + require.Equal(t, uint64(1), lead.trk.Progress(3).Match) filtered := pb.Message{} // Snapshot needs to be applied before sending MsgAppResp @@ -2932,7 +2932,7 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) require.Equal(t, ErrProposalDropped, err) - require.Equal(t, uint64(1), lead.trk.Progress[1].Match) + require.Equal(t, uint64(1), lead.trk.Progress(1).Match) } func TestLeaderTransferReceiveHigherTermVote(t *testing.T) { @@ -3655,7 +3655,7 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) { r1.becomeCandidate() r1.becomeLeader() r1.readMessages() - r1.trk.Progress[2].BecomeReplicate() + r1.trk.Progress(2).BecomeReplicate() r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2))) @@ -3683,17 +3683,17 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) { require.Equal(t, uint64(2), m.Index) r1.Step(m) m = expectOneMessage(t, r1) - require.Equal(t, uint64(2), r1.trk.Progress[2].Match) + require.Equal(t, uint64(2), r1.trk.Progress(2).Match) // r1 observes a transient network issue to r2, hence transits to probe state. r1.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - require.Equal(t, tracker.StateProbe, r1.trk.Progress[2].State) + require.Equal(t, tracker.StateProbe, r1.trk.Progress(2).State) // now r1 receives the delayed resp2. r1.Step(resp2) m = expectOneMessage(t, r1) // r1 shall re-send MsgApp from match index even if resp2's reject hint is less than matching index. - require.Equal(t, r1.trk.Progress[2].Match, m.Index) + require.Equal(t, r1.trk.Progress(2).Match, m.Index) } func expectOneMessage(t *testing.T, r *raft) pb.Message { @@ -3762,7 +3762,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw } else { v.config.Voters[0][peerAddrs[i]] = struct{}{} } - v.trk.Progress[peerAddrs[i]] = pr + v.trk.TestingSetProgress(peerAddrs[i], pr) } v.reset(v.Term) npeers[id] = v diff --git a/pkg/raft/rawnode.go b/pkg/raft/rawnode.go index c0694538b11e..c23ea94f5e55 100644 --- a/pkg/raft/rawnode.go +++ b/pkg/raft/rawnode.go @@ -109,7 +109,7 @@ func (rn *RawNode) Step(m pb.Message) error { if IsLocalMsg(m.Type) && !IsLocalMsgTarget(m.From) { return ErrStepLocalMsg } - if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && rn.raft.trk.Progress[m.From] == nil { + if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && rn.raft.trk.Progress(m.From) == nil { return ErrStepPeerNotFound } return rn.raft.Step(m) diff --git a/pkg/raft/rawnode_test.go b/pkg/raft/rawnode_test.go index 6e345eabcb48..beaccb36cf0b 100644 --- a/pkg/raft/rawnode_test.go +++ b/pkg/raft/rawnode_test.go @@ -692,7 +692,7 @@ func TestRawNodeStatus(t *testing.T) { status := rn.Status() require.Equal(t, pb.PeerID(1), status.Lead) require.Equal(t, StateLeader, status.RaftState) - require.Equal(t, *rn.raft.trk.Progress[1], status.Progress[1]) + require.Equal(t, *rn.raft.trk.Progress(1), status.Progress[1]) expCfg := quorum.Config{Voters: quorum.JointConfig{ quorum.MajorityConfig{1: {}}, diff --git a/pkg/raft/status.go b/pkg/raft/status.go index 6acd9be0f9f1..2c96d9bf5f7e 100644 --- a/pkg/raft/status.go +++ b/pkg/raft/status.go @@ -81,7 +81,7 @@ func withProgress(r *raft, visitor func(id pb.PeerID, typ ProgressType, pr track } func getProgressCopy(r *raft) map[pb.PeerID]tracker.Progress { - m := make(map[pb.PeerID]tracker.Progress, len(r.trk.Progress)) + m := make(map[pb.PeerID]tracker.Progress, r.trk.Len()) r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) { p := *pr p.Inflights = pr.Inflights.Clone() @@ -142,7 +142,7 @@ func getSparseStatus(r *raft) SparseStatus { var s SparseStatus s.BasicStatus = getBasicStatus(r) if s.RaftState == StateLeader { - s.Progress = make(map[pb.PeerID]tracker.Progress, len(r.trk.Progress)) + s.Progress = make(map[pb.PeerID]tracker.Progress, r.trk.Len()) withProgress(r, func(id pb.PeerID, _ ProgressType, pr tracker.Progress) { s.Progress[id] = pr }) diff --git a/pkg/raft/tracker/progresstracker.go b/pkg/raft/tracker/progresstracker.go index c4e2fa5aaf88..0011f1bed7f4 100644 --- a/pkg/raft/tracker/progresstracker.go +++ b/pkg/raft/tracker/progresstracker.go @@ -31,18 +31,40 @@ import ( type ProgressTracker struct { config *quorum.Config - Progress ProgressMap + progress ProgressMap } // MakeProgressTracker initializes a ProgressTracker. func MakeProgressTracker(config *quorum.Config, progressMap ProgressMap) ProgressTracker { p := ProgressTracker{ config: config, - Progress: progressMap, + progress: progressMap, } return p } +// Progress returns the progress associated with the supplied ID. +func (p *ProgressTracker) Progress(id pb.PeerID) *Progress { + return p.progress[id] +} + +// MoveProgressMap transfers ownership of the progress map to the caller. It +// shouldn't be used by the progress tracker after this. +func (p *ProgressTracker) MoveProgressMap() ProgressMap { + progress := p.progress + p.progress = nil + return progress +} + +// Len returns the length of the progress map. +func (p *ProgressTracker) Len() int { + return len(p.progress) +} + +func (p *ProgressTracker) TestingSetProgress(id pb.PeerID, progress *Progress) { + p.progress[id] = progress +} + type matchAckIndexer map[pb.PeerID]*Progress var _ quorum.AckedIndexer = matchAckIndexer(nil) @@ -59,12 +81,12 @@ func (l matchAckIndexer) AckedIndex(id pb.PeerID) (quorum.Index, bool) { // Committed returns the largest log index known to be committed based on what // the voting members of the group have acknowledged. func (p *ProgressTracker) Committed() uint64 { - return uint64(p.config.Voters.CommittedIndex(matchAckIndexer(p.Progress))) + return uint64(p.config.Voters.CommittedIndex(matchAckIndexer(p.progress))) } // Visit invokes the supplied closure for all tracked progresses in stable order. func (p *ProgressTracker) Visit(f func(id pb.PeerID, pr *Progress)) { - n := len(p.Progress) + n := len(p.progress) // We need to sort the IDs and don't want to allocate since this is hot code. // The optimization here mirrors that in `(MajorityConfig).CommittedIndex`, // see there for details. @@ -75,13 +97,13 @@ func (p *ProgressTracker) Visit(f func(id pb.PeerID, pr *Progress)) { } else { ids = make([]pb.PeerID, n) } - for id := range p.Progress { + for id := range p.progress { n-- ids[n] = id } slices.Sort(ids) for _, id := range ids { - f(id, p.Progress[id]) + f(id, p.progress[id]) } }