diff --git a/pkg/raft/node.go b/pkg/raft/node.go index fd7e820bfb9f..28904a91a420 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -377,13 +377,13 @@ func (n *node) run() { close(pm.result) } case m := <-n.recvc: - if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && r.trk.Progress[m.From] == nil { + if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && r.trk.Progress(m.From) == nil { // Filter out response message from unknown From. break } r.Step(m) case cc := <-n.confc: - _, okBefore := r.trk.Progress[r.id] + okBefore := r.trk.Progress(r.id) != nil cs := r.applyConfChange(cc) // If the node was removed, block incoming proposals. Note that we // only do this if the node was in the config before. Nodes may be @@ -394,7 +394,7 @@ func (n *node) run() { // NB: propc is reset when the leader changes, which, if we learn // about it, sort of implies that we got readded, maybe? This isn't // very sound and likely has bugs. - if _, okAfter := r.trk.Progress[r.id]; okBefore && !okAfter { + if okAfter := r.trk.Progress(r.id) != nil; okBefore && !okAfter { var found bool for _, sl := range [][]pb.PeerID{cs.Voters, cs.VotersOutgoing} { for _, id := range sl { diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index f425ea094874..0d2a7f4ffd6a 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -583,7 +583,7 @@ func (r *raft) send(m pb.Message) { // if the follower log and commit index are up-to-date, the flow is paused (for // reasons like in-flight limits), or the message could not be constructed. func (r *raft) maybeSendAppend(to pb.PeerID) bool { - pr := r.trk.Progress[to] + pr := r.trk.Progress(to) last, commit := r.raftLog.lastIndex(), r.raftLog.committed if !pr.ShouldSendMsgApp(last, commit) { @@ -652,7 +652,7 @@ func (r *raft) maybeSendSnapshot(to pb.PeerID, pr *tracker.Progress) bool { // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to pb.PeerID) { - pr := r.trk.Progress[to] + pr := r.trk.Progress(to) // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, // the receiver(follower) might not be matched with the leader @@ -805,7 +805,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { // On appending entries, the leader is effectively "sending" a MsgApp to its // local "acceptor". Since we don't actually send this self-MsgApp, update the // progress here as if it was sent. - r.trk.Progress[r.id].Next = app.lastIndex() + 1 + r.trk.Progress(r.id).Next = app.lastIndex() + 1 // The leader needs to self-ack the entries just appended once they have // been durably persisted (since it doesn't send an MsgApp to itself). This // response message will be added to msgsAfterAppend and delivered back to @@ -923,7 +923,7 @@ func (r *raft) becomeLeader() { // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this // progress with the last index already. - pr := r.trk.Progress[r.id] + pr := r.trk.Progress(r.id) pr.BecomeReplicate() // The leader always has RecentActive == true; MsgCheckQuorum makes sure to // preserve this. @@ -1271,7 +1271,7 @@ func stepLeader(r *raft, m pb.Message) error { if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) } - if r.trk.Progress[r.id] == nil { + if r.trk.Progress(r.id) == nil { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. @@ -1342,7 +1342,7 @@ func stepLeader(r *raft, m pb.Message) error { } // All other message types require a progress for m.From (pr). - pr := r.trk.Progress[m.From] + pr := r.trk.Progress(m.From) if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) return nil @@ -1934,7 +1934,7 @@ func (r *raft) restore(s snapshot) bool { // promotable indicates whether state machine can be promoted to leader, // which is true when its own id is in progress list. func (r *raft) promotable() bool { - pr := r.trk.Progress[r.id] + pr := r.trk.Progress(r.id) return pr != nil && !pr.IsLearner && !r.raftLog.hasNextOrInProgressSnapshot() } @@ -1942,7 +1942,7 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { cfg, progressMap, err := func() (quorum.Config, tracker.ProgressMap, error) { changer := confchange.Changer{ Config: r.config, - ProgressMap: r.trk.Progress, + ProgressMap: r.trk.MoveProgressMap(), MaxInflight: r.maxInflight, MaxInflightBytes: r.maxInflightBytes, LastIndex: r.raftLog.lastIndex(), @@ -1975,13 +1975,13 @@ func (r *raft) switchToConfig(cfg quorum.Config, progressMap tracker.ProgressMap r.logger.Infof("%x switched to configuration %s", r.id, r.config) cs := r.config.ConfState() - pr, ok := r.trk.Progress[r.id] + pr := r.trk.Progress(r.id) // Update whether the node itself is a learner, resetting to false when the // node is removed. - r.isLearner = ok && pr.IsLearner + r.isLearner = pr != nil && pr.IsLearner - if (!ok || r.isLearner) && r.state == StateLeader { + if (pr == nil || r.isLearner) && r.state == StateLeader { // This node is leader and was removed or demoted, step down if requested. // // We prevent demotions at the time writing but hypothetically we handle diff --git a/pkg/raft/raft_flow_control_test.go b/pkg/raft/raft_flow_control_test.go index acd6aa6c858f..b5aa80a0720c 100644 --- a/pkg/raft/raft_flow_control_test.go +++ b/pkg/raft/raft_flow_control_test.go @@ -32,7 +32,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.trk.Progress[2] + pr2 := r.trk.Progress(2) // force the progress to be in replicate state pr2.BecomeReplicate() // fill in the inflights window @@ -68,7 +68,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.trk.Progress[2] + pr2 := r.trk.Progress(2) // force the progress to be in replicate state pr2.BecomeReplicate() // fill in the inflights window @@ -113,7 +113,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.trk.Progress[2] + pr2 := r.trk.Progress(2) // force the progress to be in replicate state pr2.BecomeReplicate() // fill in the inflights window diff --git a/pkg/raft/raft_snap_test.go b/pkg/raft/raft_snap_test.go index fdc0c4f76ff3..463773dc9c77 100644 --- a/pkg/raft/raft_snap_test.go +++ b/pkg/raft/raft_snap_test.go @@ -45,11 +45,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { // force set the next of node 2, so that // node 2 needs a snapshot - sm.trk.Progress[2].Next = sm.raftLog.firstIndex() + sm.trk.Progress(2).Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress[2].Next - 1, Reject: true}) - if sm.trk.Progress[2].PendingSnapshot != 11 { - t.Fatalf("PendingSnapshot = %d, want 11", sm.trk.Progress[2].PendingSnapshot) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress(2).Next - 1, Reject: true}) + if sm.trk.Progress(2).PendingSnapshot != 11 { + t.Fatalf("PendingSnapshot = %d, want 11", sm.trk.Progress(2).PendingSnapshot) } } @@ -62,7 +62,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].BecomeSnapshot(11) + sm.trk.Progress(2).BecomeSnapshot(11) sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) msgs := sm.readMessages() @@ -80,18 +80,18 @@ func TestSnapshotFailure(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].Next = 1 - sm.trk.Progress[2].BecomeSnapshot(11) + sm.trk.Progress(2).Next = 1 + sm.trk.Progress(2).BecomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) - if sm.trk.Progress[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress[2].PendingSnapshot) + if sm.trk.Progress(2).PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress(2).PendingSnapshot) } - if sm.trk.Progress[2].Next != 1 { - t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next) + if sm.trk.Progress(2).Next != 1 { + t.Fatalf("Next = %d, want 1", sm.trk.Progress(2).Next) } - if !sm.trk.Progress[2].MsgAppProbesPaused { - t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) + if !sm.trk.Progress(2).MsgAppProbesPaused { + t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress(2).MsgAppProbesPaused) } } @@ -104,18 +104,18 @@ func TestSnapshotSucceed(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].Next = 1 - sm.trk.Progress[2].BecomeSnapshot(11) + sm.trk.Progress(2).Next = 1 + sm.trk.Progress(2).BecomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false}) - if sm.trk.Progress[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress[2].PendingSnapshot) + if sm.trk.Progress(2).PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress(2).PendingSnapshot) } - if sm.trk.Progress[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next) + if sm.trk.Progress(2).Next != 12 { + t.Fatalf("Next = %d, want 12", sm.trk.Progress(2).Next) } - if !sm.trk.Progress[2].MsgAppProbesPaused { - t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) + if !sm.trk.Progress(2).MsgAppProbesPaused { + t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress(2).MsgAppProbesPaused) } } @@ -128,23 +128,23 @@ func TestSnapshotAbort(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].Next = 1 - sm.trk.Progress[2].BecomeSnapshot(11) + sm.trk.Progress(2).Next = 1 + sm.trk.Progress(2).BecomeSnapshot(11) // A successful msgAppResp that has a higher/equal index than the // pending snapshot should abort the pending snapshot. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11}) - if sm.trk.Progress[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress[2].PendingSnapshot) + if sm.trk.Progress(2).PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress(2).PendingSnapshot) } // The follower entered StateReplicate and the leader send an append // and optimistically updated the progress (so we see 13 instead of 12). // There is something to append because the leader appended an empty entry // to the log at index 12 when it assumed leadership. - if sm.trk.Progress[2].Next != 13 { - t.Fatalf("Next = %d, want 13", sm.trk.Progress[2].Next) + if sm.trk.Progress(2).Next != 13 { + t.Fatalf("Next = %d, want 13", sm.trk.Progress(2).Next) } - if n := sm.trk.Progress[2].Inflights.Count(); n != 1 { + if n := sm.trk.Progress(2).Inflights.Count(); n != 1 { t.Fatalf("expected an inflight message, got %d", n) } } diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index a344bf38400b..ce798dbcad1b 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -98,7 +98,7 @@ func TestProgressLeader(t *testing.T) { r := newTestRaft(1, 5, 1, s) r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].BecomeReplicate() + r.trk.Progress(2).BecomeReplicate() // Send proposals to r1. The first 5 entries should be queued in the unstable log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} @@ -106,7 +106,7 @@ func TestProgressLeader(t *testing.T) { require.NoError(t, r.Step(propMsg), "#%d", i) } - require.Zero(t, r.trk.Progress[1].Match) + require.Zero(t, r.trk.Progress(1).Match) ents := r.raftLog.nextUnstableEnts() require.Len(t, ents, 6) @@ -115,8 +115,8 @@ func TestProgressLeader(t *testing.T) { r.advanceMessagesAfterAppend() - require.Equal(t, uint64(6), r.trk.Progress[1].Match) - require.Equal(t, uint64(7), r.trk.Progress[1].Next) + require.Equal(t, uint64(6), r.trk.Progress(1).Match) + require.Equal(t, uint64(7), r.trk.Progress(1).Next) } // TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response. @@ -125,16 +125,16 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].MsgAppProbesPaused = true + r.trk.Progress(2).MsgAppProbesPaused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) - r.trk.Progress[2].BecomeReplicate() - assert.False(t, r.trk.Progress[2].MsgAppProbesPaused) - r.trk.Progress[2].MsgAppProbesPaused = true + r.trk.Progress(2).BecomeReplicate() + assert.False(t, r.trk.Progress(2).MsgAppProbesPaused) + r.trk.Progress(2).MsgAppProbesPaused = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) } func TestProgressPaused(t *testing.T) { @@ -162,7 +162,7 @@ func TestProgressFlowControl(t *testing.T) { r.readMessages() // While node 2 is in probe state, propose a bunch of entries. - r.trk.Progress[2].BecomeProbe() + r.trk.Progress(2).BecomeProbe() blob := []byte(strings.Repeat("a", 1000)) large := []byte(strings.Repeat("b", 5000)) for i := 0; i < 22; i++ { @@ -238,8 +238,8 @@ func TestUncommittedEntryLimit(t *testing.T) { // Set the two followers to the replicate state. Commit to tail of log. const numFollowers = 2 - r.trk.Progress[2].BecomeReplicate() - r.trk.Progress[3].BecomeReplicate() + r.trk.Progress(2).BecomeReplicate() + r.trk.Progress(3).BecomeReplicate() r.uncommittedSize = 0 // Send proposals to r1. The first 5 entries should be appended to the log. @@ -649,7 +649,7 @@ func TestLearnerLogReplication(t *testing.T) { assert.Equal(t, nextCommitted, n1.raftLog.committed) assert.Equal(t, n1.raftLog.committed, n2.raftLog.committed) - match := n1.trk.Progress[2].Match + match := n1.trk.Progress(2).Match assert.Equal(t, n2.raftLog.committed, match) } @@ -1031,7 +1031,7 @@ func TestCommit(t *testing.T) { if id > 1 { sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}.AsV2()) } - pr := sm.trk.Progress[id] + pr := sm.trk.Progress(id) pr.Match, pr.Next = tt.matches[j], tt.matches[j]+1 } sm.maybeCommit() @@ -1927,7 +1927,7 @@ func TestLeaderAppResp(t *testing.T) { }, )) - p := sm.trk.Progress[2] + p := sm.trk.Progress(2) require.Equal(t, tt.wmatch, p.Match) require.Equal(t, tt.wnext, p.Next) @@ -1967,17 +1967,17 @@ func TestBcastBeat(t *testing.T) { sm.advanceMessagesAfterAppend() // slow follower - sm.trk.Progress[2].Match, sm.trk.Progress[2].Next = 5, 6 + sm.trk.Progress(2).Match, sm.trk.Progress(2).Next = 5, 6 // normal follower - sm.trk.Progress[3].Match, sm.trk.Progress[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm.trk.Progress(3).Match, sm.trk.Progress(3).Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() require.Len(t, msgs, 2) wantCommitMap := map[pb.PeerID]uint64{ - 2: min(sm.raftLog.committed, sm.trk.Progress[2].Match), - 3: min(sm.raftLog.committed, sm.trk.Progress[3].Match), + 2: min(sm.raftLog.committed, sm.trk.Progress(2).Match), + 3: min(sm.raftLog.committed, sm.trk.Progress(3).Match), } for i, m := range msgs { require.Equal(t, pb.MsgHeartbeat, m.Type, "#%d", i) @@ -2050,11 +2050,11 @@ func TestLeaderIncreaseNext(t *testing.T) { require.True(t, sm.raftLog.append(init)) sm.becomeCandidate() sm.becomeLeader() - sm.trk.Progress[2].State = tt.state - sm.trk.Progress[2].Next = tt.next + sm.trk.Progress(2).State = tt.state + sm.trk.Progress(2).Next = tt.next sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - p := sm.trk.Progress[2] + p := sm.trk.Progress(2) assert.Equal(t, tt.wnext, p.Next, "#%d", i) } } @@ -2064,7 +2064,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeProbe() + r.trk.Progress(2).BecomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2079,7 +2079,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { assert.Zero(t, msg[0].Index) } - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) r.maybeSendAppend(2) @@ -2090,7 +2090,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) // consume the heartbeat msg := r.readMessages() @@ -2103,7 +2103,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { msg := r.readMessages() assert.Len(t, msg, 1) assert.Zero(t, msg[0].Index) - assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) } func TestSendAppendForProgressReplicate(t *testing.T) { @@ -2111,7 +2111,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeReplicate() + r.trk.Progress(2).BecomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2126,7 +2126,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeSnapshot(10) + r.trk.Progress(2).BecomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2146,15 +2146,15 @@ func TestRecvMsgUnreachable(t *testing.T) { r.becomeLeader() r.readMessages() // set node 2 to state replicate - r.trk.Progress[2].Match = 3 - r.trk.Progress[2].BecomeReplicate() - r.trk.Progress[2].Next = 6 + r.trk.Progress(2).Match = 3 + r.trk.Progress(2).BecomeReplicate() + r.trk.Progress(2).Next = 6 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - assert.Equal(t, tracker.StateProbe, r.trk.Progress[2].State) - wnext := r.trk.Progress[2].Match + 1 - assert.Equal(t, wnext, r.trk.Progress[2].Next) + assert.Equal(t, tracker.StateProbe, r.trk.Progress(2).State) + wnext := r.trk.Progress(2).Match + 1 + assert.Equal(t, wnext, r.trk.Progress(2).Next) } func TestRestore(t *testing.T) { @@ -2205,10 +2205,10 @@ func TestRestoreWithLearner(t *testing.T) { assert.Len(t, lns, len(s.snap.Metadata.ConfState.Learners)) for _, n := range s.snap.Metadata.ConfState.Voters { - assert.False(t, sm.trk.Progress[n].IsLearner) + assert.False(t, sm.trk.Progress(n).IsLearner) } for _, n := range s.snap.Metadata.ConfState.Learners { - assert.True(t, sm.trk.Progress[n].IsLearner) + assert.True(t, sm.trk.Progress(n).IsLearner) } assert.False(t, sm.restore(s)) @@ -2368,8 +2368,8 @@ func TestProvideSnap(t *testing.T) { sm.becomeLeader() // force set the next of node 2, so that node 2 needs a snapshot - sm.trk.Progress[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress[2].Next - 1, Reject: true}) + sm.trk.Progress(2).Next = sm.raftLog.firstIndex() + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress(2).Next - 1, Reject: true}) msgs := sm.readMessages() require.Len(t, msgs, 1) @@ -2397,8 +2397,8 @@ func TestIgnoreProvidingSnap(t *testing.T) { // force set the next of node 2, so that node 2 needs a snapshot // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 - sm.trk.Progress[2].Next = sm.raftLog.firstIndex() - 1 - sm.trk.Progress[2].RecentActive = false + sm.trk.Progress(2).Next = sm.raftLog.firstIndex() - 1 + sm.trk.Progress(2).RecentActive = false sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -2442,7 +2442,7 @@ func TestSlowNodeRestore(t *testing.T) { // node 3 will only be considered as active when node 1 receives a reply from it. for { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if lead.trk.Progress[3].RecentActive { + if lead.trk.Progress(3).RecentActive { break } } @@ -2526,20 +2526,20 @@ func TestAddLearner(t *testing.T) { require.False(t, r.isLearner) nodes := r.trk.LearnerNodes() assert.Equal(t, []pb.PeerID{2}, nodes) - require.True(t, r.trk.Progress[2].IsLearner) + require.True(t, r.trk.Progress(2).IsLearner) // Promote peer to voter. r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) - require.False(t, r.trk.Progress[2].IsLearner) + require.False(t, r.trk.Progress(2).IsLearner) // Demote r. r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}.AsV2()) - require.True(t, r.trk.Progress[1].IsLearner) + require.True(t, r.trk.Progress(1).IsLearner) require.True(t, r.isLearner) // Promote r again. r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}.AsV2()) - require.False(t, r.trk.Progress[1].IsLearner) + require.False(t, r.trk.Progress(1).IsLearner) require.False(t, r.isLearner) } @@ -2819,7 +2819,7 @@ func TestLeaderTransferToSlowFollower(t *testing.T) { nt.recover() lead := nt.peers[1].(*raft) - require.Equal(t, uint64(1), lead.trk.Progress[3].Match) + require.Equal(t, uint64(1), lead.trk.Progress(3).Match) // Transfer leadership to 3 when node 3 is lack of log. nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) @@ -2840,7 +2840,7 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.storage[1].Compact(lead.raftLog.applied) nt.recover() - require.Equal(t, uint64(1), lead.trk.Progress[3].Match) + require.Equal(t, uint64(1), lead.trk.Progress(3).Match) filtered := pb.Message{} // Snapshot needs to be applied before sending MsgAppResp @@ -2932,7 +2932,7 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) require.Equal(t, ErrProposalDropped, err) - require.Equal(t, uint64(1), lead.trk.Progress[1].Match) + require.Equal(t, uint64(1), lead.trk.Progress(1).Match) } func TestLeaderTransferReceiveHigherTermVote(t *testing.T) { @@ -3655,7 +3655,7 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) { r1.becomeCandidate() r1.becomeLeader() r1.readMessages() - r1.trk.Progress[2].BecomeReplicate() + r1.trk.Progress(2).BecomeReplicate() r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2))) @@ -3683,17 +3683,17 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) { require.Equal(t, uint64(2), m.Index) r1.Step(m) m = expectOneMessage(t, r1) - require.Equal(t, uint64(2), r1.trk.Progress[2].Match) + require.Equal(t, uint64(2), r1.trk.Progress(2).Match) // r1 observes a transient network issue to r2, hence transits to probe state. r1.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - require.Equal(t, tracker.StateProbe, r1.trk.Progress[2].State) + require.Equal(t, tracker.StateProbe, r1.trk.Progress(2).State) // now r1 receives the delayed resp2. r1.Step(resp2) m = expectOneMessage(t, r1) // r1 shall re-send MsgApp from match index even if resp2's reject hint is less than matching index. - require.Equal(t, r1.trk.Progress[2].Match, m.Index) + require.Equal(t, r1.trk.Progress(2).Match, m.Index) } func expectOneMessage(t *testing.T, r *raft) pb.Message { @@ -3762,7 +3762,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw } else { v.config.Voters[0][peerAddrs[i]] = struct{}{} } - v.trk.Progress[peerAddrs[i]] = pr + v.trk.TestingSetProgress(peerAddrs[i], pr) } v.reset(v.Term) npeers[id] = v diff --git a/pkg/raft/rawnode.go b/pkg/raft/rawnode.go index c0694538b11e..c23ea94f5e55 100644 --- a/pkg/raft/rawnode.go +++ b/pkg/raft/rawnode.go @@ -109,7 +109,7 @@ func (rn *RawNode) Step(m pb.Message) error { if IsLocalMsg(m.Type) && !IsLocalMsgTarget(m.From) { return ErrStepLocalMsg } - if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && rn.raft.trk.Progress[m.From] == nil { + if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && rn.raft.trk.Progress(m.From) == nil { return ErrStepPeerNotFound } return rn.raft.Step(m) diff --git a/pkg/raft/rawnode_test.go b/pkg/raft/rawnode_test.go index 6e345eabcb48..beaccb36cf0b 100644 --- a/pkg/raft/rawnode_test.go +++ b/pkg/raft/rawnode_test.go @@ -692,7 +692,7 @@ func TestRawNodeStatus(t *testing.T) { status := rn.Status() require.Equal(t, pb.PeerID(1), status.Lead) require.Equal(t, StateLeader, status.RaftState) - require.Equal(t, *rn.raft.trk.Progress[1], status.Progress[1]) + require.Equal(t, *rn.raft.trk.Progress(1), status.Progress[1]) expCfg := quorum.Config{Voters: quorum.JointConfig{ quorum.MajorityConfig{1: {}}, diff --git a/pkg/raft/status.go b/pkg/raft/status.go index 6acd9be0f9f1..2c96d9bf5f7e 100644 --- a/pkg/raft/status.go +++ b/pkg/raft/status.go @@ -81,7 +81,7 @@ func withProgress(r *raft, visitor func(id pb.PeerID, typ ProgressType, pr track } func getProgressCopy(r *raft) map[pb.PeerID]tracker.Progress { - m := make(map[pb.PeerID]tracker.Progress, len(r.trk.Progress)) + m := make(map[pb.PeerID]tracker.Progress, r.trk.Len()) r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) { p := *pr p.Inflights = pr.Inflights.Clone() @@ -142,7 +142,7 @@ func getSparseStatus(r *raft) SparseStatus { var s SparseStatus s.BasicStatus = getBasicStatus(r) if s.RaftState == StateLeader { - s.Progress = make(map[pb.PeerID]tracker.Progress, len(r.trk.Progress)) + s.Progress = make(map[pb.PeerID]tracker.Progress, r.trk.Len()) withProgress(r, func(id pb.PeerID, _ ProgressType, pr tracker.Progress) { s.Progress[id] = pr }) diff --git a/pkg/raft/tracker/progresstracker.go b/pkg/raft/tracker/progresstracker.go index c4e2fa5aaf88..0011f1bed7f4 100644 --- a/pkg/raft/tracker/progresstracker.go +++ b/pkg/raft/tracker/progresstracker.go @@ -31,18 +31,40 @@ import ( type ProgressTracker struct { config *quorum.Config - Progress ProgressMap + progress ProgressMap } // MakeProgressTracker initializes a ProgressTracker. func MakeProgressTracker(config *quorum.Config, progressMap ProgressMap) ProgressTracker { p := ProgressTracker{ config: config, - Progress: progressMap, + progress: progressMap, } return p } +// Progress returns the progress associated with the supplied ID. +func (p *ProgressTracker) Progress(id pb.PeerID) *Progress { + return p.progress[id] +} + +// MoveProgressMap transfers ownership of the progress map to the caller. It +// shouldn't be used by the progress tracker after this. +func (p *ProgressTracker) MoveProgressMap() ProgressMap { + progress := p.progress + p.progress = nil + return progress +} + +// Len returns the length of the progress map. +func (p *ProgressTracker) Len() int { + return len(p.progress) +} + +func (p *ProgressTracker) TestingSetProgress(id pb.PeerID, progress *Progress) { + p.progress[id] = progress +} + type matchAckIndexer map[pb.PeerID]*Progress var _ quorum.AckedIndexer = matchAckIndexer(nil) @@ -59,12 +81,12 @@ func (l matchAckIndexer) AckedIndex(id pb.PeerID) (quorum.Index, bool) { // Committed returns the largest log index known to be committed based on what // the voting members of the group have acknowledged. func (p *ProgressTracker) Committed() uint64 { - return uint64(p.config.Voters.CommittedIndex(matchAckIndexer(p.Progress))) + return uint64(p.config.Voters.CommittedIndex(matchAckIndexer(p.progress))) } // Visit invokes the supplied closure for all tracked progresses in stable order. func (p *ProgressTracker) Visit(f func(id pb.PeerID, pr *Progress)) { - n := len(p.Progress) + n := len(p.progress) // We need to sort the IDs and don't want to allocate since this is hot code. // The optimization here mirrors that in `(MajorityConfig).CommittedIndex`, // see there for details. @@ -75,13 +97,13 @@ func (p *ProgressTracker) Visit(f func(id pb.PeerID, pr *Progress)) { } else { ids = make([]pb.PeerID, n) } - for id := range p.Progress { + for id := range p.progress { n-- ids[n] = id } slices.Sort(ids) for _, id := range ids { - f(id, p.Progress[id]) + f(id, p.progress[id]) } }