Skip to content

Commit

Permalink
Merge pull request #10258 from ajwerner/ajwerner/raft_committed_entri…
Browse files Browse the repository at this point in the history
…es_size

raft: separate MaxCommittedSizePerReady config from MaxSizePerMsg
  • Loading branch information
xiang90 authored Nov 15, 2018
2 parents ee9dcbc + e4af2be commit fa92397
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
14 changes: 8 additions & 6 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type raftLog struct {

logger Logger

maxMsgSize uint64
// maxNextEntsSize is the maximum number aggregate byte size of the messages
// returned from calls to nextEnts.
maxNextEntsSize uint64
}

// newLog returns log using the given storage and default options. It
Expand All @@ -51,14 +53,14 @@ func newLog(storage Storage, logger Logger) *raftLog {

// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
maxMsgSize: maxMsgSize,
storage: storage,
logger: logger,
maxNextEntsSize: maxNextEntsSize,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
Expand Down Expand Up @@ -149,7 +151,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
func (l *raftLog) nextEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex())
if l.committed+1 > off {
ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand Down
2 changes: 1 addition & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ func TestAppendPagination(t *testing.T) {
func TestCommitPagination(t *testing.T) {
s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg.MaxSizePerMsg = 2048
cfg.MaxCommittedSizePerReady = 2048
r := newRaft(cfg)
n := newNode()
go n.run(r)
Expand Down
11 changes: 10 additions & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ type Config struct {
// throughput during normal replication. Note: math.MaxUint64 for unlimited,
// 0 for at most one entry per message.
MaxSizePerMsg uint64
// MaxCommittedSizePerReady limits the size of the committed entries which
// can be applied.
MaxCommittedSizePerReady uint64
// MaxUncommittedEntriesSize limits the aggregate byte size of the
// uncommitted entries that may be appended to a leader's log. Once this
// limit is exceeded, proposals will begin to return ErrProposalDropped
Expand Down Expand Up @@ -224,6 +227,12 @@ func (c *Config) validate() error {
c.MaxUncommittedEntriesSize = noLimit
}

// default MaxCommittedSizePerReady to MaxSizePerMsg because they were
// previously the same parameter.
if c.MaxCommittedSizePerReady == 0 {
c.MaxCommittedSizePerReady = c.MaxSizePerMsg
}

if c.MaxInflightMsgs <= 0 {
return errors.New("max inflight messages must be greater than 0")
}
Expand Down Expand Up @@ -316,7 +325,7 @@ func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg)
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err) // TODO(bdarnell)
Expand Down

0 comments on commit fa92397

Please sign in to comment.