Skip to content

Commit

Permalink
prepare for AppendEntries RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
UndefinedSy committed Dec 5, 2020
1 parent 4223c2f commit dbb9e84
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 122 deletions.
1 change: 0 additions & 1 deletion src/raft/append_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package raft

import (
"time"
"fmt"
)
//-------------------AppendEntries RPC-------------------//

Expand Down
34 changes: 24 additions & 10 deletions src/raft/appendentries_thread.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
package raft

import (
"fmt"
"time"
)

const HeartBeatTimeout = 50 // Milliseconds

func AppendEntriesHandler(rf *Raft, peerIndex int, appendEntriesArgs *AppendEntriesArgs) {
func AppendEntriesProcessor(rf *Raft, peerIndex int) {
rf.mu.Lock()
appendEntriesArgs := &AppendEntriesArgs {
Term: rf.currentTerm,
LeaderId: rf.me,

prevLogIndex: rf.nextIndex[peerIndex] - 1

LeaderCommit: rf.commitIndex,
}

if appendEntriesArgs.prevLogIndex == 0 {
appendEntriesArgs.prevLogTerm = 0
Entries = rf.log[0:]
} else {
appendEntriesArgs.prevLogTerm = rf.log[appendEntriesArgs.prevLogIndex - 1].Term
Entries = rf.log[(appendEntriesArgs.prevLogIndex - 1):]
}

rf.mu.Unlock()

reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(peerIndex, appendEntriesArgs, reply)
// DPrintf("Raft[%d] - AppendEntriesHandler - to peer[%d] finished:%t\n",
Expand All @@ -27,11 +46,11 @@ func AppendEntriesHandler(rf *Raft, peerIndex int, appendEntriesArgs *AppendEntr
}

func AppendEntriesThread(rf *Raft) {
for {
for !rf.killed() {
time.Sleep(10 * time.Millisecond) // here may need a condition_variable.wait_for
rf.condLeader.L.Lock()
for rf.currentRole != Leader {
/rf.condLeader.Wait()
rf.condLeader.Wait()
}

// rf.mu.Lock() // is this still necessary
Expand All @@ -50,11 +69,6 @@ func AppendEntriesThread(rf *Raft) {

// begin heartbeat

appendEntriesArgs := &AppendEntriesArgs {
Term: rf.currentTerm,
LeaderId: rf.me,
}

rf.lastHeartbeat = time.Now()

rf.condLeader.L.Unlock()
Expand All @@ -63,7 +77,7 @@ func AppendEntriesThread(rf *Raft) {
if rf.me == i {
continue
}
go AppendEntriesHandler(rf, i, appendEntriesArgs)
go AppendEntriesProcessor(rf, i)
}
}
}
93 changes: 4 additions & 89 deletions src/raft/election_thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (rf *Raft) CollectVotes(requestVoteResultChan chan *RequestVoteReply) {
}

func ElectionThread(rf *Raft) {
for {
for !rf.killed() {
time.Sleep(electionTimeoutBase * time.Millisecond)

rf.mu.Lock()
Expand All @@ -80,16 +80,9 @@ func ElectionThread(rf *Raft) {

rf.BecomeCandidate()

logLength := len(rf.log)
var localLastLogIndex, localLastLogTerm int
if logLength == 0 {
localLastLogIndex = 0
localLastLogTerm = 0
} else {
localLastLogIndex = rf.log[logLength - 1].Index
localLastLogTerm = rf.log[logLength - 1].Term
}

localLastLogIndex := GetLastLogIndex(rf)
localLastLogTerm := GetLastLogTerm(rf)

requestVoteArgs := &RequestVoteArgs {
Term: rf.currentTerm,
CandidateId: rf.me,
Expand Down Expand Up @@ -122,83 +115,5 @@ func ElectionThread(rf *Raft) {

rf.CollectVotes(requestVoteResultChan)

//*

// var me int
// var lastActivity time.Time
// {
// rf.mu.Lock()
// // defer rf.mu.Unlock()
// me = rf.me
// lastActivity = rf.lastActivity
// rf.mu.Unlock()
// }

// electionTimeout := time.Duration(electionTimeoutBase + rand.Intn(100))
// DPrintf("Raft[%d]'s electionTimeout in this round is %d microseconds...\n", me, electionTimeout)

// if time.Now().Sub(lastActivity) > (electionTimeout * time.Microsecond) {
// DPrintf("test timeoutThread 0")
// {
// rf.mu.Lock()
// // defer rf.mu.Unlock()
// if (rf.votedFor != -1) {
// DPrintf("Raft[%d] has voted to Raft[%d], will skip this round", rf.me, rf.votedFor)
// continue
// }
// rf.BecomeCandidate()
// }

// DPrintf("test timeoutThread 1")

// var localLastLogIndex, localLastLogTerm int
// var logLength int
// var requestVoteArgs *RequestVoteArgs
// var peersNum int

// {
// // defer rf.mu.Unlock()
// logLength = len(rf.log)
// if logLength == 0 {
// localLastLogIndex = 0
// localLastLogTerm = 0
// } else {
// localLastLogIndex = rf.log[logLength - 1].Index
// localLastLogTerm = rf.log[logLength - 1].Term
// }

// requestVoteArgs = &RequestVoteArgs {
// Term: rf.currentTerm,
// CandidateId: rf.me,
// LastLogIndex: localLastLogIndex,
// LastLogTerm: localLastLogTerm,
// }
// peersNum = len(rf.peers)
// rf.mu.Unlock()
// }
// DPrintf("test timeoutThread 2")

// requestVoteResultChan := make(chan *RequestVoteReply, peersNum - 1)
// for i := 0; i < peersNum; i++ {
// if me == i {
// continue
// }
// go func(peerIndex int) {
// reply := &RequestVoteReply{}
// ok := rf.sendRequestVote(peerIndex, requestVoteArgs, reply)
// DPrintf("Vote Request from [%d] to [%d] has returned [%t], with reply: %p",
// me, peerIndex, ok, reply)
// if ok {
// requestVoteResultChan<- reply
// } else {
// requestVoteResultChan<- nil
// }

// }(i)
// }

// rf.CollectVotes(requestVoteResultChan)

// }
}
}
29 changes: 17 additions & 12 deletions src/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,29 @@ type Raft struct {
}

func (rf *Raft) BecomeLeader() {
rf.currentRole = Leader
rf.currentRole = Leader
rf.currentLeader = rf.me

rf.nextIndex = make([]int, len(rf.peers), GetLastLogIndex(rf) + 1)
rf.matchIndex = make([]int, len(rf.peers))

DPrintf("Raft[%d] became Leader term[%d]",
rf.me, rf.currentTerm)
// need to boardcast empty AppendEntry()
}

func (rf *Raft) ReInitFollower(term int) {
rf.currentTerm = term
rf.currentRole = Follower
rf.votedFor = -1
rf.currentTerm = term
rf.currentRole = Follower
rf.votedFor = -1
DPrintf("Raft[%d] became Follower term[%d]\n",
rf.me, rf.currentTerm)
}

func (rf *Raft) BecomeCandidate() {
rf.currentRole = Candidate
rf.currentTerm++
rf.votedFor = rf.me
rf.currentRole = Candidate
rf.votedFor = rf.me
rf.lastActivity = time.Now()
DPrintf("Raft[%d] became Candidate term[%d], votedFor[%d] lastActivity[%s]\n",
rf.me, rf.currentTerm, rf.votedFor, rf.lastActivity.Format(time.RFC3339))
Expand Down Expand Up @@ -218,12 +222,13 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) {
} else {
index = len(rf.log) + 1
term = rf.currentTerm
// newEntry := &Entry {
// Index: index,
// Term: term,
// Command: command,
// }
// append(rf.log, newEntry)
newEntry := &Entry {
Index: index,
Term: term,
Command: command,
}
fmt.Printf("Raft[%d] will append new entry{Index: [%d], Term: [%d], Command:[%T | %v]", rf.me, index, term, command, command)
rf.log = append(rf.log, newEntry)
}
// Your code here (2B).

Expand Down
13 changes: 3 additions & 10 deletions src/raft/request_vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,9 @@ func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
return
}

var localLastLogIndex, localLastLogTerm int
localLogLength := len(rf.log)
if localLogLength == 0 {
localLastLogIndex = 0
localLastLogTerm = 0
} else {
localLastLogIndex = rf.log[localLogLength - 1].Index
localLastLogTerm = rf.log[localLogLength - 1].Term
}

localLastLogIndex := GetLastLogIndex(rf)
localLastLogTerm := GetLastLogTerm(rf)

if (localLastLogTerm <= args.LastLogTerm) {
if (localLastLogIndex <= args.LastLogIndex) {
rf.votedFor = args.CandidateId
Expand Down
17 changes: 17 additions & 0 deletions src/raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,20 @@ func DPrintf(format string, a ...interface{}) (n int, err error) {
}
return
}

func GetLastLogIndex(rf *Raft) int {
if len(rf.log) == 0 {
return 0
} else {
return rf.log[len(rf.log) - 1].Index
}
}


func GetLastLogTerm(rf *Raft) int {
if len(rf.log) == 0 {
return 0
} else {
return rf.log[len(rf.log) - 1].Term
}
}

0 comments on commit dbb9e84

Please sign in to comment.