Skip to content

Commit

Permalink
Run a separate in memory snapshot to reduce number of entries stored …
Browse files Browse the repository at this point in the history
…in raft memory storage

Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Nov 29, 2024
1 parent 414f75b commit 45fc4c3
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 44 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) {
if m.ClientURLs != nil {
mustUpdateMemberAttrInStore(c.lg, store, m)
}
c.lg.Info(
c.lg.Debug(
"snapshot storing member",
zap.String("id", m.ID.String()),
zap.Strings("peer-urls", m.PeerURLs),
Expand Down
90 changes: 51 additions & 39 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
readyPercentThreshold = 0.9

DowngradeEnabledPath = "/downgrade/enabled"
memorySnapshotCount = 100
)

var (
Expand Down Expand Up @@ -293,6 +294,7 @@ type EtcdServer struct {
*AccessController
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
forceDiskSnapshot bool
corruptionChecker CorruptionChecker
}
Expand Down Expand Up @@ -1195,17 +1197,24 @@ func (s *EtcdServer) ForceSnapshot() {
}

func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) {
if !s.shouldSnapshot(ep) {
//TODO: Remove disk snapshot in v3.7
shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep)
shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep)
if !shouldSnapshotToDisk && !shouldSnapshotToMemory {
return
}
s.snapshot(ep)
s.snapshot(ep, shouldSnapshotToDisk)
s.compactRaftLog(ep.appliedi)
}

func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
}

func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool {
return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount
}

func (s *EtcdServer) hasMultipleVotingMembers() bool {
return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
}
Expand Down Expand Up @@ -2119,28 +2128,30 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(ep *etcdProgress) {
func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
lg := s.Logger()
lg.Info(
"triggering snapshot",
zap.String("local-member-id", s.MemberID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
)
s.forceDiskSnapshot = false
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
//
// KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially
// from applyAll function.
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()
d := GetMembershipInfoInV2Format(lg, s.cluster)
if toDisk {
s.Logger().Info(
"triggering snapshot",
zap.String("local-member-id", s.MemberID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
)
s.forceDiskSnapshot = false
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
//
// KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially
// from applyAll function.
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()
}

// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)
Expand All @@ -2152,23 +2163,25 @@ func (s *EtcdServer) snapshot(ep *etcdProgress) {
}
lg.Panic("failed to create snapshot", zap.Error(err))
}
ep.memorySnapshotIndex = ep.appliedi

verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())

// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
ep.diskSnapshotIndex = ep.appliedi
ep.memorySnapshotIndex = ep.appliedi
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}
if toDisk {
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
ep.diskSnapshotIndex = ep.appliedi
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}

lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
lg.Info(
"saved snapshot to disk",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}
}

func (s *EtcdServer) compactRaftLog(snapi uint64) {
Expand All @@ -2189,7 +2202,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand All @@ -2199,7 +2211,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
}
lg.Panic("failed to compact", zap.Error(err))
}
lg.Info(
lg.Debug(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
Expand Down
64 changes: 61 additions & 3 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
}
}

// TestSnapshot should snapshot the store and cut the persistent
func TestSnapshot(t *testing.T) {
// TestSnapshotDisk should save the snapshot to disk and release old snapshots
func TestSnapshotDisk(t *testing.T) {
revertFunc := verify.DisableVerifications()
defer revertFunc()

Expand Down Expand Up @@ -680,7 +680,7 @@ func TestSnapshot(t *testing.T) {
}
}()
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
srv.snapshot(&ep)
srv.snapshot(&ep, true)
<-ch
if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
Expand All @@ -693,6 +693,64 @@ func TestSnapshot(t *testing.T) {
}
}

func TestSnapshotMemory(t *testing.T) {
revertFunc := verify.DisableVerifications()
defer revertFunc()

be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)

s := raft.NewMemoryStorage()
s.Append([]raftpb.Entry{{Index: 1}})
st := mockstore.NewRecorderStream()
p := mockstorage.NewStorageRecorderStream("")
r := newRaftNode(raftNodeConfig{
lg: zaptest.NewLogger(t),
Node: newNodeNop(),
raftStorage: s,
storage: p,
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
r: *r,
v2store: st,
consistIndex: cindex.NewConsistentIndex(be),
}
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer func() {
assert.NoError(t, srv.kv.Close())
}()
srv.be = be

cl := membership.NewCluster(zaptest.NewLogger(t))
srv.cluster = cl

ch := make(chan struct{}, 1)

go func() {
gaction, _ := p.Wait(1)
defer func() { ch <- struct{}{} }()

if len(gaction) != 0 {
t.Errorf("len(action) = %d, want 0", len(gaction))
return
}
}()
ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}}
srv.snapshot(&ep, false)
<-ch
if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
}
if ep.diskSnapshotIndex != 0 {
t.Errorf("ep.diskSnapshotIndex = %d, want 0", ep.diskSnapshotIndex)
}
if ep.memorySnapshotIndex != 1 {
t.Errorf("ep.memorySnapshotIndex = %d, want 1", ep.memorySnapshotIndex)
}
}

// TestSnapshotOrdering ensures raft persists snapshot onto disk before
// snapshot db is applied.
func TestSnapshotOrdering(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
// In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot.
// For now let's use snapshot log which should be equivalent to compaction.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2)

// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
Expand Down

0 comments on commit 45fc4c3

Please sign in to comment.