Skip to content

Commit

Permalink
slog & 2A, 2B passed
Browse files Browse the repository at this point in the history
slog as a commen log tools
  • Loading branch information
UndefinedSy committed Dec 9, 2020
1 parent b07e86d commit 0e43e44
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 198 deletions.
65 changes: 39 additions & 26 deletions src/raft/append_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package raft

import (
"time"

"../slog"
)
//-------------------AppendEntries RPC-------------------//

Expand All @@ -19,21 +21,44 @@ type AppendEntriesReply struct {
Success bool
}

func (rf *Raft) FollowerTryUpdateCommitIndex(leaderCommit int) {
if leaderCommit > rf.commitIndex { // will update commitIndex
// commitIndex = min(leaderCommit, index of last new entry)
tmpDPrintfCommitIndex := rf.commitIndex
rf.commitIndex = GetLastLogIndex(rf)
if rf.commitIndex > leaderCommit {
rf.commitIndex = leaderCommit
}

// notify the client.
// ??? I'm not sure if this is necessary fot a follower ???
for i := tmpDPrintfCommitIndex + 1; i <= rf.commitIndex; i++ {
rf.NotifyApplyChannel(true, rf.log[i].Command, i)
}
slog.Log(slog.LOG_DEBUG, "Raft[%d] commitIndex has been change from[%d] to [%d]",
rf.me, tmpDPrintfCommitIndex, rf.commitIndex)
}
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntries - will try to lock its mutex.", rf.me)
slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me)
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf(LOG_INFO, "Raft[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] currentRole[%d] currentLeader[%d] currentLog{%+v} PrevLogIndex[%d], PrevLogTerm[%d] leaderCommit[%d], EntriesLength[%d]\n",
rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.currentRole, rf.currentLeader, rf.log, args.PrevLogIndex, args.PrevLogTerm, args.LeaderCommit, len(args.Entries))

slog.Log(slog.LOG_INFO, `Raft[%d] Handle AppendEntries, CurrentTerm[%d] currentRole[%d] currentLeader[%d]; args: LeaderId[%d] Term[%d] PrevLogIndex[%d], PrevLogTerm[%d] leaderCommit[%d], EntriesLength[%d]`,
rf.me, rf.currentTerm, rf.currentRole, rf.currentLeader,
args.LeaderId, args.Term, args.PrevLogIndex, args.PrevLogTerm, args.LeaderCommit, len(args.Entries))
defer func() {
DPrintf(LOG_INFO, "Raft[%d] Return AppendEntries, LeaderId[%d] Term[%d] currentTerm[%d] currentRole[%d] success:%t currentLeader[%d] commitIndex[%d] log:{%v}\n",
rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.currentRole, reply.Success, rf.currentLeader, rf.commitIndex, rf.log)
slog.Log(slog.LOG_INFO, `Raft[%d] Return AppendEntries, currentTerm[%d] currentRole[%d] success:%t currentLeader[%d] commitIndex[%d]`,
rf.me, rf.currentTerm, rf.currentRole, reply.Success, rf.currentLeader, rf.commitIndex)
}()

// init reply
reply.Term = rf.currentTerm
reply.Success = false

if args.Term < rf.currentTerm {
slog.Log(slog.LOG_INFO, "The request from Raft[%d] is stale, will return False.", args.LeaderId)
return
}

Expand All @@ -45,50 +70,38 @@ func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply
rf.lastActivity = time.Now()

if args.PrevLogIndex < 0 {
DPrintf(LOG_ERR, "Raft[%d] - AppendEntries - Index out of range: args.PrevLogIndex[%d] length of rf.log[%d]",
slog.Log(slog.LOG_ERR, "Raft[%d] Index out of range: args.PrevLogIndex[%d] length of rf.log[%d]",
rf.me, args.PrevLogIndex, len(rf.log))
}

if args.PrevLogIndex > GetLastLogIndex(rf) {
DPrintf(LOG_INFO, "Raft[%d] Handle AppendEntries, input PrevLogIndex[%d] exceed the lastLogIndex[%d], return false to go back.",
rf.me, args.PrevLogIndex, GetLastLogIndex(rf))
slog.Log(slog.LOG_INFO, "The args PrevLogIndex[%d] exceed the Raft[%d]'s lastLogIndex[%d], return false to go back.",
args.PrevLogIndex, rf.me, GetLastLogIndex(rf))
// TODO: will use a smart way to go back
return
}

if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
DPrintf(LOG_INFO, "Raft[%d] Handle AppendEntries, input PrevLogTerm[%d] conlicts with the existing one[%d], return false to go back.",
rf.me, args.PrevLogTerm, rf.log[args.PrevLogIndex].Term)
slog.Log(slog.LOG_INFO, "The args PrevLogTerm[%d] conlicts with Raft[%d]'s existing one[%d], return false to go back.",
args.PrevLogTerm, rf.me, rf.log[args.PrevLogIndex].Term)
// 可以返回冲突的 term 及该 term 的第一个 index,使 leader 可以直接回退 nextIndex 到合适位置。(到哪没想好)
return
}

