From 9162cd613d231be60a83e3543064d943fca5afaa Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Thu, 27 Dec 2018 15:38:38 +0000 Subject: [PATCH 1/8] etcdserver/*, wal/*: changes to snapshots and wal logic to fix #10219 --- build | 2 +- etcdserver/api/snap/snapshotter.go | 18 ++++++++ etcdserver/raft.go | 25 +++++++---- etcdserver/server.go | 33 ++++++++++++--- etcdserver/storage.go | 43 +++++++++++++++++++ wal/wal.go | 66 ++++++++++++++++++++++++++++++ 6 files changed, 173 insertions(+), 14 deletions(-) diff --git a/build b/build index be8de43f130..b683a945d1b 100755 --- a/build +++ b/build @@ -16,7 +16,7 @@ GO_LDFLAGS="$GO_LDFLAGS -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}" toggle_failpoints() { mode="$1" if command -v gofail >/dev/null 2>&1; then - gofail "$mode" etcdserver/ mvcc/backend/ + gofail "$mode" etcdserver/ mvcc/backend/ wal/ elif [[ "$mode" != "disable" ]]; then echo "FAILPOINTS set but gofail not found" exit 1 diff --git a/etcdserver/api/snap/snapshotter.go b/etcdserver/api/snap/snapshotter.go index eb96650f37d..227c92f481e 100644 --- a/etcdserver/api/snap/snapshotter.go +++ b/etcdserver/api/snap/snapshotter.go @@ -38,6 +38,7 @@ const snapSuffix = ".snap" var ( ErrNoSnapshot = errors.New("snap: no available snapshot") + ErrSnapshotIndex = errors.New("snap: no available snapshot index") ErrEmptySnapshot = errors.New("snap: empty snapshot") ErrCRCMismatch = errors.New("snap: crc mismatch") crcTable = crc32.MakeTable(crc32.Castagnoli) @@ -119,6 +120,23 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) { return snap, nil } +func (s *Snapshotter) LoadIndex(i uint64) (*raftpb.Snapshot, error) { + names, err := s.snapNames() + if err != nil { + return nil, err + } + + if len(names) == 0 { + return nil, ErrNoSnapshot + } + + if i >= uint64(len(names)) { + return nil, ErrSnapshotIndex + } + + return loadSnap(s.lg, s.dir, names[i]) +} + func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) { fpath := filepath.Join(dir, name) snap, err := Read(lg, fpath) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 872015dfb07..329a89a4cca 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -227,27 +227,36 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(r.processMessages(rd.Messages)) } - // gofail: var raftBeforeSave struct{} - if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + if !raft.IsEmptySnap(rd.Snapshot) { + // gofail: var raftBeforeSaveSnap struct{} + if err := r.storage.SaveSnapshot(rd.Snapshot); err != nil { + r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) + } + // gofail: var raftAfterSaveSnap struct{} + } + + // gofail: var raftBeforeSaveAll struct{} + if err := r.storage.SaveAll(rd.HardState, rd.Entries, rd.Snapshot); err != nil { r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } - // gofail: var raftAfterSave struct{} + // gofail: var raftAfterSaveAll struct{} if !raft.IsEmptySnap(rd.Snapshot) { - // gofail: var raftBeforeSaveSnap struct{} - if err := r.storage.SaveSnap(rd.Snapshot); err != nil { - r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) - } // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} - // gofail: var raftAfterSaveSnap struct{} + // gofail: var raftBeforeApplySnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index)) // gofail: var raftAfterApplySnap struct{} + + if err := r.storage.Release(rd.Snapshot); err != nil { + log.Fatal(err) + } + // gofail: var raftAfterWALRelease struct{} } r.raftStorage.Append(rd.Entries) diff --git a/etcdserver/server.go b/etcdserver/server.go index ec928194475..8388739d081 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -67,14 +67,14 @@ import ( ) const ( - DefaultSnapshotCount = 100000 + DefaultSnapshotCount = 10 // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower // to catch-up after compacting the raft storage entries. // We expect the follower has a millisecond level latency with the leader. // The max throughput is around 10K. Keep a 5K entries is enough for helping // follower to catch up. - DefaultSnapshotCatchUpEntries uint64 = 5000 + DefaultSnapshotCatchUpEntries uint64 = 10 StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" @@ -414,10 +414,33 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { zap.String("wal-dir", cfg.WALDir()), ) } - snapshot, err = ss.Load() - if err != nil && err != snap.ErrNoSnapshot { - return nil, err + + // Find a snapshot to start/restart a raft node + var ( + snapshot *raftpb.Snapshot + err error + ) + + for i := uint64(0); ; i++ { + snapshot, err = ss.LoadIndex(i) + if err != nil && err != snap.ErrNoSnapshot { + return nil, err + } + + if err == snap.ErrNoSnapshot { + break + } + + if checkWALSnap(cfg.Logger, cfg.WALDir(), snapshot) { + break + } + + cfg.Logger.Info( + "skip snapshot", + zap.Uint64("index", i), + ) } + if snapshot != nil { if err = st.Recovery(snapshot.Data); err != nil { cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) diff --git a/etcdserver/storage.go b/etcdserver/storage.go index f05070d5b25..6d4ed7a1d86 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -36,6 +36,14 @@ type Storage interface { SaveSnap(snap raftpb.Snapshot) error // Close closes the Storage and performs finalization. Close() error + + // SaveSnapshot function saves only snapshot to the underlying stable storage. + SaveSnapshot(snap raftpb.Snapshot) error + // SaveAll function saves ents, snapshot and state to the underlying stable storage. + // SaveAll MUST block until st and ents are on stable storage. + SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error + // Release release release the locked wal files since they will not be used. + Release(snap raftpb.Snapshot) error } type storage struct { @@ -65,6 +73,41 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { return st.WAL.ReleaseLockTo(snap.Metadata.Index) } +// SaveSnapshot saves the snapshot to disk. +func (st *storage) SaveSnapshot(snap raftpb.Snapshot) error { + return st.Snapshotter.SaveSnap(snap) +} + +func (st *storage) Release(snap raftpb.Snapshot) error { + return st.WAL.ReleaseLockTo(snap.Metadata.Index) +} + +func checkWALSnap(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot) bool { + if snapshot == nil { + lg.Fatal("checkWALSnap: snapshot is empty") + } + + walsnap := walpb.Snapshot{ + Index: snapshot.Metadata.Index, + Term: snapshot.Metadata.Term, + } + + w, _, _, st, _ := readWAL(lg, waldir, walsnap) + defer w.Close() + + lg.Info( + "checkWALSnap: snapshot and hardstate data", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + zap.Uint64("st-commit", st.Commit), + ) + + if snapshot.Metadata.Index > st.Commit { + return false + } + + return true +} + func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { var ( err error diff --git a/wal/wal.go b/wal/wal.go index d0521529131..687e1ecebb3 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -870,6 +870,72 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { return w.sync() } +func (w *WAL) SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error { + w.mu.Lock() + defer w.mu.Unlock() + + // short cut, do not call sync + if raft.IsEmptyHardState(st) && len(ents) == 0 && raft.IsEmptySnap(snap) { + return nil + } + + mustSync := raft.MustSync(st, w.state, len(ents)) + + if !raft.IsEmptySnap(snap) { + mustSync = true + } + + // 1. Save entries + // TODO(xiangli): no more reference operator + for i := range ents { + if err := w.saveEntry(&ents[i]); err != nil { + return err + } + // gofail: var raftAfterSaveWALFirstEntry struct{} + } + // gofail: var raftAfterSaveWALEntries struct{} + + // 2. Save snapshot + if !raft.IsEmptySnap(snap) { + e := walpb.Snapshot{ + Index: snap.Metadata.Index, + Term: snap.Metadata.Term, + } + + b := pbutil.MustMarshal(&e) + + rec := &walpb.Record{Type: snapshotType, Data: b} + if err := w.encoder.encode(rec); err != nil { + return err + } + + // update enti only when snapshot is ahead of last index + if w.enti < e.Index { + w.enti = e.Index + } + // gofail: var raftAfterSaveWALSnap struct{} + } + + // 3. Save HardState + if err := w.saveState(&st); err != nil { + return err + } + // gofail: var raftAfterSaveWALState struct{} + + curOff, err := w.tail().Seek(0, io.SeekCurrent) + if err != nil { + return err + } + if curOff < SegmentSizeBytes { + if mustSync { + return w.sync() + } + return nil + } + + return w.cut() +} + func (w *WAL) saveCrc(prevCrc uint32) error { return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) } From 3d2b565f98407037d1ce4a74f42a356b9a9d4d3c Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Fri, 4 Jan 2019 17:45:02 +0000 Subject: [PATCH 2/8] etcdserver/*: changes to snapshots and wal logic to fix #10219 --- build | 2 +- etcdserver/raft.go | 8 +++--- etcdserver/server.go | 5 ++++ etcdserver/storage.go | 14 --------- wal/wal.go | 66 ------------------------------------------- 5 files changed, 10 insertions(+), 85 deletions(-) diff --git a/build b/build index b683a945d1b..be8de43f130 100755 --- a/build +++ b/build @@ -16,7 +16,7 @@ GO_LDFLAGS="$GO_LDFLAGS -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}" toggle_failpoints() { mode="$1" if command -v gofail >/dev/null 2>&1; then - gofail "$mode" etcdserver/ mvcc/backend/ wal/ + gofail "$mode" etcdserver/ mvcc/backend/ elif [[ "$mode" != "disable" ]]; then echo "FAILPOINTS set but gofail not found" exit 1 diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 329a89a4cca..6c411f327ab 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -229,20 +229,20 @@ func (r *raftNode) start(rh *raftReadyHandler) { if !raft.IsEmptySnap(rd.Snapshot) { // gofail: var raftBeforeSaveSnap struct{} - if err := r.storage.SaveSnapshot(rd.Snapshot); err != nil { + if err := r.storage.SaveSnap(rd.Snapshot); err != nil { r.lg.Fatal("failed to save Raft snapshot", zap.Error(err)) } // gofail: var raftAfterSaveSnap struct{} } - // gofail: var raftBeforeSaveAll struct{} - if err := r.storage.SaveAll(rd.HardState, rd.Entries, rd.Snapshot); err != nil { + // gofail: var raftBeforeSave struct{} + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } - // gofail: var raftAfterSaveAll struct{} + // gofail: var raftAfterSave struct{} if !raft.IsEmptySnap(rd.Snapshot) { // etcdserver now claim the snapshot has been persisted onto the disk diff --git a/etcdserver/server.go b/etcdserver/server.go index 8388739d081..6fcf209d321 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -2178,6 +2178,11 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { if err = s.r.storage.SaveSnap(snap); err != nil { lg.Panic("failed to save snapshot", zap.Error(err)) } + + if err = s.r.storage.Release(snap); err != nil { + lg.Panic("failed to release wal", zap.Error(err)) + } + lg.Info( "saved snapshot", zap.Uint64("snapshot-index", snap.Metadata.Index), diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 6d4ed7a1d86..0e8390b1450 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -36,12 +36,6 @@ type Storage interface { SaveSnap(snap raftpb.Snapshot) error // Close closes the Storage and performs finalization. Close() error - - // SaveSnapshot function saves only snapshot to the underlying stable storage. - SaveSnapshot(snap raftpb.Snapshot) error - // SaveAll function saves ents, snapshot and state to the underlying stable storage. - // SaveAll MUST block until st and ents are on stable storage. - SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error // Release release release the locked wal files since they will not be used. Release(snap raftpb.Snapshot) error } @@ -66,15 +60,7 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { if err != nil { return err } - err = st.Snapshotter.SaveSnap(snap) - if err != nil { - return err - } - return st.WAL.ReleaseLockTo(snap.Metadata.Index) -} -// SaveSnapshot saves the snapshot to disk. -func (st *storage) SaveSnapshot(snap raftpb.Snapshot) error { return st.Snapshotter.SaveSnap(snap) } diff --git a/wal/wal.go b/wal/wal.go index 687e1ecebb3..d0521529131 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -870,72 +870,6 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { return w.sync() } -func (w *WAL) SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error { - w.mu.Lock() - defer w.mu.Unlock() - - // short cut, do not call sync - if raft.IsEmptyHardState(st) && len(ents) == 0 && raft.IsEmptySnap(snap) { - return nil - } - - mustSync := raft.MustSync(st, w.state, len(ents)) - - if !raft.IsEmptySnap(snap) { - mustSync = true - } - - // 1. Save entries - // TODO(xiangli): no more reference operator - for i := range ents { - if err := w.saveEntry(&ents[i]); err != nil { - return err - } - // gofail: var raftAfterSaveWALFirstEntry struct{} - } - // gofail: var raftAfterSaveWALEntries struct{} - - // 2. Save snapshot - if !raft.IsEmptySnap(snap) { - e := walpb.Snapshot{ - Index: snap.Metadata.Index, - Term: snap.Metadata.Term, - } - - b := pbutil.MustMarshal(&e) - - rec := &walpb.Record{Type: snapshotType, Data: b} - if err := w.encoder.encode(rec); err != nil { - return err - } - - // update enti only when snapshot is ahead of last index - if w.enti < e.Index { - w.enti = e.Index - } - // gofail: var raftAfterSaveWALSnap struct{} - } - - // 3. Save HardState - if err := w.saveState(&st); err != nil { - return err - } - // gofail: var raftAfterSaveWALState struct{} - - curOff, err := w.tail().Seek(0, io.SeekCurrent) - if err != nil { - return err - } - if curOff < SegmentSizeBytes { - if mustSync { - return w.sync() - } - return nil - } - - return w.cut() -} - func (w *WAL) saveCrc(prevCrc uint32) error { return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) } From 5435e7686a9a7e27d3c79aea79bca0b80ea1d770 Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Mon, 7 Jan 2019 10:48:16 +0000 Subject: [PATCH 3/8] etcdserver/*: fix tests --- etcdserver/server_test.go | 38 +++++++++++++++++------- pkg/mock/mockstorage/storage_recorder.go | 7 +++++ 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 85e474a698c..21116c65930 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -998,12 +998,16 @@ func TestSnapshot(t *testing.T) { gaction, _ := p.Wait(1) defer func() { ch <- struct{}{} }() - if len(gaction) != 1 { - t.Errorf("len(action) = %d, want 1", len(gaction)) + if len(gaction) != 2 { + t.Fatalf("len(action) = %d, want 2", len(gaction)) } if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) { t.Errorf("action = %s, want SaveSnap", gaction[0]) } + + if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) { + t.Errorf("action = %s, want Release", gaction[1]) + } }() go func() { @@ -1087,20 +1091,28 @@ func TestSnapshotOrdering(t *testing.T) { n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot} }() - if ac := <-p.Chan(); ac.Name != "Save" { + ac := <-p.Chan() + if ac.Name != "Save" { t.Fatalf("expected Save, got %+v", ac) } + + if ac := <-p.Chan(); ac.Name != "SaveSnap" { + t.Fatalf("expected SaveSnap, got %+v", ac) + } + if ac := <-p.Chan(); ac.Name != "Save" { t.Fatalf("expected Save, got %+v", ac) } + // confirm snapshot file still present before calling SaveSnap snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1)) if !fileutil.Exist(snapPath) { t.Fatalf("expected file %q, got missing", snapPath) } + // unblock SaveSnapshot, etcdserver now permitted to move snapshot file - if ac := <-p.Chan(); ac.Name != "SaveSnap" { - t.Fatalf("expected SaveSnap, got %+v", ac) + if ac := <-p.Chan(); ac.Name != "Release" { + t.Fatalf("expected Release, got %+v", ac) } } @@ -1140,16 +1152,22 @@ func TestTriggerSnap(t *testing.T) { donec := make(chan struct{}) go func() { - wcnt := 2 + snapc + wcnt := 3 + snapc gaction, _ := p.Wait(wcnt) // each operation is recorded as a Save - // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + Release if len(gaction) != wcnt { - t.Errorf("len(action) = %d, want %d", len(gaction), wcnt) + fmt.Println("gaction", gaction) + t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) + } + + if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) { + t.Errorf("action = %s, want SaveSnap", gaction[wcnt-2]) } - if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1]) + + if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) { + t.Errorf("action = %s, want Release", gaction[wcnt-1]) } close(donec) }() diff --git a/pkg/mock/mockstorage/storage_recorder.go b/pkg/mock/mockstorage/storage_recorder.go index e1fa7f9fdb5..d1e59a5b87e 100644 --- a/pkg/mock/mockstorage/storage_recorder.go +++ b/pkg/mock/mockstorage/storage_recorder.go @@ -45,4 +45,11 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { return nil } +func (p *storageRecorder) Release(st raftpb.Snapshot) error { + if !raft.IsEmptySnap(st) { + p.Record(testutil.Action{Name: "Release"}) + } + return nil +} + func (p *storageRecorder) Close() error { return nil } From 91efa67cb1d2639514ec4d095468830bbfcb4d0b Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Mon, 7 Jan 2019 14:27:19 +0000 Subject: [PATCH 4/8] etcdserver/*: rollback default settings --- etcdserver/server.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 6fcf209d321..26419d0106b 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -67,14 +67,14 @@ import ( ) const ( - DefaultSnapshotCount = 10 + DefaultSnapshotCount = 100000 // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower // to catch-up after compacting the raft storage entries. // We expect the follower has a millisecond level latency with the leader. // The max throughput is around 10K. Keep a 5K entries is enough for helping // follower to catch up. - DefaultSnapshotCatchUpEntries uint64 = 10 + DefaultSnapshotCatchUpEntries uint64 = 5000 StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" @@ -416,11 +416,6 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } // Find a snapshot to start/restart a raft node - var ( - snapshot *raftpb.Snapshot - err error - ) - for i := uint64(0); ; i++ { snapshot, err = ss.LoadIndex(i) if err != nil && err != snap.ErrNoSnapshot { From 50517039ae1d8819cec182317f67e1349ccab925 Mon Sep 17 00:00:00 2001 From: Viacheslav Biriukov Date: Tue, 8 Jan 2019 17:15:30 +0000 Subject: [PATCH 5/8] etcdserver/*, wal/*: add Sync method --- etcdserver/raft.go | 7 +++++++ etcdserver/server_test.go | 6 +++++- etcdserver/storage.go | 2 ++ pkg/mock/mockstorage/storage_recorder.go | 5 +++++ wal/wal.go | 4 ++++ 5 files changed, 23 insertions(+), 1 deletion(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 6c411f327ab..1361f59c469 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -245,6 +245,13 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftAfterSave struct{} if !raft.IsEmptySnap(rd.Snapshot) { + // Force WAL to fsync its hard state before Release() releases + // old data from the WAL. Otherwise could get an error like: + // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost? + if err := r.storage.Sync(); err != nil { + log.Fatal(err) + } + // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 21116c65930..1f3bf349b9e 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -995,7 +995,7 @@ func TestSnapshot(t *testing.T) { ch := make(chan struct{}, 2) go func() { - gaction, _ := p.Wait(1) + gaction, _ := p.Wait(2) defer func() { ch <- struct{}{} }() if len(gaction) != 2 { @@ -1111,6 +1111,10 @@ func TestSnapshotOrdering(t *testing.T) { } // unblock SaveSnapshot, etcdserver now permitted to move snapshot file + if ac := <-p.Chan(); ac.Name != "Sync" { + t.Fatalf("expected Sync, got %+v", ac) + } + if ac := <-p.Chan(); ac.Name != "Release" { t.Fatalf("expected Release, got %+v", ac) } diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 0e8390b1450..3376a825a95 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -38,6 +38,8 @@ type Storage interface { Close() error // Release release release the locked wal files since they will not be used. Release(snap raftpb.Snapshot) error + // Sync WAL + Sync() error } type storage struct { diff --git a/pkg/mock/mockstorage/storage_recorder.go b/pkg/mock/mockstorage/storage_recorder.go index d1e59a5b87e..d3048668712 100644 --- a/pkg/mock/mockstorage/storage_recorder.go +++ b/pkg/mock/mockstorage/storage_recorder.go @@ -52,4 +52,9 @@ func (p *storageRecorder) Release(st raftpb.Snapshot) error { return nil } +func (p *storageRecorder) Sync() error { + p.Record(testutil.Action{Name: "Sync"}) + return nil +} + func (p *storageRecorder) Close() error { return nil } diff --git a/wal/wal.go b/wal/wal.go index d0521529131..17df41e2665 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -723,6 +723,10 @@ func (w *WAL) sync() error { return err } +func (w *WAL) Sync() error { + return w.sync() +} + // ReleaseLockTo releases the locks, which has smaller index than the given index // except the largest one among them. // For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release From bd16071846b716cc625c69cd502c953e5f825121 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Wed, 13 May 2020 21:39:55 -0700 Subject: [PATCH 6/8] etcdserver/*, wal/*: find valid snapshots by cross checking snap files and wal snap entries --- etcdserver/api/snap/snapshotter.go | 47 ++++++++++---------- etcdserver/api/snap/snapshotter_test.go | 47 +++++++++++++++++--- etcdserver/raft.go | 2 + etcdserver/server.go | 41 +++++++----------- etcdserver/storage.go | 43 ++++++------------- wal/wal.go | 57 +++++++++++++++++++++++++ wal/wal_test.go | 57 +++++++++++++++++++++++++ 7 files changed, 208 insertions(+), 86 deletions(-) diff --git a/etcdserver/api/snap/snapshotter.go b/etcdserver/api/snap/snapshotter.go index 227c92f481e..7a577dd84af 100644 --- a/etcdserver/api/snap/snapshotter.go +++ b/etcdserver/api/snap/snapshotter.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/v3/pkg/pbutil" "go.etcd.io/etcd/v3/raft" "go.etcd.io/etcd/v3/raft/raftpb" + "go.etcd.io/etcd/v3/wal/walpb" "go.uber.org/zap" ) @@ -38,7 +39,6 @@ const snapSuffix = ".snap" var ( ErrNoSnapshot = errors.New("snap: no available snapshot") - ErrSnapshotIndex = errors.New("snap: no available snapshot index") ErrEmptySnapshot = errors.New("snap: empty snapshot") ErrCRCMismatch = errors.New("snap: crc mismatch") crcTable = crc32.MakeTable(crc32.Castagnoli) @@ -103,38 +103,37 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { return nil } +// Load returns the newest snapshot. func (s *Snapshotter) Load() (*raftpb.Snapshot, error) { - names, err := s.snapNames() - if err != nil { - return nil, err - } - var snap *raftpb.Snapshot - for _, name := range names { - if snap, err = loadSnap(s.lg, s.dir, name); err == nil { - break + return s.loadMatching(func(*raftpb.Snapshot) bool { return true }) +} + +// LoadNewestAvailable loads the newest snapshot available that is in walSnaps. +func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) { + return s.loadMatching(func(snapshot *raftpb.Snapshot) bool { + m := snapshot.Metadata + for i := len(walSnaps) - 1; i >= 0; i-- { + if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index { + return true + } } - } - if err != nil { - return nil, ErrNoSnapshot - } - return snap, nil + return false + }) } -func (s *Snapshotter) LoadIndex(i uint64) (*raftpb.Snapshot, error) { +// loadMatching returns the newest snapshot where matchFn returns true. +func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) { names, err := s.snapNames() if err != nil { return nil, err } - - if len(names) == 0 { - return nil, ErrNoSnapshot - } - - if i >= uint64(len(names)) { - return nil, ErrSnapshotIndex + var snap *raftpb.Snapshot + for _, name := range names { + if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) { + return snap, nil + } } - - return loadSnap(s.lg, s.dir, names[i]) + return nil, ErrNoSnapshot } func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) { diff --git a/etcdserver/api/snap/snapshotter_test.go b/etcdserver/api/snap/snapshotter_test.go index 7f7ac7c7e31..648fdadfd9f 100644 --- a/etcdserver/api/snap/snapshotter_test.go +++ b/etcdserver/api/snap/snapshotter_test.go @@ -24,6 +24,8 @@ import ( "testing" "go.etcd.io/etcd/v3/raft/raftpb" + "go.etcd.io/etcd/v3/wal/walpb" + "go.uber.org/zap" ) @@ -166,12 +168,47 @@ func TestLoadNewestSnap(t *testing.T) { t.Fatal(err) } - g, err := ss.Load() - if err != nil { - t.Errorf("err = %v, want nil", err) + cases := []struct { + name string + availableWalSnaps []walpb.Snapshot + expected *raftpb.Snapshot + }{ + { + name: "load-newest", + expected: &newSnap, + }, + { + name: "loadnewestavailable-newest", + availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}}, + expected: &newSnap, + }, + { + name: "loadnewestavailable-newest-unsorted", + availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}}, + expected: &newSnap, + }, + { + name: "loadnewestavailable-previous", + availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}}, + expected: testSnap, + }, } - if !reflect.DeepEqual(g, &newSnap) { - t.Errorf("snap = %#v, want %#v", g, &newSnap) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var err error + var g *raftpb.Snapshot + if tc.availableWalSnaps != nil { + g, err = ss.LoadNewestAvailable(tc.availableWalSnaps) + } else { + g, err = ss.Load() + } + if err != nil { + t.Errorf("err = %v, want nil", err) + } + if !reflect.DeepEqual(g, tc.expected) { + t.Errorf("snap = %#v, want %#v", g, tc.expected) + } + }) } } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 1361f59c469..e28b033b76c 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -227,6 +227,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(r.processMessages(rd.Messages)) } + // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to + // ensure that recovery after a snapshot restore is possible. if !raft.IsEmptySnap(rd.Snapshot) { // gofail: var raftBeforeSaveSnap struct{} if err := r.storage.SaveSnap(rd.Snapshot); err != nil { diff --git a/etcdserver/server.go b/etcdserver/server.go index 26419d0106b..5e1925ce090 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -29,6 +29,11 @@ import ( "sync/atomic" "time" + "github.com/coreos/go-semver/semver" + humanize "github.com/dustin/go-humanize" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + "go.etcd.io/etcd/v3/auth" "go.etcd.io/etcd/v3/etcdserver/api" "go.etcd.io/etcd/v3/etcdserver/api/membership" @@ -59,11 +64,6 @@ import ( "go.etcd.io/etcd/v3/raft/raftpb" "go.etcd.io/etcd/v3/version" "go.etcd.io/etcd/v3/wal" - - "github.com/coreos/go-semver/semver" - humanize "github.com/dustin/go-humanize" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" ) const ( @@ -416,24 +416,15 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } // Find a snapshot to start/restart a raft node - for i := uint64(0); ; i++ { - snapshot, err = ss.LoadIndex(i) - if err != nil && err != snap.ErrNoSnapshot { - return nil, err - } - - if err == snap.ErrNoSnapshot { - break - } - - if checkWALSnap(cfg.Logger, cfg.WALDir(), snapshot) { - break - } - - cfg.Logger.Info( - "skip snapshot", - zap.Uint64("index", i), - ) + walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) + if err != nil { + return nil, err + } + // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding + // wal log entries + snapshot, err := ss.LoadNewestAvailable(walSnaps) + if err != nil && err != snap.ErrNoSnapshot { + return nil, err } if snapshot != nil { @@ -2168,12 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } lg.Panic("failed to create snapshot", zap.Error(err)) } - // SaveSnap saves the snapshot and releases the locked wal files - // to the snapshot index. + // SaveSnap saves the snapshot to file and appends the corresponding WAL entry. if err = s.r.storage.SaveSnap(snap); err != nil { lg.Panic("failed to save snapshot", zap.Error(err)) } - if err = s.r.storage.Release(snap); err != nil { lg.Panic("failed to release wal", zap.Error(err)) } diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 3376a825a95..15367566df3 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -36,7 +36,7 @@ type Storage interface { SaveSnap(snap raftpb.Snapshot) error // Close closes the Storage and performs finalization. Close() error - // Release release release the locked wal files since they will not be used. + // Release releases the locked wal files older than the provided snapshot. Release(snap raftpb.Snapshot) error // Sync WAL Sync() error @@ -51,51 +51,32 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { return &storage{w, s} } -// SaveSnap saves the snapshot to disk and release the locked -// wal files since they will not be used. +// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry. func (st *storage) SaveSnap(snap raftpb.Snapshot) error { walsnap := walpb.Snapshot{ Index: snap.Metadata.Index, Term: snap.Metadata.Term, } - err := st.WAL.SaveSnapshot(walsnap) + // save the snapshot file before writing the snapshot to the wal. + // This makes it possible for the snapshot file to become orphaned, but prevents + // a WAL snapshot entry from having no corresponding snapshot file. + err := st.Snapshotter.SaveSnap(snap) if err != nil { return err } + // gofail: var raftBeforeWALSaveSnaphot struct{} - return st.Snapshotter.SaveSnap(snap) + return st.WAL.SaveSnapshot(walsnap) } +// Release release the locks to the wal files that are older than the provided wal for the given snap. func (st *storage) Release(snap raftpb.Snapshot) error { return st.WAL.ReleaseLockTo(snap.Metadata.Index) } -func checkWALSnap(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot) bool { - if snapshot == nil { - lg.Fatal("checkWALSnap: snapshot is empty") - } - - walsnap := walpb.Snapshot{ - Index: snapshot.Metadata.Index, - Term: snapshot.Metadata.Term, - } - - w, _, _, st, _ := readWAL(lg, waldir, walsnap) - defer w.Close() - - lg.Info( - "checkWALSnap: snapshot and hardstate data", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - zap.Uint64("st-commit", st.Commit), - ) - - if snapshot.Metadata.Index > st.Commit { - return false - } - - return true -} - +// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear +// after the position of the given snap in the WAL. +// The snap must have been previously saved to the WAL, or this call will panic. func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { var ( err error diff --git a/wal/wal.go b/wal/wal.go index 17df41e2665..d038b1f5866 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -531,6 +531,63 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return metadata, state, ents, err } +// ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory. +// Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate. +func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) { + var snaps []walpb.Snapshot + var state raftpb.HardState + var err error + + rec := &walpb.Record{} + names, err := readWALNames(lg, walDir) + if err != nil { + return nil, err + } + + // open wal files in read mode, so that there is no conflict + // when the same WAL is opened elsewhere in write mode + rs, _, closer, err := openWALFiles(lg, walDir, names, 0, false) + if err != nil { + return nil, err + } + defer func() { + if closer != nil { + closer() + } + }() + + // create a new decoder from the readers on the WAL files + decoder := newDecoder(rs...) + + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + switch rec.Type { + case snapshotType: + var loadedSnap walpb.Snapshot + pbutil.MustUnmarshal(&loadedSnap, rec.Data) + snaps = append(snaps, loadedSnap) + case stateType: + state = mustUnmarshalState(rec.Data) + } + } + // We do not have to read out all the WAL entries + // as the decoder is opened in read mode. + if err != io.EOF && err != io.ErrUnexpectedEOF { + return nil, err + } + + // filter out any snaps that are newer than the committed hardstate + n := 0 + for _, s := range snaps { + if s.Index <= state.Commit { + snaps[n] = s + n++ + } + } + snaps = snaps[:n] + + return snaps, nil +} + // Verify reads through the given WAL and verifies that it is not corrupted. // It creates a new decoder to read through the records of the given WAL. // It does not conflict with any open WAL, but it is recommended not to diff --git a/wal/wal_test.go b/wal/wal_test.go index bc9fa1e6344..07fb49feac1 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -1000,3 +1000,60 @@ func TestReadAllFail(t *testing.T) { t.Fatalf("err = %v, want ErrDecoderNotFound", err) } } + +// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting +// for hardstate +func TestValidSnapshotEntries(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + snap0 := walpb.Snapshot{Index: 0, Term: 0} + snap1 := walpb.Snapshot{Index: 1, Term: 1} + snap2 := walpb.Snapshot{Index: 2, Term: 1} + snap3 := walpb.Snapshot{Index: 3, Term: 2} + snap4 := walpb.Snapshot{Index: 4, Term: 2} + func() { + w, err := Create(zap.NewExample(), p, nil) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + // snap0 is implicitly created at index 0, term 0 + if err = w.SaveSnapshot(snap1); err != nil { + t.Fatal(err) + } + state := raftpb.HardState{Commit: 1, Term: 1} + if err = w.Save(state, nil); err != nil { + t.Fatal(err) + } + if err = w.SaveSnapshot(snap2); err != nil { + t.Fatal(err) + } + if err = w.SaveSnapshot(snap3); err != nil { + t.Fatal(err) + } + state2 := raftpb.HardState{Commit: 3, Term: 2} + if err = w.Save(state2, nil); err != nil { + t.Fatal(err) + } + if err = w.SaveSnapshot(snap4); err != nil { + t.Fatal(err) + } + }() + walSnaps, err := ValidSnapshotEntries(zap.NewExample(), p) + if err != nil { + t.Fatal(err) + } + expected := []walpb.Snapshot{snap0, snap1, snap2, snap3} + if len(walSnaps) != len(expected) { + t.Fatalf("expected 4 walSnaps, got %d", len(expected)) + } + for i := 0; i < len(expected); i++ { + if walSnaps[i].Index != expected[i].Index || walSnaps[i].Term != expected[i].Term { + t.Errorf("expected walSnaps %+v at index %d, got %+v", expected[i], i, walSnaps[i]) + } + } +} From b68eea236e729a662e359ba805bbeb2a71ba41c1 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Thu, 14 May 2020 13:34:55 -0700 Subject: [PATCH 7/8] etcdserver/*, wal/*:Add comments, clean up error messages and tests --- etcdserver/raft.go | 5 +++-- etcdserver/server_test.go | 2 +- wal/wal.go | 2 +- wal/wal_test.go | 17 ++++++----------- 4 files changed, 11 insertions(+), 15 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index e28b033b76c..b8e455b3f87 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -250,8 +250,9 @@ func (r *raftNode) start(rh *raftReadyHandler) { // Force WAL to fsync its hard state before Release() releases // old data from the WAL. Otherwise could get an error like: // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost? + // See https://github.com/etcd-io/etcd/issues/10219 for more details. if err := r.storage.Sync(); err != nil { - log.Fatal(err) + r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err)) } // etcdserver now claim the snapshot has been persisted onto the disk @@ -263,7 +264,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftAfterApplySnap struct{} if err := r.storage.Release(rd.Snapshot); err != nil { - log.Fatal(err) + r.lg.Fatal("failed to release Raft wal", zap.Error(err)) } // gofail: var raftAfterWALRelease struct{} } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 1f3bf349b9e..d09ccc30b57 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1162,7 +1162,7 @@ func TestTriggerSnap(t *testing.T) { // each operation is recorded as a Save // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + Release if len(gaction) != wcnt { - fmt.Println("gaction", gaction) + t.Logf("gaction: %v", gaction) t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } diff --git a/wal/wal.go b/wal/wal.go index d038b1f5866..5d0be5585a6 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -583,7 +583,7 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro n++ } } - snaps = snaps[:n] + snaps = snaps[:n:n] return snaps, nil } diff --git a/wal/wal_test.go b/wal/wal_test.go index 07fb49feac1..068b05334b2 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -1011,9 +1011,11 @@ func TestValidSnapshotEntries(t *testing.T) { defer os.RemoveAll(p) snap0 := walpb.Snapshot{Index: 0, Term: 0} snap1 := walpb.Snapshot{Index: 1, Term: 1} + state1 := raftpb.HardState{Commit: 1, Term: 1} snap2 := walpb.Snapshot{Index: 2, Term: 1} snap3 := walpb.Snapshot{Index: 3, Term: 2} - snap4 := walpb.Snapshot{Index: 4, Term: 2} + state2 := raftpb.HardState{Commit: 3, Term: 2} + snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3 func() { w, err := Create(zap.NewExample(), p, nil) if err != nil { @@ -1025,8 +1027,7 @@ func TestValidSnapshotEntries(t *testing.T) { if err = w.SaveSnapshot(snap1); err != nil { t.Fatal(err) } - state := raftpb.HardState{Commit: 1, Term: 1} - if err = w.Save(state, nil); err != nil { + if err = w.Save(state1, nil); err != nil { t.Fatal(err) } if err = w.SaveSnapshot(snap2); err != nil { @@ -1035,7 +1036,6 @@ func TestValidSnapshotEntries(t *testing.T) { if err = w.SaveSnapshot(snap3); err != nil { t.Fatal(err) } - state2 := raftpb.HardState{Commit: 3, Term: 2} if err = w.Save(state2, nil); err != nil { t.Fatal(err) } @@ -1048,12 +1048,7 @@ func TestValidSnapshotEntries(t *testing.T) { t.Fatal(err) } expected := []walpb.Snapshot{snap0, snap1, snap2, snap3} - if len(walSnaps) != len(expected) { - t.Fatalf("expected 4 walSnaps, got %d", len(expected)) - } - for i := 0; i < len(expected); i++ { - if walSnaps[i].Index != expected[i].Index || walSnaps[i].Term != expected[i].Term { - t.Errorf("expected walSnaps %+v at index %d, got %+v", expected[i], i, walSnaps[i]) - } + if !reflect.DeepEqual(walSnaps, expected) { + t.Errorf("expected walSnaps %+v, got %+v", expected, walSnaps) } } From 743e6e92cbc02e4320b256b56ac644e1f1ef8f30 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Thu, 14 May 2020 15:06:15 -0700 Subject: [PATCH 8/8] etcdserver/*, wal/*: Remove orphaned .snap.db files during Release --- etcdserver/api/snap/snapshotter.go | 29 ++++++++++++++++++ etcdserver/api/snap/snapshotter_test.go | 40 +++++++++++++++++++++++++ etcdserver/storage.go | 9 ++++-- 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/etcdserver/api/snap/snapshotter.go b/etcdserver/api/snap/snapshotter.go index 7a577dd84af..86475bd0914 100644 --- a/etcdserver/api/snap/snapshotter.go +++ b/etcdserver/api/snap/snapshotter.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "sort" + "strconv" "strings" "time" @@ -265,3 +266,31 @@ func (s *Snapshotter) cleanupSnapdir(filenames []string) error { } return nil } + +func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error { + dir, err := os.Open(s.dir) + if err != nil { + return err + } + defer dir.Close() + filenames, err := dir.Readdirnames(-1) + if err != nil { + return err + } + for _, filename := range filenames { + if strings.HasSuffix(filename, ".snap.db") { + hexIndex := strings.TrimSuffix(filepath.Base(filename), ".snap.db") + index, err := strconv.ParseUint(hexIndex, 16, 64) + if err != nil { + return fmt.Errorf("failed to parse index from .snap.db filename '%s': %w", filename, err) + } + if index < snap.Metadata.Index { + s.lg.Info("found orphaned .snap.db file; deleting", zap.String("path", filename)) + if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) { + return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr) + } + } + } + } + return nil +} diff --git a/etcdserver/api/snap/snapshotter_test.go b/etcdserver/api/snap/snapshotter_test.go index 648fdadfd9f..542305b5698 100644 --- a/etcdserver/api/snap/snapshotter_test.go +++ b/etcdserver/api/snap/snapshotter_test.go @@ -16,6 +16,7 @@ package snap import ( "fmt" + "go.etcd.io/etcd/v3/pkg/fileutil" "hash/crc32" "io/ioutil" "os" @@ -266,3 +267,42 @@ func TestAllSnapshotBroken(t *testing.T) { t.Errorf("err = %v, want %v", err, ErrNoSnapshot) } } + +func TestReleaseSnapDBs(t *testing.T) { + dir := filepath.Join(os.TempDir(), "snapshot") + err := os.Mkdir(dir, 0700) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + snapIndices := []uint64{100, 200, 300, 400} + for _, index := range snapIndices { + filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index)) + if err := ioutil.WriteFile(filename, []byte("snap file\n"), 0644); err != nil { + t.Fatal(err) + } + } + + ss := New(zap.NewExample(), dir) + + if err := ss.ReleaseSnapDBs(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 300}}); err != nil { + t.Fatal(err) + } + + deleted := []uint64{100, 200} + for _, index := range deleted { + filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index)) + if fileutil.Exist(filename) { + t.Errorf("expected %s (index: %d) to be deleted, but it still exists", filename, index) + } + } + + retained := []uint64{300, 400} + for _, index := range retained { + filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index)) + if !fileutil.Exist(filename) { + t.Errorf("expected %s (index: %d) to be retained, but it no longer exists", filename, index) + } + } +} diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 15367566df3..796718b89c0 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -69,9 +69,14 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { return st.WAL.SaveSnapshot(walsnap) } -// Release release the locks to the wal files that are older than the provided wal for the given snap. +// Release releases resources older than the given snap and are no longer needed: +// - releases the locks to the wal files that are older than the provided wal for the given snap. +// - deletes any .snap.db files that are older than the given snap. func (st *storage) Release(snap raftpb.Snapshot) error { - return st.WAL.ReleaseLockTo(snap.Metadata.Index) + if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil { + return err + } + return st.Snapshotter.ReleaseSnapDBs(snap) } // readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear