From dcc85aa2db19991541427e31aff05a2eb61d9a1d Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 3 Jun 2019 13:16:28 +0200 Subject: [PATCH] raft: use membership sets in progress tracking Instead of having disjoint mappings of ID to *Progress for voters and learners, use a map[id]struct{} for each and share a map of *Progress among them. This is easier to handle when joint quorums are introduced, at which point a node may be a voting member of two quorums. --- raft/progress.go | 50 +++++++------- raft/raft_flow_control_test.go | 6 +- raft/raft_snap_test.go | 58 ++++++++--------- raft/raft_test.go | 116 +++++++++++++++++---------------- 4 files changed, 115 insertions(+), 115 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index fa4d63edfba..fc4afb2bb0a 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -291,8 +291,9 @@ func (in *inflights) reset() { // known about the nodes and learners in it. In particular, it tracks the match // index for each peer which in turn allows reasoning about the committed index. type progressTracker struct { - nodes map[uint64]*Progress - learners map[uint64]*Progress + nodes map[uint64]struct{} + learners map[uint64]struct{} + prs map[uint64]*Progress votes map[uint64]bool @@ -303,8 +304,9 @@ type progressTracker struct { func makePRS(maxInflight int) progressTracker { p := progressTracker{ maxInflight: maxInflight, - nodes: map[uint64]*Progress{}, - learners: map[uint64]*Progress{}, + prs: map[uint64]*Progress{}, + nodes: map[uint64]struct{}{}, + learners: map[uint64]struct{}{}, votes: map[uint64]bool{}, } return p @@ -334,8 +336,8 @@ func (p *progressTracker) committed() uint64 { } p.matchBuf = p.matchBuf[:len(p.nodes)] idx := 0 - for _, pr := range p.nodes { - p.matchBuf[idx] = pr.Match + for id := range p.nodes { + p.matchBuf[idx] = p.prs[id].Match idx++ } sort.Sort(&p.matchBuf) @@ -343,50 +345,44 @@ func (p *progressTracker) committed() uint64 { } func (p *progressTracker) removeAny(id uint64) { - pN := p.nodes[id] - pL := p.learners[id] + _, okPR := p.prs[id] + _, okV := p.nodes[id] + _, okL := p.learners[id] - if pN == nil && pL == nil { + if !okPR { + panic("attempting to remove unknown peer %x") + } else if !okV && !okL { panic("attempting to remove unknown peer %x") - } else if pN != nil && pL != nil { + } else if okV && okL { panic(fmt.Sprintf("peer %x is both voter and learner", id)) } delete(p.nodes, id) delete(p.learners, id) + delete(p.prs, id) } // initProgress initializes a new progress for the given node or learner. The // node may not exist yet in either form or a panic will ensue. func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) { - if pr := p.nodes[id]; pr != nil { + if pr := p.prs[id]; pr != nil { panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) } - if pr := p.learners[id]; pr != nil { - panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr)) - } if !isLearner { - p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)} - return + p.nodes[id] = struct{}{} + } else { + p.learners[id] = struct{}{} } - p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true} + p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner} } func (p *progressTracker) getProgress(id uint64) *Progress { - if pr, ok := p.nodes[id]; ok { - return pr - } - - return p.learners[id] + return p.prs[id] } // visit invokes the supplied closure for all tracked progresses. func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { - for id, pr := range p.nodes { - f(id, pr) - } - - for id, pr := range p.learners { + for id, pr := range p.prs { f(id, pr) } } diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 699bb5b0780..033e336921c 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -29,7 +29,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.nodes[2] + pr2 := r.prs.prs[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window @@ -65,7 +65,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.nodes[2] + pr2 := r.prs.prs[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window @@ -110,7 +110,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.nodes[2] + pr2 := r.prs.prs[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 145473824c6..246ed07e207 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -40,11 +40,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { // force set the next of node 2, so that // node 2 needs a snapshot - sm.prs.nodes[2].Next = sm.raftLog.firstIndex() + sm.prs.prs[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) - if sm.prs.nodes[2].PendingSnapshot != 11 { - t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true}) + if sm.prs.prs[2].PendingSnapshot != 11 { + t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.prs[2].PendingSnapshot) } } @@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].becomeSnapshot(11) sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) msgs := sm.readMessages() @@ -73,18 +73,18 @@ func TestSnapshotFailure(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].Next = 1 - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].Next = 1 + sm.prs.prs[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) - if sm.prs.nodes[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) + if sm.prs.prs[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) } - if sm.prs.nodes[2].Next != 1 { - t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next) + if sm.prs.prs[2].Next != 1 { + t.Fatalf("Next = %d, want 1", sm.prs.prs[2].Next) } - if !sm.prs.nodes[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) + if !sm.prs.prs[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused) } } @@ -96,18 +96,18 @@ func TestSnapshotSucceed(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].Next = 1 - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].Next = 1 + sm.prs.prs[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false}) - if sm.prs.nodes[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) + if sm.prs.prs[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) } - if sm.prs.nodes[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next) + if sm.prs.prs[2].Next != 12 { + t.Fatalf("Next = %d, want 12", sm.prs.prs[2].Next) } - if !sm.prs.nodes[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) + if !sm.prs.prs[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused) } } @@ -206,7 +206,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { mustSend(n2, n1, pb.MsgAppResp) // Leader has correct state for follower. - pr := n1.prs.nodes[2] + pr := n1.prs.prs[2] if pr.State != ProgressStateReplicate { t.Fatalf("unexpected state %v", pr) } @@ -227,23 +227,23 @@ func TestSnapshotAbort(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].Next = 1 - sm.prs.nodes[2].becomeSnapshot(11) + sm.prs.prs[2].Next = 1 + sm.prs.prs[2].becomeSnapshot(11) // A successful msgAppResp that has a higher/equal index than the // pending snapshot should abort the pending snapshot. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11}) - if sm.prs.nodes[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) + if sm.prs.prs[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) } // The follower entered ProgressStateReplicate and the leader send an append // and optimistically updated the progress (so we see 13 instead of 12). // There is something to append because the leader appended an empty entry // to the log at index 12 when it assumed leadership. - if sm.prs.nodes[2].Next != 13 { - t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next) + if sm.prs.prs[2].Next != 13 { + t.Fatalf("Next = %d, want 13", sm.prs.prs[2].Next) } - if n := sm.prs.nodes[2].ins.count; n != 1 { + if n := sm.prs.prs[2].ins.count; n != 1 { t.Fatalf("expected an inflight message, got %d", n) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index 13a59f72800..ab1037bea54 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -271,12 +271,12 @@ func TestProgressLeader(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() - r.prs.nodes[2].becomeReplicate() + r.prs.prs[2].becomeReplicate() // Send proposals to r1. The first 5 entries should be appended to the log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} for i := 0; i < 5; i++ { - if pr := r.prs.nodes[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { + if pr := r.prs.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { t.Errorf("unexpected progress %v", pr) } if err := r.Step(propMsg); err != nil { @@ -291,17 +291,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.prs.nodes[2].Paused = true + r.prs.prs[2].Paused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } - r.prs.nodes[2].becomeReplicate() + r.prs.prs[2].becomeReplicate() r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want false", r.prs.nodes[2].Paused) + if r.prs.prs[2].Paused { + t.Errorf("paused = %v, want false", r.prs.prs[2].Paused) } } @@ -331,7 +331,7 @@ func TestProgressFlowControl(t *testing.T) { r.readMessages() // While node 2 is in probe state, propose a bunch of entries. - r.prs.nodes[2].becomeProbe() + r.prs.prs[2].becomeProbe() blob := []byte(strings.Repeat("a", 1000)) for i := 0; i < 10; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) @@ -409,8 +409,8 @@ func TestUncommittedEntryLimit(t *testing.T) { // Set the two followers to the replicate state. Commit to tail of log. const numFollowers = 2 - r.prs.nodes[2].becomeReplicate() - r.prs.nodes[3].becomeReplicate() + r.prs.prs[2].becomeReplicate() + r.prs.prs[3].becomeReplicate() r.uncommittedSize = 0 // Send proposals to r1. The first 5 entries should be appended to the log. @@ -2632,7 +2632,7 @@ func TestLeaderAppResp(t *testing.T) { sm.readMessages() sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) - p := sm.prs.nodes[2] + p := sm.prs.prs[2] if p.Match != tt.wmatch { t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) } @@ -2679,9 +2679,9 @@ func TestBcastBeat(t *testing.T) { mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) } // slow follower - sm.prs.nodes[2].Match, sm.prs.nodes[2].Next = 5, 6 + sm.prs.prs[2].Match, sm.prs.prs[2].Next = 5, 6 // normal follower - sm.prs.nodes[3].Match, sm.prs.nodes[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm.prs.prs[3].Match, sm.prs.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() @@ -2689,8 +2689,8 @@ func TestBcastBeat(t *testing.T) { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } wantCommitMap := map[uint64]uint64{ - 2: min(sm.raftLog.committed, sm.prs.nodes[2].Match), - 3: min(sm.raftLog.committed, sm.prs.nodes[3].Match), + 2: min(sm.raftLog.committed, sm.prs.prs[2].Match), + 3: min(sm.raftLog.committed, sm.prs.prs[3].Match), } for i, m := range msgs { if m.Type != pb.MsgHeartbeat { @@ -2776,11 +2776,11 @@ func TestLeaderIncreaseNext(t *testing.T) { sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() - sm.prs.nodes[2].State = tt.state - sm.prs.nodes[2].Next = tt.next + sm.prs.prs[2].State = tt.state + sm.prs.prs[2].Next = tt.next sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - p := sm.prs.nodes[2] + p := sm.prs.prs[2] if p.Next != tt.wnext { t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) } @@ -2792,7 +2792,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.nodes[2].becomeProbe() + r.prs.prs[2].becomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2811,8 +2811,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2826,8 +2826,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } // consume the heartbeat @@ -2849,8 +2849,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.prs.nodes[2].Paused { - t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) + if !r.prs.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) } } @@ -2859,7 +2859,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.nodes[2].becomeReplicate() + r.prs.prs[2].becomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2876,7 +2876,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.nodes[2].becomeSnapshot(10) + r.prs.prs[2].becomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2897,17 +2897,17 @@ func TestRecvMsgUnreachable(t *testing.T) { r.becomeLeader() r.readMessages() // set node 2 to state replicate - r.prs.nodes[2].Match = 3 - r.prs.nodes[2].becomeReplicate() - r.prs.nodes[2].optimisticUpdate(5) + r.prs.prs[2].Match = 3 + r.prs.prs[2].becomeReplicate() + r.prs.prs[2].optimisticUpdate(5) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - if r.prs.nodes[2].State != ProgressStateProbe { - t.Errorf("state = %s, want %s", r.prs.nodes[2].State, ProgressStateProbe) + if r.prs.prs[2].State != ProgressStateProbe { + t.Errorf("state = %s, want %s", r.prs.prs[2].State, ProgressStateProbe) } - if wnext := r.prs.nodes[2].Match + 1; r.prs.nodes[2].Next != wnext { - t.Errorf("next = %d, want %d", r.prs.nodes[2].Next, wnext) + if wnext := r.prs.prs[2].Match + 1; r.prs.prs[2].Next != wnext { + t.Errorf("next = %d, want %d", r.prs.prs[2].Next, wnext) } } @@ -2973,13 +2973,13 @@ func TestRestoreWithLearner(t *testing.T) { t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) } for _, n := range s.Metadata.ConfState.Nodes { - if sm.prs.nodes[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], false) + if sm.prs.prs[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], false) } } for _, n := range s.Metadata.ConfState.Learners { - if !sm.prs.learners[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], true) + if !sm.prs.prs[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], true) } } @@ -3121,8 +3121,8 @@ func TestProvideSnap(t *testing.T) { sm.becomeLeader() // force set the next of node 2, so that node 2 needs a snapshot - sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) + sm.prs.prs[2].Next = sm.raftLog.firstIndex() + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true}) msgs := sm.readMessages() if len(msgs) != 1 { @@ -3152,8 +3152,8 @@ func TestIgnoreProvidingSnap(t *testing.T) { // force set the next of node 2, so that node 2 needs a snapshot // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 - sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - 1 - sm.prs.nodes[2].RecentActive = false + sm.prs.prs[2].Next = sm.raftLog.firstIndex() - 1 + sm.prs.prs[2].RecentActive = false sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -3201,7 +3201,7 @@ func TestSlowNodeRestore(t *testing.T) { // node 3 will only be considered as active when node 1 receives a reply from it. for { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if lead.prs.nodes[3].RecentActive { + if lead.prs.prs[3].RecentActive { break } } @@ -3304,8 +3304,8 @@ func TestAddLearner(t *testing.T) { if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) } - if !r.prs.learners[2].IsLearner { - t.Errorf("node 2 is learner %t, want %t", r.prs.nodes[2].IsLearner, true) + if !r.prs.prs[2].IsLearner { + t.Errorf("node 2 is learner %t, want %t", r.prs.prs[2].IsLearner, true) } } @@ -3619,8 +3619,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) { nt.recover() lead := nt.peers[1].(*raft) - if lead.prs.nodes[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) + if lead.prs.prs[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of log. @@ -3642,8 +3642,8 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.storage[1].Compact(lead.raftLog.applied) nt.recover() - if lead.prs.nodes[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) + if lead.prs.prs[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of snapshot. @@ -3722,8 +3722,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { t.Fatalf("should return drop proposal error while transferring") } - if lead.prs.nodes[1].Match != 1 { - t.Fatalf("node 1 has match %x, want %x", lead.prs.nodes[1].Match, 1) + if lead.prs.prs[1].Match != 1 { + t.Fatalf("node 1 has match %x, want %x", lead.prs.prs[1].Match, 1) } } @@ -4300,14 +4300,18 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs.nodes = make(map[uint64]*Progress) - v.prs.learners = make(map[uint64]*Progress) + v.prs.nodes = make(map[uint64]struct{}) + v.prs.learners = make(map[uint64]struct{}) + v.prs.prs = make(map[uint64]*Progress) for i := 0; i < size; i++ { + pr := &Progress{} if _, ok := learners[peerAddrs[i]]; ok { - v.prs.learners[peerAddrs[i]] = &Progress{IsLearner: true} + pr.IsLearner = true + v.prs.learners[peerAddrs[i]] = struct{}{} } else { - v.prs.nodes[peerAddrs[i]] = &Progress{} + v.prs.nodes[peerAddrs[i]] = struct{}{} } + v.prs.prs[peerAddrs[i]] = pr } v.reset(v.Term) npeers[id] = v