rf.log = rf.log[:args.PrevLogIndex + 1] // 删除掉之后的 slice
DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntries - Remove logs after index[%d], current log is: {%v}.", rf.me, args.PrevLogIndex, rf.log)
slog.Log(slog.LOG_DEBUG, "Raft[%d] removed logs after index[%d], current log is: {%v}.",
rf.me, args.PrevLogIndex, rf.log)

for _, Entry := range(args.Entries) {
rf.log = append(rf.log, Entry)
}

if args.LeaderCommit > rf.commitIndex {
tmpDPrintfCommitIndex := rf.commitIndex
rf.commitIndex = rf.log[len(rf.log) - 1].Index
if rf.commitIndex > args.LeaderCommit {
rf.commitIndex = args.LeaderCommit
}

for i := tmpDPrintfCommitIndex + 1; i <= rf.commitIndex; i++ {
rf.NotifyApplyChannel(true, rf.log[i].Command, i)
}
DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntries - commitIndex will change from[%d] to [%d]",
rf.me, tmpDPrintfCommitIndex, rf.commitIndex)
}
rf.FollowerTryUpdateCommitIndex(args.LeaderCommit)

reply.Success = true
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
DPrintf(LOG_DEBUG, "Raft[%d] - sendAppendEntries - will send AppendEntries Request to [%d]",
rf.me, server)
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
Expand Down
76 changes: 39 additions & 37 deletions src/raft/appendentries_thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,104 +3,109 @@ package raft
import (
"time"
"sort"
"sync"

"../slog"
)

const HeartBeatTimeout = 50 // Milliseconds

func (rf *Raft) LeaderTryUpdateCommitIndex() {
DPrintf(LOG_DEBUG, "Raft[%d] - LeaderTryUpdateCommitIndex - will try to lock its mutex.", rf.me)
slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me)
rf.mu.Lock()
defer rf.mu.Unlock()

// deep copy the matchIndex array from the leader
matchIndexCopy := make([]int, len(rf.peers))
copy(matchIndexCopy, rf.matchIndex)
matchIndexCopy[rf.me] = GetLastLogIndex(rf)

// Get the marjority matchedIndex
sort.Ints(matchIndexCopy)
matchIndexWithQuorum := matchIndexCopy[len(rf.peers) / 2]
DPrintf(LOG_INFO, "Raft[%d] - LeaderTryUpdateCommitIndex - matchIndexWithQuorum is: [%d], commitIndex is:[%d], logEntry in N is:{%+v} currentTerm is:[%d]",
rf.me, matchIndexWithQuorum, rf.commitIndex, rf.log[matchIndexWithQuorum], rf.currentTerm)
slog.Log(slog.LOG_DEBUG, "Raft[%d] matchIndexWithQuorum is: [%d], currentCommitIndex is:[%d], logEntry in majority is:{%+v} currentTerm is:[%d]",
rf.me, matchIndexWithQuorum, rf.commitIndex, rf.log[matchIndexWithQuorum], rf.currentTerm)

if matchIndexWithQuorum > rf.commitIndex && rf.log[matchIndexWithQuorum].Term == rf.currentTerm {
for i := rf.commitIndex + 1; i <= matchIndexWithQuorum; i++ {
rf.NotifyApplyChannel(true, rf.log[i].Command, i)
}
rf.commitIndex = matchIndexWithQuorum
DPrintf(LOG_INFO, "Raft[%d] - LeaderTryUpdateCommitIndex - commitIndex has been updated to:[%d]",
rf.me, rf.commitIndex)
slog.Log(slog.LOG_DEBUG, "Raft[%d] commitIndex has been updated to:[%d]",
rf.me, rf.commitIndex)
}
}

