From 2a10049e47c4451f69e66eead66732a33f453b13 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 30 Aug 2022 10:24:51 +0800 Subject: [PATCH] fix the potential data loss for clusters with only one member For a cluster with only one member, the raft always send identical unstable entries and committed entries to etcdserver, and etcd responds to the client once it finishes (actually partially) the applying workflow. When the client receives the response, it doesn't mean etcd has already successfully saved the data, including BoltDB and WAL, because: 1. etcd commits the boltDB transaction periodically instead of on each request; 2. etcd saves WAL entries in parallel with applying the committed entries. Accordingly, it may run into a situation of data loss when the etcd crashes immediately after responding to the client and before the boltDB and WAL successfully save the data to disk. Note that this issue can only happen for clusters with only one member. For clusters with multiple members, it isn't an issue, because etcd will not commit & apply the data before it being replicated to majority members. When the client receives the response, it means the data must have been applied. It further means the data must have been committed. Note: for clusters with multiple members, the raft will never send identical unstable entries and committed entries to etcdserver. Signed-off-by: Benjamin Wang --- server/etcdserver/raft.go | 52 +++++++++++++++++++++-- server/etcdserver/raft_test.go | 77 ++++++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 3 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 704e45a7aac..64327a63329 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -209,6 +209,14 @@ func (r *raftNode) start(rh *raftReadyHandler) { updateCommittedIndex(&ap, rh) + waitWALSync := shouldWaitWALSync(rd) + if waitWALSync { + // gofail: var raftBeforeSaveWaitWalSync struct{} + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) + } + } + select { case r.applyc <- ap: case <-r.stopped: @@ -233,9 +241,11 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftAfterSaveSnap struct{} } - // gofail: var raftBeforeSave struct{} - if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { - r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) + if !waitWALSync { + // gofail: var raftBeforeSave struct{} + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) + } } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) @@ -314,6 +324,42 @@ func (r *raftNode) start(rh *raftReadyHandler) { }() } +// For a cluster with only one member, the raft may send both the +// unstable entries and committed entries to etcdserver, and there +// may have overlapped log entries between them. +// +// etcd responds to the client once it finishes (actually partially) +// the applying workflow. But when the client receives the response, +// it doesn't mean etcd has already successfully saved the data, +// including BoltDB and WAL, because: +// 1. etcd commits the boltDB transaction periodically instead of on each request; +// 2. etcd saves WAL entries in parallel with applying the committed entries. +// Accordingly, it might run into a situation of data loss when the etcd crashes +// immediately after responding to the client and before the boltDB and WAL +// successfully save the data to disk. +// Note that this issue can only happen for clusters with only one member. +// +// For clusters with multiple members, it isn't an issue, because etcd will +// not commit & apply the data before it being replicated to majority members. +// When the client receives the response, it means the data must have been applied. +// It further means the data must have been committed. +// Note: for clusters with multiple members, the raft will never send identical +// unstable entries and committed entries to etcdserver. +// +// Refer to https://github.com/etcd-io/etcd/issues/14370. +func shouldWaitWALSync(rd raft.Ready) bool { + if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 { + return false + } + + // Check if there is overlap between unstable and committed entries + // assuming that their index and term are only incrementing. + lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1] + firstUnstableEntry := rd.Entries[0] + return lastCommittedEntry.Term > firstUnstableEntry.Term || + (lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index) +} + func updateCommittedIndex(ap *toApply, rh *raftReadyHandler) { var ci uint64 if len(ap.entries) != 0 { diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index 8e5585ad0c9..87f852f31f7 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3" @@ -317,3 +318,79 @@ func TestStopRaftNodeMoreThanOnce(t *testing.T) { } } } + +func TestShouldWaitWALSync(t *testing.T) { + testcases := []struct { + name string + unstableEntries []raftpb.Entry + commitedEntries []raftpb.Entry + expectedResult bool + }{ + { + name: "both entries are nil", + unstableEntries: nil, + commitedEntries: nil, + expectedResult: false, + }, + { + name: "both entries are empty slices", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other empty", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other has data", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "one empty and the other has data", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has different term and index", + unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has identical data", + unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: true, + }, + { + name: "has overlapped entry", + unstableEntries: []raftpb.Entry{ + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + {Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}}, + {Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}}, + }, + commitedEntries: []raftpb.Entry{ + {Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}}, + {Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}}, + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + }, + expectedResult: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + shouldWALSync := shouldWaitWALSync(raft.Ready{ + Entries: tc.unstableEntries, + CommittedEntries: tc.commitedEntries, + }) + assert.Equal(t, tc.expectedResult, shouldWALSync) + }) + } +}