diff --git a/raft/node.go b/raft/node.go index 19ba37600fa..5b2600fde26 100644 --- a/raft/node.go +++ b/raft/node.go @@ -353,15 +353,15 @@ func (n *node) run(r *raft) { } case m := <-n.recvc: // filter out response message from unknown From. - if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { + if pr := r.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { r.Step(m) } case cc := <-n.confc: if cc.NodeID == None { select { case n.confstatec <- pb.ConfState{ - Nodes: r.nodes(), - Learners: r.learnerNodes()}: + Nodes: r.prs.voterNodes(), + Learners: r.prs.learnerNodes()}: case <-n.done: } break @@ -384,8 +384,8 @@ func (n *node) run(r *raft) { } select { case n.confstatec <- pb.ConfState{ - Nodes: r.nodes(), - Learners: r.learnerNodes()}: + Nodes: r.prs.voterNodes(), + Learners: r.prs.learnerNodes()}: case <-n.done: } case <-n.tickc: diff --git a/raft/progress.go b/raft/progress.go index 54e0b21b01a..fa4d63edfba 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -14,7 +14,10 @@ package raft -import "fmt" +import ( + "fmt" + "sort" +) const ( ProgressStateProbe ProgressStateType = iota @@ -283,3 +286,175 @@ func (in *inflights) reset() { in.count = 0 in.start = 0 } + +// progressTracker tracks the currently active configuration and the information +// known about the nodes and learners in it. In particular, it tracks the match +// index for each peer which in turn allows reasoning about the committed index. +type progressTracker struct { + nodes map[uint64]*Progress + learners map[uint64]*Progress + + votes map[uint64]bool + + maxInflight int + matchBuf uint64Slice +} + +func makePRS(maxInflight int) progressTracker { + p := progressTracker{ + maxInflight: maxInflight, + nodes: map[uint64]*Progress{}, + learners: map[uint64]*Progress{}, + votes: map[uint64]bool{}, + } + return p +} + +// isSingleton returns true if (and only if) there is only one voting member +// (i.e. the leader) in the current configuration. +func (p *progressTracker) isSingleton() bool { + return len(p.nodes) == 1 +} + +func (p *progressTracker) quorum() int { + return len(p.nodes)/2 + 1 +} + +func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool { + return len(m) >= p.quorum() +} + +// committed returns the largest log index known to be committed based on what +// the voting members of the group have acknowledged. +func (p *progressTracker) committed() uint64 { + // Preserving matchBuf across calls is an optimization + // used to avoid allocating a new slice on each call. + if cap(p.matchBuf) < len(p.nodes) { + p.matchBuf = make(uint64Slice, len(p.nodes)) + } + p.matchBuf = p.matchBuf[:len(p.nodes)] + idx := 0 + for _, pr := range p.nodes { + p.matchBuf[idx] = pr.Match + idx++ + } + sort.Sort(&p.matchBuf) + return p.matchBuf[len(p.matchBuf)-p.quorum()] +} + +func (p *progressTracker) removeAny(id uint64) { + pN := p.nodes[id] + pL := p.learners[id] + + if pN == nil && pL == nil { + panic("attempting to remove unknown peer %x") + } else if pN != nil && pL != nil { + panic(fmt.Sprintf("peer %x is both voter and learner", id)) + } + + delete(p.nodes, id) + delete(p.learners, id) +} + +// initProgress initializes a new progress for the given node or learner. The +// node may not exist yet in either form or a panic will ensue. +func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) { + if pr := p.nodes[id]; pr != nil { + panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) + } + if pr := p.learners[id]; pr != nil { + panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr)) + } + if !isLearner { + p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)} + return + } + p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true} +} + +func (p *progressTracker) getProgress(id uint64) *Progress { + if pr, ok := p.nodes[id]; ok { + return pr + } + + return p.learners[id] +} + +// visit invokes the supplied closure for all tracked progresses. +func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { + for id, pr := range p.nodes { + f(id, pr) + } + + for id, pr := range p.learners { + f(id, pr) + } +} + +// checkQuorumActive returns true if the quorum is active from +// the view of the local raft state machine. Otherwise, it returns +// false. +func (p *progressTracker) quorumActive() bool { + var act int + p.visit(func(id uint64, pr *Progress) { + if pr.RecentActive && !pr.IsLearner { + act++ + } + }) + + return act >= p.quorum() +} + +func (p *progressTracker) voterNodes() []uint64 { + nodes := make([]uint64, 0, len(p.nodes)) + for id := range p.nodes { + nodes = append(nodes, id) + } + sort.Sort(uint64Slice(nodes)) + return nodes +} + +func (p *progressTracker) learnerNodes() []uint64 { + nodes := make([]uint64, 0, len(p.learners)) + for id := range p.learners { + nodes = append(nodes, id) + } + sort.Sort(uint64Slice(nodes)) + return nodes +} + +// resetVotes prepares for a new round of vote counting via recordVote. +func (p *progressTracker) resetVotes() { + p.votes = map[uint64]bool{} +} + +// recordVote records that the node with the given id voted for this Raft +// instance if v == true (and declined it otherwise). +func (p *progressTracker) recordVote(id uint64, v bool) { + _, ok := p.votes[id] + if !ok { + p.votes[id] = v + } +} + +// tallyVotes returns the number of granted and rejected votes, and whether the +// election outcome is known. +func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) { + for _, v := range p.votes { + if v { + granted++ + } else { + rejected++ + } + } + + q := p.quorum() + + result = electionIndeterminate + if granted >= q { + result = electionWon + } else if rejected >= q { + result = electionLost + } + return granted, rejected, result +} diff --git a/raft/raft.go b/raft/raft.go index 76f95f3cf39..24a6c01784b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -20,7 +20,6 @@ import ( "fmt" "math" "math/rand" - "sort" "strings" "sync" "time" @@ -261,18 +260,13 @@ type raft struct { maxMsgSize uint64 maxUncommittedSize uint64 - maxInflight int - prs map[uint64]*Progress - learnerPrs map[uint64]*Progress - matchBuf uint64Slice + prs progressTracker state StateType // isLearner is true if the local raft node is a learner. isLearner bool - votes map[uint64]bool - msgs []pb.Message // the leader id @@ -348,10 +342,8 @@ func newRaft(c *Config) *raft { isLearner: false, raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, - maxInflight: c.MaxInflightMsgs, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: make(map[uint64]*Progress), - learnerPrs: make(map[uint64]*Progress), + prs: makePRS(c.MaxInflightMsgs), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -361,13 +353,13 @@ func newRaft(c *Config) *raft { disableProposalForwarding: c.DisableProposalForwarding, } for _, p := range peers { - r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} + // Add node to active config. + r.prs.initProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */) } for _, p := range learners { - if _, ok := r.prs[p]; ok { - panic(fmt.Sprintf("node %x is in both learner and peer list", p)) - } - r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true} + // Add learner to active config. + r.prs.initProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */) + if r.id == p { r.isLearner = true } @@ -382,7 +374,7 @@ func newRaft(c *Config) *raft { r.becomeFollower(r.Term, None) var nodesStrs []string - for _, n := range r.nodes() { + for _, n := range r.prs.voterNodes() { nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) } @@ -403,26 +395,6 @@ func (r *raft) hardState() pb.HardState { } } -func (r *raft) quorum() int { return len(r.prs)/2 + 1 } - -func (r *raft) nodes() []uint64 { - nodes := make([]uint64, 0, len(r.prs)) - for id := range r.prs { - nodes = append(nodes, id) - } - sort.Sort(uint64Slice(nodes)) - return nodes -} - -func (r *raft) learnerNodes() []uint64 { - nodes := make([]uint64, 0, len(r.learnerPrs)) - for id := range r.learnerPrs { - nodes = append(nodes, id) - } - sort.Sort(uint64Slice(nodes)) - return nodes -} - // send persists state to stable storage and then sends to its mailbox. func (r *raft) send(m pb.Message) { m.From = r.id @@ -457,14 +429,6 @@ func (r *raft) send(m pb.Message) { r.msgs = append(r.msgs, m) } -func (r *raft) getProgress(id uint64) *Progress { - if pr, ok := r.prs[id]; ok { - return pr - } - - return r.learnerPrs[id] -} - // sendAppend sends an append RPC with new entries (if any) and the // current commit index to the given peer. func (r *raft) sendAppend(to uint64) { @@ -477,7 +441,7 @@ func (r *raft) sendAppend(to uint64) { // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { - pr := r.getProgress(to) + pr := r.prs.getProgress(to) if pr.IsPaused() { return false } @@ -546,7 +510,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. - commit := min(r.getProgress(to).Match, r.raftLog.committed) + commit := min(r.prs.getProgress(to).Match, r.raftLog.committed) m := pb.Message{ To: to, Type: pb.MsgHeartbeat, @@ -557,20 +521,10 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { r.send(m) } -func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) { - for id, pr := range r.prs { - f(id, pr) - } - - for id, pr := range r.learnerPrs { - f(id, pr) - } -} - // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.prs. func (r *raft) bcastAppend() { - r.forEachProgress(func(id uint64, _ *Progress) { + r.prs.visit(func(id uint64, _ *Progress) { if id == r.id { return } @@ -590,7 +544,7 @@ func (r *raft) bcastHeartbeat() { } func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { - r.forEachProgress(func(id uint64, _ *Progress) { + r.prs.visit(func(id uint64, _ *Progress) { if id == r.id { return } @@ -602,19 +556,7 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { // the commit index changed (in which case the caller should call // r.bcastAppend). func (r *raft) maybeCommit() bool { - // Preserving matchBuf across calls is an optimization - // used to avoid allocating a new slice on each call. - if cap(r.matchBuf) < len(r.prs) { - r.matchBuf = make(uint64Slice, len(r.prs)) - } - r.matchBuf = r.matchBuf[:len(r.prs)] - idx := 0 - for _, p := range r.prs { - r.matchBuf[idx] = p.Match - idx++ - } - sort.Sort(&r.matchBuf) - mci := r.matchBuf[len(r.matchBuf)-r.quorum()] + mci := r.prs.committed() return r.raftLog.maybeCommit(mci, r.Term) } @@ -631,9 +573,14 @@ func (r *raft) reset(term uint64) { r.abortLeaderTransfer() - r.votes = make(map[uint64]bool) - r.forEachProgress(func(id uint64, pr *Progress) { - *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner} + r.prs.resetVotes() + r.prs.visit(func(id uint64, pr *Progress) { + *pr = Progress{ + Match: 0, + Next: r.raftLog.lastIndex() + 1, + ins: newInflights(r.prs.maxInflight), + IsLearner: pr.IsLearner, + } if id == r.id { pr.Match = r.raftLog.lastIndex() } @@ -661,7 +608,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { } // use latest "last" index after truncate/append li = r.raftLog.append(es...) - r.getProgress(r.id).maybeUpdate(li) + r.prs.getProgress(r.id).maybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() return true @@ -734,7 +681,7 @@ func (r *raft) becomePreCandidate() { // but doesn't change anything else. In particular it does not increase // r.Term or change r.Vote. r.step = stepCandidate - r.votes = make(map[uint64]bool) + r.prs.resetVotes() r.tick = r.tickElection r.lead = None r.state = StatePreCandidate @@ -755,7 +702,7 @@ func (r *raft) becomeLeader() { // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this // progress with the last index already. - r.prs[r.id].becomeReplicate() + r.prs.getProgress(r.id).becomeReplicate() // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's @@ -790,7 +737,7 @@ func (r *raft) campaign(t CampaignType) { voteMsg = pb.MsgVote term = r.Term } - if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) { + if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state. if t == campaignPreElection { @@ -800,7 +747,7 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs { + for id := range r.prs.nodes { if id == r.id { continue } @@ -815,21 +762,22 @@ func (r *raft) campaign(t CampaignType) { } } -func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) { +type electionResult byte + +const ( + electionIndeterminate electionResult = iota + electionLost + electionWon +) + +func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) { if v { r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) } else { r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term) } - if _, ok := r.votes[id]; !ok { - r.votes[id] = v - } - for _, vv := range r.votes { - if vv { - granted++ - } - } - return granted + r.prs.recordVote(id, v) + return r.prs.tallyVotes() } func (r *raft) Step(m pb.Message) error { @@ -985,16 +933,32 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastHeartbeat() return nil case pb.MsgCheckQuorum: - if !r.checkQuorumActive() { + // The leader should always see itself as active. As a precaution, handle + // the case in which the leader isn't in the configuration any more (for + // example if it just removed itself). + // + // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the + // leader steps down when removing itself. I might be missing something. + if pr := r.prs.getProgress(r.id); pr != nil { + pr.RecentActive = true + } + if !r.prs.quorumActive() { r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) } + // Mark everyone (but ourselves) as inactive in preparation for the next + // CheckQuorum. + r.prs.visit(func(id uint64, pr *Progress) { + if id != r.id { + pr.RecentActive = false + } + }) return nil case pb.MsgProp: if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) } - if _, ok := r.prs[r.id]; !ok { + if r.prs.getProgress(r.id) == nil { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. @@ -1024,7 +988,7 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastAppend() return nil case pb.MsgReadIndex: - if r.quorum() > 1 { + if !r.prs.isSingleton() { // more than one voting member in cluster if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. return nil @@ -1036,6 +1000,8 @@ func stepLeader(r *raft, m pb.Message) error { switch r.readOnly.option { case ReadOnlySafe: r.readOnly.addRequest(r.raftLog.committed, m) + // The local node automatically acks the request. + r.readOnly.recvAck(r.id, m.Entries[0].Data) r.bcastHeartbeatWithCtx(m.Entries[0].Data) case ReadOnlyLeaseBased: ri := r.raftLog.committed @@ -1045,7 +1011,7 @@ func stepLeader(r *raft, m pb.Message) error { r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) } } - } else { // there is only one voting member (the leader) in the cluster + } else { // only one voting member (the leader) in the cluster if m.From == None || m.From == r.id { // from leader itself r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } else { // from learner member @@ -1057,7 +1023,7 @@ func stepLeader(r *raft, m pb.Message) error { } // All other message types require a progress for m.From (pr). - pr := r.getProgress(m.From) + pr := r.prs.getProgress(m.From) if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) return nil @@ -1133,8 +1099,7 @@ func stepLeader(r *raft, m pb.Message) error { return nil } - ackCount := r.readOnly.recvAck(m) - if ackCount < r.quorum() { + if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) { return nil } @@ -1231,17 +1196,17 @@ func stepCandidate(r *raft, m pb.Message) error { r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleSnapshot(m) case myVoteRespType: - gr := r.poll(m.From, m.Type, !m.Reject) - r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr) - switch r.quorum() { - case gr: + gr, rj, res := r.poll(m.From, m.Type, !m.Reject) + r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) + switch res { + case electionWon: if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() r.bcastAppend() } - case len(r.votes) - gr: + case electionLost: // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) @@ -1370,8 +1335,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) - r.prs = make(map[uint64]*Progress) - r.learnerPrs = make(map[uint64]*Progress) + r.prs = makePRS(r.prs.maxInflight) r.restoreNode(s.Metadata.ConfState.Nodes, false) r.restoreNode(s.Metadata.ConfState.Learners, true) return true @@ -1384,16 +1348,16 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) { match = next - 1 r.isLearner = isLearner } - r.setProgress(n, match, next, isLearner) - r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n)) + r.prs.initProgress(n, match, next, isLearner) + r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.getProgress(n)) } } // promotable indicates whether state machine can be promoted to leader, // which is true when its own id is in progress list. func (r *raft) promotable() bool { - _, ok := r.prs[r.id] - return ok + pr := r.prs.getProgress(r.id) + return pr != nil && !pr.IsLearner } func (r *raft) addNode(id uint64) { @@ -1405,12 +1369,12 @@ func (r *raft) addLearner(id uint64) { } func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { - pr := r.getProgress(id) + pr := r.prs.getProgress(id) if pr == nil { - r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) + r.prs.initProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) } else { if isLearner && !pr.IsLearner { - // can only change Learner to Voter + // Can only change Learner to Voter. r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id) return } @@ -1421,10 +1385,11 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { return } - // change Learner to Voter, use origin Learner progress - delete(r.learnerPrs, id) + // Change Learner to Voter, use origin Learner progress. + r.prs.removeAny(id) + r.prs.initProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) pr.IsLearner = false - r.prs[id] = pr + *r.prs.getProgress(id) = *pr } if r.id == id { @@ -1434,18 +1399,20 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked // before the added node has a chance to communicate with us. - pr = r.getProgress(id) - pr.RecentActive = true + r.prs.getProgress(id).RecentActive = true } func (r *raft) removeNode(id uint64) { - r.delProgress(id) + r.prs.removeAny(id) - // do not try to commit or abort transferring if there is no nodes in the cluster. - if len(r.prs) == 0 && len(r.learnerPrs) == 0 { + // Do not try to commit or abort transferring if the cluster is now empty. + if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 { return } + // TODO(tbg): won't bad (or at least unfortunate) things happen if the + // leader just removed itself? + // The quorum size is now smaller, so see if any pending entries can // be committed. if r.maybeCommit() { @@ -1457,24 +1424,6 @@ func (r *raft) removeNode(id uint64) { } } -func (r *raft) setProgress(id, match, next uint64, isLearner bool) { - if !isLearner { - delete(r.learnerPrs, id) - r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} - return - } - - if _, ok := r.prs[id]; ok { - panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id)) - } - r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true} -} - -func (r *raft) delProgress(id uint64) { - delete(r.prs, id) - delete(r.learnerPrs, id) -} - func (r *raft) loadState(state pb.HardState) { if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() { r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) @@ -1495,29 +1444,6 @@ func (r *raft) resetRandomizedElectionTimeout() { r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout) } -// checkQuorumActive returns true if the quorum is active from -// the view of the local raft state machine. Otherwise, it returns -// false. -// checkQuorumActive also resets all RecentActive to false. -func (r *raft) checkQuorumActive() bool { - var act int - - r.forEachProgress(func(id uint64, pr *Progress) { - if id == r.id { // self is always active - act++ - return - } - - if pr.RecentActive && !pr.IsLearner { - act++ - } - - pr.RecentActive = false - }) - - return act >= r.quorum() -} - func (r *raft) sendTimeoutNow(to uint64) { r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow}) } diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 070ed8d9551..01d4dd7a66f 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -29,11 +29,11 @@ func TestMsgAppFlowControlFull(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs[2] + pr2 := r.prs.nodes[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window - for i := 0; i < r.maxInflight; i++ { + for i := 0; i < r.prs.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() if len(ms) != 1 { @@ -65,18 +65,18 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs[2] + pr2 := r.prs.nodes[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window - for i := 0; i < r.maxInflight; i++ { + for i := 0; i < r.prs.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) r.readMessages() } // 1 is noop, 2 is the first proposal we just sent. // so we start with 2. - for tt := 2; tt < r.maxInflight; tt++ { + for tt := 2; tt < r.prs.maxInflight; tt++ { // move forward the window r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)}) r.readMessages() @@ -110,11 +110,11 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs[2] + pr2 := r.prs.nodes[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window - for i := 0; i < r.maxInflight; i++ { + for i := 0; i < r.prs.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) r.readMessages() } diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index c40f279c20a..3b767b4fa0b 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -169,7 +169,7 @@ func testNonleaderStartElection(t *testing.T, state StateType) { if r.state != StateCandidate { t.Errorf("state = %s, want %s", r.state, StateCandidate) } - if !r.votes[r.id] { + if !r.prs.votes[r.id] { t.Errorf("vote for self = false, want true") } msgs := r.readMessages() diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index ed545b4b51c..358c85b8e29 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -40,11 +40,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { // force set the next of node 2, so that // node 2 needs a snapshot - sm.prs[2].Next = sm.raftLog.firstIndex() + sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true}) - if sm.prs[2].PendingSnapshot != 11 { - t.Fatalf("PendingSnapshot = %d, want 11", sm.prs[2].PendingSnapshot) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) + if sm.prs.nodes[2].PendingSnapshot != 11 { + t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot) } } @@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].becomeSnapshot(11) + sm.prs.nodes[2].becomeSnapshot(11) sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) msgs := sm.readMessages() @@ -73,18 +73,18 @@ func TestSnapshotFailure(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].Next = 1 - sm.prs[2].becomeSnapshot(11) + sm.prs.nodes[2].Next = 1 + sm.prs.nodes[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) - if sm.prs[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot) + if sm.prs.nodes[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) } - if sm.prs[2].Next != 1 { - t.Fatalf("Next = %d, want 1", sm.prs[2].Next) + if sm.prs.nodes[2].Next != 1 { + t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next) } - if !sm.prs[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs[2].Paused) + if !sm.prs.nodes[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) } } @@ -96,18 +96,18 @@ func TestSnapshotSucceed(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].Next = 1 - sm.prs[2].becomeSnapshot(11) + sm.prs.nodes[2].Next = 1 + sm.prs.nodes[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false}) - if sm.prs[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot) + if sm.prs.nodes[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) } - if sm.prs[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.prs[2].Next) + if sm.prs.nodes[2].Next != 12 { + t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next) } - if !sm.prs[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs[2].Paused) + if !sm.prs.nodes[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) } } @@ -206,7 +206,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { mustSend(n2, n1, pb.MsgAppResp) // Leader has correct state for follower. - pr := n1.prs[2] + pr := n1.prs.nodes[2] if pr.State != ProgressStateReplicate { t.Fatalf("unexpected state %v", pr) } @@ -227,23 +227,23 @@ func TestSnapshotAbort(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].Next = 1 - sm.prs[2].becomeSnapshot(11) + sm.prs.nodes[2].Next = 1 + sm.prs.nodes[2].becomeSnapshot(11) // A successful msgAppResp that has a higher/equal index than the // pending snapshot should abort the pending snapshot. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11}) - if sm.prs[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot) + if sm.prs.nodes[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) } // The follower entered ProgressStateReplicate and the leader send an append // and optimistically updated the progress (so we see 13 instead of 12). // There is something to append because the leader appended an empty entry // to the log at index 12 when it assumed leadership. - if sm.prs[2].Next != 13 { - t.Fatalf("Next = %d, want 13", sm.prs[2].Next) + if sm.prs.nodes[2].Next != 13 { + t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next) } - if n := sm.prs[2].ins.count; n != 1 { + if n := sm.prs.nodes[2].ins.count; n != 1 { t.Fatalf("expected an inflight message, got %d", n) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index df808eb6f1c..93c61735cba 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -271,12 +271,12 @@ func TestProgressLeader(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() - r.prs[2].becomeReplicate() + r.prs.nodes[2].becomeReplicate() // Send proposals to r1. The first 5 entries should be appended to the log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} for i := 0; i < 5; i++ { - if pr := r.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { + if pr := r.prs.nodes[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { t.Errorf("unexpected progress %v", pr) } if err := r.Step(propMsg); err != nil { @@ -291,17 +291,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.prs[2].Paused = true + r.prs.nodes[2].Paused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs[2].Paused) + if !r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) } - r.prs[2].becomeReplicate() + r.prs.nodes[2].becomeReplicate() r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.prs[2].Paused { - t.Errorf("paused = %v, want false", r.prs[2].Paused) + if r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want false", r.prs.nodes[2].Paused) } } @@ -331,7 +331,7 @@ func TestProgressFlowControl(t *testing.T) { r.readMessages() // While node 2 is in probe state, propose a bunch of entries. - r.prs[2].becomeProbe() + r.prs.nodes[2].becomeProbe() blob := []byte(strings.Repeat("a", 1000)) for i := 0; i < 10; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) @@ -409,8 +409,8 @@ func TestUncommittedEntryLimit(t *testing.T) { // Set the two followers to the replicate state. Commit to tail of log. const numFollowers = 2 - r.prs[2].becomeReplicate() - r.prs[3].becomeReplicate() + r.prs.nodes[2].becomeReplicate() + r.prs.nodes[3].becomeReplicate() r.uncommittedSize = 0 // Send proposals to r1. The first 5 entries should be appended to the log. @@ -889,7 +889,7 @@ func TestLearnerLogReplication(t *testing.T) { t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed) } - match := n1.getProgress(2).Match + match := n1.prs.getProgress(2).Match if match != n2.raftLog.committed { t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match) } @@ -1351,8 +1351,9 @@ func TestCommit(t *testing.T) { storage.hardState = pb.HardState{Term: tt.smTerm} sm := newTestRaft(1, []uint64{1}, 10, 2, storage) + sm.prs.removeAny(1) for j := 0; j < len(tt.matches); j++ { - sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false) + sm.prs.initProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false) } sm.maybeCommit() if g := sm.raftLog.committed; g != tt.w { @@ -2137,7 +2138,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { nt := newNetwork(a, b) setRandomizedElectionTimeout(b, b.electionTimeout+1) // Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states - b.delProgress(2) + b.prs.removeAny(2) if b.promotable() { t.Fatalf("promotable = %v, want false", b.promotable()) @@ -2631,7 +2632,7 @@ func TestLeaderAppResp(t *testing.T) { sm.readMessages() sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) - p := sm.prs[2] + p := sm.prs.nodes[2] if p.Match != tt.wmatch { t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) } @@ -2678,9 +2679,9 @@ func TestBcastBeat(t *testing.T) { mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) } // slow follower - sm.prs[2].Match, sm.prs[2].Next = 5, 6 + sm.prs.nodes[2].Match, sm.prs.nodes[2].Next = 5, 6 // normal follower - sm.prs[3].Match, sm.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm.prs.nodes[3].Match, sm.prs.nodes[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() @@ -2688,8 +2689,8 @@ func TestBcastBeat(t *testing.T) { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } wantCommitMap := map[uint64]uint64{ - 2: min(sm.raftLog.committed, sm.prs[2].Match), - 3: min(sm.raftLog.committed, sm.prs[3].Match), + 2: min(sm.raftLog.committed, sm.prs.nodes[2].Match), + 3: min(sm.raftLog.committed, sm.prs.nodes[3].Match), } for i, m := range msgs { if m.Type != pb.MsgHeartbeat { @@ -2775,11 +2776,11 @@ func TestLeaderIncreaseNext(t *testing.T) { sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].State = tt.state - sm.prs[2].Next = tt.next + sm.prs.nodes[2].State = tt.state + sm.prs.nodes[2].Next = tt.next sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - p := sm.prs[2] + p := sm.prs.nodes[2] if p.Next != tt.wnext { t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) } @@ -2791,7 +2792,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs[2].becomeProbe() + r.prs.nodes[2].becomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2810,8 +2811,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs[2].Paused) + if !r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2825,8 +2826,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs[2].Paused) + if !r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) } // consume the heartbeat @@ -2848,8 +2849,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs[2].Paused) + if !r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) } } @@ -2858,7 +2859,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs[2].becomeReplicate() + r.prs.nodes[2].becomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2875,7 +2876,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs[2].becomeSnapshot(10) + r.prs.nodes[2].becomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2896,17 +2897,17 @@ func TestRecvMsgUnreachable(t *testing.T) { r.becomeLeader() r.readMessages() // set node 2 to state replicate - r.prs[2].Match = 3 - r.prs[2].becomeReplicate() - r.prs[2].optimisticUpdate(5) + r.prs.nodes[2].Match = 3 + r.prs.nodes[2].becomeReplicate() + r.prs.nodes[2].optimisticUpdate(5) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - if r.prs[2].State != ProgressStateProbe { - t.Errorf("state = %s, want %s", r.prs[2].State, ProgressStateProbe) + if r.prs.nodes[2].State != ProgressStateProbe { + t.Errorf("state = %s, want %s", r.prs.nodes[2].State, ProgressStateProbe) } - if wnext := r.prs[2].Match + 1; r.prs[2].Next != wnext { - t.Errorf("next = %d, want %d", r.prs[2].Next, wnext) + if wnext := r.prs.nodes[2].Match + 1; r.prs.nodes[2].Next != wnext { + t.Errorf("next = %d, want %d", r.prs.nodes[2].Next, wnext) } } @@ -2931,7 +2932,7 @@ func TestRestore(t *testing.T) { if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } - sg := sm.nodes() + sg := sm.prs.voterNodes() if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) { t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes) } @@ -2963,22 +2964,22 @@ func TestRestoreWithLearner(t *testing.T) { if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } - sg := sm.nodes() + sg := sm.prs.voterNodes() if len(sg) != len(s.Metadata.ConfState.Nodes) { t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes) } - lns := sm.learnerNodes() + lns := sm.prs.learnerNodes() if len(lns) != len(s.Metadata.ConfState.Learners) { t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) } for _, n := range s.Metadata.ConfState.Nodes { - if sm.prs[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], false) + if sm.prs.nodes[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], false) } } for _, n := range s.Metadata.ConfState.Learners { - if !sm.learnerPrs[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], true) + if !sm.prs.learners[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], true) } } @@ -3120,8 +3121,8 @@ func TestProvideSnap(t *testing.T) { sm.becomeLeader() // force set the next of node 2, so that node 2 needs a snapshot - sm.prs[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true}) + sm.prs.nodes[2].Next = sm.raftLog.firstIndex() + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) msgs := sm.readMessages() if len(msgs) != 1 { @@ -3151,8 +3152,8 @@ func TestIgnoreProvidingSnap(t *testing.T) { // force set the next of node 2, so that node 2 needs a snapshot // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 - sm.prs[2].Next = sm.raftLog.firstIndex() - 1 - sm.prs[2].RecentActive = false + sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - 1 + sm.prs.nodes[2].RecentActive = false sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -3192,7 +3193,7 @@ func TestSlowNodeRestore(t *testing.T) { } lead := nt.peers[1].(*raft) nextEnts(lead, nt.storage[1]) - nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil) nt.storage[1].Compact(lead.raftLog.applied) nt.recover() @@ -3200,7 +3201,7 @@ func TestSlowNodeRestore(t *testing.T) { // node 3 will only be considered as active when node 1 receives a reply from it. for { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if lead.prs[3].RecentActive { + if lead.prs.nodes[3].RecentActive { break } } @@ -3287,7 +3288,7 @@ func TestNewLeaderPendingConfig(t *testing.T) { func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r.addNode(2) - nodes := r.nodes() + nodes := r.prs.voterNodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) @@ -3298,13 +3299,13 @@ func TestAddNode(t *testing.T) { func TestAddLearner(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r.addLearner(2) - nodes := r.learnerNodes() + nodes := r.prs.learnerNodes() wnodes := []uint64{2} if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) } - if !r.learnerPrs[2].IsLearner { - t.Errorf("node 2 is learner %t, want %t", r.prs[2].IsLearner, true) + if !r.prs.learners[2].IsLearner { + t.Errorf("node 2 is learner %t, want %t", r.prs.nodes[2].IsLearner, true) } } @@ -3348,14 +3349,14 @@ func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.removeNode(2) w := []uint64{1} - if g := r.nodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } // remove all nodes from cluster r.removeNode(1) w = []uint64{} - if g := r.nodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } } @@ -3366,18 +3367,18 @@ func TestRemoveLearner(t *testing.T) { r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) r.removeNode(2) w := []uint64{1} - if g := r.nodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } w = []uint64{} - if g := r.learnerNodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.learnerNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } // remove all nodes from cluster r.removeNode(1) - if g := r.nodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } } @@ -3416,8 +3417,8 @@ func TestRaftNodes(t *testing.T) { } for i, tt := range tests { r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage()) - if !reflect.DeepEqual(r.nodes(), tt.wids) { - t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids) + if !reflect.DeepEqual(r.prs.voterNodes(), tt.wids) { + t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.voterNodes(), tt.wids) } } } @@ -3618,8 +3619,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) { nt.recover() lead := nt.peers[1].(*raft) - if lead.prs[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1) + if lead.prs.nodes[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of log. @@ -3637,12 +3638,12 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) lead := nt.peers[1].(*raft) nextEnts(lead, nt.storage[1]) - nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil) nt.storage[1].Compact(lead.raftLog.applied) nt.recover() - if lead.prs[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1) + if lead.prs.nodes[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of snapshot. @@ -3721,8 +3722,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { t.Fatalf("should return drop proposal error while transferring") } - if lead.prs[1].Match != 1 { - t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1) + if lead.prs.nodes[1].Match != 1 { + t.Fatalf("node 1 has match %x, want %x", lead.prs.nodes[1].Match, 1) } } @@ -4294,18 +4295,18 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw sm := newRaft(cfg) npeers[id] = sm case *raft: - learners := make(map[uint64]bool, len(v.learnerPrs)) - for i := range v.learnerPrs { + learners := make(map[uint64]bool, len(v.prs.learners)) + for i := range v.prs.learners { learners[i] = true } v.id = id - v.prs = make(map[uint64]*Progress) - v.learnerPrs = make(map[uint64]*Progress) + v.prs.nodes = make(map[uint64]*Progress) + v.prs.learners = make(map[uint64]*Progress) for i := 0; i < size; i++ { if _, ok := learners[peerAddrs[i]]; ok { - v.learnerPrs[peerAddrs[i]] = &Progress{IsLearner: true} + v.prs.learners[peerAddrs[i]] = &Progress{IsLearner: true} } else { - v.prs[peerAddrs[i]] = &Progress{} + v.prs.nodes[peerAddrs[i]] = &Progress{} } } v.reset(v.Term) diff --git a/raft/rawnode.go b/raft/rawnode.go index 7ab42ca1c25..139a084211c 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -166,7 +166,7 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { // ApplyConfChange applies a config change to the local node. func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { if cc.NodeID == None { - return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()} + return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()} } switch cc.Type { case pb.ConfChangeAddNode: @@ -179,7 +179,7 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { default: panic("unexpected conf type") } - return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()} + return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()} } // Step advances the state machine using the given message. @@ -188,7 +188,7 @@ func (rn *RawNode) Step(m pb.Message) error { if IsLocalMsg(m.Type) { return ErrStepLocalMsg } - if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { + if pr := rn.raft.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { return rn.raft.Step(m) } return ErrStepPeerNotFound @@ -257,16 +257,15 @@ const ( // WithProgress is a helper to introspect the Progress for this node and its // peers. func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) { - for id, pr := range rn.raft.prs { - pr := *pr - pr.ins = nil - visitor(id, ProgressTypePeer, pr) - } - for id, pr := range rn.raft.learnerPrs { - pr := *pr - pr.ins = nil - visitor(id, ProgressTypeLearner, pr) - } + rn.raft.prs.visit(func(id uint64, pr *Progress) { + typ := ProgressTypePeer + if pr.IsLearner { + typ = ProgressTypeLearner + } + p := *pr + p.ins = nil + visitor(id, typ, p) + }) } // ReportUnreachable reports the given node is not reachable for the last send. diff --git a/raft/read_only.go b/raft/read_only.go index 38053383186..955fe798669 100644 --- a/raft/read_only.go +++ b/raft/read_only.go @@ -50,26 +50,25 @@ func newReadOnly(option ReadOnlyOption) *readOnly { // the read only request. // `m` is the original read only request message from the local or remote node. func (ro *readOnly) addRequest(index uint64, m pb.Message) { - ctx := string(m.Entries[0].Data) - if _, ok := ro.pendingReadIndex[ctx]; ok { + s := string(m.Entries[0].Data) + if _, ok := ro.pendingReadIndex[s]; ok { return } - ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} - ro.readIndexQueue = append(ro.readIndexQueue, ctx) + ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} + ro.readIndexQueue = append(ro.readIndexQueue, s) } // recvAck notifies the readonly struct that the raft state machine received // an acknowledgment of the heartbeat that attached with the read only request // context. -func (ro *readOnly) recvAck(m pb.Message) int { - rs, ok := ro.pendingReadIndex[string(m.Context)] +func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} { + rs, ok := ro.pendingReadIndex[string(context)] if !ok { - return 0 + return nil } - rs.acks[m.From] = struct{}{} - // add one to include an ack from local node - return len(rs.acks) + 1 + rs.acks[id] = struct{}{} + return rs.acks } // advance advances the read only request queue kept by the readonly struct. diff --git a/raft/status.go b/raft/status.go index 68a9b1a4487..f894716e190 100644 --- a/raft/status.go +++ b/raft/status.go @@ -33,15 +33,18 @@ type Status struct { } func getProgressCopy(r *raft) map[uint64]Progress { - prs := make(map[uint64]Progress) - for id, p := range r.prs { - prs[id] = *p - } + m := make(map[uint64]Progress) + r.prs.visit(func(id uint64, pr *Progress) { + var p Progress + p, pr = *pr, nil /* avoid accidental reuse below */ - for id, p := range r.learnerPrs { - prs[id] = *p - } - return prs + // The inflight buffer is tricky to copy and besides, it isn't exposed + // to the client, so pretend it's nil. + p.ins = nil + + m[id] = p + }) + return m } func getStatusWithoutProgress(r *raft) Status {