Skip to content

Commit

Permalink
Run a separate in memory snapshot to reduce number of entries stored …
Browse files Browse the repository at this point in the history
…in raft memory storage

Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Nov 29, 2024
1 parent 3de0018 commit 75def3f
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 33 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) {
if m.ClientURLs != nil {
mustUpdateMemberAttrInStore(c.lg, store, m)
}
c.lg.Info(
c.lg.Debug(
"snapshot storing member",
zap.String("id", m.ID.String()),
zap.Strings("peer-urls", m.PeerURLs),
Expand Down
89 changes: 59 additions & 30 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
readyPercentThreshold = 0.9

DowngradeEnabledPath = "/downgrade/enabled"
memorySnapshotCount = 100
)

var (
Expand Down Expand Up @@ -291,9 +292,10 @@ type EtcdServer struct {
clusterVersionChanged *notify.Notifier

*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
forceDiskSnapshot bool
corruptionChecker CorruptionChecker
}

Expand Down Expand Up @@ -741,10 +743,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
}

type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
confState raftpb.ConfState
diskSnapshotIndex uint64
memorySnapshotIndex uint64
appliedt uint64
appliedi uint64
}

// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
Expand Down Expand Up @@ -809,10 +812,11 @@ func (s *EtcdServer) run() {
s.r.start(rh)

ep := etcdProgress{
confState: sn.Metadata.ConfState,
snapi: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
confState: sn.Metadata.ConfState,
diskSnapshotIndex: sn.Metadata.Index,
memorySnapshotIndex: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
}

defer func() {
Expand Down Expand Up @@ -998,15 +1002,15 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
lg := s.Logger()
lg.Info(
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
)
defer func() {
lg.Info(
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand All @@ -1017,7 +1021,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
if toApply.snapshot.Metadata.Index <= ep.appliedi {
lg.Panic(
"unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),

Check warning on line 1024 in server/etcdserver/server.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/server.go#L1024

Added line #L1024 was not covered by tests
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand Down Expand Up @@ -1132,7 +1136,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {

ep.appliedt = toApply.snapshot.Metadata.Term
ep.appliedi = toApply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.diskSnapshotIndex = ep.appliedi
ep.memorySnapshotIndex = ep.appliedi
ep.confState = toApply.snapshot.Metadata.ConfState

// As backends and implementations like alarmsStore changed, we need
Expand Down Expand Up @@ -1188,31 +1193,37 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
}

func (s *EtcdServer) ForceSnapshot() {
s.forceSnapshot = true
s.forceDiskSnapshot = true

Check warning on line 1196 in server/etcdserver/server.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/server.go#L1196

Added line #L1196 was not covered by tests
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if !s.shouldSnapshot(ep) {
if !s.shouldSnapshotToDisk(ep) {
if ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount {
s.snapshotToMemory(ep.appliedi, ep.confState)
s.compactRaftLog(ep.appliedi)
ep.memorySnapshotIndex = ep.appliedi
}
return
}
//TODO: Remove disk snapshot in v3.7
lg := s.Logger()
lg.Info(
"triggering snapshot",
zap.String("local-member-id", s.MemberID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.snapi),
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceSnapshot),
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
)
s.forceSnapshot = false
s.forceDiskSnapshot = false

s.snapshot(ep.appliedi, ep.confState)
s.snapshotToDisk(ep.appliedi, ep.confState)
s.compactRaftLog(ep.appliedi)
ep.snapi = ep.appliedi
ep.diskSnapshotIndex = ep.appliedi
}

func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
}

func (s *EtcdServer) hasMultipleVotingMembers() bool {
Expand Down Expand Up @@ -2132,7 +2143,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
func (s *EtcdServer) snapshotToDisk(snapi uint64, confState raftpb.ConfState) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
// commit kv to write metadata (for example: consistent index) to disk.
//
Expand Down Expand Up @@ -2169,11 +2180,30 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}

lg.Info(
"saved snapshot",
"saved snapshot to disk",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}

func (s *EtcdServer) snapshotToMemory(snapi uint64, confState raftpb.ConfState) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)

lg := s.Logger()

// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.

Check warning on line 2197 in server/etcdserver/server.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/server.go#L2197

Added line #L2197 was not covered by tests
if errorspkg.Is(err, raft.ErrSnapOutOfDate) {
return
}
lg.Panic("failed to create snapshot", zap.Error(err))
}

verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
}

func (s *EtcdServer) compactRaftLog(snapi uint64) {
lg := s.Logger()

Expand All @@ -2188,11 +2218,10 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
if snapi <= s.Cfg.SnapshotCatchUpEntries {
return
}

compacti := snapi - s.Cfg.SnapshotCatchUpEntries
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand All @@ -2202,7 +2231,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
}
lg.Panic("failed to compact", zap.Error(err))
}
lg.Info(
lg.Debug(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func TestSnapshot(t *testing.T) {
}
}()

srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
srv.snapshotToDisk(1, raftpb.ConfState{Voters: []uint64{1}})
<-ch
if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
// In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot.
// For now let's use snapshot log which should be equivalent to compaction.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2)

// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
Expand Down

0 comments on commit 75def3f

Please sign in to comment.