diff --git a/src/raft/append_entries.go b/src/raft/append_entries.go index bf8665e..a1ca095 100644 --- a/src/raft/append_entries.go +++ b/src/raft/append_entries.go @@ -2,6 +2,8 @@ package raft import ( "time" + + "../slog" ) //-------------------AppendEntries RPC-------------------// @@ -19,21 +21,44 @@ type AppendEntriesReply struct { Success bool } +func (rf *Raft) FollowerTryUpdateCommitIndex(leaderCommit int) { + if leaderCommit > rf.commitIndex { // will update commitIndex + // commitIndex = min(leaderCommit, index of last new entry) + tmpDPrintfCommitIndex := rf.commitIndex + rf.commitIndex = GetLastLogIndex(rf) + if rf.commitIndex > leaderCommit { + rf.commitIndex = leaderCommit + } + + // notify the client. + // ??? I'm not sure if this is necessary fot a follower ??? + for i := tmpDPrintfCommitIndex + 1; i <= rf.commitIndex; i++ { + rf.NotifyApplyChannel(true, rf.log[i].Command, i) + } + slog.Log(slog.LOG_DEBUG, "Raft[%d] commitIndex has been change from[%d] to [%d]", + rf.me, tmpDPrintfCommitIndex, rf.commitIndex) + } +} + func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { - DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntries - will try to lock its mutex.", rf.me) + slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me) rf.mu.Lock() defer rf.mu.Unlock() - DPrintf(LOG_INFO, "Raft[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] currentRole[%d] currentLeader[%d] currentLog{%+v} PrevLogIndex[%d], PrevLogTerm[%d] leaderCommit[%d], EntriesLength[%d]\n", - rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.currentRole, rf.currentLeader, rf.log, args.PrevLogIndex, args.PrevLogTerm, args.LeaderCommit, len(args.Entries)) + + slog.Log(slog.LOG_INFO, `Raft[%d] Handle AppendEntries, CurrentTerm[%d] currentRole[%d] currentLeader[%d]; args: LeaderId[%d] Term[%d] PrevLogIndex[%d], PrevLogTerm[%d] leaderCommit[%d], EntriesLength[%d]`, + rf.me, rf.currentTerm, rf.currentRole, rf.currentLeader, + args.LeaderId, args.Term, args.PrevLogIndex, args.PrevLogTerm, args.LeaderCommit, len(args.Entries)) defer func() { - DPrintf(LOG_INFO, "Raft[%d] Return AppendEntries, LeaderId[%d] Term[%d] currentTerm[%d] currentRole[%d] success:%t currentLeader[%d] commitIndex[%d] log:{%v}\n", - rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.currentRole, reply.Success, rf.currentLeader, rf.commitIndex, rf.log) + slog.Log(slog.LOG_INFO, `Raft[%d] Return AppendEntries, currentTerm[%d] currentRole[%d] success:%t currentLeader[%d] commitIndex[%d]`, + rf.me, rf.currentTerm, rf.currentRole, reply.Success, rf.currentLeader, rf.commitIndex) }() + // init reply reply.Term = rf.currentTerm reply.Success = false if args.Term < rf.currentTerm { + slog.Log(slog.LOG_INFO, "The request from Raft[%d] is stale, will return False.", args.LeaderId) return } @@ -45,50 +70,38 @@ func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply rf.lastActivity = time.Now() if args.PrevLogIndex < 0 { - DPrintf(LOG_ERR, "Raft[%d] - AppendEntries - Index out of range: args.PrevLogIndex[%d] length of rf.log[%d]", + slog.Log(slog.LOG_ERR, "Raft[%d] Index out of range: args.PrevLogIndex[%d] length of rf.log[%d]", rf.me, args.PrevLogIndex, len(rf.log)) } if args.PrevLogIndex > GetLastLogIndex(rf) { - DPrintf(LOG_INFO, "Raft[%d] Handle AppendEntries, input PrevLogIndex[%d] exceed the lastLogIndex[%d], return false to go back.", - rf.me, args.PrevLogIndex, GetLastLogIndex(rf)) + slog.Log(slog.LOG_INFO, "The args PrevLogIndex[%d] exceed the Raft[%d]'s lastLogIndex[%d], return false to go back.", + args.PrevLogIndex, rf.me, GetLastLogIndex(rf)) + // TODO: will use a smart way to go back return } if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm { - DPrintf(LOG_INFO, "Raft[%d] Handle AppendEntries, input PrevLogTerm[%d] conlicts with the existing one[%d], return false to go back.", - rf.me, args.PrevLogTerm, rf.log[args.PrevLogIndex].Term) + slog.Log(slog.LOG_INFO, "The args PrevLogTerm[%d] conlicts with Raft[%d]'s existing one[%d], return false to go back.", + args.PrevLogTerm, rf.me, rf.log[args.PrevLogIndex].Term) // 可以返回冲突的 term 及该 term 的第一个 index,使 leader 可以直接回退 nextIndex 到合适位置。(到哪没想好) return } rf.log = rf.log[:args.PrevLogIndex + 1] // 删除掉之后的 slice - DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntries - Remove logs after index[%d], current log is: {%v}.", rf.me, args.PrevLogIndex, rf.log) + slog.Log(slog.LOG_DEBUG, "Raft[%d] removed logs after index[%d], current log is: {%v}.", + rf.me, args.PrevLogIndex, rf.log) for _, Entry := range(args.Entries) { rf.log = append(rf.log, Entry) } - if args.LeaderCommit > rf.commitIndex { - tmpDPrintfCommitIndex := rf.commitIndex - rf.commitIndex = rf.log[len(rf.log) - 1].Index - if rf.commitIndex > args.LeaderCommit { - rf.commitIndex = args.LeaderCommit - } - - for i := tmpDPrintfCommitIndex + 1; i <= rf.commitIndex; i++ { - rf.NotifyApplyChannel(true, rf.log[i].Command, i) - } - DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntries - commitIndex will change from[%d] to [%d]", - rf.me, tmpDPrintfCommitIndex, rf.commitIndex) - } + rf.FollowerTryUpdateCommitIndex(args.LeaderCommit) reply.Success = true } func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool { - DPrintf(LOG_DEBUG, "Raft[%d] - sendAppendEntries - will send AppendEntries Request to [%d]", - rf.me, server) ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) return ok } diff --git a/src/raft/appendentries_thread.go b/src/raft/appendentries_thread.go index a138b80..879f3ba 100644 --- a/src/raft/appendentries_thread.go +++ b/src/raft/appendentries_thread.go @@ -3,58 +3,60 @@ package raft import ( "time" "sort" - "sync" + + "../slog" ) const HeartBeatTimeout = 50 // Milliseconds func (rf *Raft) LeaderTryUpdateCommitIndex() { - DPrintf(LOG_DEBUG, "Raft[%d] - LeaderTryUpdateCommitIndex - will try to lock its mutex.", rf.me) + slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me) rf.mu.Lock() defer rf.mu.Unlock() + // deep copy the matchIndex array from the leader matchIndexCopy := make([]int, len(rf.peers)) copy(matchIndexCopy, rf.matchIndex) matchIndexCopy[rf.me] = GetLastLogIndex(rf) + // Get the marjority matchedIndex sort.Ints(matchIndexCopy) matchIndexWithQuorum := matchIndexCopy[len(rf.peers) / 2] - DPrintf(LOG_INFO, "Raft[%d] - LeaderTryUpdateCommitIndex - matchIndexWithQuorum is: [%d], commitIndex is:[%d], logEntry in N is:{%+v} currentTerm is:[%d]", - rf.me, matchIndexWithQuorum, rf.commitIndex, rf.log[matchIndexWithQuorum], rf.currentTerm) + slog.Log(slog.LOG_DEBUG, "Raft[%d] matchIndexWithQuorum is: [%d], currentCommitIndex is:[%d], logEntry in majority is:{%+v} currentTerm is:[%d]", + rf.me, matchIndexWithQuorum, rf.commitIndex, rf.log[matchIndexWithQuorum], rf.currentTerm) + if matchIndexWithQuorum > rf.commitIndex && rf.log[matchIndexWithQuorum].Term == rf.currentTerm { for i := rf.commitIndex + 1; i <= matchIndexWithQuorum; i++ { rf.NotifyApplyChannel(true, rf.log[i].Command, i) } rf.commitIndex = matchIndexWithQuorum - DPrintf(LOG_INFO, "Raft[%d] - LeaderTryUpdateCommitIndex - commitIndex has been updated to:[%d]", - rf.me, rf.commitIndex) + slog.Log(slog.LOG_DEBUG, "Raft[%d] commitIndex has been updated to:[%d]", + rf.me, rf.commitIndex) } } -func (rf *Raft) GenerateAppendEntriesArgs(targetIndex int, args *AppendEntriesArgs) { +func (rf *Raft) GenerateAppendEntriesArgs(peerIndex int, args *AppendEntriesArgs) { args.Term = rf.currentTerm args.LeaderId = rf.me - args.PrevLogIndex = rf.nextIndex[targetIndex] - 1 + args.PrevLogIndex = rf.nextIndex[peerIndex] - 1 args.PrevLogTerm = rf.log[args.PrevLogIndex].Term args.Entries = rf.log[args.PrevLogIndex + 1:] args.LeaderCommit = rf.commitIndex - DPrintf(LOG_INFO, "Raft[%d], peerIndex[%d], current PrevLogIndex is:[%d], Entries is:{%+v}", - rf.me, targetIndex, args.PrevLogIndex, args.Entries) - + slog.Log(slog.LOG_DEBUG, "Raft[%d], peerIndex[%d], PrevLogIndex:[%d], PrevLogTerm:[%d] Entries:{%+v}", + rf.me, peerIndex, args.PrevLogIndex, args.PrevLogTerm, args.Entries) } -func AppendEntriesProcessor(rf *Raft, peerIndex int, wg *sync.WaitGroup) { - defer wg.Done() - - DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntriesProcessor - will try to lock its mutex.", rf.me) +func AppendEntriesProcessor(rf *Raft, peerIndex int) { + slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me) rf.mu.Lock() + + // Check if still need to process AppendEntries if rf.killed() || rf.currentRole != ROLE_LEADER { rf.mu.Unlock() return } - // Check if still need to process AppendEntries appendEntriesArgs := &AppendEntriesArgs{} rf.GenerateAppendEntriesArgs(peerIndex, appendEntriesArgs) @@ -62,37 +64,40 @@ func AppendEntriesProcessor(rf *Raft, peerIndex int, wg *sync.WaitGroup) { rf.mu.Unlock() reply := &AppendEntriesReply{} - ok := rf.sendAppendEntries(peerIndex, appendEntriesArgs, reply) - DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntriesProcessor - sendAppendEntries to [%d] has returned ok: [%t], with reply: {%+v}", - rf.me, peerIndex, ok, reply) if ok { + slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me) + rf.mu.Lock() + if reply.Term > rf.currentTerm { - DPrintf(LOG_INFO, "Raft[%d] will return to follower because currentTerm[%d] replyRaftIndex[%d], reply.Term[%d]", - rf.me, rf.currentTerm, peerIndex, reply.Term) + slog.Log(slog.LOG_INFO, `Raft[%d] will return to follower because currentTerm[%d] | + reply from Raft[%d] reply.Term[%d]`, + rf.me, rf.currentTerm, peerIndex, reply.Term) rf.ReInitFollower(reply.Term) + rf.mu.Unlock() return } if reply.Success { rf.nextIndex[peerIndex] = appendEntriesArgs.PrevLogIndex + len(appendEntriesArgs.Entries) + 1 rf.matchIndex[peerIndex] = rf.nextIndex[peerIndex] - 1 + rf.mu.Unlock() + rf.LeaderTryUpdateCommitIndex() - DPrintf(LOG_INFO, "Raft[%d] - AppendEntriesHandler - to peer[%d] success:%t, nextIndex to Raft[%d] now is:[%d]\n", - rf.me, peerIndex, reply.Success, peerIndex, rf.nextIndex[peerIndex]) + slog.Log(slog.LOG_DEBUG, "Raft[%d] to peer[%d] success:%t, nextIndex to Raft[%d] now is:[%d]", + rf.me, peerIndex, reply.Success, peerIndex, rf.nextIndex[peerIndex]) + } else { - DPrintf(LOG_DEBUG, "Raft[%d] - AppendEntriesProcessor Success - will try to lock its mutex.", rf.me) - rf.mu.Lock() - defer rf.mu.Unlock() + // may need a smart way rf.nextIndex[peerIndex]-- - DPrintf(LOG_INFO, "Raft[%d] - AppendEntriesHandler - to peer[%d] success:%t, rf.nextIndex dec to:[%d]\n", - rf.me, peerIndex, reply.Success, rf.nextIndex[peerIndex]) - wg.Add(1) - go AppendEntriesProcessor(rf, peerIndex, wg) + slog.Log(slog.LOG_INFO, "Raft[%d] to peer[%d] success:%t, rf.nextIndex dec to:[%d]", + rf.me, peerIndex, reply.Success, rf.nextIndex[peerIndex]) + rf.mu.Unlock() + go AppendEntriesProcessor(rf, peerIndex) } } else { - DPrintf(LOG_ERR, "Raft[%d] - AppendEntriesProcessor - there is an error in sendAppendEntries to Raft[%d], will return.\n", - rf.me, peerIndex) + slog.Log(slog.LOG_ERR, "Raft[%d] there is an error in sendAppendEntries to Raft[%d], will return.", + rf.me, peerIndex) // go AppendEntriesProcessor(rf, peerIndex) // might need retry or something } @@ -100,7 +105,7 @@ func AppendEntriesProcessor(rf *Raft, peerIndex int, wg *sync.WaitGroup) { func AppendEntriesThread(rf *Raft) { for !rf.killed() { - time.Sleep(10 * time.Millisecond) // here may need a condition_variable.wait_for + time.Sleep(10 * time.Millisecond) rf.condLeader.L.Lock() for rf.currentRole != ROLE_LEADER { @@ -119,15 +124,12 @@ func AppendEntriesThread(rf *Raft) { rf.condLeader.L.Unlock() - // var AppendEntriesProcessorWG sync.WaitGroup for i := 0; i < len(rf.peers); i++ { if rf.me == i { continue } - // AppendEntriesProcessorWG.Add(1) - go AppendEntriesProcessor(rf, i) // , &AppendEntriesProcessorWG) + go AppendEntriesProcessor(rf, i) } - // AppendEntriesProcessorWG.Wait() } } diff --git a/src/raft/config.go b/src/raft/config.go index f3040b4..f582e51 100644 --- a/src/raft/config.go +++ b/src/raft/config.go @@ -370,8 +370,6 @@ func (cfg *config) nCommitted(index int) (int, interface{}) { cfg.mu.Lock() cmd1, ok := cfg.logs[i][index] - DPrintf(LOG_DEBUG, "TEST_config - nCommitted - cfg.logs[raft-%d][index-%d] cmd1[%+v], ok[%t] logs{%+v}", - i, index, cmd1, ok, cfg.logs[i]) cfg.mu.Unlock() if ok { @@ -383,8 +381,6 @@ func (cfg *config) nCommitted(index int) (int, interface{}) { cmd = cmd1 } } - DPrintf(LOG_DEBUG, "TEST_config - nCommitted - returns count[%d], cmd[%+v]", - count, cmd) return count, cmd } diff --git a/src/raft/election_thread.go b/src/raft/election_thread.go index f1d2279..6d504c8 100644 --- a/src/raft/election_thread.go +++ b/src/raft/election_thread.go @@ -3,55 +3,54 @@ package raft import ( "math/rand" "time" + + "../slog" ) const electionTimeoutBase = 200 // Milliseconds func (rf *Raft) CollectVotes(requestVoteResultChan chan *RequestVoteReply) { - var me, participantsNum int - { - rf.mu.Lock() - // defer rf.mu.Unlock() - me = rf.me - participantsNum = len(rf.peers) - rf.mu.Unlock() - } - votesObtained := 1 - for i := 0; i < participantsNum - 1; i++ { + participantsNum := cap(requestVoteResultChan) + for i := 0; i < participantsNum; i++ { requestVoteResult := <-requestVoteResultChan + if requestVoteResult != nil { if requestVoteResult.VoteGranted { votesObtained++ - DPrintf(LOG_DEBUG, "Raft[%d] - CollectVotes - has got 1 vote. Currently have [%d] votes.\n", me, votesObtained) + slog.Log(slog.LOG_DEBUG, "Raft[%d] got 1 vote. Currently have [%d] votes.", rf.me, votesObtained) + if votesObtained > (participantsNum / 2) { - rf.mu.Lock() - defer rf.mu.Unlock() - DPrintf(LOG_INFO, "Raft[%d] - CollectVotes - has got majority[%d] votes, will become a leader | currentRole is: [%d].\n", - me, votesObtained, rf.currentRole) + slog.Log(slog.LOG_INFO, "Raft[%d] has got majority[%d] votes, will become a leader | currentRole is: [%d].", + rf.me, votesObtained, rf.currentRole) if (rf.currentRole == ROLE_CANDIDATE) { + rf.mu.Lock() + rf.BecomeLeader() - // rf.lastHeartbeat = time.Unix(0, 0) - rf.condLeader.Signal() + rf.condLeader.Signal() // kick off AppendEntriesThread + + rf.mu.Unlock() } return } } if requestVoteResult.Term > rf.currentTerm { - DPrintf(LOG_INFO, "Raft[%d] - CollectVotes - has met a peer with higher Term[%d], will return to a follower.\n", me, requestVoteResult.Term) rf.mu.Lock() - defer rf.mu.Unlock() + + slog.Log(slog.LOG_INFO, "Raft[%d] has met a peer with higher Term[%d], will return to a follower.", rf.me, requestVoteResult.Term) rf.ReInitFollower(requestVoteResult.Term) + + rf.mu.Unlock() // give up requesting vote. return } } else { - DPrintf(LOG_ERR, "Raft[%d] - CollectVotes - there is an error in return value of the sendRequestVote.\n", me) + slog.Log(slog.LOG_ERR, "Raft[%d] there is an error in Call sendRequestVote.", rf.me) } } - DPrintf(LOG_DEBUG, "Raft[%d] - CollectVotes - obtained [%d] votes and did not become leader, will go back to follower", me, votesObtained) + slog.Log(slog.LOG_DEBUG, "Raft[%d] obtained [%d] votes and did not become leader, will go back to follower", rf.me, votesObtained) rf.mu.Lock() defer rf.mu.Unlock() rf.ReInitFollower(rf.currentTerm) @@ -60,13 +59,14 @@ func (rf *Raft) CollectVotes(requestVoteResultChan chan *RequestVoteReply) { func ElectionThread(rf *Raft) { for !rf.killed() { - time.Sleep(electionTimeoutBase * time.Millisecond) + time.Sleep(50 * time.Millisecond) - DPrintf(LOG_DEBUG, "Raft[%d] - ElectionThread - will try to lock its mutex.", rf.me) + slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me) rf.mu.Lock() elapse := time.Now().Sub(rf.lastActivity) electionTimeout := time.Duration(electionTimeoutBase + rand.Intn(150)) * time.Millisecond + slog.Log(slog.LOG_DEBUG, "Raft[%d] elapse: [%d] electionTimeout: [%d]", rf.me, elapse.Milliseconds(), electionTimeout.Milliseconds()) if elapse < electionTimeout { rf.mu.Unlock() continue @@ -79,16 +79,12 @@ func ElectionThread(rf *Raft) { rf.BecomeCandidate() // Prepare RequestVoteArgs - localLastLogIndex := GetLastLogIndex(rf) - localLastLogTerm := GetLastLogTerm(rf) - requestVoteArgs := &RequestVoteArgs { Term: rf.currentTerm, CandidateId: rf.me, - LastLogIndex: localLastLogIndex, - LastLogTerm: localLastLogTerm, + LastLogIndex: GetLastLogIndex(rf), + LastLogTerm: GetLastLogTerm(rf), } - // Prepare RequestVoteArgs peersNum := len(rf.peers) @@ -102,11 +98,7 @@ func ElectionThread(rf *Raft) { go func(peerIndex int) { reply := &RequestVoteReply{} - DPrintf(LOG_DEBUG, "Raft[%d] - ElectionThread - will send Vote Request to [%d]", - rf.me, peerIndex) ok := rf.sendRequestVote(peerIndex, requestVoteArgs, reply) - DPrintf(LOG_DEBUG, "Raft[%d] - ElectionThread - sendRequestVote to [%d] has returned ok: [%t], with reply: {%+v}", - rf.me, peerIndex, ok, reply) if ok { requestVoteResultChan<- reply } else { @@ -117,6 +109,5 @@ func ElectionThread(rf *Raft) { } rf.CollectVotes(requestVoteResultChan) - } } \ No newline at end of file diff --git a/src/raft/raft.go b/src/raft/raft.go index 40d8a47..3b8beb8 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -25,6 +25,8 @@ import ( "../labrpc" "fmt" + + "../slog" ) // import "bytes" // import "../labgob" @@ -121,16 +123,16 @@ func (rf *Raft) BecomeLeader() { rf.nextIndex[i] = GetLastLogIndex(rf) + 1 } - DPrintf(LOG_INFO, "Raft[%d] became Leader term[%d]", - rf.me, rf.currentTerm) + slog.Log(slog.LOG_INFO, "Raft[%d] became Leader term[%d]", + rf.me, rf.currentTerm) } func (rf *Raft) ReInitFollower(term int) { rf.currentTerm = term rf.currentRole = ROLE_FOLLOWER rf.votedFor = -1 - DPrintf(LOG_DEBUG, "Raft[%d] became Follower term[%d]\n", - rf.me, rf.currentTerm) + slog.Log(slog.LOG_DEBUG, "Raft[%d] became Follower term[%d]", + rf.me, rf.currentTerm) } func (rf *Raft) BecomeCandidate() { @@ -138,8 +140,8 @@ func (rf *Raft) BecomeCandidate() { rf.currentRole = ROLE_CANDIDATE rf.votedFor = rf.me rf.lastActivity = time.Now() - DPrintf(LOG_DEBUG, "Raft[%d] became Candidate term[%d], votedFor[%d] lastActivity[%s]\n", - rf.me, rf.currentTerm, rf.votedFor, rf.lastActivity.Format(time.RFC3339)) + slog.Log(slog.LOG_DEBUG, "Raft[%d] became Candidate term[%d], votedFor[%d]", + rf.me, rf.currentTerm, rf.votedFor) } // return currentTerm and whether this server @@ -156,8 +158,8 @@ func (rf *Raft) GetState() (int, bool) { if rf.currentRole == ROLE_LEADER { isleader = true } - DPrintf(LOG_INFO, "GetState of Raft[%d]: term[%d], isLeader[%t]", - rf.me, rf.currentTerm, isleader) + slog.Log(slog.LOG_INFO, "GetState of Raft[%d]: term[%d], isLeader[%t]", + rf.me, rf.currentTerm, isleader) // Your code here (2A). return term, isleader @@ -236,8 +238,8 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) { Command: command, } rf.log = append(rf.log, newEntry) - DPrintf(LOG_INFO, "Raft[%d] will append new entry{Index: [%d], Term: [%d], Command:[%T | %v]\n", - rf.me, index, term, command, command) + slog.Log(slog.LOG_DEBUG, "Raft[%d] will append new entry{Index: [%d], Term: [%d], Command:[%T | %v]", + rf.me, index, term, command, command) } // Your code here (2B). @@ -256,7 +258,7 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) { // should call killed() to check whether it should stop. // func (rf *Raft) Kill() { - DPrintf(LOG_INFO, "Raft[%d] has been killed", rf.me) + slog.Log(slog.LOG_INFO, "Raft[%d] has been killed", rf.me) atomic.StoreInt32(&rf.dead, 1) // Your code here, if desired. } diff --git a/src/raft/request_vote.go b/src/raft/request_vote.go index e2e266a..0eaa588 100644 --- a/src/raft/request_vote.go +++ b/src/raft/request_vote.go @@ -1,6 +1,9 @@ package raft -import "time" +import ( + "time" + "../slog" +) type RequestVoteArgs struct { Term int @@ -21,57 +24,51 @@ type RequestVoteReply struct { // func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (2A, 2B). + slog.Log(slog.LOG_DEBUG, "Raft[%d] will try to lock its mutex.", rf.me) rf.mu.Lock() - DPrintf(LOG_DEBUG, "Raft[%d] has locked its mutex in RequestVote.\n", rf.me) defer rf.mu.Unlock() - reply.Term = rf.currentTerm - reply.VoteGranted = false - - DPrintf(LOG_INFO, "Raft[%d] Handle RequestVote, CandidatesId[%d] Term[%d] CurrentTerm[%d] LastLogIndex[%d] LastLogTerm[%d] votedFor[%d]\n", - rf.me, args.CandidateId, args.Term, rf.currentTerm, args.LastLogIndex, args.LastLogTerm, rf.votedFor) + slog.Log(slog.LOG_INFO, `Raft[%d] Handle RequestVote, currentVotedFor[%d], CurrentTerm[%d]; args: CandidatesId[%d] Term[%d] LastLogIndex[%d] LastLogTerm[%d]`, + rf.me, rf.votedFor, rf.currentTerm, + args.CandidateId, args.Term, args.LastLogIndex, args.LastLogTerm) defer func() { - DPrintf(LOG_INFO, "Raft[%d] Return RequestVote, CandidatesId[%d] Term[%d] currentTerm[%d] localLastLogIndex[%d] localLastLogTerm[%d] VoteGranted[%v]\n", - rf.me, args.CandidateId, args.Term, rf.currentTerm, GetLastLogIndex(rf), GetLastLogTerm(rf), reply.VoteGranted) + slog.Log(slog.LOG_INFO, "Raft[%d] Return RequestVote, CandidatesId[%d] Term[%d] currentTerm[%d] localLastLogIndex[%d] localLastLogTerm[%d] VoteGranted[%v]", + rf.me, args.CandidateId, args.Term, rf.currentTerm, GetLastLogIndex(rf), GetLastLogTerm(rf), reply.VoteGranted) }() + // init reply + reply.Term = rf.currentTerm + reply.VoteGranted = false + if args.Term < rf.currentTerm { - DPrintf(LOG_DEBUG, "requester(%d)'s Term[%d] < local(%d) term[%d]\n", - args.CandidateId, args.Term, rf.me, rf.currentTerm) + slog.Log(slog.LOG_INFO, "requester[%d]'s Term[%d] < local[%d] term[%d]", + args.CandidateId, args.Term, rf.me, rf.currentTerm) return } if args.Term > rf.currentTerm { - DPrintf(LOG_INFO, "Raft[%d] - RequestVote - The Term in Vote Request[%d] from Raft[%d] is higher than currentTerm[%d]", - rf.me, args.Term, args.CandidateId, rf.currentTerm) + slog.Log(slog.LOG_INFO, "Raft[%d] goes to Follower because Term in Vote Request[%d] from Raft[%d] is higher than currentTerm[%d]", + rf.me, args.Term, args.CandidateId, rf.currentTerm) rf.ReInitFollower(args.Term) reply.Term = rf.currentTerm } if rf.votedFor != -1 && rf.votedFor != args.CandidateId { - DPrintf(LOG_DEBUG, "Raft[%d] has already voted other candidate(%d)\n", - rf.me, rf.votedFor) + slog.Log(slog.LOG_DEBUG, "Raft[%d] has already voted to candidate(%d)", + rf.me, rf.votedFor) return } localLastLogIndex := GetLastLogIndex(rf) localLastLogTerm := GetLastLogTerm(rf) - if args.LastLogTerm > localLastLogTerm || (args.LastLogTerm == localLastLogTerm && args.LastLogIndex >= localLastLogIndex) { + slog.Log(slog.LOG_DEBUG, "Raft[%d] will vote candidate(%d)", + rf.me, args.CandidateId) rf.votedFor = args.CandidateId reply.VoteGranted = true rf.lastActivity = time.Now() } - // if (localLastLogTerm <= args.LastLogTerm) { - // if (localLastLogIndex <= args.LastLogIndex) { - // rf.votedFor = args.CandidateId - // reply.VoteGranted = true - // rf.lastActivity = time.Now() // is this necessary? - // } - // } - - return // Your code here (2A, 2B). } @@ -108,6 +105,7 @@ func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { ok := rf.peers[server].Call("Raft.RequestVote", args, reply) + slog.Log(slog.LOG_DEBUG, "Raft[%d] has received response from Raft[%d]", rf.me, server) return ok } diff --git a/src/raft/test_test.go b/src/raft/test_test.go index 135937b..6696e8f 100644 --- a/src/raft/test_test.go +++ b/src/raft/test_test.go @@ -60,17 +60,14 @@ func TestReElection2A(t *testing.T) { leader1 := cfg.checkOneLeader() // if the leader disconnects, a new one should be elected. - DPrintf(LOG_INFO, "---------- test_test: will disconnect leader[%d] ----------\n", leader1) cfg.disconnect(leader1) cfg.checkOneLeader() - DPrintf(LOG_INFO, "---------- test_test: disconnected node[%d] rejoin ----------\n", leader1) // if the old leader rejoins, that shouldn't // disturb the new leader. cfg.connect(leader1) leader2 := cfg.checkOneLeader() - DPrintf(LOG_INFO, "---------- test_test: lost quorum[%d, %d] ----------\n", leader2, (leader2 + 1) % servers) // if there's no quorum, no leader should // be elected. cfg.disconnect(leader2) @@ -78,12 +75,10 @@ func TestReElection2A(t *testing.T) { time.Sleep(2 * RaftElectionTimeout) cfg.checkNoLeader() - DPrintf(LOG_INFO, "---------- test_test: regain quorum[%d] ----------\n", (leader2 + 1) % servers) // if a quorum arises, it should elect a leader. cfg.connect((leader2 + 1) % servers) cfg.checkOneLeader() - DPrintf(LOG_INFO, "---------- test_test: rejoin all nodes[%d] ----------\n", leader2) // re-join of last node shouldn't prevent leader from existing. cfg.connect(leader2) cfg.checkOneLeader() @@ -160,9 +155,7 @@ func TestFailAgree2B(t *testing.T) { // disconnect one follower from the network. leader := cfg.checkOneLeader() - DPrintf(LOG_INFO, "TestFailAgree2B - checkOneLeader - currentLeader is: [%d]", leader) cfg.disconnect((leader + 1) % servers) - DPrintf(LOG_INFO, "TestFailAgree2B - disconnect a follower node[%d]", (leader + 1) % servers) // the leader and remaining follower should be // able to agree despite the disconnected follower. @@ -174,7 +167,6 @@ func TestFailAgree2B(t *testing.T) { // re-connect cfg.connect((leader + 1) % servers) - DPrintf(LOG_INFO, "TestFailAgree2B - reconnect the down node[%d]", (leader + 1) % servers) // the full set of servers should preserve // previous agreements, and be able to agree @@ -350,7 +342,6 @@ func TestRejoin2B(t *testing.T) { // leader network failure leader1 := cfg.checkOneLeader() cfg.disconnect(leader1) - DPrintf(LOG_INFO, "TestRejoin2B - disconnect Raft[%d] and send requests to the disconnected node.", leader1) // make old leader try to agree on some entries cfg.rafts[leader1].Start(102) @@ -359,22 +350,18 @@ func TestRejoin2B(t *testing.T) { // new leader commits, also for index=2 cfg.one(103, 2, true) - DPrintf(LOG_INFO, "TestRejoin2B - new leader commits new request 103 with index = 2") // new leader network failure leader2 := cfg.checkOneLeader() cfg.disconnect(leader2) - DPrintf(LOG_INFO, "TestRejoin2B - leader 2 Raft[%d] has been connected", leader2) // old leader connected again cfg.connect(leader1) - DPrintf(LOG_INFO, "TestRejoin2B - Previous disconnected leader 1 Raft[%d] has been re-connected", leader1) cfg.one(104, 2, true) // all together now cfg.connect(leader2) - DPrintf(LOG_INFO, "TestRejoin2B - Previous disconnected leader 2 Raft[%d] has been re-connected", leader2) cfg.one(105, servers, true) @@ -392,40 +379,29 @@ func TestBackup2B(t *testing.T) { // put leader and one follower in a partition leader1 := cfg.checkOneLeader() - DPrintf(LOG_INFO, "TestBackup2B - leader 1 Raft[%d] has been selected and checked", leader1) cfg.disconnect((leader1 + 2) % servers) cfg.disconnect((leader1 + 3) % servers) cfg.disconnect((leader1 + 4) % servers) - DPrintf(LOG_INFO, "TestBackup2B - 3 nodes[%d, %d, %d] offline, lost quorum", - (leader1 + 2) % servers, (leader1 + 3) % servers, (leader1 + 4) % servers) // submit lots of commands that won't commit for i := 0; i < 50; i++ { cfg.rafts[leader1].Start(rand.Int()) } - DPrintf(LOG_INFO, "TestBackup2B - submit 50 commands to leader 1 Raft[%d] but these commands won't commit", - leader1) - time.Sleep(RaftElectionTimeout / 2) cfg.disconnect((leader1 + 0) % servers) cfg.disconnect((leader1 + 1) % servers) - DPrintf(LOG_INFO, "TestBackup2B - the remaining 2 nodes[%d, %d] offline", - (leader1 + 0) % servers, (leader1 + 1) % servers) // allow other partition to recover cfg.connect((leader1 + 2) % servers) cfg.connect((leader1 + 3) % servers) cfg.connect((leader1 + 4) % servers) - DPrintf(LOG_INFO, "TestBackup2B - previous offlined 3 nodes[%d, %d, %d] go live, regain quorum", - (leader1 + 2) % servers, (leader1 + 3) % servers, (leader1 + 4) % servers) // lots of successful commands to new group. for i := 0; i < 50; i++ { cfg.one(rand.Int(), 3, true) } - DPrintf(LOG_INFO, "TestBackup2B - submit 50 commands to new quorum group, and these commands will commit") // now another partitioned leader and one follower leader2 := cfg.checkOneLeader() @@ -434,14 +410,11 @@ func TestBackup2B(t *testing.T) { other = (leader2 + 1) % servers } cfg.disconnect(other) - DPrintf(LOG_INFO, "TestBackup2B - leader 2 Raft[%d] has been checked and another node Raft[%d] goes down", - leader2, other) // lots more commands that won't commit for i := 0; i < 50; i++ { cfg.rafts[leader2].Start(rand.Int()) } - DPrintf(LOG_INFO, "TestBackup2B - submit 50 commands to leader 2 Raft[%d] and these commands won't commit", leader2) time.Sleep(RaftElectionTimeout / 2) @@ -452,19 +425,16 @@ func TestBackup2B(t *testing.T) { cfg.connect((leader1 + 0) % servers) cfg.connect((leader1 + 1) % servers) cfg.connect(other) - DPrintf(LOG_INFO, "TestBackup2B - bring original leader back to life, current alive nodes[%d, %d, %d]", - (leader1 + 0) % servers, (leader1 + 1) % servers, other) // lots of successful commands to new group. for i := 0; i < 50; i++ { cfg.one(rand.Int(), 3, true) } - + // now everyone for i := 0; i < servers; i++ { cfg.connect(i) } - DPrintf(LOG_INFO, "TestBackup2B - now everyone goes live") cfg.one(rand.Int(), servers, true) cfg.end() diff --git a/src/raft/util.go b/src/raft/util.go index dd30b5f..b2a425c 100644 --- a/src/raft/util.go +++ b/src/raft/util.go @@ -1,41 +1,15 @@ package raft -import "log" - -// Debugging -type LogLevel int -const ( - LOG_ERR LogLevel = 0 - LOG_INFO LogLevel = 1 - LOG_DEBUG LogLevel = 2 - LOG_TRACE LogLevel = 3 +import ( + "../slog" ) -const Debug = 2 - -func DPrintf(logLevel LogLevel, format string, a ...interface{}) (n int, err error) { - if Debug >= logLevel { - log.Printf(format, a...) - } - return -} - func GetLastLogIndex(rf *Raft) int { - // if len(rf.log) == 0 { - // return 0 - // } else { - // return rf.log[len(rf.log) - 1].Index - // } return rf.log[len(rf.log) - 1].Index } func GetLastLogTerm(rf *Raft) int { - // if len(rf.log) == 0 { - // return 0 - // } else { - // return rf.log[len(rf.log) - 1].Term - // } return rf.log[len(rf.log) - 1].Term } @@ -45,7 +19,7 @@ func (rf *Raft) NotifyApplyChannel(commandValid bool, command interface{}, comma Command : command, CommandIndex: commandIndex, } - DPrintf(LOG_DEBUG, "Raft[%d] notify applyMsg{%+v}", - rf.me, applyMsg) + slog.Log(slog.LOG_DEBUG, "Raft[%d] notify applyMsg{%+v}", + rf.me, applyMsg) rf.applyCh<- applyMsg } \ No newline at end of file diff --git a/src/slog/slog.go b/src/slog/slog.go new file mode 100644 index 0000000..288bb59 --- /dev/null +++ b/src/slog/slog.go @@ -0,0 +1,50 @@ +package slog + +import ( + "fmt" + "log" + "strings" + "runtime" + _"reflect" + "path/filepath" +) + +type LogLevel int +const ( + LOG_ERR LogLevel = 0 + LOG_INFO LogLevel = 10 + LOG_DEBUG LogLevel = 20 +) + +const currentLogLevel = LOG_ERR + +func Log(logLevel LogLevel, formating string, args ...interface{}) { + var logLevelString string + switch logLevel { + case LOG_ERR: + logLevelString = "ERROR" + case LOG_INFO: + logLevelString = "INFO" + case LOG_DEBUG: + logLevelString = "DEBUG" + default: + logLevelString = "UNKNOWN" + } + + if logLevel <= currentLogLevel { + var funcname string + pc, filename, line, ok := runtime.Caller(1) + // fmt.Println(reflect.TypeOf(pc), reflect.ValueOf(pc)) + if ok { + funcname = runtime.FuncForPC(pc).Name() // main.(*MyStruct).foo + funcname = filepath.Ext(funcname) // .foo + funcname = strings.TrimPrefix(funcname, ".") // foo + + filename = filepath.Base(filename) // /full/path/basename.go => basename.go + + log.Printf("%s - %s:%d(%s): %s\n", logLevelString, filename, line, funcname, fmt.Sprintf(formating, args...)) + } else { + log.Printf(formating, args...) + } + } +} \ No newline at end of file