Skip to content

Commit

Permalink
*: a new option for size based compaction
Browse files Browse the repository at this point in the history
This commit adds a new option --snapshot-size for specifying size
based compaction. The option takes a number of bytes. If the
accumulated size of in memory log entries becomes larger than the
given size, etcd triggers compaction.
  • Loading branch information
mitake committed May 10, 2017
1 parent aac2292 commit c7b7a23
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 6 deletions.
2 changes: 2 additions & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Config struct {
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
SnapSize uint64 `json:"snapshot-size"`
AutoCompactionRetention int `json:"auto-compaction-retention"`

// TickMs is the number of milliseconds between heartbeat ticks.
Expand Down Expand Up @@ -172,6 +173,7 @@ func NewConfig() *Config {
MaxWalFiles: DefaultMaxWALs,
Name: DefaultName,
SnapCount: etcdserver.DefaultSnapCount,
SnapSize: etcdserver.DefaultSnapSize,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapCount: cfg.SnapCount,
SnapSize: cfg.SnapSize,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func newConfig() *config {
fs.UintVar(&cfg.MaxWalFiles, "max-wals", cfg.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).")
fs.StringVar(&cfg.Name, "name", cfg.Name, "Human-readable name for this member.")
fs.Uint64Var(&cfg.SnapCount, "snapshot-count", cfg.SnapCount, "Number of committed transactions to trigger a snapshot to disk.")
fs.Uint64Var(&cfg.SnapSize, "snapshot-size", cfg.SnapSize, "Size of committed transactions to trigger a snapshot to disk.")
fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ServerConfig struct {
// rather than the dataDir/member/wal.
DedicatedWALDir string
SnapCount uint64
SnapSize uint64
MaxSnapFiles uint
MaxWALFiles uint
InitialPeerURLsMap types.URLsMap
Expand Down Expand Up @@ -167,6 +168,7 @@ func (c *ServerConfig) print(initial bool) {
plog.Infof("heartbeat = %dms", c.TickMs)
plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs))
plog.Infof("snapshot count = %d", c.SnapCount)
plog.Infof("snapshot size = %d", c.SnapSize)
if len(c.DiscoveryURL) != 0 {
plog.Infof("discovery URL= %s", c.DiscoveryURL)
if len(c.DiscoveryProxy) != 0 {
Expand Down
37 changes: 31 additions & 6 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (

const (
DefaultSnapCount = 100000
DefaultSnapSize = DefaultSnapCount * 1024 // assume each entry is 1KB

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"
Expand Down Expand Up @@ -176,6 +177,7 @@ type EtcdServer struct {
r raftNode

snapCount uint64
snapSize uint64

w wait.Wait

Expand Down Expand Up @@ -419,6 +421,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
readych: make(chan struct{}),
Cfg: cfg,
snapCount: cfg.SnapCount,
snapSize: cfg.SnapSize,
errorc: make(chan error, 1),
store: st,
r: *newRaftNode(
Expand Down Expand Up @@ -534,6 +537,10 @@ func (s *EtcdServer) start() {
plog.Infof("set snapshot count to default %d", DefaultSnapCount)
s.snapCount = DefaultSnapCount
}
if s.snapSize == 0 {
plog.Infof("set snapshot size to default %d", DefaultSnapSize)
s.snapSize = DefaultSnapSize
}
s.w = wait.New()
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
Expand Down Expand Up @@ -913,8 +920,12 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
}
}

func (s *EtcdServer) shouldTriggerSnapshot(ep *etcdProgress) bool {
return s.snapCount < ep.appliedi-ep.snapi || s.r.raftStorage.ShouldCompactBySize(s.snapSize)
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.snapCount {
if !s.shouldTriggerSnapshot(ep) {
return
}

Expand Down Expand Up @@ -1417,6 +1428,24 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
return false, nil
}

func (s *EtcdServer) getCompactIndex(snapi uint64) uint64 {
compacti := uint64(1)

if snapi > numberOfCatchUpEntries {
// keep some in memory log entries for slow followers.
compacti = snapi - numberOfCatchUpEntries
}

szCompacti := s.r.raftStorage.SizeBasedCompactIndex(s.snapSize)
if compacti < szCompacti {
if snapi < szCompacti {
return snapi
}
return szCompacti
}
return compacti
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
clone := s.store.Clone()
Expand Down Expand Up @@ -1461,11 +1490,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > numberOfCatchUpEntries {
compacti = snapi - numberOfCatchUpEntries
}
compacti := s.getCompactIndex(snapi)
err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand Down
35 changes: 35 additions & 0 deletions raft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type MemoryStorage struct {
snapshot pb.Snapshot
// ents[i] has raft log position i+snapshot.Metadata.Index
ents []pb.Entry
// entsSize has a total size of Data of ents
entsSize uint64
}

// NewMemoryStorage creates an empty MemoryStorage.
Expand Down Expand Up @@ -182,6 +184,7 @@ func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error {

ms.snapshot = snap
ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}}
ms.entsSize = 0
return nil
}

Expand Down Expand Up @@ -230,6 +233,7 @@ func (ms *MemoryStorage) Compact(compactIndex uint64) error {
ents[0].Term = ms.ents[i].Term
ents = append(ents, ms.ents[i+1:]...)
ms.ents = ents
ms.entsSize = sizeOfEntries(ms.ents)
return nil
}

Expand Down Expand Up @@ -261,11 +265,42 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error {
case uint64(len(ms.ents)) > offset:
ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
ms.ents = append(ms.ents, entries...)
ms.entsSize = sizeOfEntries(ms.ents)
case uint64(len(ms.ents)) == offset:
ms.ents = append(ms.ents, entries...)
ms.entsSize += sizeOfEntries(entries)
default:
raftLogger.Panicf("missing log entry [last: %d, append at: %d]",
ms.lastIndex(), entries[0].Index)
}
return nil
}

func sizeOfEntries(entries []pb.Entry) (size uint64) {
for _, e := range entries {
size += uint64(len(e.Data))
}
return size
}

func (ms *MemoryStorage) ShouldCompactBySize(limitBytes uint64) bool {
return limitBytes <= ms.entsSize
}

func (ms *MemoryStorage) SizeBasedCompactIndex(limitBytes uint64) uint64 {
ms.Lock()
defer ms.Unlock()

compacti := ms.ents[0].Index - 1
sz := uint64(0)

for _, e := range ms.ents {
sz += uint64(len(e.Data))
compacti++
if limitBytes <= sz {
return compacti
}
}

return compacti
}

0 comments on commit c7b7a23

Please sign in to comment.