Skip to content

Commit

Permalink
fix the potential data loss for clusters with only one member
Browse files Browse the repository at this point in the history
For a cluster with only one member, the raft always send identical
unstable entries and committed entries to etcdserver, and etcd
responds to the client once it finishes (actually partially) the
applying workflow.

When the client receives the response, it doesn't mean etcd has already
successfully saved the data, including BoltDB and WAL, because:
   1. etcd commits the boltDB transaction periodically instead of on each request;
   2. etcd saves WAL entries in parallel with applying the committed entries.
Accordingly, it may run into a situation of data loss when the etcd crashes
immediately after responding to the client and before the boltDB and WAL
successfully save the data to disk.
Note that this issue can only happen for clusters with only one member.

For clusters with multiple members, it isn't an issue, because etcd will
not commit & apply the data before it being replicated to majority members.
When the client receives the response, it means the data must have been applied.
It further means the data must have been committed.
Note: for clusters with multiple members, the raft will never send identical
unstable entries and committed entries to etcdserver.

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Sep 1, 2022
1 parent 5707147 commit d1957fe
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 14 deletions.
28 changes: 22 additions & 6 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ type node struct {
status chan chan Status

rn *RawNode
rd Ready
}

func newNode(rn *RawNode) node {
Expand Down Expand Up @@ -304,7 +305,6 @@ func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready

r := n.rn.raft

Expand All @@ -322,7 +322,7 @@ func (n *node) run() {
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = n.rn.readyWithoutAccept()
n.rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}

Expand Down Expand Up @@ -393,12 +393,12 @@ func (n *node) run() {
}
case <-n.tickc:
n.rn.Tick()
case readyc <- rd:
n.rn.acceptReady(rd)
case readyc <- n.rd:
n.rn.acceptReady(n.rd)
advancec = n.advancec
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
n.rn.Advance(n.rd)
n.rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
Expand Down Expand Up @@ -503,6 +503,17 @@ func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool)
func (n *node) Ready() <-chan Ready { return n.readyc }

func (n *node) Advance() {
ents := n.rd.Entries

// Provide a feedback that the `Entries` has already been successfully
// saved by the user. Note that only leader needs to do this, because
// a follower will not send the feedback to leader until it finishes
// saving the `Entries`.
if n.isLeader() && (len(ents) > 1 || (len(ents) == 1 && len(ents[0].Data) > 0)) {
e := ents[len(ents)-1]
n.Step(context.TODO(), pb.Message{From: n.Status().ID, Type: pb.MsgAppResp, Index: e.Index})
}

select {
case n.advancec <- struct{}{}:
case <-n.done:
Expand Down Expand Up @@ -561,6 +572,11 @@ func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}

func (n *node) isLeader() bool {
sts := n.Status()
return sts.ID == sts.Lead
}

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
Expand Down
22 changes: 17 additions & 5 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
case rd := <-n.Ready():
s.Append(rd.Entries)
applied := false
for _, e := range rd.Entries {
for _, e := range rd.CommittedEntries {
rdyEntries = append(rdyEntries, e)
switch e.Type {
case raftpb.EntryNormal:
Expand Down Expand Up @@ -600,11 +600,17 @@ func TestNodeStart(t *testing.T) {
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
HardState: raftpb.HardState{},
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: nil,
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
Entries: nil,
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
MustSync: false,
},
}
storage := NewMemoryStorage()
c := &Config{
Expand Down Expand Up @@ -642,8 +648,11 @@ func TestNodeStart(t *testing.T) {

select {
case rd := <-n.Ready():
t.Errorf("unexpected Ready: %+v", rd)
case <-time.After(time.Millisecond):
if !reflect.DeepEqual(rd, wants[2]) {
t.Errorf("Unexpected ready: got = %+v,\n wanted = %+v", rd, wants[2])
}
case <-time.After(10 * time.Millisecond):
t.Error("Timed out waiting for the ready data")
}
}

Expand Down Expand Up @@ -932,6 +941,9 @@ func TestCommitPagination(t *testing.T) {
t.Fatal(err)
}
}
rd = readyWithTimeout(&n)
s.Append(rd.Entries)
n.Advance()

// The 3 proposals will commit in two batches.
rd = readyWithTimeout(&n)
Expand Down
9 changes: 8 additions & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ func newRaft(c *Config) *raft {

func (r *raft) hasLeader() bool { return r.lead != None }

func (r *raft) isLeader() bool { return r.id == r.lead }

func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }

func (r *raft) hardState() pb.HardState {
Expand Down Expand Up @@ -635,7 +637,12 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.prs.Progress[r.id].MaybeUpdate(li)
// The entries haven't been confirmed to be saved, so we can't
// update `pr.Match` for now.
// It's only called by `advance` and `becomeLeader` here.
if len(es) == 1 && es[0].Data == nil {
r.prs.Progress[r.id].MaybeUpdate(li)
}
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
Expand Down
1 change: 1 addition & 0 deletions raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ func TestLeaderAcknowledgeCommit(t *testing.T) {
commitNoopEntry(r, s)
li := r.raftLog.lastIndex()
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})

for _, m := range r.readMessages() {
if tt.acceptors[m.To] {
Expand Down
14 changes: 14 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func TestProgressLeader(t *testing.T) {
if err := r.Step(propMsg); err != nil {
t.Fatalf("proposal resulted in error: %v", err)
}
if err := r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}); err != nil {
t.Fatalf("reporting feedback resulted in error: %v", err)
}
}
}

Expand Down Expand Up @@ -3388,6 +3391,9 @@ func TestCommitAfterRemoveNode(t *testing.T) {
{Type: pb.EntryConfChange, Data: ccData},
},
})
// Node 1 acknowledges the config change to its local raft.
r.Step(pb.Message{Type: pb.MsgAppResp, From: 1, Index: r.raftLog.lastIndex()})

