Skip to content

Commit

Permalink
Merge pull request etcd-io#18825 from serathius/snapshot-separate
Browse files Browse the repository at this point in the history
Run a separate in memory snapshot to reduce number of entries stored in raft memory storage
  • Loading branch information
serathius authored Nov 29, 2024
2 parents 6e8b3e3 + 4989834 commit 4068189
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 78 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) {
if m.ClientURLs != nil {
mustUpdateMemberAttrInStore(c.lg, store, m)
}
c.lg.Info(
c.lg.Debug(
"snapshot storing member",
zap.String("id", m.ID.String()),
zap.Strings("peer-urls", m.PeerURLs),
Expand Down
133 changes: 73 additions & 60 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
readyPercentThreshold = 0.9

DowngradeEnabledPath = "/downgrade/enabled"
memorySnapshotCount = 100
)

var (
Expand Down Expand Up @@ -291,9 +292,10 @@ type EtcdServer struct {
clusterVersionChanged *notify.Notifier

*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
forceDiskSnapshot bool
corruptionChecker CorruptionChecker
}

Expand Down Expand Up @@ -741,10 +743,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
}

type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
confState raftpb.ConfState
diskSnapshotIndex uint64
memorySnapshotIndex uint64
appliedt uint64
appliedi uint64
}

// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
Expand Down Expand Up @@ -809,10 +812,11 @@ func (s *EtcdServer) run() {
s.r.start(rh)

ep := etcdProgress{
confState: sn.Metadata.ConfState,
snapi: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
confState: sn.Metadata.ConfState,
diskSnapshotIndex: sn.Metadata.Index,
memorySnapshotIndex: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
}

defer func() {
Expand Down Expand Up @@ -979,7 +983,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
// storage, since the raft routine might be slower than toApply routine.
<-apply.notifyc

s.triggerSnapshot(ep)
s.snapshotIfNeededAndCompactRaftLog(ep)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand All @@ -998,15 +1002,15 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
lg := s.Logger()
lg.Info(
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
)
defer func() {
lg.Info(
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand All @@ -1017,7 +1021,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
if toApply.snapshot.Metadata.Index <= ep.appliedi {
lg.Panic(
"unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand Down Expand Up @@ -1132,7 +1136,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {

ep.appliedt = toApply.snapshot.Metadata.Term
ep.appliedi = toApply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.diskSnapshotIndex = ep.appliedi
ep.memorySnapshotIndex = ep.appliedi
ep.confState = toApply.snapshot.Metadata.ConfState

// As backends and implementations like alarmsStore changed, we need
Expand Down Expand Up @@ -1188,31 +1193,26 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
}

func (s *EtcdServer) ForceSnapshot() {
s.forceSnapshot = true
s.forceDiskSnapshot = true
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if !s.shouldSnapshot(ep) {
func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) {
//TODO: Remove disk snapshot in v3.7
shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep)
shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep)
if !shouldSnapshotToDisk && !shouldSnapshotToMemory {
return
}
lg := s.Logger()
lg.Info(
"triggering snapshot",
zap.String("local-member-id", s.MemberID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.snapi),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceSnapshot),
)
s.forceSnapshot = false

s.snapshot(ep.appliedi, ep.confState)
s.snapshot(ep, shouldSnapshotToDisk)
s.compactRaftLog(ep.appliedi)
ep.snapi = ep.appliedi
}

func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
}

func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool {
return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount
}

func (s *EtcdServer) hasMultipleVotingMembers() bool {
Expand Down Expand Up @@ -2128,23 +2128,33 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
//
// KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially
// from applyAll function.
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()

func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
lg := s.Logger()
d := GetMembershipInfoInV2Format(lg, s.cluster)
if toDisk {
s.Logger().Info(
"triggering snapshot",
zap.String("local-member-id", s.MemberID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
)
s.forceDiskSnapshot = false
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
//
// KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially
// from applyAll function.
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()
}

// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
Expand All @@ -2153,21 +2163,25 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
lg.Panic("failed to create snapshot", zap.Error(err))
}
ep.memorySnapshotIndex = ep.appliedi

verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())

// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}
if toDisk {
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
ep.diskSnapshotIndex = ep.appliedi
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}

lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
lg.Info(
"saved snapshot to disk",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}
}

func (s *EtcdServer) compactRaftLog(snapi uint64) {
Expand All @@ -2188,7 +2202,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand All @@ -2198,7 +2211,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
}
lg.Panic("failed to compact", zap.Error(err))
}
lg.Info(
lg.Debug(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
Expand Down
73 changes: 57 additions & 16 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
}
}

// TestSnapshot should snapshot the store and cut the persistent
func TestSnapshot(t *testing.T) {
// TestSnapshotDisk should save the snapshot to disk and release old snapshots
func TestSnapshotDisk(t *testing.T) {
revertFunc := verify.DisableVerifications()
defer revertFunc()

Expand Down Expand Up @@ -667,24 +667,65 @@ func TestSnapshot(t *testing.T) {
gaction, _ := p.Wait(2)
defer func() { ch <- struct{}{} }()

if len(gaction) != 2 {
t.Errorf("len(action) = %d, want 2", len(gaction))
return
}
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[0])
}
assert.Len(t, gaction, 2)
assert.Equal(t, testutil.Action{Name: "SaveSnap"}, gaction[0])
assert.Equal(t, testutil.Action{Name: "Release"}, gaction[1])
}()
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
srv.snapshot(&ep, true)
<-ch
assert.Empty(t, st.Action())
assert.Equal(t, uint64(1), ep.diskSnapshotIndex)
assert.Equal(t, uint64(1), ep.memorySnapshotIndex)
}

if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) {
t.Errorf("action = %s, want Release", gaction[1])
}
func TestSnapshotMemory(t *testing.T) {
revertFunc := verify.DisableVerifications()
defer revertFunc()

be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)

s := raft.NewMemoryStorage()
s.Append([]raftpb.Entry{{Index: 1}})
st := mockstore.NewRecorderStream()
p := mockstorage.NewStorageRecorderStream("")
r := newRaftNode(raftNodeConfig{
lg: zaptest.NewLogger(t),
Node: newNodeNop(),
raftStorage: s,
storage: p,
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
r: *r,
v2store: st,
consistIndex: cindex.NewConsistentIndex(be),
}
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer func() {
assert.NoError(t, srv.kv.Close())
}()
srv.be = be

srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
cl := membership.NewCluster(zaptest.NewLogger(t))
srv.cluster = cl

ch := make(chan struct{}, 1)

go func() {
gaction, _ := p.Wait(1)
defer func() { ch <- struct{}{} }()

assert.Empty(t, gaction)
}()
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
srv.snapshot(&ep, false)
<-ch
if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
}
assert.Empty(t, st.Action())
assert.Equal(t, uint64(0), ep.diskSnapshotIndex)
assert.Equal(t, uint64(1), ep.memorySnapshotIndex)
}

// TestSnapshotOrdering ensures raft persists snapshot onto disk before
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
// In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot.
// For now let's use snapshot log which should be equivalent to compaction.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2)

// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
Expand Down

0 comments on commit 4068189

Please sign in to comment.