From a1de5fc2349cfd4d22637d8dc3ee4ae37fb4f25b Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 30 Aug 2022 20:32:53 +0200 Subject: [PATCH 1/2] RAFT considerd leaders entries 'persisted' on ready advance. --- raft/node.go | 2 ++ raft/raft.go | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/raft/node.go b/raft/node.go index d374b6c0c21..4be0c2b5550 100644 --- a/raft/node.go +++ b/raft/node.go @@ -165,6 +165,8 @@ type Node interface { // commands. For example. when the last Ready contains a snapshot, the application might take // a long time to apply the snapshot data. To continue receiving Ready without blocking raft // progress, it can call Advance before finishing applying the last ready. + // Advance must not be called if Entries has not been stored to the stable log storage (WAL). + // Advance() // ApplyConfChange applies a config change (previously passed to // ProposeConfChange) to the node. This must be called whenever a config diff --git a/raft/raft.go b/raft/raft.go index 14b5b76dda1..48053b92b46 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -573,6 +573,8 @@ func (r *raft) advance(rd Ready) { if len(rd.Entries) > 0 { e := rd.Entries[len(rd.Entries)-1] r.raftLog.stableTo(e.Index, e.Term) + r.prs.Progress[r.id].MaybeUpdate(e.Index) + r.maybeCommit() } if !IsEmptySnap(rd.Snapshot) { r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) @@ -633,9 +635,6 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { // Drop the proposal. return false } - // use latest "last" index after truncate/append - li = r.raftLog.append(es...) - r.prs.Progress[r.id].MaybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() return true From 9c8d9740d746d6836ea9240b56651f8fb6d4a4c3 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Wed, 31 Aug 2022 00:10:16 +0200 Subject: [PATCH 2/2] Tempary Logging for test-fixes --- raft/log.go | 7 +++++-- raft/node_test.go | 26 +++++++++++++++++++++----- raft/raft.go | 8 ++++++-- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/raft/log.go b/raft/log.go index 82cf54aa278..cb3f8d117a3 100644 --- a/raft/log.go +++ b/raft/log.go @@ -272,11 +272,13 @@ func (l *raftLog) term(i uint64) (uint64, error) { // TODO: return an error instead? return 0, nil } - + log.Printf("UNSTABLE=%+v", l.unstable.entries) if t, ok := l.unstable.maybeTerm(i); ok { + log.Printf("Found term !!!") return t, nil } - + li, _ := l.storage.LastIndex() + log.Printf("STABLE=%v", li) t, err := l.storage.Term(i) if err == nil { return t, nil @@ -326,6 +328,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool { } func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { + log.Println("maxIndex=", maxIndex) if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true diff --git a/raft/node_test.go b/raft/node_test.go index 77895a0eed4..3b9901fa9a0 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -373,9 +373,9 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { n.Tick() case rd := <-n.Ready(): s.Append(rd.Entries) + rdyEntries = append(rdyEntries, rd.Entries...) applied := false - for _, e := range rd.Entries { - rdyEntries = append(rdyEntries, e) + for _, e := range rd.CommittedEntries { switch e.Type { case raftpb.EntryNormal: case raftpb.EntryConfChange: @@ -385,8 +385,11 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { applied = true } } + t.Logf("PreA !!!") n.Advance() + t.Logf("PostA !!!") if applied { + t.Logf("Applied !!!") applyConfChan <- struct{}{} } } @@ -600,11 +603,17 @@ func TestNodeStart(t *testing.T) { MustSync: true, }, { - HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, + HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1}, Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, - CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + CommittedEntries: []raftpb.Entry{{Term: 2, Index: 2, Data: nil}}, MustSync: true, }, + { + HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, + Entries: nil, + CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + MustSync: false, + }, } storage := NewMemoryStorage() c := &Config{ @@ -640,6 +649,13 @@ func TestNodeStart(t *testing.T) { n.Advance() } + if g3 := <-n.Ready(); !reflect.DeepEqual(g3, wants[2]) { + t.Errorf("#%d!: g = %+v,\n w = %+v", 3, g3, wants[2]) + } else { + storage.Append(g3.Entries) + n.Advance() + } + select { case rd := <-n.Ready(): t.Errorf("unexpected Ready: %+v", rd) @@ -759,7 +775,7 @@ func TestNodeAdvance(t *testing.T) { n.Advance() n.Campaign(ctx) - <-n.Ready() + rd = <-n.Ready() n.Propose(ctx, []byte("foo")) select { diff --git a/raft/raft.go b/raft/raft.go index 48053b92b46..9431adfe1dc 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -573,8 +573,11 @@ func (r *raft) advance(rd Ready) { if len(rd.Entries) > 0 { e := rd.Entries[len(rd.Entries)-1] r.raftLog.stableTo(e.Index, e.Term) - r.prs.Progress[r.id].MaybeUpdate(e.Index) - r.maybeCommit() + if r.prs.Progress[r.id] != nil { + r.logger.Infof("Updating progress: %v", e.Index) + r.prs.Progress[r.id].MaybeUpdate(e.Index) + } + r.maybeCommit() // TODO: Consider moving to stepLeader loop... } if !IsEmptySnap(rd.Snapshot) { r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) @@ -635,6 +638,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { // Drop the proposal. return false } + r.raftLog.append(es...) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() return true