From dbb9e84ca7421b21e9f2e71606729b0c3bf091ec Mon Sep 17 00:00:00 2001 From: Yao Shunyu Date: Sat, 5 Dec 2020 23:49:50 +0800 Subject: [PATCH] prepare for AppendEntries RPC --- src/raft/append_entries.go | 1 - src/raft/appendentries_thread.go | 34 ++++++++---- src/raft/election_thread.go | 93 ++------------------------------ src/raft/raft.go | 29 +++++----- src/raft/request_vote.go | 13 ++--- src/raft/util.go | 17 ++++++ 6 files changed, 65 insertions(+), 122 deletions(-) diff --git a/src/raft/append_entries.go b/src/raft/append_entries.go index 734fc2d..f20bf7d 100644 --- a/src/raft/append_entries.go +++ b/src/raft/append_entries.go @@ -2,7 +2,6 @@ package raft import ( "time" - "fmt" ) //-------------------AppendEntries RPC-------------------// diff --git a/src/raft/appendentries_thread.go b/src/raft/appendentries_thread.go index 0f4db25..3d96f7e 100644 --- a/src/raft/appendentries_thread.go +++ b/src/raft/appendentries_thread.go @@ -1,13 +1,32 @@ package raft import ( - "fmt" "time" ) const HeartBeatTimeout = 50 // Milliseconds -func AppendEntriesHandler(rf *Raft, peerIndex int, appendEntriesArgs *AppendEntriesArgs) { +func AppendEntriesProcessor(rf *Raft, peerIndex int) { + rf.mu.Lock() + appendEntriesArgs := &AppendEntriesArgs { + Term: rf.currentTerm, + LeaderId: rf.me, + + prevLogIndex: rf.nextIndex[peerIndex] - 1 + + LeaderCommit: rf.commitIndex, + } + + if appendEntriesArgs.prevLogIndex == 0 { + appendEntriesArgs.prevLogTerm = 0 + Entries = rf.log[0:] + } else { + appendEntriesArgs.prevLogTerm = rf.log[appendEntriesArgs.prevLogIndex - 1].Term + Entries = rf.log[(appendEntriesArgs.prevLogIndex - 1):] + } + + rf.mu.Unlock() + reply := &AppendEntriesReply{} ok := rf.sendAppendEntries(peerIndex, appendEntriesArgs, reply) // DPrintf("Raft[%d] - AppendEntriesHandler - to peer[%d] finished:%t\n", @@ -27,11 +46,11 @@ func AppendEntriesHandler(rf *Raft, peerIndex int, appendEntriesArgs *AppendEntr } func AppendEntriesThread(rf *Raft) { - for { + for !rf.killed() { time.Sleep(10 * time.Millisecond) // here may need a condition_variable.wait_for rf.condLeader.L.Lock() for rf.currentRole != Leader { - /rf.condLeader.Wait() + rf.condLeader.Wait() } // rf.mu.Lock() // is this still necessary @@ -50,11 +69,6 @@ func AppendEntriesThread(rf *Raft) { // begin heartbeat - appendEntriesArgs := &AppendEntriesArgs { - Term: rf.currentTerm, - LeaderId: rf.me, - } - rf.lastHeartbeat = time.Now() rf.condLeader.L.Unlock() @@ -63,7 +77,7 @@ func AppendEntriesThread(rf *Raft) { if rf.me == i { continue } - go AppendEntriesHandler(rf, i, appendEntriesArgs) + go AppendEntriesProcessor(rf, i) } } } diff --git a/src/raft/election_thread.go b/src/raft/election_thread.go index 26ca662..a3399d4 100644 --- a/src/raft/election_thread.go +++ b/src/raft/election_thread.go @@ -59,7 +59,7 @@ func (rf *Raft) CollectVotes(requestVoteResultChan chan *RequestVoteReply) { } func ElectionThread(rf *Raft) { - for { + for !rf.killed() { time.Sleep(electionTimeoutBase * time.Millisecond) rf.mu.Lock() @@ -80,16 +80,9 @@ func ElectionThread(rf *Raft) { rf.BecomeCandidate() - logLength := len(rf.log) - var localLastLogIndex, localLastLogTerm int - if logLength == 0 { - localLastLogIndex = 0 - localLastLogTerm = 0 - } else { - localLastLogIndex = rf.log[logLength - 1].Index - localLastLogTerm = rf.log[logLength - 1].Term - } - + localLastLogIndex := GetLastLogIndex(rf) + localLastLogTerm := GetLastLogTerm(rf) + requestVoteArgs := &RequestVoteArgs { Term: rf.currentTerm, CandidateId: rf.me, @@ -122,83 +115,5 @@ func ElectionThread(rf *Raft) { rf.CollectVotes(requestVoteResultChan) - //* - - // var me int - // var lastActivity time.Time - // { - // rf.mu.Lock() - // // defer rf.mu.Unlock() - // me = rf.me - // lastActivity = rf.lastActivity - // rf.mu.Unlock() - // } - - // electionTimeout := time.Duration(electionTimeoutBase + rand.Intn(100)) - // DPrintf("Raft[%d]'s electionTimeout in this round is %d microseconds...\n", me, electionTimeout) - - // if time.Now().Sub(lastActivity) > (electionTimeout * time.Microsecond) { - // DPrintf("test timeoutThread 0") - // { - // rf.mu.Lock() - // // defer rf.mu.Unlock() - // if (rf.votedFor != -1) { - // DPrintf("Raft[%d] has voted to Raft[%d], will skip this round", rf.me, rf.votedFor) - // continue - // } - // rf.BecomeCandidate() - // } - - // DPrintf("test timeoutThread 1") - - // var localLastLogIndex, localLastLogTerm int - // var logLength int - // var requestVoteArgs *RequestVoteArgs - // var peersNum int - - // { - // // defer rf.mu.Unlock() - // logLength = len(rf.log) - // if logLength == 0 { - // localLastLogIndex = 0 - // localLastLogTerm = 0 - // } else { - // localLastLogIndex = rf.log[logLength - 1].Index - // localLastLogTerm = rf.log[logLength - 1].Term - // } - - // requestVoteArgs = &RequestVoteArgs { - // Term: rf.currentTerm, - // CandidateId: rf.me, - // LastLogIndex: localLastLogIndex, - // LastLogTerm: localLastLogTerm, - // } - // peersNum = len(rf.peers) - // rf.mu.Unlock() - // } - // DPrintf("test timeoutThread 2") - - // requestVoteResultChan := make(chan *RequestVoteReply, peersNum - 1) - // for i := 0; i < peersNum; i++ { - // if me == i { - // continue - // } - // go func(peerIndex int) { - // reply := &RequestVoteReply{} - // ok := rf.sendRequestVote(peerIndex, requestVoteArgs, reply) - // DPrintf("Vote Request from [%d] to [%d] has returned [%t], with reply: %p", - // me, peerIndex, ok, reply) - // if ok { - // requestVoteResultChan<- reply - // } else { - // requestVoteResultChan<- nil - // } - - // }(i) - // } - - // rf.CollectVotes(requestVoteResultChan) - - // } } } \ No newline at end of file diff --git a/src/raft/raft.go b/src/raft/raft.go index 46ac13f..ccefd9d 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -105,25 +105,29 @@ type Raft struct { } func (rf *Raft) BecomeLeader() { - rf.currentRole = Leader + rf.currentRole = Leader rf.currentLeader = rf.me + + rf.nextIndex = make([]int, len(rf.peers), GetLastLogIndex(rf) + 1) + rf.matchIndex = make([]int, len(rf.peers)) + DPrintf("Raft[%d] became Leader term[%d]", rf.me, rf.currentTerm) // need to boardcast empty AppendEntry() } func (rf *Raft) ReInitFollower(term int) { - rf.currentTerm = term - rf.currentRole = Follower - rf.votedFor = -1 + rf.currentTerm = term + rf.currentRole = Follower + rf.votedFor = -1 DPrintf("Raft[%d] became Follower term[%d]\n", rf.me, rf.currentTerm) } func (rf *Raft) BecomeCandidate() { - rf.currentRole = Candidate rf.currentTerm++ - rf.votedFor = rf.me + rf.currentRole = Candidate + rf.votedFor = rf.me rf.lastActivity = time.Now() DPrintf("Raft[%d] became Candidate term[%d], votedFor[%d] lastActivity[%s]\n", rf.me, rf.currentTerm, rf.votedFor, rf.lastActivity.Format(time.RFC3339)) @@ -218,12 +222,13 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) { } else { index = len(rf.log) + 1 term = rf.currentTerm - // newEntry := &Entry { - // Index: index, - // Term: term, - // Command: command, - // } - // append(rf.log, newEntry) + newEntry := &Entry { + Index: index, + Term: term, + Command: command, + } + fmt.Printf("Raft[%d] will append new entry{Index: [%d], Term: [%d], Command:[%T | %v]", rf.me, index, term, command, command) + rf.log = append(rf.log, newEntry) } // Your code here (2B). diff --git a/src/raft/request_vote.go b/src/raft/request_vote.go index d9dfb97..c10f2be 100644 --- a/src/raft/request_vote.go +++ b/src/raft/request_vote.go @@ -51,16 +51,9 @@ func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { return } - var localLastLogIndex, localLastLogTerm int - localLogLength := len(rf.log) - if localLogLength == 0 { - localLastLogIndex = 0 - localLastLogTerm = 0 - } else { - localLastLogIndex = rf.log[localLogLength - 1].Index - localLastLogTerm = rf.log[localLogLength - 1].Term - } - + localLastLogIndex := GetLastLogIndex(rf) + localLastLogTerm := GetLastLogTerm(rf) + if (localLastLogTerm <= args.LastLogTerm) { if (localLastLogIndex <= args.LastLogIndex) { rf.votedFor = args.CandidateId diff --git a/src/raft/util.go b/src/raft/util.go index d77e71f..f58474b 100644 --- a/src/raft/util.go +++ b/src/raft/util.go @@ -11,3 +11,20 @@ func DPrintf(format string, a ...interface{}) (n int, err error) { } return } + +func GetLastLogIndex(rf *Raft) int { + if len(rf.log) == 0 { + return 0 + } else { + return rf.log[len(rf.log) - 1].Index + } +} + + +func GetLastLogTerm(rf *Raft) int { + if len(rf.log) == 0 { + return 0 + } else { + return rf.log[len(rf.log) - 1].Term + } +} \ No newline at end of file