From 9162cd613d231be60a83e3543064d943fca5afaa Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Thu, 27 Dec 2018 15:38:38 +0000 Subject: [PATCH] etcdserver/*, wal/*: changes to snapshots and wal logic to fix #10219 --- build | 2 +- etcdserver/api/snap/snapshotter.go | 18 ++++++++ etcdserver/raft.go | 25 +++++++---- etcdserver/server.go | 33 ++++++++++++--- etcdserver/storage.go | 43 +++++++++++++++++++ wal/wal.go | 66 ++++++++++++++++++++++++++++++ 6 files changed, 173 insertions(+), 14 deletions(-) diff --git a/build b/build index be8de43f130..b683a945d1b 100755 --- a/build +++ b/build @@ -16,7 +16,7 @@ GO_LDFLAGS="$GO_LDFLAGS -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}" toggle_failpoints() { mode="$1" if command -v gofail >/dev/null 2>&1; then - gofail "$mode" etcdserver/ mvcc/backend/ + gofail "$mode" etcdserver/ mvcc/backend/ wal/ elif [[ "$mode" != "disable" ]]; then echo "FAILPOINTS set but gofail not found" exit 1 diff --git a/etcdserver/api/snap/snapshotter.go b/etcdserver/api/snap/snapshotter.go index eb96650f37d..227c92f481e 100644 --- a/etcdserver/api/snap/snapshotter.go +++ b/etcdserver/api/snap/snapshotter.go @@ -38,6 +38,7 @@ const snapSuffix = ".snap" var ( ErrNoSnapshot = errors.New("snap: no available snapshot") + ErrSnapshotIndex = errors.New("snap: no available snapshot index") ErrEmptySnapshot = errors.New("snap: empty snapshot") ErrCRCMismatch = errors.New("snap: crc mismatch") crcTable = crc32.MakeTable(crc32.Castagnoli) @@ -119,6 +120,23 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) { return snap, nil } +func (s *Snapshotter) LoadIndex(i uint64) (*raftpb.Snapshot, error) { + names, err := s.snapNames() + if err != nil { + return nil, err + } + + if len(names) == 0 { + return nil, ErrNoSnapshot + } + + if i >= uint64(len(names)) { + return nil, ErrSnapshotIndex + } + + return loadSnap(s.lg, s.dir, names[i]) +} + func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) { fpath := filepath.Join(dir, name) snap, err := Read(lg, fpath) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 872015dfb07..329a89a4cca 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -227,27 +227,36 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(r.processMessages(rd.Messages)) } - // gofail: var raftBeforeSave struct{} - if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + if !raft.IsEmptySnap(rd.Snapshot) { + // gofail: var raftBeforeSaveSnap struct{} + if err := r.storage.SaveSnapshot(rd.Snapshot); err != nil { + r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) + } + // gofail: var raftAfterSaveSnap struct{} + } + + // gofail: var raftBeforeSaveAll struct{} + if err := r.storage.SaveAll(rd.HardState, rd.Entries, rd.Snapshot); err != nil { r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } - // gofail: var raftAfterSave struct{} + // gofail: var raftAfterSaveAll struct{} if !raft.IsEmptySnap(rd.Snapshot) { - // gofail: var raftBeforeSaveSnap struct{} - if err := r.storage.SaveSnap(rd.Snapshot); err != nil { - r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) - } // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} - // gofail: var raftAfterSaveSnap struct{} + // gofail: var raftBeforeApplySnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index)) // gofail: var raftAfterApplySnap struct{} + + if err := r.storage.Release(rd.Snapshot); err != nil { + log.Fatal(err) + } + // gofail: var raftAfterWALRelease struct{} } r.raftStorage.Append(rd.Entries) diff --git a/etcdserver/server.go b/etcdserver/server.go index ec928194475..8388739d081 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -67,14 +67,14 @@ import ( ) const ( - DefaultSnapshotCount = 100000 + DefaultSnapshotCount = 10 // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower // to catch-up after compacting the raft storage entries. // We expect the follower has a millisecond level latency with the leader. // The max throughput is around 10K. Keep a 5K entries is enough for helping // follower to catch up. - DefaultSnapshotCatchUpEntries uint64 = 5000 + DefaultSnapshotCatchUpEntries uint64 = 10 StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" @@ -414,10 +414,33 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { zap.String("wal-dir", cfg.WALDir()), ) } - snapshot, err = ss.Load() - if err != nil && err != snap.ErrNoSnapshot { - return nil, err + + // Find a snapshot to start/restart a raft node + var ( + snapshot *raftpb.Snapshot + err error + ) + + for i := uint64(0); ; i++ { + snapshot, err = ss.LoadIndex(i) + if err != nil && err != snap.ErrNoSnapshot { + return nil, err + } + + if err == snap.ErrNoSnapshot { + break + } + + if checkWALSnap(cfg.Logger, cfg.WALDir(), snapshot) { + break + } + + cfg.Logger.Info( + "skip snapshot", + zap.Uint64("index", i), + ) } + if snapshot != nil { if err = st.Recovery(snapshot.Data); err != nil { cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) diff --git a/etcdserver/storage.go b/etcdserver/storage.go index f05070d5b25..6d4ed7a1d86 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -36,6 +36,14 @@ type Storage interface { SaveSnap(snap raftpb.Snapshot) error // Close closes the Storage and performs finalization. Close() error + + // SaveSnapshot function saves only snapshot to the underlying stable storage. + SaveSnapshot(snap raftpb.Snapshot) error + // SaveAll function saves ents, snapshot and state to the underlying stable storage. + // SaveAll MUST block until st and ents are on stable storage. + SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error + // Release release release the locked wal files since they will not be used. + Release(snap raftpb.Snapshot) error } type storage struct { @@ -65,6 +73,41 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { return st.WAL.ReleaseLockTo(snap.Metadata.Index) } +// SaveSnapshot saves the snapshot to disk. +func (st *storage) SaveSnapshot(snap raftpb.Snapshot) error { + return st.Snapshotter.SaveSnap(snap) +} + +func (st *storage) Release(snap raftpb.Snapshot) error { + return st.WAL.ReleaseLockTo(snap.Metadata.Index) +} + +func checkWALSnap(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot) bool { + if snapshot == nil { + lg.Fatal("checkWALSnap: snapshot is empty") + } + + walsnap := walpb.Snapshot{ + Index: snapshot.Metadata.Index, + Term: snapshot.Metadata.Term, + } + + w, _, _, st, _ := readWAL(lg, waldir, walsnap) + defer w.Close() + + lg.Info( + "checkWALSnap: snapshot and hardstate data", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + zap.Uint64("st-commit", st.Commit), + ) + + if snapshot.Metadata.Index > st.Commit { + return false + } + + return true +} + func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { var ( err error diff --git a/wal/wal.go b/wal/wal.go index d0521529131..687e1ecebb3 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -870,6 +870,72 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { return w.sync() } +func (w *WAL) SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error { + w.mu.Lock() + defer w.mu.Unlock() + + // short cut, do not call sync + if raft.IsEmptyHardState(st) && len(ents) == 0 && raft.IsEmptySnap(snap) { + return nil + } + + mustSync := raft.MustSync(st, w.state, len(ents)) + + if !raft.IsEmptySnap(snap) { + mustSync = true + } + + // 1. Save entries + // TODO(xiangli): no more reference operator + for i := range ents { + if err := w.saveEntry(&ents[i]); err != nil { + return err + } + // gofail: var raftAfterSaveWALFirstEntry struct{} + } + // gofail: var raftAfterSaveWALEntries struct{} + + // 2. Save snapshot + if !raft.IsEmptySnap(snap) { + e := walpb.Snapshot{ + Index: snap.Metadata.Index, + Term: snap.Metadata.Term, + } + + b := pbutil.MustMarshal(&e) + + rec := &walpb.Record{Type: snapshotType, Data: b} + if err := w.encoder.encode(rec); err != nil { + return err + } + + // update enti only when snapshot is ahead of last index + if w.enti < e.Index { + w.enti = e.Index + } + // gofail: var raftAfterSaveWALSnap struct{} + } + + // 3. Save HardState + if err := w.saveState(&st); err != nil { + return err + } + // gofail: var raftAfterSaveWALState struct{} + + curOff, err := w.tail().Seek(0, io.SeekCurrent) + if err != nil { + return err + } + if curOff < SegmentSizeBytes { + if mustSync { + return w.sync() + } + return nil + } + + return w.cut() +} + func (w *WAL) saveCrc(prevCrc uint32) error { return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) }