Skip to content

Commit

Permalink
raft: fix committed entry pagination with async storage writes
Browse files Browse the repository at this point in the history
This commit fixes the interactions between commit entry pagination and async
storage writes. The pagination now properly applies across multiple Ready
structs, acting as a limit on outstanding committed entries that have yet to be
acked through a MsgStorageApplyResp message.

The commit also resolves an abuse of the LogTerm field in MsgStorageApply{Resp}.

Signed-off-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
nvanbenschoten committed Dec 21, 2022
1 parent 2c2960c commit 2e0653d
Show file tree
Hide file tree
Showing 12 changed files with 513 additions and 184 deletions.
96 changes: 73 additions & 23 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,30 @@ type raftLog struct {
// been instructed to apply to its state machine. Some of these
// entries may be in the process of applying and have not yet
// reached applied.
// Use: The field is incremented when accepting a Ready struct.
// Invariant: applied <= applying && applying <= committed
applying uint64
// applied is the highest log position that the application has
// successfully applied to its state machine.
// Use: The field is incremented when advancing after the committed
// entries in a Ready struct have been applied (either synchronously
// or asynchronously).
// Invariant: applied <= committed
applied uint64

logger Logger

// maxNextCommittedEntsSize is the maximum number aggregate byte size of the
// messages returned from calls to nextCommittedEnts.
maxNextCommittedEntsSize uint64
// maxApplyingEntsSize limits the outstanding byte size of the messages
// returned from calls to nextCommittedEnts that have not been acknowledged
// by a call to appliedTo.
maxApplyingEntsSize entryEncodingSize
// applyingEntsSize is the current outstanding byte size of the messages
// returned from calls to nextCommittedEnts that have not been acknowledged
// by a call to appliedTo.
applyingEntsSize entryEncodingSize
// applyingEntsPaused is true when entry application has been paused until
// enough progress is acknowledged.
applyingEntsPaused bool
}

// newLog returns log using the given storage and default options. It
Expand All @@ -59,14 +71,14 @@ func newLog(storage Storage, logger Logger) *raftLog {

// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxNextCommittedEntsSize uint64) *raftLog {
func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEncodingSize) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
maxNextCommittedEntsSize: maxNextCommittedEntsSize,
storage: storage,
logger: logger,
maxApplyingEntsSize: maxApplyingEntsSize,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
Expand All @@ -81,8 +93,8 @@ func newLogWithSize(storage Storage, logger Logger, maxNextCommittedEntsSize uin
log.unstable.logger = logger
// Initialize our committed and applied pointers to the time of the last compaction.
log.committed = firstIndex - 1
log.applied = firstIndex - 1
log.applying = firstIndex - 1
log.applied = firstIndex - 1

return log
}
Expand Down Expand Up @@ -204,20 +216,25 @@ func (l *raftLog) hasNextOrInProgressUnstableEnts() bool {
// entries from the unstable log may be returned; otherwise, only entries known
// to reside locally on stable storage will be returned.
func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
if l.applyingEntsPaused {
// Entry application outstanding size limit reached.
return nil
}
if l.hasNextOrInProgressSnapshot() {
// See comment in hasNextCommittedEnts.
return nil
}
lo, hi := l.applying+1, l.committed+1 // [lo, hi)
if !allowUnstable {
hi = min(hi, l.unstable.offset)
}
lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
if lo >= hi {
// Nothing to apply.
return nil
}
// TODO: handle pagination correctly.
ents, err := l.slice(lo, hi, l.maxNextCommittedEntsSize)
maxSize := l.maxApplyingEntsSize - l.applyingEntsSize
if maxSize <= 0 {
l.logger.Panicf("applying entry size (%d-%d)=%d not positive",
l.maxApplyingEntsSize, l.applyingEntsSize, maxSize)
}
ents, err := l.slice(lo, hi, maxSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand All @@ -227,17 +244,30 @@ func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
// hasNextCommittedEnts returns if there is any available entries for execution.
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool {
if l.applyingEntsPaused {
// Entry application outstanding size limit reached.
return false
}
if l.hasNextOrInProgressSnapshot() {
// If we have a snapshot to apply, don't also return any committed
// entries. Doing so raises questions about what should be applied
// first.
return false
}
lo, hi := l.applying+1, l.committed+1 // [lo, hi)
lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
return lo < hi
}

// maxAppliableIndex returns the maximum committed index that can be applied.
// If allowUnstable is true, committed entries from the unstable log can be
// applied; otherwise, only entries known to reside locally on stable storage
// can be applied.
func (l *raftLog) maxAppliableIndex(allowUnstable bool) uint64 {
hi := l.committed
if !allowUnstable {
hi = min(hi, l.unstable.offset)
hi = min(hi, l.unstable.offset-1)
}
return lo < hi
return hi
}

// nextUnstableSnapshot returns the snapshot, if present, that is available to
Expand Down Expand Up @@ -297,19 +327,39 @@ func (l *raftLog) commitTo(tocommit uint64) {
}
}

func (l *raftLog) appliedTo(i uint64) {
func (l *raftLog) appliedTo(i uint64, size entryEncodingSize) {
if l.committed < i || i < l.applied {
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
}
l.applied = i
l.applying = max(l.applying, i)
if l.applyingEntsSize > size {
l.applyingEntsSize -= size
} else {
// Defense against underflow.
l.applyingEntsSize = 0
}
l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize
}

func (l *raftLog) acceptApplying(i uint64) {
func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable bool) {
if l.committed < i {
l.logger.Panicf("applying(%d) is out of range [prevApplying(%d), committed(%d)]", i, l.applying, l.committed)
}
l.applying = i
l.applyingEntsSize += size
// Determine whether to pause entry application until some progress is
// acknowledged. We pause in two cases:
// 1. the outstanding entry size equals or exceeds the maximum size.
// 2. the outstanding entry size does not equal or exceed the maximum size,
// but we determine that the next entry in the log will push us over the
// limit. We determine this by comparing the last entry returned from
// raftLog.nextCommittedEnts to the maximum entry that the method was
// allowed to return had there been no size limit. If these indexes are
// not equal, then the returned entries slice must have been truncated to
// adhere to the memory limit.
l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize ||
i < l.maxAppliableIndex(allowUnstable)
}

func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
Expand Down Expand Up @@ -355,11 +405,11 @@ func (l *raftLog) term(i uint64) (uint64, error) {
panic(err) // TODO(bdarnell)
}

func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
if i > l.lastIndex() {
return nil, nil
}
return l.slice(i, l.lastIndex()+1, maxsize)
return l.slice(i, l.lastIndex()+1, maxSize)
}

// allEntries returns all entries in the log.
Expand Down Expand Up @@ -408,7 +458,7 @@ func (l *raftLog) restore(s pb.Snapshot) {
}

// slice returns a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
err := l.mustCheckOutOfBounds(lo, hi)
if err != nil {
return nil, err
Expand All @@ -418,7 +468,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
}
var ents []pb.Entry
if lo < l.unstable.offset {
storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), uint64(maxSize))
if err == ErrCompacted {
return nil, err
} else if err == ErrUnavailable {
Expand Down
Loading

0 comments on commit 2e0653d

Please sign in to comment.