Skip to content

Commit

Permalink
Finish AppendEntries Part
Browse files Browse the repository at this point in the history
currently can only pass the first 2 cases in test 2B
  • Loading branch information
UndefinedSy committed Dec 6, 2020
1 parent dbb9e84 commit 8bf27c8
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 70 deletions.
36 changes: 32 additions & 4 deletions src/raft/append_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ type AppendEntriesReply struct {
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("Raft[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] currentRole[%d] currentLeader[%d] lastActivity[%s]\n",
rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.currentRole, rf.currentLeader, rf.lastActivity.Format(time.RFC3339))
DPrintf(LOG_INFO, "Raft[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] currentRole[%d] currentLeader[%d] currentLog{%+v} PrevLogIndex[%d], PrevLogTerm[%d] leaderCommit[%d], EntriesLength[%d]\n",
rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.currentRole, rf.currentLeader, rf.log, args.PrevLogIndex, args.PrevLogTerm, args.LeaderCommit, len(args.Entries))
defer func() {
DPrintf("Raft[%d] Return AppendEntries, LeaderId[%d] Term[%d] currentTerm[%d] currentRole[%d] success:%t currentLeader[%d] lastActivity[%s]\n",
rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.currentRole, reply.Success, rf.currentLeader, rf.lastActivity.Format(time.RFC3339))
DPrintf(LOG_INFO, "Raft[%d] Return AppendEntries, LeaderId[%d] Term[%d] currentTerm[%d] currentRole[%d] success:%t currentLeader[%d] commitIndex[%d] log:{%v}\n",
rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.currentRole, reply.Success, rf.currentLeader, rf.commitIndex, rf.log)
}()

reply.Term = rf.currentTerm
Expand All @@ -43,6 +43,34 @@ func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply
rf.currentLeader = args.LeaderId
rf.lastActivity = time.Now()

if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
DPrintf(LOG_DEBUG, "Raft[%d] Handle AppendEntries, input PrevLogTerm[%d] conlicts with the existing one[%d], return false to go back.",
rf.me, args.PrevLogTerm, rf.log[args.PrevLogIndex].Term)
// 可以返回冲突的 term 及该 term 的第一个 index,使 leader 可以直接回退 nextIndex 到合适位置。(到哪没想好)
return
}

rf.log = rf.log[:args.PrevLogIndex + 1] // 删除掉之后的 slice
DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntries - Remove logs after index[%d], current log is: {%v}.", rf.me, args.PrevLogIndex, rf.log)

for _, Entry := range(args.Entries) {
rf.log = append(rf.log, Entry)
}

if args.LeaderCommit > rf.commitIndex {
tmpDPrintfCommitIndex := rf.commitIndex
rf.commitIndex = rf.log[len(rf.log) - 1].Index
if rf.commitIndex > args.LeaderCommit {
rf.commitIndex = args.LeaderCommit
}

for i := tmpDPrintfCommitIndex + 1; i <= rf.commitIndex; i++ {
rf.NotifyApplyChannel(true, rf.log[i].Command, i)
}
DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntries - commitIndex will change from[%d] to [%d]",
rf.me, tmpDPrintfCommitIndex, rf.commitIndex)
}

reply.Success = true
}

Expand Down
78 changes: 60 additions & 18 deletions src/raft/appendentries_thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,87 @@ package raft

import (
"time"
"sort"
)

const HeartBeatTimeout = 50 // Milliseconds

func (rf *Raft) LeaderTryUpdateCommitIndex() {
rf.mu.Lock()
defer rf.mu.Unlock()

matchIndexCopy := make([]int, len(rf.peers))
copy(matchIndexCopy, rf.matchIndex)
matchIndexCopy[rf.me] = GetLastLogIndex(rf)

sort.Ints(matchIndexCopy)
matchIndexWithQuorum := matchIndexCopy[len(rf.peers) / 2]
DPrintf(LOG_INFO, "Raft[%d] - LeaderTryUpdateCommitIndex - matchIndexWithQuorum is: [%d], commitIndex is:[%d], logEntry in N is:{%+v} currentTerm is:[%d]",
rf.me, matchIndexWithQuorum, rf.commitIndex, rf.log[matchIndexWithQuorum], rf.currentTerm)
if matchIndexWithQuorum > rf.commitIndex && rf.log[matchIndexWithQuorum].Term == rf.currentTerm {
for i := rf.commitIndex + 1; i <= matchIndexWithQuorum; i++ {
rf.NotifyApplyChannel(true, rf.log[i].Command, i)
}
rf.commitIndex = matchIndexWithQuorum
DPrintf(LOG_INFO, "Raft[%d] - LeaderTryUpdateCommitIndex - commitIndex has been updated to:[%d]",
rf.me, rf.commitIndex)
}
}