// Stabilize the log and make sure nothing is committed yet.
if ents := nextEnts(r, s); len(ents) > 0 {
t.Fatalf("unexpected committed entries: %v", ents)
Expand All @@ -3401,6 +3407,8 @@ func TestCommitAfterRemoveNode(t *testing.T) {
{Type: pb.EntryNormal, Data: []byte("hello")},
},
})
// Node 1 acknowledges the proposal to its local raft.
r.Step(pb.Message{Type: pb.MsgAppResp, From: 1, Index: r.raftLog.lastIndex()})

// Node 2 acknowledges the config change, committing it.
r.Step(pb.Message{
Expand Down Expand Up @@ -4714,6 +4722,12 @@ func (nw *network) send(msgs ...pb.Message) {
m := msgs[0]
p := nw.peers[m.To]
p.Step(m)
if sm, ok := p.(*raft); ok {
// The leader acknowledges the proposal to its local raft.
if sm.isLeader() && m.Type == pb.MsgProp && len(m.Entries) > 0 && m.Entries[0].Data != nil {
p.Step(pb.Message{From: sm.id, To: sm.id, Type: pb.MsgAppResp, Index: sm.raftLog.lastIndex()})
}
}
msgs = append(msgs[1:], nw.filter(p.readMessages())...)
}
}
Expand Down
12 changes: 10 additions & 2 deletions raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
},
}

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
for i, tc := range testCases {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
s := newTestMemoryStorage(withPeers(1))
rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
if err != nil {
Expand Down Expand Up @@ -280,6 +280,9 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
}
rawNode.ProposeConfChange(ccv2)
}
if err = rawNode.Step(pb.Message{From: rawNode.raft.id, To: rawNode.raft.id, Type: pb.MsgAppResp, Index: rawNode.raft.raftLog.lastIndex()}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
proposed = true
}
}
Expand Down Expand Up @@ -433,6 +436,8 @@ func TestRawNodeJointAutoLeave(t *testing.T) {
t.Fatal(err)
}
rawNode.ProposeConfChange(testCc)
r := rawNode.raft
rawNode.Step(pb.Message{From: r.id, To: r.id, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
proposed = true
}
}
Expand Down Expand Up @@ -743,6 +748,7 @@ func TestRawNodeStart(t *testing.T) {
}
rawNode.Campaign()
rawNode.Propose([]byte("foo"))
rawNode.Step(pb.Message{From: rawNode.raft.id, To: rawNode.raft.id, Type: pb.MsgAppResp, Index: rawNode.raft.raftLog.lastIndex()})
if !rawNode.HasReady() {
t.Fatal("expected a Ready")
}
Expand Down Expand Up @@ -981,6 +987,8 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
// log to grow indefinitely.
for i := 0; i < 1024; i++ {
rawNode.Propose(data)
id := rawNode.raft.id
rawNode.Step(pb.Message{From: id, To: id, Type: pb.MsgAppResp, Index: rawNode.raft.raftLog.lastIndex()})
}

// Check the size of leader's uncommitted log tail. It should not exceed the
Expand Down

0 comments on commit d1957fe

Please sign in to comment.