Skip to content

Commit

Permalink
*: a new option for size based compaction
Browse files Browse the repository at this point in the history
This commit adds a new option --snapshot-size for specifying size
based compaction. The option takes a number of bytes. If the
accumulated size of in memory log entries becomes larger than the
given size, etcd triggers compaction.
  • Loading branch information
mitake committed Aug 1, 2017
1 parent 2951faf commit df37a38
Show file tree
Hide file tree
Showing 20 changed files with 257 additions and 170 deletions.
2 changes: 1 addition & 1 deletion contrib/raftexample/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (rc *raftNode) replayWAL() *wal.WAL {
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
rc.raftStorage = raft.NewMemoryStorage()
rc.raftStorage = raft.NewMemoryStorage(0)
if snapshot != nil {
rc.raftStorage.ApplySnapshot(*snapshot)
}
Expand Down
2 changes: 2 additions & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Config struct {
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
SnapSize uint64 `json:"snapshot-size"`
AutoCompactionRetention int `json:"auto-compaction-retention"`
AutoCompactionMode string `json:"auto-compaction-mode"`

Expand Down Expand Up @@ -182,6 +183,7 @@ func NewConfig() *Config {
SnapCount: etcdserver.DefaultSnapCount,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
SnapSize: etcdserver.DefaultSnapSize,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapCount: cfg.SnapCount,
SnapSize: cfg.SnapSize,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func newConfig() *config {
fs.UintVar(&cfg.MaxWalFiles, "max-wals", cfg.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).")
fs.StringVar(&cfg.Name, "name", cfg.Name, "Human-readable name for this member.")
fs.Uint64Var(&cfg.SnapCount, "snapshot-count", cfg.SnapCount, "Number of committed transactions to trigger a snapshot to disk.")
fs.Uint64Var(&cfg.SnapSize, "snapshot-size", cfg.SnapSize, "Size of committed transactions to trigger a snapshot to disk.")
fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ServerConfig struct {
// rather than the dataDir/member/wal.
DedicatedWALDir string
SnapCount uint64
SnapSize uint64
MaxSnapFiles uint
MaxWALFiles uint
InitialPeerURLsMap types.URLsMap
Expand Down Expand Up @@ -202,6 +203,7 @@ func (c *ServerConfig) print(initial bool) {
plog.Infof("heartbeat = %dms", c.TickMs)
plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs))
plog.Infof("snapshot count = %d", c.SnapCount)
plog.Infof("snapshot size = %d", c.SnapSize)
if len(c.DiscoveryURL) != 0 {
plog.Infof("discovery URL= %s", c.DiscoveryURL)
if len(c.DiscoveryProxy) != 0 {
Expand Down
6 changes: 3 additions & 3 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
}
id = member.ID
plog.Infof("starting member %s in cluster %s", id, cl.ID())
s = raft.NewMemoryStorage()
s = raft.NewMemoryStorage(cfg.SnapSize)
c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
Expand Down Expand Up @@ -429,7 +429,7 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := membership.NewCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
s := raft.NewMemoryStorage(cfg.SnapSize)
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
Expand Down Expand Up @@ -485,7 +485,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := membership.NewCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
s := raft.NewMemoryStorage(cfg.SnapSize)
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
r := newRaftNode(raftNodeConfig{
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{r: *r}
Expand All @@ -183,7 +183,7 @@ func TestConfgChangeBlocksApply(t *testing.T) {
r := newRaftNode(raftNodeConfig{
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{r: *r}
Expand Down
40 changes: 34 additions & 6 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (

const (
DefaultSnapCount = 100000
DefaultSnapSize = 0 // 0 means no size based compaction

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"
Expand Down Expand Up @@ -175,6 +176,9 @@ type EtcdServer struct {
readych chan struct{}
Cfg ServerConfig

snapCount uint64
snapSize uint64

w wait.Wait

readMu sync.RWMutex
Expand Down Expand Up @@ -406,6 +410,8 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
srv = &EtcdServer{
readych: make(chan struct{}),
Cfg: cfg,
snapCount: cfg.SnapCount,
snapSize: cfg.SnapSize,
errorc: make(chan error, 1),
store: st,
snapshotter: ss,
Expand Down Expand Up @@ -534,6 +540,10 @@ func (s *EtcdServer) start() {
plog.Infof("set snapshot count to default %d", DefaultSnapCount)
s.Cfg.SnapCount = DefaultSnapCount
}
if s.snapSize == 0 {
plog.Infof("set snapshot size to default %d", DefaultSnapSize)
s.snapSize = DefaultSnapSize
}
s.w = wait.New()
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
Expand Down Expand Up @@ -914,8 +924,12 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
}
}

func (s *EtcdServer) shouldTriggerSnapshot(ep *etcdProgress) bool {
return s.snapCount < ep.appliedi-ep.snapi || s.r.raftStorage.ShouldCompactBySize()
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {
if !s.shouldTriggerSnapshot(ep) {
return
}

Expand Down Expand Up @@ -1416,6 +1430,24 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
return false, nil
}

func (s *EtcdServer) getCompactIndex(snapi uint64) uint64 {
compacti := uint64(1)

if snapi > numberOfCatchUpEntries {
// keep some in memory log entries for slow followers.
compacti = snapi - numberOfCatchUpEntries
}

szCompacti := s.r.raftStorage.SizeBasedCompactIndex()
if compacti < szCompacti {
if snapi < szCompacti {
return snapi
}
return szCompacti
}
return compacti
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
clone := s.store.Clone()
Expand Down Expand Up @@ -1460,11 +1492,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > numberOfCatchUpEntries {
compacti = snapi - numberOfCatchUpEntries
}
compacti := s.getCompactIndex(snapi)
err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand Down
20 changes: 10 additions & 10 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestApplyRepeat(t *testing.T) {
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
Expand Down Expand Up @@ -679,7 +679,7 @@ func TestDoProposal(t *testing.T) {
r := newRaftNode(raftNodeConfig{
Node: newNodeCommitter(),
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
transport: rafthttp.NewNopTransporter(),
})
srv := &EtcdServer{
Expand Down Expand Up @@ -847,7 +847,7 @@ func TestSyncTrigger(t *testing.T) {
tk := &time.Ticker{C: st}
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
transport: rafthttp.NewNopTransporter(),
storage: mockstorage.NewStorageRecorder(""),
})
Expand Down Expand Up @@ -901,7 +901,7 @@ func TestSnapshot(t *testing.T) {
os.RemoveAll(tmpPath)
}()

s := raft.NewMemoryStorage()
s := raft.NewMemoryStorage(0)
s.Append([]raftpb.Entry{{Index: 1}})
st := mockstore.NewRecorderStream()
p := mockstorage.NewStorageRecorderStream("")
Expand Down Expand Up @@ -970,7 +970,7 @@ func TestSnapshotOrdering(t *testing.T) {
t.Fatalf("couldn't make snap dir (%v)", err)
}

rs := raft.NewMemoryStorage()
rs := raft.NewMemoryStorage(0)
p := mockstorage.NewStorageRecorderStream(testdir)
tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir)
r := newRaftNode(raftNodeConfig{
Expand Down Expand Up @@ -1037,7 +1037,7 @@ func TestTriggerSnap(t *testing.T) {
p := mockstorage.NewStorageRecorderStream("")
r := newRaftNode(raftNodeConfig{
Node: newNodeCommitter(),
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: p,
transport: rafthttp.NewNopTransporter(),
})
Expand Down Expand Up @@ -1096,7 +1096,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
t.Fatalf("Couldn't make snap dir (%v)", err)
}

rs := raft.NewMemoryStorage()
rs := raft.NewMemoryStorage(0)
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
r := newRaftNode(raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func TestAddMember(t *testing.T) {
cl.SetStore(st)
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
Expand Down Expand Up @@ -1226,7 +1226,7 @@ func TestRemoveMember(t *testing.T) {
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
Expand Down Expand Up @@ -1266,7 +1266,7 @@ func TestUpdateMember(t *testing.T) {
cl.AddMember(&membership.Member{ID: 1234})
r := newRaftNode(raftNodeConfig{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(0),
storage: mockstorage.NewStorageRecorder(""),
transport: rafthttp.NewNopTransporter(),
})
Expand Down
Loading

0 comments on commit df37a38

Please sign in to comment.