diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index b212dcb7c00..e20e7bd96f1 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -231,7 +231,7 @@ func (rc *raftNode) replayWAL() *wal.WAL { if err != nil { log.Fatalf("raftexample: failed to read WAL (%v)", err) } - rc.raftStorage = raft.NewMemoryStorage() + rc.raftStorage = raft.NewMemoryStorage(0) if snapshot != nil { rc.raftStorage.ApplySnapshot(*snapshot) } diff --git a/embed/config.go b/embed/config.go index 2fb2a3280e7..4100e15d702 100644 --- a/embed/config.go +++ b/embed/config.go @@ -80,6 +80,7 @@ type Config struct { MaxWalFiles uint `json:"max-wals"` Name string `json:"name"` SnapCount uint64 `json:"snapshot-count"` + SnapSize uint64 `json:"snapshot-size"` AutoCompactionRetention int `json:"auto-compaction-retention"` AutoCompactionMode string `json:"auto-compaction-mode"` @@ -182,6 +183,7 @@ func NewConfig() *Config { SnapCount: etcdserver.DefaultSnapCount, MaxTxnOps: DefaultMaxTxnOps, MaxRequestBytes: DefaultMaxRequestBytes, + SnapSize: etcdserver.DefaultSnapSize, TickMs: 100, ElectionMs: 1000, LPUrls: []url.URL{*lpurl}, diff --git a/embed/etcd.go b/embed/etcd.go index 90179f462a7..03334b87079 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -127,6 +127,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { DataDir: cfg.Dir, DedicatedWALDir: cfg.WalDir, SnapCount: cfg.SnapCount, + SnapSize: cfg.SnapSize, MaxSnapFiles: cfg.MaxSnapFiles, MaxWALFiles: cfg.MaxWalFiles, InitialPeerURLsMap: urlsmap, diff --git a/etcdmain/config.go b/etcdmain/config.go index 4bc900bc1ed..18f5ad7dd3d 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -138,6 +138,7 @@ func newConfig() *config { fs.UintVar(&cfg.MaxWalFiles, "max-wals", cfg.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).") fs.StringVar(&cfg.Name, "name", cfg.Name, "Human-readable name for this member.") fs.Uint64Var(&cfg.SnapCount, "snapshot-count", cfg.SnapCount, "Number of committed transactions to trigger a snapshot to disk.") + fs.Uint64Var(&cfg.SnapSize, "snapshot-size", cfg.SnapSize, "Size of committed transactions to trigger a snapshot to disk.") fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.") fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.") fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.") diff --git a/etcdserver/config.go b/etcdserver/config.go index f6ed1f1bae7..4009c475bec 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -40,6 +40,7 @@ type ServerConfig struct { // rather than the dataDir/member/wal. DedicatedWALDir string SnapCount uint64 + SnapSize uint64 MaxSnapFiles uint MaxWALFiles uint InitialPeerURLsMap types.URLsMap @@ -202,6 +203,7 @@ func (c *ServerConfig) print(initial bool) { plog.Infof("heartbeat = %dms", c.TickMs) plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs)) plog.Infof("snapshot count = %d", c.SnapCount) + plog.Infof("snapshot size = %d", c.SnapSize) if len(c.DiscoveryURL) != 0 { plog.Infof("discovery URL= %s", c.DiscoveryURL) if len(c.DiscoveryProxy) != 0 { diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 8e9070149bf..46ca82fbd38 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -400,7 +400,7 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id } id = member.ID plog.Infof("starting member %s in cluster %s", id, cl.ID()) - s = raft.NewMemoryStorage() + s = raft.NewMemoryStorage(cfg.SnapSize) c := &raft.Config{ ID: uint64(id), ElectionTick: cfg.ElectionTicks, @@ -429,7 +429,7 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit) cl := membership.NewCluster("") cl.SetID(cid) - s := raft.NewMemoryStorage() + s := raft.NewMemoryStorage(cfg.SnapSize) if snapshot != nil { s.ApplySnapshot(*snapshot) } @@ -485,7 +485,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) cl := membership.NewCluster("") cl.SetID(cid) - s := raft.NewMemoryStorage() + s := raft.NewMemoryStorage(cfg.SnapSize) if snapshot != nil { s.ApplySnapshot(*snapshot) } diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 757826cc9e1..27948f95235 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -156,7 +156,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { r := newRaftNode(raftNodeConfig{ Node: n, storage: mockstorage.NewStorageRecorder(""), - raftStorage: raft.NewMemoryStorage(), + raftStorage: raft.NewMemoryStorage(0), transport: rafthttp.NewNopTransporter(), }) srv := &EtcdServer{r: *r} @@ -183,7 +183,7 @@ func TestConfgChangeBlocksApply(t *testing.T) { r := newRaftNode(raftNodeConfig{ Node: n, storage: mockstorage.NewStorageRecorder(""), - raftStorage: raft.NewMemoryStorage(), + raftStorage: raft.NewMemoryStorage(0), transport: rafthttp.NewNopTransporter(), }) srv := &EtcdServer{r: *r} diff --git a/etcdserver/server.go b/etcdserver/server.go index 8fb830c1a8e..a7686f61b3a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -61,6 +61,7 @@ import ( const ( DefaultSnapCount = 100000 + DefaultSnapSize = 0 // 0 means no size based compaction StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" @@ -175,6 +176,9 @@ type EtcdServer struct { readych chan struct{} Cfg ServerConfig + snapCount uint64 + snapSize uint64 + w wait.Wait readMu sync.RWMutex @@ -406,6 +410,8 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { srv = &EtcdServer{ readych: make(chan struct{}), Cfg: cfg, + snapCount: cfg.SnapCount, + snapSize: cfg.SnapSize, errorc: make(chan error, 1), store: st, snapshotter: ss, @@ -534,6 +540,10 @@ func (s *EtcdServer) start() { plog.Infof("set snapshot count to default %d", DefaultSnapCount) s.Cfg.SnapCount = DefaultSnapCount } + if s.snapSize == 0 { + plog.Infof("set snapshot size to default %d", DefaultSnapSize) + s.snapSize = DefaultSnapSize + } s.w = wait.New() s.applyWait = wait.NewTimeList() s.done = make(chan struct{}) @@ -914,8 +924,12 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { } } +func (s *EtcdServer) shouldTriggerSnapshot(ep *etcdProgress) bool { + return s.snapCount < ep.appliedi-ep.snapi || s.r.raftStorage.ShouldCompactBySize() +} + func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { - if ep.appliedi-ep.snapi <= s.Cfg.SnapCount { + if !s.shouldTriggerSnapshot(ep) { return } @@ -1416,6 +1430,24 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con return false, nil } +func (s *EtcdServer) getCompactIndex(snapi uint64) uint64 { + compacti := uint64(1) + + if snapi > numberOfCatchUpEntries { + // keep some in memory log entries for slow followers. + compacti = snapi - numberOfCatchUpEntries + } + + szCompacti := s.r.raftStorage.SizeBasedCompactIndex() + if compacti < szCompacti { + if snapi < szCompacti { + return snapi + } + return szCompacti + } + return compacti +} + // TODO: non-blocking snapshot func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { clone := s.store.Clone() @@ -1460,11 +1492,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { return } - // keep some in memory log entries for slow followers. - compacti := uint64(1) - if snapi > numberOfCatchUpEntries { - compacti = snapi - numberOfCatchUpEntries - } + compacti := s.getCompactIndex(snapi) err = s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 66c99247bb5..4fef1036dee 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -172,7 +172,7 @@ func TestApplyRepeat(t *testing.T) { cl.AddMember(&membership.Member{ID: 1234}) r := newRaftNode(raftNodeConfig{ Node: n, - raftStorage: raft.NewMemoryStorage(), + raftStorage: raft.NewMemoryStorage(0), storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }) @@ -679,7 +679,7 @@ func TestDoProposal(t *testing.T) { r := newRaftNode(raftNodeConfig{ Node: newNodeCommitter(), storage: mockstorage.NewStorageRecorder(""), - raftStorage: raft.NewMemoryStorage(), + raftStorage: raft.NewMemoryStorage(0), transport: rafthttp.NewNopTransporter(), }) srv := &EtcdServer{ @@ -847,7 +847,7 @@ func TestSyncTrigger(t *testing.T) { tk := &time.Ticker{C: st} r := newRaftNode(raftNodeConfig{ Node: n, - raftStorage: raft.NewMemoryStorage(), + raftStorage: raft.NewMemoryStorage(0), transport: rafthttp.NewNopTransporter(), storage: mockstorage.NewStorageRecorder(""), }) @@ -901,7 +901,7 @@ func TestSnapshot(t *testing.T) { os.RemoveAll(tmpPath) }() - s := raft.NewMemoryStorage() + s := raft.NewMemoryStorage(0) s.Append([]raftpb.Entry{{Index: 1}}) st := mockstore.NewRecorderStream() p := mockstorage.NewStorageRecorderStream("") @@ -970,7 +970,7 @@ func TestSnapshotOrdering(t *testing.T) { t.Fatalf("couldn't make snap dir (%v)", err) } - rs := raft.NewMemoryStorage() + rs := raft.NewMemoryStorage(0) p := mockstorage.NewStorageRecorderStream(testdir) tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir) r := newRaftNode(raftNodeConfig{ @@ -1037,7 +1037,7 @@ func TestTriggerSnap(t *testing.T) { p := mockstorage.NewStorageRecorderStream("") r := newRaftNode(raftNodeConfig{ Node: newNodeCommitter(), - raftStorage: raft.NewMemoryStorage(), + raftStorage: raft.NewMemoryStorage(0), storage: p, transport: rafthttp.NewNopTransporter(), }) @@ -1096,7 +1096,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { t.Fatalf("Couldn't make snap dir (%v)", err) } - rs := raft.NewMemoryStorage() + rs := raft.NewMemoryStorage(0) tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) r := newRaftNode(raftNodeConfig{ isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, @@ -1185,7 +1185,7 @@ func TestAddMember(t *testing.T) { cl.SetStore(st) r := newRaftNode(raftNodeConfig{ Node: n, - raftStorage: raft.NewMemoryStorage(), + raftStorage: raft.NewMemoryStorage(0), storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }) @@ -1226,7 +1226,7 @@ func TestRemoveMember(t *testing.T) { cl.AddMember(&membership.Member{ID: 1234}) r := newRaftNode(raftNodeConfig{ Node: n, - raftStorage: raft.NewMemoryStorage(), + raftStorage: raft.NewMemoryStorage(0), storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }) @@ -1266,7 +1266,7 @@ func TestUpdateMember(t *testing.T) { cl.AddMember(&membership.Member{ID: 1234}) r := newRaftNode(raftNodeConfig{ Node: n, - raftStorage: raft.NewMemoryStorage(), + raftStorage: raft.NewMemoryStorage(0), storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }) diff --git a/raft/log_test.go b/raft/log_test.go index f80e41ce2ab..5dad679fa4f 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -45,7 +45,7 @@ func TestFindConflict(t *testing.T) { } for i, tt := range tests { - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(0), raftLogger) raftLog.append(previousEnts...) gconflict := raftLog.findConflict(tt.ents) @@ -57,7 +57,7 @@ func TestFindConflict(t *testing.T) { func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(0), raftLogger) raftLog.append(previousEnts...) tests := []struct { lastIndex uint64 @@ -123,7 +123,7 @@ func TestAppend(t *testing.T) { } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append(previousEnts) raftLog := newLog(storage, raftLogger) @@ -236,7 +236,7 @@ func TestLogMaybeAppend(t *testing.T) { } for i, tt := range tests { - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(0), raftLogger) raftLog.append(previousEnts...) raftLog.committed = commit func() { @@ -280,7 +280,7 @@ func TestCompactionSideEffects(t *testing.T) { lastIndex := uint64(1000) unstableIndex := uint64(750) lastTerm := lastIndex - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) for i = 1; i <= unstableIndex; i++ { storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}}) } @@ -356,7 +356,7 @@ func TestHasNextEnts(t *testing.T) { {5, false}, } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.ApplySnapshot(snap) raftLog := newLog(storage, raftLogger) raftLog.append(ents...) @@ -389,7 +389,7 @@ func TestNextEnts(t *testing.T) { {5, nil}, } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.ApplySnapshot(snap) raftLog := newLog(storage, raftLogger) raftLog.append(ents...) @@ -417,7 +417,7 @@ func TestUnstableEnts(t *testing.T) { for i, tt := range tests { // append stable entries to storage - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append(previousEnts[:tt.unstable-1]) // append unstable entries to raftlog @@ -459,7 +459,7 @@ func TestCommitTo(t *testing.T) { } } }() - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(0), raftLogger) raftLog.append(previousEnts...) raftLog.committed = commit raftLog.commitTo(tt.commit) @@ -482,7 +482,7 @@ func TestStableTo(t *testing.T) { {3, 1, 1}, // bad index } for i, tt := range tests { - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(0), raftLogger) raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) raftLog.stableTo(tt.stablei, tt.stablet) if raftLog.unstable.offset != tt.wunstable { @@ -517,7 +517,7 @@ func TestStableToWithSnap(t *testing.T) { {snapi - 1, snapt + 1, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1}, } for i, tt := range tests { - s := NewMemoryStorage() + s := NewMemoryStorage(0) s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}) raftLog := newLog(s, raftLogger) raftLog.append(tt.newEnts...) @@ -553,7 +553,7 @@ func TestCompaction(t *testing.T) { } }() - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) for i := uint64(1); i <= tt.lastIndex; i++ { storage.Append([]pb.Entry{{Index: i}}) } @@ -581,7 +581,7 @@ func TestLogRestore(t *testing.T) { index := uint64(1000) term := uint64(1000) snap := pb.SnapshotMetadata{Index: index, Term: term} - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.ApplySnapshot(pb.Snapshot{Metadata: snap}) raftLog := newLog(storage, raftLogger) @@ -605,7 +605,7 @@ func TestLogRestore(t *testing.T) { func TestIsOutOfBounds(t *testing.T) { offset := uint64(100) num := uint64(100) - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) l := newLog(storage, raftLogger) for i := uint64(1); i <= num; i++ { @@ -688,7 +688,7 @@ func TestTerm(t *testing.T) { offset := uint64(100) num := uint64(100) - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) l := newLog(storage, raftLogger) for i = 1; i < num; i++ { @@ -718,7 +718,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) { storagesnapi := uint64(100) unstablesnapi := storagesnapi + 5 - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: storagesnapi, Term: 1}}) l := newLog(storage, raftLogger) l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}}) @@ -752,7 +752,7 @@ func TestSlice(t *testing.T) { half := offset + num/2 halfe := pb.Entry{Index: half, Term: half} - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) for i = 1; i < num/2; i++ { storage.Append([]pb.Entry{{Index: offset + i, Term: offset + i}}) diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index 4e60634a223..f24fb3164d7 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -26,7 +26,7 @@ func BenchmarkOneNode(b *testing.B) { defer cancel() n := newNode() - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1}, 10, 1, s) go n.run(r) diff --git a/raft/node_test.go b/raft/node_test.go index f4c726ea869..558140580e4 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -114,7 +114,7 @@ func TestNodePropose(t *testing.T) { } n := newNode() - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1}, 10, 1, s) go n.run(r) n.Campaign(context.TODO()) @@ -153,7 +153,7 @@ func TestNodeReadIndex(t *testing.T) { wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} n := newNode() - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1}, 10, 1, s) r.readStates = wrs @@ -225,9 +225,9 @@ func TestDisableProposalForwarding(t *testing.T) { // TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader // gets forwarded to the new leader and 'send' method does not attach its term. func TestNodeReadIndexToOldLeader(t *testing.T) { - r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - r3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + r3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) nt := newNetwork(r1, r2, r3) @@ -289,7 +289,7 @@ func TestNodeProposeConfig(t *testing.T) { } n := newNode() - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1}, 10, 1, s) go n.run(r) n.Campaign(context.TODO()) @@ -327,7 +327,7 @@ func TestNodeProposeConfig(t *testing.T) { // not affect the later propose to add new node. func TestNodeProposeAddDuplicateNode(t *testing.T) { n := newNode() - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1}, 10, 1, s) go n.run(r) n.Campaign(context.TODO()) @@ -399,7 +399,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { // who is the current leader. func TestBlockProposal(t *testing.T) { n := newNode() - r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(0)) go n.run(r) defer n.Stop() @@ -430,7 +430,7 @@ func TestBlockProposal(t *testing.T) { // elapsed of the underlying raft state machine. func TestNodeTick(t *testing.T) { n := newNode() - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1}, 10, 1, s) go n.run(r) elapsed := r.electionElapsed @@ -450,7 +450,7 @@ func TestNodeTick(t *testing.T) { // processing, and that it is idempotent func TestNodeStop(t *testing.T) { n := newNode() - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1}, 10, 1, s) donec := make(chan struct{}) @@ -533,7 +533,7 @@ func TestNodeStart(t *testing.T) { MustSync: true, }, } - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) c := &Config{ ID: 1, ElectionTick: 10, @@ -586,7 +586,7 @@ func TestNodeRestart(t *testing.T) { MustSync: true, } - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.SetHardState(st) storage.Append(entries) c := &Config{ @@ -631,7 +631,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) { MustSync: true, } - s := NewMemoryStorage() + s := NewMemoryStorage(0) s.SetHardState(st) s.ApplySnapshot(snap) s.Append(entries) @@ -662,7 +662,7 @@ func TestNodeAdvance(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) c := &Config{ ID: 1, ElectionTick: 10, diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index c745050f8b4..777935a0b53 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -24,7 +24,7 @@ import ( // 1. msgApp can fill the sending window until full // 2. when the window is full, no more msgApp can be sent. func TestMsgAppFlowControlFull(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() @@ -60,7 +60,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { // 1. valid msgAppResp.index moves the windows to pass all smaller or equal index. // 2. out-of-dated msgAppResp has no effect on the sliding window. func TestMsgAppFlowControlMoveForward(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() @@ -105,7 +105,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { // TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response // frees one slot if the window is full. func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 2911e8aa333..897b006e87c 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -52,7 +52,7 @@ func TestLeaderUpdateTermFromMessage(t *testing.T) { // it immediately reverts to follower state. // Reference: section 5.1 func testUpdateTermFromMessage(t *testing.T, state StateType) { - r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) switch state { case StateFollower: r.becomeFollower(1, 2) @@ -82,7 +82,7 @@ func TestRejectStaleTermMessage(t *testing.T) { fakeStep := func(r *raft, m pb.Message) { called = true } - r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) r.step = fakeStep r.loadState(pb.HardState{Term: 2}) @@ -96,7 +96,7 @@ func TestRejectStaleTermMessage(t *testing.T) { // TestStartAsFollower tests that when servers start up, they begin as followers. // Reference: section 5.2 func TestStartAsFollower(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) if r.state != StateFollower { t.Errorf("state = %s, want %s", r.state, StateFollower) } @@ -109,7 +109,7 @@ func TestStartAsFollower(t *testing.T) { func TestLeaderBcastBeat(t *testing.T) { // heartbeat interval hi := 1 - r := newTestRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() for i := 0; i < 10; i++ { @@ -151,7 +151,7 @@ func TestCandidateStartNewElection(t *testing.T) { func testNonleaderStartElection(t *testing.T, state StateType) { // election timeout et := 10 - r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(0)) switch state { case StateFollower: r.becomeFollower(1, 2) @@ -215,7 +215,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) { {5, map[uint64]bool{}, StateCandidate}, } for i, tt := range tests { - r := newTestRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage()) + r := newTestRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage(0)) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) for id, vote := range tt.votes { @@ -248,7 +248,7 @@ func TestFollowerVote(t *testing.T) { {2, 1, true}, } for i, tt := range tests { - r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) r.loadState(pb.HardState{Term: 1, Vote: tt.vote}) r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote}) @@ -274,7 +274,7 @@ func TestCandidateFallback(t *testing.T) { {From: 2, To: 1, Term: 2, Type: pb.MsgApp}, } for i, tt := range tests { - r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) if r.state != StateCandidate { t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate) @@ -307,7 +307,7 @@ func TestCandidateElectionTimeoutRandomized(t *testing.T) { // Reference: section 5.2 func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) { et := 10 - r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(0)) timeouts := make(map[int]bool) for round := 0; round < 50*et; round++ { switch state { @@ -353,7 +353,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) { rs := make([]*raft, size) ids := idsBySize(size) for k := range rs { - rs[k] = newTestRaft(ids[k], ids, et, 1, NewMemoryStorage()) + rs[k] = newTestRaft(ids[k], ids, et, 1, NewMemoryStorage(0)) } conflicts := 0 for round := 0; round < 1000; round++ { @@ -395,7 +395,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) { // Also, it writes the new entry into stable storage. // Reference: section 5.3 func TestLeaderStartReplication(t *testing.T) { - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s) r.becomeCandidate() r.becomeLeader() @@ -434,7 +434,7 @@ func TestLeaderStartReplication(t *testing.T) { // servers eventually find out. // Reference: section 5.3 func TestLeaderCommitEntry(t *testing.T) { - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s) r.becomeCandidate() r.becomeLeader() @@ -488,7 +488,7 @@ func TestLeaderAcknowledgeCommit(t *testing.T) { {5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, true}, } for i, tt := range tests { - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, idsBySize(tt.size), 10, 1, s) r.becomeCandidate() r.becomeLeader() @@ -521,7 +521,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) { {{Term: 1, Index: 1}}, } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append(tt) r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.loadState(pb.HardState{Term: 2}) @@ -578,7 +578,7 @@ func TestFollowerCommitEntry(t *testing.T) { }, } for i, tt := range tests { - r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) r.becomeFollower(1, 2) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit}) @@ -619,7 +619,7 @@ func TestFollowerCheckMsgApp(t *testing.T) { {ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2}, } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append(ents) r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.loadState(pb.HardState{Commit: 1}) @@ -675,7 +675,7 @@ func TestFollowerAppendEntries(t *testing.T) { }, } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}) r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.becomeFollower(2, 2) @@ -744,11 +744,11 @@ func TestLeaderSyncFollowerLog(t *testing.T) { }, } for i, tt := range tests { - leadStorage := NewMemoryStorage() + leadStorage := NewMemoryStorage(0) leadStorage.Append(ents) lead := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage) lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term}) - followerStorage := NewMemoryStorage() + followerStorage := NewMemoryStorage(0) followerStorage.Append(tt) follower := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage) follower.loadState(pb.HardState{Term: term - 1}) @@ -781,7 +781,7 @@ func TestVoteRequest(t *testing.T) { {[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3}, } for j, tt := range tests { - r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) r.Step(pb.Message{ From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents, }) @@ -842,7 +842,7 @@ func TestVoter(t *testing.T) { {[]pb.Entry{{Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true}, } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append(tt.ents) r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) @@ -878,7 +878,7 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) { {3, 3}, } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append(ents) r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) r.loadState(pb.HardState{Term: 2}) diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 3908d581233..1be0af81935 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -31,7 +31,7 @@ var ( ) func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) sm := newTestRaft(1, []uint64{1}, 10, 1, storage) sm.restore(testingSnap) @@ -49,7 +49,7 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { } func TestPendingSnapshotPauseReplication(t *testing.T) { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.restore(testingSnap) @@ -66,7 +66,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { } func TestSnapshotFailure(t *testing.T) { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.restore(testingSnap) @@ -89,7 +89,7 @@ func TestSnapshotFailure(t *testing.T) { } func TestSnapshotSucceed(t *testing.T) { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.restore(testingSnap) @@ -112,7 +112,7 @@ func TestSnapshotSucceed(t *testing.T) { } func TestSnapshotAbort(t *testing.T) { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.restore(testingSnap) diff --git a/raft/raft_test.go b/raft/raft_test.go index 1fa28544471..425bfc7cf14 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -262,7 +262,7 @@ func TestProgressResume(t *testing.T) { // TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response. func TestProgressResumeByHeartbeatResp(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() r.prs[2].Paused = true @@ -280,7 +280,7 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { } func TestProgressPaused(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -469,7 +469,7 @@ func TestPreVoteFromAnyState(t *testing.T) { func testVoteFromAnyState(t *testing.T, vt pb.MessageType) { for st := StateType(0); st < numStates; st++ { - r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) r.Term = 1 switch st { @@ -690,9 +690,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { } func TestDuelingCandidates(t *testing.T) { - a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) nt := newNetwork(a, b, c) nt.cut(1, 3) @@ -732,7 +732,7 @@ func TestDuelingCandidates(t *testing.T) { }{ {a, StateFollower, 2, wlog}, {b, StateFollower, 2, wlog}, - {c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)}, + {c, StateFollower, 2, newLog(NewMemoryStorage(0), raftLogger)}, } for i, tt := range tests { @@ -755,9 +755,9 @@ func TestDuelingCandidates(t *testing.T) { } func TestDuelingPreCandidates(t *testing.T) { - cfgA := newTestConfig(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - cfgB := newTestConfig(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - cfgC := newTestConfig(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + cfgA := newTestConfig(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + cfgB := newTestConfig(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + cfgC := newTestConfig(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) cfgA.PreVote = true cfgB.PreVote = true cfgC.PreVote = true @@ -802,7 +802,7 @@ func TestDuelingPreCandidates(t *testing.T) { }{ {a, StateLeader, 1, wlog}, {b, StateFollower, 1, wlog}, - {c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger)}, + {c, StateFollower, 1, newLog(NewMemoryStorage(0), raftLogger)}, } for i, tt := range tests { @@ -958,7 +958,7 @@ func TestProposal(t *testing.T) { send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) - wantLog := newLog(NewMemoryStorage(), raftLogger) + wantLog := newLog(NewMemoryStorage(0), raftLogger) if tt.success { wantLog = &raftLog{ storage: &MemoryStorage{ @@ -1052,7 +1052,7 @@ func TestCommit(t *testing.T) { } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append(tt.logs) storage.hardState = pb.HardState{Term: tt.smTerm} @@ -1082,7 +1082,7 @@ func TestPastElectionTimeout(t *testing.T) { } for i, tt := range tests { - sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(0)) sm.electionElapsed = tt.elapse c := 0 for j := 0; j < 10000; j++ { @@ -1108,7 +1108,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) { fakeStep := func(r *raft, m pb.Message) { called = true } - sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(0)) sm.step = fakeStep sm.Term = 2 sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1}) @@ -1148,7 +1148,7 @@ func TestHandleMsgApp(t *testing.T) { } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}) sm := newTestRaft(1, []uint64{1}, 10, 1, storage) sm.becomeFollower(2, None) @@ -1182,7 +1182,7 @@ func TestHandleHeartbeat(t *testing.T) { } for i, tt := range tests { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage) sm.becomeFollower(2, 2) @@ -1203,7 +1203,7 @@ func TestHandleHeartbeat(t *testing.T) { // TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response. func TestHandleHeartbeatResp(t *testing.T) { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage) sm.becomeCandidate() @@ -1250,7 +1250,7 @@ func TestHandleHeartbeatResp(t *testing.T) { // readOnly readIndexQueue and pendingReadIndex map. // related issue: https://github.com/coreos/etcd/issues/7571 func TestRaftFreesReadOnlyMem(t *testing.T) { - sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(0)) sm.becomeCandidate() sm.becomeLeader() sm.raftLog.commitTo(sm.raftLog.lastIndex()) @@ -1298,7 +1298,7 @@ func TestRaftFreesReadOnlyMem(t *testing.T) { // TestMsgAppRespWaitReset verifies the resume behavior of a leader // MsgAppResp. func TestMsgAppRespWaitReset(t *testing.T) { - sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage(0)) sm.becomeCandidate() sm.becomeLeader() @@ -1404,7 +1404,7 @@ func testRecvMsgVote(t *testing.T, msgType pb.MessageType) { } for i, tt := range tests { - sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(0)) sm.state = tt.state switch tt.state { case StateFollower: @@ -1485,7 +1485,7 @@ func TestStateTransition(t *testing.T) { } }() - sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(0)) sm.state = tt.from switch tt.to { @@ -1527,7 +1527,7 @@ func TestAllServerStepdown(t *testing.T) { tterm := uint64(3) for i, tt := range tests { - sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) switch tt.state { case StateFollower: sm.becomeFollower(1, None) @@ -1567,7 +1567,7 @@ func TestAllServerStepdown(t *testing.T) { } func TestLeaderStepdownWhenQuorumActive(t *testing.T) { - sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage(0)) sm.checkQuorum = true @@ -1585,7 +1585,7 @@ func TestLeaderStepdownWhenQuorumActive(t *testing.T) { } func TestLeaderStepdownWhenQuorumLost(t *testing.T) { - sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage(0)) sm.checkQuorum = true @@ -1602,9 +1602,9 @@ func TestLeaderStepdownWhenQuorumLost(t *testing.T) { } func TestLeaderSupersedingWithCheckQuorum(t *testing.T) { - a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) a.checkQuorum = true b.checkQuorum = true @@ -1645,9 +1645,9 @@ func TestLeaderSupersedingWithCheckQuorum(t *testing.T) { } func TestLeaderElectionWithCheckQuorum(t *testing.T) { - a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) a.checkQuorum = true b.checkQuorum = true @@ -1694,9 +1694,9 @@ func TestLeaderElectionWithCheckQuorum(t *testing.T) { // can disrupt the leader even if the leader still "officially" holds the lease, The // leader is expected to step down and adopt the candidate's term func TestFreeStuckCandidateWithCheckQuorum(t *testing.T) { - a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) a.checkQuorum = true b.checkQuorum = true @@ -1754,8 +1754,8 @@ func TestFreeStuckCandidateWithCheckQuorum(t *testing.T) { } func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { - a := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - b := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage()) + a := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) + b := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage(0)) a.checkQuorum = true b.checkQuorum = true @@ -1788,9 +1788,9 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { } func TestReadOnlyOptionSafe(t *testing.T) { - a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) nt := newNetwork(a, b, c) setRandomizedElectionTimeout(b, b.electionTimeout+1) @@ -1842,9 +1842,9 @@ func TestReadOnlyOptionSafe(t *testing.T) { } func TestReadOnlyOptionLease(t *testing.T) { - a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) a.readOnly.option = ReadOnlyLeaseBased b.readOnly.option = ReadOnlyLeaseBased c.readOnly.option = ReadOnlyLeaseBased @@ -1899,9 +1899,9 @@ func TestReadOnlyOptionLease(t *testing.T) { } func TestReadOnlyOptionLeaseWithoutCheckQuorum(t *testing.T) { - a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) a.readOnly.option = ReadOnlyLeaseBased b.readOnly.option = ReadOnlyLeaseBased c.readOnly.option = ReadOnlyLeaseBased @@ -1937,7 +1937,7 @@ func TestReadOnlyForNewLeader(t *testing.T) { } peers := make([]stateMachine, 0) for _, c := range nodeConfigs { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}) storage.SetHardState(pb.HardState{Term: 1, Commit: c.committed}) if c.compact_index != 0 { @@ -2019,7 +2019,7 @@ func TestLeaderAppResp(t *testing.T) { for i, tt := range tests { // sm term is 1 after it becomes the leader. // thus the last log term must be 1 to be committed. - sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) sm.raftLog = &raftLog{ storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}, unstable: unstable{offset: 3}, @@ -2065,7 +2065,7 @@ func TestBcastBeat(t *testing.T) { ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, }, } - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.ApplySnapshot(s) sm := newTestRaft(1, nil, 10, 1, storage) sm.Term = 1 @@ -2126,7 +2126,7 @@ func TestRecvMsgBeat(t *testing.T) { } for i, tt := range tests { - sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(0)) sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}} sm.Term = 1 sm.state = tt.state @@ -2169,7 +2169,7 @@ func TestLeaderIncreaseNext(t *testing.T) { } for i, tt := range tests { - sm := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + sm := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() @@ -2185,7 +2185,7 @@ func TestLeaderIncreaseNext(t *testing.T) { } func TestSendAppendForProgressProbe(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() r.readMessages() @@ -2252,7 +2252,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { } func TestSendAppendForProgressReplicate(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() r.readMessages() @@ -2269,7 +2269,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { } func TestSendAppendForProgressSnapshot(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() r.readMessages() @@ -2287,7 +2287,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { func TestRecvMsgUnreachable(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} - s := NewMemoryStorage() + s := NewMemoryStorage(0) s.Append(previousEnts) r := newTestRaft(1, []uint64{1, 2}, 10, 1, s) r.becomeCandidate() @@ -2317,7 +2317,7 @@ func TestRestore(t *testing.T) { }, } - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) if ok := sm.restore(s); !ok { t.Fatal("restore fail, want succeed") @@ -2342,7 +2342,7 @@ func TestRestore(t *testing.T) { func TestRestoreIgnoreSnapshot(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} commit := uint64(1) - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.raftLog.append(previousEnts...) sm.raftLog.commitTo(commit) @@ -2382,7 +2382,7 @@ func TestProvideSnap(t *testing.T) { ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, }, } - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) sm := newTestRaft(1, []uint64{1}, 10, 1, storage) sm.restore(s) @@ -2412,7 +2412,7 @@ func TestIgnoreProvidingSnap(t *testing.T) { ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, }, } - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) sm := newTestRaft(1, []uint64{1}, 10, 1, storage) sm.restore(s) @@ -2442,7 +2442,7 @@ func TestRestoreFromSnapMsg(t *testing.T) { } m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s} - sm := newTestRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + sm := newTestRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) sm.Step(m) if sm.lead != uint64(1) { @@ -2491,7 +2491,7 @@ func TestSlowNodeRestore(t *testing.T) { // it appends the entry to log and sets pendingConf to be true. func TestStepConfig(t *testing.T) { // a raft that cannot make progress - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() index := r.raftLog.lastIndex() @@ -2509,7 +2509,7 @@ func TestStepConfig(t *testing.T) { // the proposal to noop and keep its original state. func TestStepIgnoreConfig(t *testing.T) { // a raft that cannot make progress - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) r.becomeCandidate() r.becomeLeader() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) @@ -2540,7 +2540,7 @@ func TestRecoverPendingConfig(t *testing.T) { {pb.EntryConfChange, true}, } for i, tt := range tests { - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) r.appendEntry(pb.Entry{Type: tt.entType}) r.becomeCandidate() r.becomeLeader() @@ -2559,7 +2559,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) { t.Errorf("expect panic, but nothing happens") } }() - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) r.becomeCandidate() @@ -2569,7 +2569,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) { // TestAddNode tests that addNode could update pendingConf and nodes correctly. func TestAddNode(t *testing.T) { - r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(0)) r.pendingConf = true r.addNode(2) if r.pendingConf { @@ -2585,7 +2585,7 @@ func TestAddNode(t *testing.T) { // TestAddNodeCheckQuorum tests that addNode does not trigger a leader election // immediately when checkQuorum is set. func TestAddNodeCheckQuorum(t *testing.T) { - r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(0)) r.pendingConf = true r.checkQuorum = true @@ -2620,7 +2620,7 @@ func TestAddNodeCheckQuorum(t *testing.T) { // TestRemoveNode tests that removeNode could update pendingConf, nodes and // and removed list correctly. func TestRemoveNode(t *testing.T) { - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(0)) r.pendingConf = true r.removeNode(2) if r.pendingConf { @@ -2651,7 +2651,7 @@ func TestPromotable(t *testing.T) { {[]uint64{2, 3}, false}, } for i, tt := range tests { - r := newTestRaft(id, tt.peers, 5, 1, NewMemoryStorage()) + r := newTestRaft(id, tt.peers, 5, 1, NewMemoryStorage(0)) if g := r.promotable(); g != tt.wp { t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp) } @@ -2673,7 +2673,7 @@ func TestRaftNodes(t *testing.T) { }, } for i, tt := range tests { - r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage()) + r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage(0)) if !reflect.DeepEqual(r.nodes(), tt.wids) { t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids) } @@ -2689,7 +2689,7 @@ func TestPreCampaignWhileLeader(t *testing.T) { } func testCampaignWhileLeader(t *testing.T, preVote bool) { - cfg := newTestConfig(1, []uint64{1}, 5, 1, NewMemoryStorage()) + cfg := newTestConfig(1, []uint64{1}, 5, 1, NewMemoryStorage(0)) cfg.PreVote = preVote r := newRaft(cfg) if r.state != StateFollower { @@ -2715,7 +2715,7 @@ func testCampaignWhileLeader(t *testing.T, preVote bool) { // committed when a config change reduces the quorum requirements. func TestCommitAfterRemoveNode(t *testing.T) { // Create a cluster with two nodes. - s := NewMemoryStorage() + s := NewMemoryStorage(0) r := newTestRaft(1, []uint64{1, 2}, 5, 1, s) r.becomeCandidate() r.becomeLeader() @@ -3101,7 +3101,7 @@ func checkLeaderTransferState(t *testing.T, r *raft, state StateType, lead uint6 // (previously, if the node also got votes, it would panic as it // transitioned to StateLeader) func TestTransferNonMember(t *testing.T) { - r := newTestRaft(1, []uint64{2, 3, 4}, 5, 1, NewMemoryStorage()) + r := newTestRaft(1, []uint64{2, 3, 4}, 5, 1, NewMemoryStorage(0)) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgTimeoutNow}) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVoteResp}) @@ -3210,7 +3210,7 @@ func TestNodeWithSmallerTermCanCompleteElection(t *testing.T) { } func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) for i, term := range terms { storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}}) } @@ -3227,7 +3227,7 @@ func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft { // to the given value but no log entries (indicating that it voted in // the given term but has not received any logs). func votedWithConfig(configFunc func(*Config), vote, term uint64) *raft { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.SetHardState(pb.HardState{Vote: vote, Term: term}) cfg := newTestConfig(1, []uint64{}, 5, 1, storage) if configFunc != nil { @@ -3266,7 +3266,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw id := peerAddrs[j] switch v := p.(type) { case nil: - nstorage[id] = NewMemoryStorage() + nstorage[id] = NewMemoryStorage(0) cfg := newTestConfig(id, peerAddrs, 10, 1, nstorage[id]) if configFunc != nil { configFunc(cfg) diff --git a/raft/rafttest/node.go b/raft/rafttest/node.go index a37a16839ce..b680b9bf934 100644 --- a/raft/rafttest/node.go +++ b/raft/rafttest/node.go @@ -39,7 +39,7 @@ type node struct { } func startNode(id uint64, peers []raft.Peer, iface iface) *node { - st := raft.NewMemoryStorage() + st := raft.NewMemoryStorage(0) c := &raft.Config{ ID: id, ElectionTick: 10, diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 4ccf72de45a..12244baf095 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -25,7 +25,7 @@ import ( // TestRawNodeStep ensures that RawNode.Step ignore local message. func TestRawNodeStep(t *testing.T) { for i, msgn := range raftpb.MessageType_name { - s := NewMemoryStorage() + s := NewMemoryStorage(0) rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) if err != nil { t.Fatal(err) @@ -47,7 +47,7 @@ func TestRawNodeStep(t *testing.T) { // TestRawNodeProposeAndConfChange ensures that RawNode.Propose and RawNode.ProposeConfChange // send the given proposal and ConfChange to the underlying raft. func TestRawNodeProposeAndConfChange(t *testing.T) { - s := NewMemoryStorage() + s := NewMemoryStorage(0) var err error rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) if err != nil { @@ -113,7 +113,7 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { // TestRawNodeProposeAddDuplicateNode ensures that two proposes to add the same node should // not affect the later propose to add new node. func TestRawNodeProposeAddDuplicateNode(t *testing.T) { - s := NewMemoryStorage() + s := NewMemoryStorage(0) rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) if err != nil { t.Fatal(err) @@ -195,7 +195,7 @@ func TestRawNodeReadIndex(t *testing.T) { } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} - s := NewMemoryStorage() + s := NewMemoryStorage(0) c := newTestConfig(1, nil, 10, 1, s) rawNode, err := NewRawNode(c, []Peer{{ID: 1}}) if err != nil { @@ -283,7 +283,7 @@ func TestRawNodeStart(t *testing.T) { }, } - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}}) if err != nil { t.Fatal(err) @@ -331,7 +331,7 @@ func TestRawNodeRestart(t *testing.T) { MustSync: true, } - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) storage.SetHardState(st) storage.Append(entries) rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), nil) @@ -368,7 +368,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { MustSync: true, } - s := NewMemoryStorage() + s := NewMemoryStorage(0) s.SetHardState(st) s.ApplySnapshot(snap) s.Append(entries) @@ -390,7 +390,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { // no dependency check between Ready() and Advance() func TestRawNodeStatus(t *testing.T) { - storage := NewMemoryStorage() + storage := NewMemoryStorage(0) rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}}) if err != nil { t.Fatal(err) diff --git a/raft/storage.go b/raft/storage.go index 69c3a7d9033..c33371c638d 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -81,13 +81,28 @@ type MemoryStorage struct { snapshot pb.Snapshot // ents[i] has raft log position i+snapshot.Metadata.Index ents []pb.Entry + // entsSize has a total size of Data of ents + entsSize uint64 + + // snapSize holds a number of bytes that should be a limit of ents + // Note that the limit can be violated in some extreme cases e.g. + // a single entry can be larger than snapSize, so we cannot have + // an invariant that is always true like this: size of ents <= limit + snapSize uint64 + + // snapSizeCompactIdx is an upper bound index that should be compacted + // based on snapSize. If it is -1, size based compaction shouldn't be + // triggered. + snapSizeCompactIdx int64 } // NewMemoryStorage creates an empty MemoryStorage. -func NewMemoryStorage() *MemoryStorage { +func NewMemoryStorage(snapSize uint64) *MemoryStorage { return &MemoryStorage{ // When starting from scratch populate the list with a dummy entry at term zero. - ents: make([]pb.Entry, 1), + ents: make([]pb.Entry, 1), + snapSize: snapSize, + snapSizeCompactIdx: -1, } } @@ -182,6 +197,8 @@ func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error { ms.snapshot = snap ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}} + ms.entsSize = 0 + ms.snapSizeCompactIdx = -1 return nil } @@ -230,6 +247,12 @@ func (ms *MemoryStorage) Compact(compactIndex uint64) error { ents[0].Term = ms.ents[i].Term ents = append(ents, ms.ents[i+1:]...) ms.ents = ents + ms.entsSize = sizeOfEntries(ms.ents) + if ms.snapSize < ms.entsSize { + ms.snapSizeCompactIdx = int64(ms.ents[len(ms.ents)-1].Index) + } else { + ms.snapSizeCompactIdx = -1 + } return nil } @@ -261,11 +284,41 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error { case uint64(len(ms.ents)) > offset: ms.ents = append([]pb.Entry{}, ms.ents[:offset]...) ms.ents = append(ms.ents, entries...) + ms.entsSize = sizeOfEntries(ms.ents) case uint64(len(ms.ents)) == offset: ms.ents = append(ms.ents, entries...) + ms.entsSize += sizeOfEntries(entries) default: raftLogger.Panicf("missing log entry [last: %d, append at: %d]", ms.lastIndex(), entries[0].Index) } + + if ms.snapSizeCompactIdx == -1 && ms.snapSize < ms.entsSize { + // this calculation of snapSizeCompactIdx isn't accurate but + // calculating the exact minimum index that satisfies the + // below condition would be expensive + ms.snapSizeCompactIdx = int64(ms.ents[len(ms.ents)-1].Index) + } + return nil } + +func sizeOfEntries(entries []pb.Entry) (size uint64) { + for _, e := range entries { + size += uint64(len(e.Data)) + } + return size +} + +func (ms *MemoryStorage) ShouldCompactBySize() bool { + ms.Lock() + defer ms.Unlock() + return ms.snapSize != 0 && ms.snapSizeCompactIdx != -1 +} + +func (ms *MemoryStorage) SizeBasedCompactIndex() uint64 { + ms.Lock() + defer ms.Unlock() + // when ms.snapSizeCompactIdx == -1, this function must not be called + return uint64(ms.snapSizeCompactIdx) +} diff --git a/raft/storage_test.go b/raft/storage_test.go index 71d50b4c946..88ac1dd33d1 100644 --- a/raft/storage_test.go +++ b/raft/storage_test.go @@ -265,7 +265,7 @@ func TestStorageApplySnapshot(t *testing.T) { {Data: data, Metadata: pb.SnapshotMetadata{Index: 3, Term: 3, ConfState: *cs}}, } - s := NewMemoryStorage() + s := NewMemoryStorage(0) //Apply Snapshot successful i := 0