func AppendEntriesProcessor(rf *Raft, peerIndex int) {
rf.mu.Lock()
if rf.killed() || rf.currentRole != ROLE_LEADER {
rf.mu.Unlock()
return
}
appendEntriesArgs := &AppendEntriesArgs {
Term: rf.currentTerm,
LeaderId: rf.me,

prevLogIndex: rf.nextIndex[peerIndex] - 1
PrevLogIndex: rf.nextIndex[peerIndex] - 1,

LeaderCommit: rf.commitIndex,
}

if appendEntriesArgs.prevLogIndex == 0 {
appendEntriesArgs.prevLogTerm = 0
Entries = rf.log[0:]
} else {
appendEntriesArgs.prevLogTerm = rf.log[appendEntriesArgs.prevLogIndex - 1].Term
Entries = rf.log[(appendEntriesArgs.prevLogIndex - 1):]
}
// if appendEntriesArgs.PrevLogIndex == 0 {
// appendEntriesArgs.PrevLogTerm = 0
// appendEntriesArgs.Entries = rf.log[0:]
// } else {
// appendEntriesArgs.PrevLogTerm = rf.log[appendEntriesArgs.PrevLogIndex - 1].Term
// appendEntriesArgs.Entries = rf.log[(appendEntriesArgs.PrevLogIndex - 1):]
// }
appendEntriesArgs.PrevLogTerm = rf.log[appendEntriesArgs.PrevLogIndex].Term
appendEntriesArgs.Entries = rf.log[appendEntriesArgs.PrevLogIndex + 1:]
DPrintf(LOG_INFO, "Raft[%d], peerIndex[%d], current PrevLogIndex is:[%d], current log is:{%+v}, Entries is:{%+v}",
rf.me, peerIndex, appendEntriesArgs.PrevLogIndex, rf.log, appendEntriesArgs.Entries)


rf.mu.Unlock()

reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(peerIndex, appendEntriesArgs, reply)
// DPrintf("Raft[%d] - AppendEntriesHandler - to peer[%d] finished:%t\n",
// rf.me, peerIndex, ok)
if ok {
DPrintf(LOG_DEBUG, "Raft[%d] currentTerm[%d] reply.Term[%d] reply success[%t]",
rf.me, rf.currentTerm, reply.Term, reply.Success)
if reply.Term > rf.currentTerm {
rf.ReInitFollower(reply.Term)
return
}

if reply.Success {
DPrintf("Raft[%d] - AppendEntriesHandler - to peer[%d] success:%t\n",
rf.me, peerIndex, reply.Success)
rf.nextIndex[peerIndex] = appendEntriesArgs.PrevLogIndex + len(appendEntriesArgs.Entries) + 1
rf.matchIndex[peerIndex] = rf.nextIndex[peerIndex] - 1
rf.LeaderTryUpdateCommitIndex()
DPrintf(LOG_INFO, "Raft[%d] - AppendEntriesHandler - to peer[%d] success:%t, nextIndex[%d] now is:[%d]\n",
rf.me, peerIndex, reply.Success, peerIndex, rf.nextIndex[peerIndex])
} else {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.nextIndex[peerIndex]--
go AppendEntriesProcessor(rf, peerIndex)
}
} else {
go AppendEntriesProcessor(rf, peerIndex)
// might need retry or something
}
}
Expand All @@ -49,17 +91,17 @@ func AppendEntriesThread(rf *Raft) {
for !rf.killed() {
time.Sleep(10 * time.Millisecond) // here may need a condition_variable.wait_for
rf.condLeader.L.Lock()
for rf.currentRole != Leader {
for rf.currentRole != ROLE_LEADER {
rf.condLeader.Wait()
}

// rf.mu.Lock() // is this still necessary

if rf.currentRole != Leader { // here should be a condition variable
// rf.mu.Unlock()
rf.condLeader.L.Unlock()
continue
}
// if rf.currentRole != ROLE_LEADER { // here should be a condition variable
// // rf.mu.Unlock()
// rf.condLeader.L.Unlock()
// continue
// }

if time.Now().Sub(rf.lastHeartbeat) < (HeartBeatTimeout * time.Millisecond) {
// rf.mu.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions src/raft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ func (cfg *config) nCommitted(index int) (int, interface{}) {

cfg.mu.Lock()
cmd1, ok := cfg.logs[i][index]
DPrintf(LOG_INFO, "TEST_config - nCommitted - cfg.logs[raft-%d][index-%d] cmd1[%+v], ok[%t] logs{%+v}",
i, index, cmd1, ok, cfg.logs[i])
cfg.mu.Unlock()

if ok {
Expand All @@ -381,6 +383,8 @@ func (cfg *config) nCommitted(index int) (int, interface{}) {
cmd = cmd1
}
}
DPrintf(LOG_INFO, "TEST_config - nCommitted - returns count[%d], cmd[%+v]",
count, cmd)
return count, cmd
}

Expand Down
18 changes: 9 additions & 9 deletions src/raft/election_thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ func (rf *Raft) CollectVotes(requestVoteResultChan chan *RequestVoteReply) {
if requestVoteResult != nil {
if requestVoteResult.VoteGranted {
votesObtained++
DPrintf("Raft[%d] - CollectVotes - has got 1 vote. Currently have [%d] votes.\n", me, votesObtained)
DPrintf(LOG_DEBUG, "Raft[%d] - CollectVotes - has got 1 vote. Currently have [%d] votes.\n", me, votesObtained)
if votesObtained > (participantsNum / 2) {
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("Raft[%d] - CollectVotes - has got majority[%d] votes, will become a leader | currentRole is: [%d].\n",
DPrintf(LOG_INFO, "Raft[%d] - CollectVotes - has got majority[%d] votes, will become a leader | currentRole is: [%d].\n",
me, votesObtained, rf.currentRole)
if (rf.currentRole == Candidate) {
if (rf.currentRole == ROLE_CANDIDATE) {
rf.BecomeLeader()
// rf.lastHeartbeat = time.Unix(0, 0)
rf.condLeader.Signal()
Expand All @@ -40,18 +40,18 @@ func (rf *Raft) CollectVotes(requestVoteResultChan chan *RequestVoteReply) {
}

if requestVoteResult.Term > rf.currentTerm {
DPrintf("Raft[%d] - CollectVotes - has met a peer with higher Term[%d], will return to a follower.\n", me, requestVoteResult.Term)
DPrintf(LOG_INFO, "Raft[%d] - CollectVotes - has met a peer with higher Term[%d], will return to a follower.\n", me, requestVoteResult.Term)
rf.mu.Lock()
defer rf.mu.Unlock()
rf.ReInitFollower(requestVoteResult.Term)
// give up requesting vote.
return
}
} else {
DPrintf("Raft[%d] - CollectVotes - there is an error in return value of the sendRequestVote.\n", me)
DPrintf(LOG_DEBUG, "Raft[%d] - CollectVotes - there is an error in return value of the sendRequestVote.\n", me)
}
}
DPrintf("Raft[%d] - CollectVotes - obtained [%d] votes and did not become leader, will go back to follower", me, votesObtained)
DPrintf(LOG_DEBUG, "Raft[%d] - CollectVotes - obtained [%d] votes and did not become leader, will go back to follower", me, votesObtained)
rf.mu.Lock()
defer rf.mu.Unlock()
rf.ReInitFollower(rf.currentTerm)
Expand All @@ -74,7 +74,7 @@ func ElectionThread(rf *Raft) {

if (rf.votedFor != -1) {
rf.mu.Unlock()
DPrintf("Raft[%d] - ElectionThread - has voted to Raft[%d], will give up this round", rf.me, rf.votedFor)
DPrintf(LOG_DEBUG, "Raft[%d] - ElectionThread - has voted to Raft[%d], will give up this round", rf.me, rf.votedFor)
continue
}

Expand Down Expand Up @@ -102,8 +102,8 @@ func ElectionThread(rf *Raft) {
go func(peerIndex int) {
reply := &RequestVoteReply{}
ok := rf.sendRequestVote(peerIndex, requestVoteArgs, reply)
DPrintf("Raft[%d] - ElectionThread - sendRequestVote to [%d] has returned [%t], with reply: %p",
rf.me, peerIndex, ok, reply)
DPrintf(LOG_DEBUG, "Raft[%d] - ElectionThread - sendRequestVote to [%d] has returned [%t], with reply: %p",
rf.me, peerIndex, ok, reply)
if ok {
requestVoteResultChan<- reply
} else {
Expand Down
55 changes: 34 additions & 21 deletions src/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,15 @@ type Entry struct {
Command interface{}
}

func (entry *Entry) String() string {
return fmt.Sprintf("{Term[%d], Index[%d], Command[%v]}", entry.Term, entry.Index, entry.Command)
}

type Role int
const (
Follower Role = 0
Candidate Role = 1
Leader Role = 2
ROLE_FOLLOWER Role = 0
ROLE_CANDIDATE Role = 1
ROLE_LEADER Role = 2
)

//
Expand Down Expand Up @@ -97,6 +101,7 @@ type Raft struct {
lastHeartbeat time.Time
// Self-defined state

applyCh chan ApplyMsg

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
Expand All @@ -105,32 +110,36 @@ type Raft struct {
}

func (rf *Raft) BecomeLeader() {
rf.currentRole = Leader
rf.currentRole = ROLE_LEADER
rf.currentLeader = rf.me

rf.nextIndex = make([]int, len(rf.peers), GetLastLogIndex(rf) + 1)
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))

DPrintf("Raft[%d] became Leader term[%d]",
rf.me, rf.currentTerm)
for i := range(rf.nextIndex) {
rf.nextIndex[i] = GetLastLogIndex(rf) + 1
}

DPrintf(LOG_INFO, "Raft[%d] became Leader term[%d]",
rf.me, rf.currentTerm)
// need to boardcast empty AppendEntry()
}

func (rf *Raft) ReInitFollower(term int) {
rf.currentTerm = term
rf.currentRole = Follower
rf.currentRole = ROLE_FOLLOWER
rf.votedFor = -1
DPrintf("Raft[%d] became Follower term[%d]\n",
rf.me, rf.currentTerm)
DPrintf(LOG_DEBUG, "Raft[%d] became Follower term[%d]\n",
rf.me, rf.currentTerm)
}

func (rf *Raft) BecomeCandidate() {
rf.currentTerm++
rf.currentRole = Candidate
rf.currentRole = ROLE_CANDIDATE
rf.votedFor = rf.me
rf.lastActivity = time.Now()
DPrintf("Raft[%d] became Candidate term[%d], votedFor[%d] lastActivity[%s]\n",
rf.me, rf.currentTerm, rf.votedFor, rf.lastActivity.Format(time.RFC3339))
DPrintf(LOG_DEBUG, "Raft[%d] became Candidate term[%d], votedFor[%d] lastActivity[%s]\n",
rf.me, rf.currentTerm, rf.votedFor, rf.lastActivity.Format(time.RFC3339))
}

// return currentTerm and whether this server
Expand All @@ -144,12 +153,11 @@ func (rf *Raft) GetState() (int, bool) {
defer rf.mu.Unlock()

term = rf.currentTerm
if rf.currentRole == Leader {
if rf.currentRole == ROLE_LEADER {
isleader = true
}
DPrintf("GetState of Raft[%d]: term[%d], isLeader[%t]", rf.me, rf.currentTerm, isleader)

fmt.Printf("GetState of Raft[%d]: term[%d], isLeader[%t]\n", rf.me, rf.currentTerm, isleader)
DPrintf(LOG_INFO, "GetState of Raft[%d]: term[%d], isLeader[%t]",
rf.me, rf.currentTerm, isleader)

// Your code here (2A).
return term, isleader
Expand Down Expand Up @@ -217,18 +225,19 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()

if rf.currentRole != Leader {
if rf.currentRole != ROLE_LEADER {
isLeader = false
} else {
index = len(rf.log) + 1
index = len(rf.log)
term = rf.currentTerm
newEntry := &Entry {
Index: index,
Term: term,
Command: command,
}
fmt.Printf("Raft[%d] will append new entry{Index: [%d], Term: [%d], Command:[%T | %v]", rf.me, index, term, command, command)
rf.log = append(rf.log, newEntry)
DPrintf(LOG_INFO, "Raft[%d] will append new entry{Index: [%d], Term: [%d], Command:[%T | %v]\n",
rf.me, index, term, command, command)
}
// Your code here (2B).

Expand All @@ -247,6 +256,7 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) {
// should call killed() to check whether it should stop.
//
func (rf *Raft) Kill() {
DPrintf(LOG_INFO, "Raft[%d] has been killed", rf.me)
atomic.StoreInt32(&rf.dead, 1)
// Your code here, if desired.
}
Expand Down Expand Up @@ -280,16 +290,19 @@ func Make(peers []*labrpc.ClientEnd, me int,

// persistent state
rf.votedFor = -1
rf.log = append(rf.log, &Entry{})
// persistent state on all servers

// volatile state
rf.currentRole = Follower
rf.currentRole = ROLE_FOLLOWER
// Your initialization code here (2A, 2B, 2C).

// volatile state on leader

// volatile state on leader

rf.applyCh = applyCh

rand.Seed(int64(me))
go ElectionThread(rf)
go AppendEntriesThread(rf)
Expand Down
Loading

0 comments on commit 8bf27c8

Please sign in to comment.