Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
127619: raft: make ProgressMap private r=nvanbenschoten a=arulajmani

The ProgressMap doesn't need to be public. This will be in line with other fields on the ProgressTracker and other trackers we'll introduce shortly. This patch makes the field private and gates access to it behind methods.

Informs cockroachdb#125265
Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Jul 29, 2024
2 parents cc46011 + c921401 commit d6381b7
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 109 deletions.
6 changes: 3 additions & 3 deletions pkg/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,13 @@ func (n *node) run() {
close(pm.result)
}
case m := <-n.recvc:
if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && r.trk.Progress[m.From] == nil {
if IsResponseMsg(m.Type) && !IsLocalMsgTarget(m.From) && r.trk.Progress(m.From) == nil {
// Filter out response message from unknown From.
break
}
r.Step(m)
case cc := <-n.confc:
_, okBefore := r.trk.Progress[r.id]
okBefore := r.trk.Progress(r.id) != nil
cs := r.applyConfChange(cc)
// If the node was removed, block incoming proposals. Note that we
// only do this if the node was in the config before. Nodes may be
Expand All @@ -394,7 +394,7 @@ func (n *node) run() {
// NB: propc is reset when the leader changes, which, if we learn
// about it, sort of implies that we got readded, maybe? This isn't
// very sound and likely has bugs.
if _, okAfter := r.trk.Progress[r.id]; okBefore && !okAfter {
if okAfter := r.trk.Progress(r.id) != nil; okBefore && !okAfter {
var found bool
for _, sl := range [][]pb.PeerID{cs.Voters, cs.VotersOutgoing} {
for _, id := range sl {
Expand Down
22 changes: 11 additions & 11 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func (r *raft) send(m pb.Message) {
// if the follower log and commit index are up-to-date, the flow is paused (for
// reasons like in-flight limits), or the message could not be constructed.
func (r *raft) maybeSendAppend(to pb.PeerID) bool {
pr := r.trk.Progress[to]
pr := r.trk.Progress(to)

last, commit := r.raftLog.lastIndex(), r.raftLog.committed
if !pr.ShouldSendMsgApp(last, commit) {
Expand Down Expand Up @@ -652,7 +652,7 @@ func (r *raft) maybeSendSnapshot(to pb.PeerID, pr *tracker.Progress) bool {

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to pb.PeerID) {
pr := r.trk.Progress[to]
pr := r.trk.Progress(to)
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
Expand Down Expand Up @@ -805,7 +805,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
// On appending entries, the leader is effectively "sending" a MsgApp to its
// local "acceptor". Since we don't actually send this self-MsgApp, update the
// progress here as if it was sent.
r.trk.Progress[r.id].Next = app.lastIndex() + 1
r.trk.Progress(r.id).Next = app.lastIndex() + 1
// The leader needs to self-ack the entries just appended once they have
// been durably persisted (since it doesn't send an MsgApp to itself). This
// response message will be added to msgsAfterAppend and delivered back to
Expand Down Expand Up @@ -923,7 +923,7 @@ func (r *raft) becomeLeader() {
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
pr := r.trk.Progress[r.id]
pr := r.trk.Progress(r.id)
pr.BecomeReplicate()
// The leader always has RecentActive == true; MsgCheckQuorum makes sure to
// preserve this.
Expand Down Expand Up @@ -1271,7 +1271,7 @@ func stepLeader(r *raft, m pb.Message) error {
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if r.trk.Progress[r.id] == nil {
if r.trk.Progress(r.id) == nil {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
Expand Down Expand Up @@ -1342,7 +1342,7 @@ func stepLeader(r *raft, m pb.Message) error {
}

// All other message types require a progress for m.From (pr).
pr := r.trk.Progress[m.From]
pr := r.trk.Progress(m.From)
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
Expand Down Expand Up @@ -1934,15 +1934,15 @@ func (r *raft) restore(s snapshot) bool {
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
pr := r.trk.Progress[r.id]
pr := r.trk.Progress(r.id)
return pr != nil && !pr.IsLearner && !r.raftLog.hasNextOrInProgressSnapshot()
}

func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
cfg, progressMap, err := func() (quorum.Config, tracker.ProgressMap, error) {
changer := confchange.Changer{
Config: r.config,
ProgressMap: r.trk.Progress,
ProgressMap: r.trk.MoveProgressMap(),
MaxInflight: r.maxInflight,
MaxInflightBytes: r.maxInflightBytes,
LastIndex: r.raftLog.lastIndex(),
Expand Down Expand Up @@ -1975,13 +1975,13 @@ func (r *raft) switchToConfig(cfg quorum.Config, progressMap tracker.ProgressMap

r.logger.Infof("%x switched to configuration %s", r.id, r.config)
cs := r.config.ConfState()
pr, ok := r.trk.Progress[r.id]
pr := r.trk.Progress(r.id)

// Update whether the node itself is a learner, resetting to false when the
// node is removed.
r.isLearner = ok && pr.IsLearner
r.isLearner = pr != nil && pr.IsLearner

if (!ok || r.isLearner) && r.state == StateLeader {
if (pr == nil || r.isLearner) && r.state == StateLeader {
// This node is leader and was removed or demoted, step down if requested.
//
// We prevent demotions at the time writing but hypothetically we handle
Expand Down
6 changes: 3 additions & 3 deletions pkg/raft/raft_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

pr2 := r.trk.Progress[2]
pr2 := r.trk.Progress(2)
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

pr2 := r.trk.Progress[2]
pr2 := r.trk.Progress(2)
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

pr2 := r.trk.Progress[2]
pr2 := r.trk.Progress(2)
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
Expand Down
56 changes: 28 additions & 28 deletions pkg/raft/raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {

// force set the next of node 2, so that
// node 2 needs a snapshot
sm.trk.Progress[2].Next = sm.raftLog.firstIndex()
sm.trk.Progress(2).Next = sm.raftLog.firstIndex()

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress[2].Next - 1, Reject: true})
if sm.trk.Progress[2].PendingSnapshot != 11 {
t.Fatalf("PendingSnapshot = %d, want 11", sm.trk.Progress[2].PendingSnapshot)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress(2).Next - 1, Reject: true})
if sm.trk.Progress(2).PendingSnapshot != 11 {
t.Fatalf("PendingSnapshot = %d, want 11", sm.trk.Progress(2).PendingSnapshot)
}
}

Expand All @@ -62,7 +62,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()

sm.trk.Progress[2].BecomeSnapshot(11)
sm.trk.Progress(2).BecomeSnapshot(11)

sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
msgs := sm.readMessages()
Expand All @@ -80,18 +80,18 @@ func TestSnapshotFailure(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
sm.trk.Progress(2).Next = 1
sm.trk.Progress(2).BecomeSnapshot(11)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
if sm.trk.Progress[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress[2].PendingSnapshot)
if sm.trk.Progress(2).PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress(2).PendingSnapshot)
}
if sm.trk.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
if sm.trk.Progress(2).Next != 1 {
t.Fatalf("Next = %d, want 1", sm.trk.Progress(2).Next)
}
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
if !sm.trk.Progress(2).MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress(2).MsgAppProbesPaused)
}
}

Expand All @@ -104,18 +104,18 @@ func TestSnapshotSucceed(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
sm.trk.Progress(2).Next = 1
sm.trk.Progress(2).BecomeSnapshot(11)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
if sm.trk.Progress[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress[2].PendingSnapshot)
if sm.trk.Progress(2).PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress(2).PendingSnapshot)
}
if sm.trk.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
if sm.trk.Progress(2).Next != 12 {
t.Fatalf("Next = %d, want 12", sm.trk.Progress(2).Next)
}
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
if !sm.trk.Progress(2).MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress(2).MsgAppProbesPaused)
}
}

Expand All @@ -128,23 +128,23 @@ func TestSnapshotAbort(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
sm.trk.Progress(2).Next = 1
sm.trk.Progress(2).BecomeSnapshot(11)

// A successful msgAppResp that has a higher/equal index than the
// pending snapshot should abort the pending snapshot.
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
if sm.trk.Progress[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress[2].PendingSnapshot)
if sm.trk.Progress(2).PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.trk.Progress(2).PendingSnapshot)
}
// The follower entered StateReplicate and the leader send an append
// and optimistically updated the progress (so we see 13 instead of 12).
// There is something to append because the leader appended an empty entry
// to the log at index 12 when it assumed leadership.
if sm.trk.Progress[2].Next != 13 {
t.Fatalf("Next = %d, want 13", sm.trk.Progress[2].Next)
if sm.trk.Progress(2).Next != 13 {
t.Fatalf("Next = %d, want 13", sm.trk.Progress(2).Next)
}
if n := sm.trk.Progress[2].Inflights.Count(); n != 1 {
if n := sm.trk.Progress(2).Inflights.Count(); n != 1 {
t.Fatalf("expected an inflight message, got %d", n)
}
}
Loading

0 comments on commit d6381b7

Please sign in to comment.