func (rf *Raft) GenerateAppendEntriesArgs(targetIndex int, args *AppendEntriesArgs) {
func (rf *Raft) GenerateAppendEntriesArgs(peerIndex int, args *AppendEntriesArgs) {
args.Term = rf.currentTerm
args.LeaderId = rf.me
args.PrevLogIndex = rf.nextIndex[targetIndex] - 1
args.PrevLogIndex = rf.nextIndex[peerIndex] - 1
args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
args.Entries = rf.log[args.PrevLogIndex + 1:]

args.LeaderCommit = rf.commitIndex

DPrintf(LOG_INFO, "Raft[%d], peerIndex[%d], current PrevLogIndex is:[%d], Entries is:{%+v}",
rf.me, targetIndex, args.PrevLogIndex, args.Entries)

slog.Log(slog.LOG_DEBUG, "Raft[%d], peerIndex[%d], PrevLogIndex:[%d], PrevLogTerm:[%d] Entries:{%+v}",
rf.me, peerIndex, args.PrevLogIndex, args.PrevLogTerm, args.Entries)
}

func AppendEntriesProcessor(rf *Raft, peerIndex int, wg *sync.WaitGroup) {
defer wg.Done()

DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntriesProcessor - will try to lock its mutex.", rf.me)
func AppendEntriesProcessor(rf *Raft, peerIndex int) {
slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me)
rf.mu.Lock()

// Check if still need to process AppendEntries
if rf.killed() || rf.currentRole != ROLE_LEADER {
rf.mu.Unlock()
return
}
// Check if still need to process AppendEntries

appendEntriesArgs := &AppendEntriesArgs{}
rf.GenerateAppendEntriesArgs(peerIndex, appendEntriesArgs)

rf.mu.Unlock()

reply := &AppendEntriesReply{}

ok := rf.sendAppendEntries(peerIndex, appendEntriesArgs, reply)
DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntriesProcessor - sendAppendEntries to [%d] has returned ok: [%t], with reply: {%+v}",
rf.me, peerIndex, ok, reply)
if ok {
slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me)
rf.mu.Lock()

if reply.Term > rf.currentTerm {
DPrintf(LOG_INFO, "Raft[%d] will return to follower because currentTerm[%d] replyRaftIndex[%d], reply.Term[%d]",
rf.me, rf.currentTerm, peerIndex, reply.Term)
slog.Log(slog.LOG_INFO, `Raft[%d] will return to follower because currentTerm[%d] |
reply from Raft[%d] reply.Term[%d]`,
rf.me, rf.currentTerm, peerIndex, reply.Term)
rf.ReInitFollower(reply.Term)
rf.mu.Unlock()
return
}

if reply.Success {
rf.nextIndex[peerIndex] = appendEntriesArgs.PrevLogIndex + len(appendEntriesArgs.Entries) + 1
rf.matchIndex[peerIndex] = rf.nextIndex[peerIndex] - 1
rf.mu.Unlock()

rf.LeaderTryUpdateCommitIndex()
DPrintf(LOG_INFO, "Raft[%d] - AppendEntriesHandler - to peer[%d] success:%t, nextIndex to Raft[%d] now is:[%d]\n",
rf.me, peerIndex, reply.Success, peerIndex, rf.nextIndex[peerIndex])
slog.Log(slog.LOG_DEBUG, "Raft[%d] to peer[%d] success:%t, nextIndex to Raft[%d] now is:[%d]",
rf.me, peerIndex, reply.Success, peerIndex, rf.nextIndex[peerIndex])

} else {
DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntriesProcessor Success - will try to lock its mutex.", rf.me)
rf.mu.Lock()
defer rf.mu.Unlock()
// may need a smart way
rf.nextIndex[peerIndex]--
DPrintf(LOG_INFO, "Raft[%d] - AppendEntriesHandler - to peer[%d] success:%t, rf.nextIndex dec to:[%d]\n",
rf.me, peerIndex, reply.Success, rf.nextIndex[peerIndex])
wg.Add(1)
go AppendEntriesProcessor(rf, peerIndex, wg)
slog.Log(slog.LOG_INFO, "Raft[%d] to peer[%d] success:%t, rf.nextIndex dec to:[%d]",
rf.me, peerIndex, reply.Success, rf.nextIndex[peerIndex])
rf.mu.Unlock()
go AppendEntriesProcessor(rf, peerIndex)
}
} else {
DPrintf(LOG_ERR, "Raft[%d] - AppendEntriesProcessor - there is an error in sendAppendEntries to Raft[%d], will return.\n",
rf.me, peerIndex)
slog.Log(slog.LOG_ERR, "Raft[%d] there is an error in sendAppendEntries to Raft[%d], will return.",
rf.me, peerIndex)
// go AppendEntriesProcessor(rf, peerIndex)
// might need retry or something
}
}

func AppendEntriesThread(rf *Raft) {
for !rf.killed() {
time.Sleep(10 * time.Millisecond) // here may need a condition_variable.wait_for
time.Sleep(10 * time.Millisecond)

rf.condLeader.L.Lock()
for rf.currentRole != ROLE_LEADER {
Expand All @@ -119,15 +124,12 @@ func AppendEntriesThread(rf *Raft) {

rf.condLeader.L.Unlock()

// var AppendEntriesProcessorWG sync.WaitGroup
for i := 0; i < len(rf.peers); i++ {
if rf.me == i {
continue
}

// AppendEntriesProcessorWG.Add(1)
go AppendEntriesProcessor(rf, i) // , &AppendEntriesProcessorWG)
go AppendEntriesProcessor(rf, i)
}
// AppendEntriesProcessorWG.Wait()
}
}
4 changes: 0 additions & 4 deletions src/raft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,6 @@ func (cfg *config) nCommitted(index int) (int, interface{}) {

cfg.mu.Lock()
cmd1, ok := cfg.logs[i][index]
DPrintf(LOG_DEBUG, "TEST_config - nCommitted - cfg.logs[raft-%d][index-%d] cmd1[%+v], ok[%t] logs{%+v}",
i, index, cmd1, ok, cfg.logs[i])
cfg.mu.Unlock()

if ok {
Expand All @@ -383,8 +381,6 @@ func (cfg *config) nCommitted(index int) (int, interface{}) {
cmd = cmd1
}
}
DPrintf(LOG_DEBUG, "TEST_config - nCommitted - returns count[%d], cmd[%+v]",
count, cmd)
return count, cmd
}

Expand Down
Loading

0 comments on commit 0e43e44

Please sign in to comment.