diff --git a/log.go b/log.go index cd19e581a67d..daf7acef7e4d 100644 --- a/log.go +++ b/log.go @@ -36,18 +36,30 @@ type raftLog struct { // been instructed to apply to its state machine. Some of these // entries may be in the process of applying and have not yet // reached applied. + // Use: The field is incremented when accepting a Ready struct. // Invariant: applied <= applying && applying <= committed applying uint64 // applied is the highest log position that the application has // successfully applied to its state machine. + // Use: The field is incremented when advancing after the committed + // entries in a Ready struct have been applied (either synchronously + // or asynchronously). // Invariant: applied <= committed applied uint64 logger Logger - // maxNextCommittedEntsSize is the maximum number aggregate byte size of the - // messages returned from calls to nextCommittedEnts. - maxNextCommittedEntsSize uint64 + // maxApplyingEntsSize limits the outstanding byte size of the messages + // returned from calls to nextCommittedEnts that have not been acknowledged + // by a call to appliedTo. + maxApplyingEntsSize entryEncodingSize + // applyingEntsSize is the current outstanding byte size of the messages + // returned from calls to nextCommittedEnts that have not been acknowledged + // by a call to appliedTo. + applyingEntsSize entryEncodingSize + // applyingEntsPaused is true when entry application has been paused until + // enough progress is acknowledged. + applyingEntsPaused bool } // newLog returns log using the given storage and default options. It @@ -59,14 +71,14 @@ func newLog(storage Storage, logger Logger) *raftLog { // newLogWithSize returns a log using the given storage and max // message size. -func newLogWithSize(storage Storage, logger Logger, maxNextCommittedEntsSize uint64) *raftLog { +func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEncodingSize) *raftLog { if storage == nil { log.Panic("storage must not be nil") } log := &raftLog{ - storage: storage, - logger: logger, - maxNextCommittedEntsSize: maxNextCommittedEntsSize, + storage: storage, + logger: logger, + maxApplyingEntsSize: maxApplyingEntsSize, } firstIndex, err := storage.FirstIndex() if err != nil { @@ -81,8 +93,8 @@ func newLogWithSize(storage Storage, logger Logger, maxNextCommittedEntsSize uin log.unstable.logger = logger // Initialize our committed and applied pointers to the time of the last compaction. log.committed = firstIndex - 1 - log.applied = firstIndex - 1 log.applying = firstIndex - 1 + log.applied = firstIndex - 1 return log } @@ -204,20 +216,25 @@ func (l *raftLog) hasNextOrInProgressUnstableEnts() bool { // entries from the unstable log may be returned; otherwise, only entries known // to reside locally on stable storage will be returned. func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) { + if l.applyingEntsPaused { + // Entry application outstanding size limit reached. + return nil + } if l.hasNextOrInProgressSnapshot() { // See comment in hasNextCommittedEnts. return nil } - lo, hi := l.applying+1, l.committed+1 // [lo, hi) - if !allowUnstable { - hi = min(hi, l.unstable.offset) - } + lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi) if lo >= hi { // Nothing to apply. return nil } - // TODO: handle pagination correctly. - ents, err := l.slice(lo, hi, l.maxNextCommittedEntsSize) + maxSize := l.maxApplyingEntsSize - l.applyingEntsSize + if maxSize <= 0 { + l.logger.Panicf("applying entry size (%d-%d)=%d not positive", + l.maxApplyingEntsSize, l.applyingEntsSize, maxSize) + } + ents, err := l.slice(lo, hi, maxSize) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } @@ -227,17 +244,30 @@ func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) { // hasNextCommittedEnts returns if there is any available entries for execution. // This is a fast check without heavy raftLog.slice() in nextCommittedEnts(). func (l *raftLog) hasNextCommittedEnts(allowUnstable bool) bool { + if l.applyingEntsPaused { + // Entry application outstanding size limit reached. + return false + } if l.hasNextOrInProgressSnapshot() { // If we have a snapshot to apply, don't also return any committed // entries. Doing so raises questions about what should be applied // first. return false } - lo, hi := l.applying+1, l.committed+1 // [lo, hi) + lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi) + return lo < hi +} + +// maxAppliableIndex returns the maximum committed index that can be applied. +// If allowUnstable is true, committed entries from the unstable log can be +// applied; otherwise, only entries known to reside locally on stable storage +// can be applied. +func (l *raftLog) maxAppliableIndex(allowUnstable bool) uint64 { + hi := l.committed if !allowUnstable { - hi = min(hi, l.unstable.offset) + hi = min(hi, l.unstable.offset-1) } - return lo < hi + return hi } // nextUnstableSnapshot returns the snapshot, if present, that is available to @@ -297,19 +327,39 @@ func (l *raftLog) commitTo(tocommit uint64) { } } -func (l *raftLog) appliedTo(i uint64) { +func (l *raftLog) appliedTo(i uint64, size entryEncodingSize) { if l.committed < i || i < l.applied { l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) } l.applied = i l.applying = max(l.applying, i) + if l.applyingEntsSize > size { + l.applyingEntsSize -= size + } else { + // Defense against underflow. + l.applyingEntsSize = 0 + } + l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize } -func (l *raftLog) acceptApplying(i uint64) { +func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable bool) { if l.committed < i { l.logger.Panicf("applying(%d) is out of range [prevApplying(%d), committed(%d)]", i, l.applying, l.committed) } l.applying = i + l.applyingEntsSize += size + // Determine whether to pause entry application until some progress is + // acknowledged. We pause in two cases: + // 1. the outstanding entry size equals or exceeds the maximum size. + // 2. the outstanding entry size does not equal or exceed the maximum size, + // but we determine that the next entry in the log will push us over the + // limit. We determine this by comparing the last entry returned from + // raftLog.nextCommittedEnts to the maximum entry that the method was + // allowed to return had there been no size limit. If these indexes are + // not equal, then the returned entries slice must have been truncated to + // adhere to the memory limit. + l.applyingEntsPaused = l.applyingEntsSize >= l.maxApplyingEntsSize || + i < l.maxAppliableIndex(allowUnstable) } func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } @@ -355,11 +405,11 @@ func (l *raftLog) term(i uint64) (uint64, error) { panic(err) // TODO(bdarnell) } -func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) { +func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { if i > l.lastIndex() { return nil, nil } - return l.slice(i, l.lastIndex()+1, maxsize) + return l.slice(i, l.lastIndex()+1, maxSize) } // allEntries returns all entries in the log. @@ -408,7 +458,7 @@ func (l *raftLog) restore(s pb.Snapshot) { } // slice returns a slice of log entries from lo through hi-1, inclusive. -func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) { +func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { err := l.mustCheckOutOfBounds(lo, hi) if err != nil { return nil, err @@ -418,7 +468,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) { } var ents []pb.Entry if lo < l.unstable.offset { - storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize) + storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), uint64(maxSize)) if err == ErrCompacted { return nil, err } else if err == ErrUnavailable { diff --git a/log_test.go b/log_test.go index 3e03c7a2f62f..89d5827e2192 100644 --- a/log_test.go +++ b/log_test.go @@ -270,7 +270,7 @@ func TestCompactionSideEffects(t *testing.T) { } require.True(t, raftLog.maybeCommit(lastIndex, lastTerm)) - raftLog.appliedTo(raftLog.committed) + raftLog.appliedTo(raftLog.committed, 0 /* size */) offset := uint64(500) storage.Compact(offset) @@ -310,22 +310,25 @@ func TestHasNextCommittedEnts(t *testing.T) { applied uint64 applying uint64 allowUnstable bool + paused bool snap bool whasNext bool }{ - {applied: 3, applying: 3, allowUnstable: true, snap: false, whasNext: true}, - {applied: 3, applying: 4, allowUnstable: true, snap: false, whasNext: true}, - {applied: 3, applying: 5, allowUnstable: true, snap: false, whasNext: false}, - {applied: 4, applying: 4, allowUnstable: true, snap: false, whasNext: true}, - {applied: 4, applying: 5, allowUnstable: true, snap: false, whasNext: false}, - {applied: 5, applying: 5, allowUnstable: true, snap: false, whasNext: false}, + {applied: 3, applying: 3, allowUnstable: true, whasNext: true}, + {applied: 3, applying: 4, allowUnstable: true, whasNext: true}, + {applied: 3, applying: 5, allowUnstable: true, whasNext: false}, + {applied: 4, applying: 4, allowUnstable: true, whasNext: true}, + {applied: 4, applying: 5, allowUnstable: true, whasNext: false}, + {applied: 5, applying: 5, allowUnstable: true, whasNext: false}, // Don't allow unstable entries. - {applied: 3, applying: 3, allowUnstable: false, snap: false, whasNext: true}, - {applied: 3, applying: 4, allowUnstable: false, snap: false, whasNext: false}, - {applied: 3, applying: 5, allowUnstable: false, snap: false, whasNext: false}, - {applied: 4, applying: 4, allowUnstable: false, snap: false, whasNext: false}, - {applied: 4, applying: 5, allowUnstable: false, snap: false, whasNext: false}, - {applied: 5, applying: 5, allowUnstable: false, snap: false, whasNext: false}, + {applied: 3, applying: 3, allowUnstable: false, whasNext: true}, + {applied: 3, applying: 4, allowUnstable: false, whasNext: false}, + {applied: 3, applying: 5, allowUnstable: false, whasNext: false}, + {applied: 4, applying: 4, allowUnstable: false, whasNext: false}, + {applied: 4, applying: 5, allowUnstable: false, whasNext: false}, + {applied: 5, applying: 5, allowUnstable: false, whasNext: false}, + // Paused. + {applied: 3, applying: 3, allowUnstable: true, paused: true, whasNext: false}, // With snapshot. {applied: 3, applying: 3, allowUnstable: true, snap: true, whasNext: false}, } @@ -339,8 +342,9 @@ func TestHasNextCommittedEnts(t *testing.T) { raftLog.append(ents...) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) - raftLog.appliedTo(tt.applied) - raftLog.acceptApplying(tt.applying) + raftLog.appliedTo(tt.applied, 0 /* size */) + raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable) + raftLog.applyingEntsPaused = tt.paused if tt.snap { newSnap := snap newSnap.Metadata.Index++ @@ -364,22 +368,25 @@ func TestNextCommittedEnts(t *testing.T) { applied uint64 applying uint64 allowUnstable bool + paused bool snap bool wents []pb.Entry }{ - {applied: 3, applying: 3, allowUnstable: true, snap: false, wents: ents[:2]}, - {applied: 3, applying: 4, allowUnstable: true, snap: false, wents: ents[1:2]}, - {applied: 3, applying: 5, allowUnstable: true, snap: false, wents: nil}, - {applied: 4, applying: 4, allowUnstable: true, snap: false, wents: ents[1:2]}, - {applied: 4, applying: 5, allowUnstable: true, snap: false, wents: nil}, - {applied: 5, applying: 5, allowUnstable: true, snap: false, wents: nil}, + {applied: 3, applying: 3, allowUnstable: true, wents: ents[:2]}, + {applied: 3, applying: 4, allowUnstable: true, wents: ents[1:2]}, + {applied: 3, applying: 5, allowUnstable: true, wents: nil}, + {applied: 4, applying: 4, allowUnstable: true, wents: ents[1:2]}, + {applied: 4, applying: 5, allowUnstable: true, wents: nil}, + {applied: 5, applying: 5, allowUnstable: true, wents: nil}, // Don't allow unstable entries. - {applied: 3, applying: 3, allowUnstable: false, snap: false, wents: ents[:1]}, - {applied: 3, applying: 4, allowUnstable: false, snap: false, wents: nil}, - {applied: 3, applying: 5, allowUnstable: false, snap: false, wents: nil}, - {applied: 4, applying: 4, allowUnstable: false, snap: false, wents: nil}, - {applied: 4, applying: 5, allowUnstable: false, snap: false, wents: nil}, - {applied: 5, applying: 5, allowUnstable: false, snap: false, wents: nil}, + {applied: 3, applying: 3, allowUnstable: false, wents: ents[:1]}, + {applied: 3, applying: 4, allowUnstable: false, wents: nil}, + {applied: 3, applying: 5, allowUnstable: false, wents: nil}, + {applied: 4, applying: 4, allowUnstable: false, wents: nil}, + {applied: 4, applying: 5, allowUnstable: false, wents: nil}, + {applied: 5, applying: 5, allowUnstable: false, wents: nil}, + // Paused. + {applied: 3, applying: 3, allowUnstable: true, paused: true, wents: nil}, // With snapshot. {applied: 3, applying: 3, allowUnstable: true, snap: true, wents: nil}, } @@ -393,8 +400,9 @@ func TestNextCommittedEnts(t *testing.T) { raftLog.append(ents...) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) - raftLog.appliedTo(tt.applied) - raftLog.acceptApplying(tt.applying) + raftLog.appliedTo(tt.applied, 0 /* size */) + raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable) + raftLog.applyingEntsPaused = tt.paused if tt.snap { newSnap := snap newSnap.Metadata.Index++ @@ -405,6 +413,113 @@ func TestNextCommittedEnts(t *testing.T) { } } +func TestAcceptApplying(t *testing.T) { + maxSize := entryEncodingSize(100) + snap := pb.Snapshot{ + Metadata: pb.SnapshotMetadata{Term: 1, Index: 3}, + } + ents := []pb.Entry{ + {Term: 1, Index: 4}, + {Term: 1, Index: 5}, + {Term: 1, Index: 6}, + } + tests := []struct { + index uint64 + allowUnstable bool + size entryEncodingSize + wpaused bool + }{ + {index: 3, allowUnstable: true, size: maxSize - 1, wpaused: true}, + {index: 3, allowUnstable: true, size: maxSize, wpaused: true}, + {index: 3, allowUnstable: true, size: maxSize + 1, wpaused: true}, + {index: 4, allowUnstable: true, size: maxSize - 1, wpaused: true}, + {index: 4, allowUnstable: true, size: maxSize, wpaused: true}, + {index: 4, allowUnstable: true, size: maxSize + 1, wpaused: true}, + {index: 5, allowUnstable: true, size: maxSize - 1, wpaused: false}, + {index: 5, allowUnstable: true, size: maxSize, wpaused: true}, + {index: 5, allowUnstable: true, size: maxSize + 1, wpaused: true}, + // Don't allow unstable entries. + {index: 3, allowUnstable: false, size: maxSize - 1, wpaused: true}, + {index: 3, allowUnstable: false, size: maxSize, wpaused: true}, + {index: 3, allowUnstable: false, size: maxSize + 1, wpaused: true}, + {index: 4, allowUnstable: false, size: maxSize - 1, wpaused: false}, + {index: 4, allowUnstable: false, size: maxSize, wpaused: true}, + {index: 4, allowUnstable: false, size: maxSize + 1, wpaused: true}, + {index: 5, allowUnstable: false, size: maxSize - 1, wpaused: false}, + {index: 5, allowUnstable: false, size: maxSize, wpaused: true}, + {index: 5, allowUnstable: false, size: maxSize + 1, wpaused: true}, + } + for i, tt := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + storage := NewMemoryStorage() + require.NoError(t, storage.ApplySnapshot(snap)) + require.NoError(t, storage.Append(ents[:1])) + + raftLog := newLogWithSize(storage, raftLogger, maxSize) + raftLog.append(ents...) + raftLog.stableTo(4, 1) + raftLog.maybeCommit(5, 1) + raftLog.appliedTo(3, 0 /* size */) + + raftLog.acceptApplying(tt.index, tt.size, tt.allowUnstable) + require.Equal(t, tt.wpaused, raftLog.applyingEntsPaused) + }) + } +} + +func TestAppliedTo(t *testing.T) { + maxSize := entryEncodingSize(100) + overshoot := entryEncodingSize(5) + snap := pb.Snapshot{ + Metadata: pb.SnapshotMetadata{Term: 1, Index: 3}, + } + ents := []pb.Entry{ + {Term: 1, Index: 4}, + {Term: 1, Index: 5}, + {Term: 1, Index: 6}, + } + tests := []struct { + index uint64 + size entryEncodingSize + wapplyingSize entryEncodingSize + wpaused bool + }{ + // Apply some of in-progress entries (applying = 5 below). + {index: 4, size: overshoot - 1, wapplyingSize: maxSize + 1, wpaused: true}, + {index: 4, size: overshoot, wapplyingSize: maxSize, wpaused: true}, + {index: 4, size: overshoot + 1, wapplyingSize: maxSize - 1, wpaused: false}, + // Apply all of in-progress entries. + {index: 5, size: overshoot - 1, wapplyingSize: maxSize + 1, wpaused: true}, + {index: 5, size: overshoot, wapplyingSize: maxSize, wpaused: true}, + {index: 5, size: overshoot + 1, wapplyingSize: maxSize - 1, wpaused: false}, + // Apply all of outstanding bytes. + {index: 4, size: maxSize + overshoot, wapplyingSize: 0, wpaused: false}, + // Apply more than outstanding bytes. + // Incorrect accounting doesn't underflow applyingSize. + {index: 4, size: maxSize + overshoot + 1, wapplyingSize: 0, wpaused: false}, + } + for i, tt := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + storage := NewMemoryStorage() + require.NoError(t, storage.ApplySnapshot(snap)) + require.NoError(t, storage.Append(ents[:1])) + + raftLog := newLogWithSize(storage, raftLogger, maxSize) + raftLog.append(ents...) + raftLog.stableTo(4, 1) + raftLog.maybeCommit(5, 1) + raftLog.appliedTo(3, 0 /* size */) + raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */) + + raftLog.appliedTo(tt.index, tt.size) + require.Equal(t, tt.index, raftLog.applied) + require.Equal(t, uint64(5), raftLog.applying) + require.Equal(t, tt.wapplyingSize, raftLog.applyingEntsSize) + require.Equal(t, tt.wpaused, raftLog.applyingEntsPaused) + }) + } +} + // TestNextUnstableEnts ensures unstableEntries returns the unstable part of the // entries correctly. func TestNextUnstableEnts(t *testing.T) { @@ -553,7 +668,7 @@ func TestCompaction(t *testing.T) { raftLog := newLog(storage, raftLogger) raftLog.maybeCommit(tt.lastIndex, 0) - raftLog.appliedTo(raftLog.committed) + raftLog.appliedTo(raftLog.committed, 0 /* size */) for j := 0; j < len(tt.compact); j++ { err := storage.Compact(tt.compact[j]) if err != nil { @@ -765,7 +880,7 @@ func TestSlice(t *testing.T) { require.True(t, tt.wpanic) } }() - g, err := l.slice(tt.from, tt.to, tt.limit) + g, err := l.slice(tt.from, tt.to, entryEncodingSize(tt.limit)) require.False(t, tt.from <= offset && err != ErrCompacted) require.False(t, tt.from > offset && err != nil) require.Equal(t, tt.w, g) diff --git a/node_test.go b/node_test.go index 4343c234e998..068002b85c4c 100644 --- a/node_test.go +++ b/node_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "go.etcd.io/raft/v3/raftpb" ) @@ -937,6 +939,153 @@ func TestCommitPagination(t *testing.T) { n.Advance() } +func TestCommitPaginationWithAsyncStorageWrites(t *testing.T) { + s := newTestMemoryStorage(withPeers(1)) + cfg := newTestConfig(1, 10, 1, s) + cfg.MaxCommittedSizePerReady = 2048 + cfg.AsyncStorageWrites = true + ctx, cancel, n := newNodeTestHarness(context.Background(), t, cfg) + defer cancel() + n.Campaign(ctx) + + // Persist vote. + rd := readyWithTimeout(n) + require.Len(t, rd.Messages, 1) + m := rd.Messages[0] + require.Equal(t, raftpb.MsgStorageAppend, m.Type) + require.NoError(t, s.Append(m.Entries)) + for _, resp := range m.Responses { + require.NoError(t, n.Step(ctx, resp)) + } + // Append empty entry. + rd = readyWithTimeout(n) + require.Len(t, rd.Messages, 1) + m = rd.Messages[0] + require.Equal(t, raftpb.MsgStorageAppend, m.Type) + require.NoError(t, s.Append(m.Entries)) + for _, resp := range m.Responses { + require.NoError(t, n.Step(ctx, resp)) + } + // Apply empty entry. + rd = readyWithTimeout(n) + require.Len(t, rd.Messages, 2) + for _, m := range rd.Messages { + switch m.Type { + case raftpb.MsgStorageAppend: + require.NoError(t, s.Append(m.Entries)) + for _, resp := range m.Responses { + require.NoError(t, n.Step(ctx, resp)) + } + case raftpb.MsgStorageApply: + if len(m.Entries) != 1 { + t.Fatalf("expected 1 (empty) entry, got %d", len(m.Entries)) + } + require.Len(t, m.Responses, 1) + require.NoError(t, n.Step(ctx, m.Responses[0])) + default: + t.Fatalf("unexpected: %v", m) + } + } + + // Propose first entry. + blob := []byte(strings.Repeat("a", 1024)) + require.NoError(t, n.Propose(ctx, blob)) + + // Append first entry. + rd = readyWithTimeout(n) + require.Len(t, rd.Messages, 1) + m = rd.Messages[0] + require.Equal(t, raftpb.MsgStorageAppend, m.Type) + require.Len(t, m.Entries, 1) + require.NoError(t, s.Append(m.Entries)) + for _, resp := range m.Responses { + require.NoError(t, n.Step(ctx, resp)) + } + + // Propose second entry. + require.NoError(t, n.Propose(ctx, blob)) + + // Append second entry. Don't apply first entry yet. + rd = readyWithTimeout(n) + require.Len(t, rd.Messages, 2) + var applyResps []raftpb.Message + for _, m := range rd.Messages { + switch m.Type { + case raftpb.MsgStorageAppend: + require.NoError(t, s.Append(m.Entries)) + for _, resp := range m.Responses { + require.NoError(t, n.Step(ctx, resp)) + } + case raftpb.MsgStorageApply: + if len(m.Entries) != 1 { + t.Fatalf("expected 1 (empty) entry, got %d", len(m.Entries)) + } + require.Len(t, m.Responses, 1) + applyResps = append(applyResps, m.Responses[0]) + default: + t.Fatalf("unexpected: %v", m) + } + } + + // Propose third entry. + require.NoError(t, n.Propose(ctx, blob)) + + // Append third entry. Don't apply second entry yet. + rd = readyWithTimeout(n) + require.Len(t, rd.Messages, 2) + for _, m := range rd.Messages { + switch m.Type { + case raftpb.MsgStorageAppend: + require.NoError(t, s.Append(m.Entries)) + for _, resp := range m.Responses { + require.NoError(t, n.Step(ctx, resp)) + } + case raftpb.MsgStorageApply: + if len(m.Entries) != 1 { + t.Fatalf("expected 1 (empty) entry, got %d", len(m.Entries)) + } + require.Len(t, m.Responses, 1) + applyResps = append(applyResps, m.Responses[0]) + default: + t.Fatalf("unexpected: %v", m) + } + } + + // Third entry should not be returned to be applied until first entry's + // application is acknowledged. + drain := true + for drain { + select { + case rd := <-n.Ready(): + for _, m := range rd.Messages { + if m.Type == raftpb.MsgStorageApply { + t.Fatalf("expected MsgStorageApply, %v", m) + } + } + case <-time.After(10 * time.Millisecond): + drain = false + } + } + + // Acknowledged first entry application. + require.NoError(t, n.Step(ctx, applyResps[0])) + applyResps = applyResps[1:] + + // Third entry now returned for application. + rd = readyWithTimeout(n) + require.Len(t, rd.Messages, 1) + m = rd.Messages[0] + require.Equal(t, raftpb.MsgStorageApply, m.Type) + require.Len(t, m.Entries, 1) + applyResps = append(applyResps, m.Responses[0]) + + // Acknowledged second and third entry application. + for _, resp := range applyResps { + require.NoError(t, n.Step(ctx, resp)) + } + applyResps = nil +} + type ignoreSizeHintMemStorage struct { *MemoryStorage } diff --git a/raft.go b/raft.go index 58eccb172084..d43e917bf9c9 100644 --- a/raft.go +++ b/raft.go @@ -194,7 +194,11 @@ type Config struct { // 0 for at most one entry per message. MaxSizePerMsg uint64 // MaxCommittedSizePerReady limits the size of the committed entries which - // can be applied. + // can be applying at the same time. + // + // Despite its name (preserved for compatibility), this quota applies across + // Ready structs to encompass all outstanding entries in unacknowledged + // MsgStorageApply messages when AsyncStorageWrites is enabled. MaxCommittedSizePerReady uint64 // MaxUncommittedEntriesSize limits the aggregate byte size of the // uncommitted entries that may be appended to a leader's log. Once this @@ -316,8 +320,8 @@ type raft struct { // the log raftLog *raftLog - maxMsgSize uint64 - maxUncommittedSize uint64 + maxMsgSize entryEncodingSize + maxUncommittedSize entryPayloadSize // TODO(tbg): rename to trk. prs tracker.ProgressTracker @@ -358,7 +362,7 @@ type raft struct { // an estimate of the size of the uncommitted tail of the Raft log. Used to // prevent unbounded log growth. Only maintained by the leader. Reset on // term changes. - uncommittedSize uint64 + uncommittedSize entryPayloadSize readOnly *readOnly @@ -399,7 +403,7 @@ func newRaft(c *Config) *raft { if err := c.validate(); err != nil { panic(err.Error()) } - raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) + raftlog := newLogWithSize(c.Storage, c.Logger, entryEncodingSize(c.MaxCommittedSizePerReady)) hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell) @@ -410,8 +414,8 @@ func newRaft(c *Config) *raft { lead: None, isLearner: false, raftLog: raftlog, - maxMsgSize: c.MaxSizePerMsg, - maxUncommittedSize: c.MaxUncommittedEntriesSize, + maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), + maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize), prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, @@ -435,7 +439,7 @@ func newRaft(c *Config) *raft { r.loadState(hs) } if c.Applied > 0 { - raftlog.appliedTo(c.Applied) + raftlog.appliedTo(c.Applied, 0 /* size */) } r.becomeFollower(r.Term, None) @@ -613,7 +617,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // Send the actual MsgApp otherwise, and update the progress accordingly. next := pr.Next // save Next for later, as the progress update can change it - if err := pr.UpdateOnEntriesSend(len(ents), payloadsSize(ents), next); err != nil { + if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), next); err != nil { r.logger.Panicf("%x: %v", r.id, err) } r.send(pb.Message{ @@ -676,10 +680,10 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { }) } -func (r *raft) appliedTo(index uint64) { +func (r *raft) appliedTo(index uint64, size entryEncodingSize) { oldApplied := r.raftLog.applied newApplied := max(index, oldApplied) - r.raftLog.appliedTo(newApplied) + r.raftLog.appliedTo(newApplied, size) if r.prs.Config.AutoLeave && newApplied >= r.pendingConfIndex && r.state == StateLeader { // If the current (and most recent, at least for this leader's term) @@ -708,7 +712,7 @@ func (r *raft) appliedTo(index uint64) { func (r *raft) appliedSnap(snap *pb.Snapshot) { index := snap.Metadata.Index r.raftLog.stableSnapTo(index) - r.appliedTo(index) + r.appliedTo(index, 0 /* size */) } // maybeCommit attempts to advance the commit index. Returns true if @@ -897,7 +901,7 @@ func (r *raft) becomeLeader() { // uncommitted log quota. This is because we want to preserve the // behavior of allowing one entry larger than quota if the current // usage is zero. - r.uncommittedSize = 0 + r.reduceUncommittedSize(payloadSize(emptyEnt)) r.logger.Infof("%x became leader at term %d", r.id, r.Term) } @@ -1091,10 +1095,11 @@ func (r *raft) Step(m pb.Message) error { } case pb.MsgStorageApplyResp: - r.appliedTo(m.Index) - // NOTE: we abuse the LogTerm field to store the aggregate entry size so - // that we don't need to introduce a new field on Message. - r.reduceUncommittedSize(m.LogTerm) + if len(m.Entries) > 0 { + index := m.Entries[len(m.Entries)-1].Index + r.appliedTo(index, entsSize(m.Entries)) + r.reduceUncommittedSize(payloadsSize(m.Entries)) + } case pb.MsgVote, pb.MsgPreVote: // We can vote if this is a repeat of a vote we've already cast... @@ -1948,18 +1953,9 @@ func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { return true } -// getUncommittedSize computes the aggregate size of the provided entries. -func (r *raft) getUncommittedSize(ents []pb.Entry) uint64 { - if r.uncommittedSize == 0 { - // Fast-path for followers, who do not track or enforce the limit. - return 0 - } - return payloadsSize(ents) -} - // reduceUncommittedSize accounts for the newly committed entries by decreasing // the uncommitted entry size limit. -func (r *raft) reduceUncommittedSize(s uint64) { +func (r *raft) reduceUncommittedSize(s entryPayloadSize) { if s > r.uncommittedSize { // uncommittedSize may underestimate the size of the uncommitted Raft // log tail but will never overestimate it. Saturate at 0 instead of @@ -1970,14 +1966,6 @@ func (r *raft) reduceUncommittedSize(s uint64) { } } -func payloadsSize(ents []pb.Entry) uint64 { - var s uint64 - for _, e := range ents { - s += uint64(PayloadSize(e)) - } - return s -} - func numOfPendingConf(ents []pb.Entry) int { n := 0 for i := range ents { diff --git a/raft_paper_test.go b/raft_paper_test.go index 585168d0157b..d7e9949c2389 100644 --- a/raft_paper_test.go +++ b/raft_paper_test.go @@ -922,7 +922,7 @@ func commitNoopEntry(r *raft, s *MemoryStorage) { // ignore further messages to refresh followers' commit index r.readMessages() s.Append(r.raftLog.nextUnstableEnts()) - r.raftLog.appliedTo(r.raftLog.committed) + r.raftLog.appliedTo(r.raftLog.committed, 0 /* size */) r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) } diff --git a/raft_test.go b/raft_test.go index f59791686d11..6abda3cfe016 100644 --- a/raft_test.go +++ b/raft_test.go @@ -38,7 +38,7 @@ func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { // Return committed entries. ents = r.raftLog.nextCommittedEnts(true) - r.raftLog.appliedTo(r.raftLog.committed) + r.raftLog.appliedTo(r.raftLog.committed, 0 /* size */) return ents } @@ -240,9 +240,9 @@ func TestUncommittedEntryLimit(t *testing.T) { // writing, the former). const maxEntries = 1024 testEntry := pb.Entry{Data: []byte("testdata")} - maxEntrySize := maxEntries * PayloadSize(testEntry) + maxEntrySize := maxEntries * payloadSize(testEntry) - if n := PayloadSize(pb.Entry{Data: nil}); n != 0 { + if n := payloadSize(pb.Entry{Data: nil}); n != 0 { t.Fatal("entry with no Data must have zero payload size") } @@ -283,7 +283,7 @@ func TestUncommittedEntryLimit(t *testing.T) { if e := maxEntries * numFollowers; len(ms) != e { t.Fatalf("expected %d messages, got %d", e, len(ms)) } - r.reduceUncommittedSize(r.getUncommittedSize(propEnts)) + r.reduceUncommittedSize(payloadsSize(propEnts)) if r.uncommittedSize != 0 { t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize) } @@ -319,7 +319,7 @@ func TestUncommittedEntryLimit(t *testing.T) { if e := 2 * numFollowers; len(ms) != e { t.Fatalf("expected %d messages, got %d", e, len(ms)) } - r.reduceUncommittedSize(r.getUncommittedSize(propEnts)) + r.reduceUncommittedSize(payloadsSize(propEnts)) if n := r.uncommittedSize; n != 0 { t.Fatalf("expected zero uncommitted size, got %d", n) } diff --git a/rawnode.go b/rawnode.go index 675fa5bc8d73..1e88a8011734 100644 --- a/rawnode.go +++ b/rawnode.go @@ -140,7 +140,7 @@ func (rn *RawNode) readyWithoutAccept() Ready { rd := Ready{ Entries: r.raftLog.nextUnstableEnts(), - CommittedEntries: r.raftLog.nextCommittedEnts(!rn.asyncStorageWrites), + CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()), Messages: r.msgs, } if softSt := r.softState(); !softSt.equal(rn.prevSoftSt) { @@ -368,14 +368,12 @@ func needStorageApplyRespMsg(rd Ready) bool { return needStorageApplyMsg(rd) } // message is processed. Used with AsyncStorageWrites. func newStorageApplyMsg(r *raft, rd Ready) pb.Message { ents := rd.CommittedEntries - last := ents[len(ents)-1].Index return pb.Message{ Type: pb.MsgStorageApply, To: LocalApplyThread, From: r.id, Term: 0, // committed entries don't apply under a specific term Entries: ents, - Index: last, Responses: []pb.Message{ newStorageApplyRespMsg(r, ents), }, @@ -385,18 +383,13 @@ func newStorageApplyMsg(r *raft, rd Ready) pb.Message { // newStorageApplyRespMsg creates the message that should be returned to node // after the committed entries in the current Ready (along with those in all // prior Ready structs) have been applied to the local state machine. -func newStorageApplyRespMsg(r *raft, committedEnts []pb.Entry) pb.Message { - last := committedEnts[len(committedEnts)-1].Index - size := r.getUncommittedSize(committedEnts) +func newStorageApplyRespMsg(r *raft, ents []pb.Entry) pb.Message { return pb.Message{ - Type: pb.MsgStorageApplyResp, - To: r.id, - From: LocalApplyThread, - Term: 0, // committed entries don't apply under a specific term - Index: last, - // NOTE: we abuse the LogTerm field to store the aggregate entry size so - // that we don't need to introduce a new field on Message. - LogTerm: size, + Type: pb.MsgStorageApplyResp, + To: r.id, + From: LocalApplyThread, + Term: 0, // committed entries don't apply under a specific term + Entries: ents, } } @@ -436,10 +429,18 @@ func (rn *RawNode) acceptReady(rd Ready) { rn.raft.raftLog.acceptUnstable() if len(rd.CommittedEntries) > 0 { ents := rd.CommittedEntries - rn.raft.raftLog.acceptApplying(ents[len(ents)-1].Index) + index := ents[len(ents)-1].Index + rn.raft.raftLog.acceptApplying(index, entsSize(ents), rn.applyUnstableEntries()) } } +// applyUnstableEntries returns whether entries are allowed to be applied once +// they are known to be committed but before they have been written locally to +// stable storage. +func (rn *RawNode) applyUnstableEntries() bool { + return !rn.asyncStorageWrites +} + // HasReady called when RawNode user need to check if any Ready pending. func (rn *RawNode) HasReady() bool { // TODO(nvanbenschoten): order these cases in terms of cost and frequency. @@ -456,7 +457,7 @@ func (rn *RawNode) HasReady() bool { if len(r.msgs) > 0 || len(r.msgsAfterAppend) > 0 { return true } - if r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(!rn.asyncStorageWrites) { + if r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(rn.applyUnstableEntries()) { return true } if len(r.readStates) != 0 { diff --git a/rawnode_test.go b/rawnode_test.go index ceaed813b3c8..3e146f8b457c 100644 --- a/rawnode_test.go +++ b/rawnode_test.go @@ -979,12 +979,12 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { const maxEntries = 16 data := []byte("testdata") testEntry := pb.Entry{Data: data} - maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) + maxEntrySize := maxEntries * payloadSize(testEntry) t.Log("maxEntrySize", maxEntrySize) s := newTestMemoryStorage(withPeers(1)) cfg := newTestConfig(1, 10, 1, s) - cfg.MaxUncommittedEntriesSize = maxEntrySize + cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize) rawNode, err := NewRawNode(cfg) if err != nil { t.Fatal(err) @@ -1010,7 +1010,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { // Check the size of leader's uncommitted log tail. It should not exceed the // MaxUncommittedEntriesSize limit. - checkUncommitted := func(exp uint64) { + checkUncommitted := func(exp entryPayloadSize) { t.Helper() if a := rawNode.raft.uncommittedSize; exp != a { t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a) diff --git a/storage.go b/storage.go index ecccb71b44c6..b781dfa88eb9 100644 --- a/storage.go +++ b/storage.go @@ -131,7 +131,7 @@ func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { } ents := ms.ents[lo-offset : hi-offset] - return limitSize(ents, maxSize), nil + return limitSize(ents, entryEncodingSize(maxSize)), nil } // Term implements the Storage interface. diff --git a/testdata/async_storage_writes.txt b/testdata/async_storage_writes.txt index bd48ec6e3bae..f6a7ef8fc71d 100644 --- a/testdata/async_storage_writes.txt +++ b/testdata/async_storage_writes.txt @@ -139,7 +139,7 @@ stabilize 1->2 MsgApp Term:1 Log:1/11 Commit:11 1->3 MsgApp Term:1 Log:1/11 Commit:11 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 - 1->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/11] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""]] > 2 receiving messages 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 3 receiving messages @@ -150,9 +150,9 @@ stabilize Responses: > 1 processing apply thread Processing: - 1->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses: - ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/11 + ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] > 2 handling Ready Ready MustSync=false: HardState Term:1 Vote:1 Commit:11 @@ -160,7 +160,7 @@ stabilize 1/11 EntryNormal "" Messages: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 Responses:[2->1 MsgAppResp Term:1 Log:0/11] - 2->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/11] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""]] > 3 handling Ready Ready MustSync=false: HardState Term:1 Vote:1 Commit:11 @@ -168,9 +168,9 @@ stabilize 1/11 EntryNormal "" Messages: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 Responses:[3->1 MsgAppResp Term:1 Log:0/11] - 3->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/11] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""]] > 1 receiving messages - ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/11 + ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] > 2 processing append thread Processing: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:11 @@ -183,21 +183,21 @@ stabilize 3->1 MsgAppResp Term:1 Log:0/11 > 2 processing apply thread Processing: - 2->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses: - ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/11 + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] > 3 processing apply thread Processing: - 3->ApplyThread MsgStorageApply Term:0 Log:0/11 Entries:[1/11 EntryNormal ""] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses: - ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/11 + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/11 3->1 MsgAppResp Term:1 Log:0/11 > 2 receiving messages - ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/11 + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] > 3 receiving messages - ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/11 + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] propose 1 prop_1 ---- @@ -330,7 +330,7 @@ process-ready 1 2 3 1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] 1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] HardState: Term:1 Vote:1 Commit:12 Responses:[1->1 MsgAppResp Term:1 Log:0/14, AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14] - 1->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/12] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"]] > 2 handling Ready > 3 handling Ready @@ -356,7 +356,7 @@ process-ready 1 2 3 1/12 EntryNormal "prop_1" Messages: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] HardState: Term:1 Vote:1 Commit:12 Responses:[2->1 MsgAppResp Term:1 Log:0/13, 2->1 MsgAppResp Term:1 Log:0/14, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14] - 2->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/12] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"]] > 3 handling Ready Ready MustSync=true: HardState Term:1 Vote:1 Commit:12 @@ -366,7 +366,7 @@ process-ready 1 2 3 1/12 EntryNormal "prop_1" Messages: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] HardState: Term:1 Vote:1 Commit:12 Responses:[3->1 MsgAppResp Term:1 Log:0/13, 3->1 MsgAppResp Term:1 Log:0/14, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14] - 3->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/12] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"]] process-append-thread 1 2 3 ---- @@ -417,7 +417,7 @@ process-ready 1 2 3 1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] 1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] HardState: Term:1 Vote:1 Commit:13 Responses:[1->1 MsgAppResp Term:1 Log:0/15, AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15] - 1->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/13] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"]] > 2 handling Ready > 3 handling Ready @@ -443,7 +443,7 @@ process-ready 1 2 3 1/13 EntryNormal "prop_2" Messages: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] HardState: Term:1 Vote:1 Commit:13 Responses:[2->1 MsgAppResp Term:1 Log:0/14, 2->1 MsgAppResp Term:1 Log:0/15, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15] - 2->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/13] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"]] > 3 handling Ready Ready MustSync=true: HardState Term:1 Vote:1 Commit:13 @@ -453,7 +453,7 @@ process-ready 1 2 3 1/13 EntryNormal "prop_2" Messages: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] HardState: Term:1 Vote:1 Commit:13 Responses:[3->1 MsgAppResp Term:1 Log:0/14, 3->1 MsgAppResp Term:1 Log:0/15, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15] - 3->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/13] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"]] process-append-thread 1 2 3 ---- @@ -482,19 +482,19 @@ process-apply-thread 1 2 3 ---- > 1 processing apply thread Processing: - 1->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses: - ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/12 + ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] > 2 processing apply thread Processing: - 2->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses: - ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/12 + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] > 3 processing apply thread Processing: - 3->ApplyThread MsgStorageApply Term:0 Log:0/12 Entries:[1/12 EntryNormal "prop_1"] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses: - ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/12 + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] deliver-msgs 1 2 3 ---- @@ -504,11 +504,11 @@ AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14 2->1 MsgAppResp Term:1 Log:0/14 3->1 MsgAppResp Term:1 Log:0/13 3->1 MsgAppResp Term:1 Log:0/14 -ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/12 +ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14 -ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/12 +ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14 -ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/12 +ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] process-ready 1 2 3 ---- @@ -521,7 +521,7 @@ process-ready 1 2 3 1->2 MsgApp Term:1 Log:1/15 Commit:14 1->3 MsgApp Term:1 Log:1/15 Commit:14 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:14 Responses:[AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15] - 1->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/14] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"]] > 2 handling Ready > 3 handling Ready @@ -543,7 +543,7 @@ process-ready 1 2 3 1/14 EntryNormal "prop_3" Messages: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:14 Responses:[2->1 MsgAppResp Term:1 Log:0/15, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15] - 2->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/14] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"]] > 3 handling Ready Ready MustSync=false: HardState Term:1 Vote:1 Commit:14 @@ -551,7 +551,7 @@ process-ready 1 2 3 1/14 EntryNormal "prop_3" Messages: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:14 Responses:[3->1 MsgAppResp Term:1 Log:0/15, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15] - 3->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/14] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"]] process-append-thread 1 2 3 ---- @@ -580,19 +580,19 @@ process-apply-thread 1 2 3 ---- > 1 processing apply thread Processing: - 1->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses: - ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/13 + ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] > 2 processing apply thread Processing: - 2->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses: - ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/13 + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] > 3 processing apply thread Processing: - 3->ApplyThread MsgStorageApply Term:0 Log:0/13 Entries:[1/13 EntryNormal "prop_2"] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses: - ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/13 + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] deliver-msgs 1 2 3 ---- @@ -602,11 +602,11 @@ AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 2->1 MsgAppResp Term:1 Log:0/15 3->1 MsgAppResp Term:1 Log:0/14 3->1 MsgAppResp Term:1 Log:0/15 -ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/13 +ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 -ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/13 +ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 -ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/13 +ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] process-ready 1 2 3 ---- @@ -619,7 +619,7 @@ process-ready 1 2 3 1->2 MsgApp Term:1 Log:1/15 Commit:15 1->3 MsgApp Term:1 Log:1/15 Commit:15 1->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:15 - 1->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/15] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"]] > 2 handling Ready > 3 handling Ready @@ -641,7 +641,7 @@ process-ready 1 2 3 1/15 EntryNormal "prop_4" Messages: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:15 Responses:[2->1 MsgAppResp Term:1 Log:0/15] - 2->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/15] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"]] > 3 handling Ready Ready MustSync=false: HardState Term:1 Vote:1 Commit:15 @@ -649,7 +649,7 @@ process-ready 1 2 3 1/15 EntryNormal "prop_4" Messages: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 HardState: Term:1 Vote:1 Commit:15 Responses:[3->1 MsgAppResp Term:1 Log:0/15] - 3->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/15] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"]] process-append-thread 2 3 ---- @@ -670,31 +670,31 @@ process-apply-thread 1 2 3 ---- > 1 processing apply thread Processing: - 1->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses: - ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/14 + ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] > 2 processing apply thread Processing: - 2->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses: - ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/14 + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] > 3 processing apply thread Processing: - 3->ApplyThread MsgStorageApply Term:0 Log:0/14 Entries:[1/14 EntryNormal "prop_3"] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses: - ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/14 + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] deliver-msgs 1 2 3 ---- 2->1 MsgAppResp Term:1 Log:0/15 3->1 MsgAppResp Term:1 Log:0/15 -ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/14 +ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 INFO entry at index 15 missing from unstable log; ignoring -ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/14 +ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 INFO entry at index 15 missing from unstable log; ignoring -ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/14 +ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] process-ready 1 2 3 ---- @@ -722,27 +722,27 @@ process-apply-thread 1 2 3 ---- > 1 processing apply thread Processing: - 1->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] + 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses: - ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/15 + ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] > 2 processing apply thread Processing: - 2->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] + 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses: - ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/15 + ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] > 3 processing apply thread Processing: - 3->ApplyThread MsgStorageApply Term:0 Log:0/15 Entries:[1/15 EntryNormal "prop_4"] + 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses: - ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/15 + ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] deliver-msgs 1 2 3 ---- 2->1 MsgAppResp Term:1 Log:0/15 3->1 MsgAppResp Term:1 Log:0/15 -ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/15 -ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/15 -ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/15 +ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] +ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] +ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] process-ready 1 2 3 ---- diff --git a/util.go b/util.go index d5ed30b375a4..1bec53578800 100644 --- a/util.go +++ b/util.go @@ -213,12 +213,6 @@ func describeTarget(id uint64) string { } } -// PayloadSize is the size of the payload of this Entry. Notably, it does not -// depend on its Index or Term. -func PayloadSize(e pb.Entry) int { - return len(e.Data) -} - // DescribeEntry returns a concise human-readable description of an // Entry for debugging. func DescribeEntry(e pb.Entry, f EntryFormatter) string { @@ -267,7 +261,19 @@ func DescribeEntries(ents []pb.Entry, f EntryFormatter) string { return buf.String() } -func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry { +// entryEncodingSize represents the protocol buffer encoding size of one or more +// entries. +type entryEncodingSize uint64 + +func entsSize(ents []pb.Entry) entryEncodingSize { + var size entryEncodingSize + for _, ent := range ents { + size += entryEncodingSize(ent.Size()) + } + return size +} + +func limitSize(ents []pb.Entry, maxSize entryEncodingSize) []pb.Entry { if len(ents) == 0 { return ents } @@ -275,13 +281,33 @@ func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry { var limit int for limit = 1; limit < len(ents); limit++ { size += ents[limit].Size() - if uint64(size) > maxSize { + if entryEncodingSize(size) > maxSize { break } } return ents[:limit] } +// entryPayloadSize represents the size of one or more entries' payloads. +// Notably, it does not depend on its Index or Term. Entries with empty +// payloads, like those proposed after a leadership change, are considered +// to be zero size. +type entryPayloadSize uint64 + +// payloadSize is the size of the payload of the provided entry. +func payloadSize(e pb.Entry) entryPayloadSize { + return entryPayloadSize(len(e.Data)) +} + +// payloadsSize is the size of the payloads of the provided entries. +func payloadsSize(ents []pb.Entry) entryPayloadSize { + var s entryPayloadSize + for _, e := range ents { + s += payloadSize(e) + } + return s +} + func assertConfStatesEquivalent(l Logger, cs1, cs2 pb.ConfState) { err := cs1.Equivalent(cs2) if err == nil { diff --git a/util_test.go b/util_test.go index e711ec1696c8..fc7ddfbd5263 100644 --- a/util_test.go +++ b/util_test.go @@ -60,7 +60,7 @@ func TestLimitSize(t *testing.T) { for _, tt := range tests { t.Run("", func(t *testing.T) { - require.Equal(t, tt.wentries, limitSize(ents, tt.maxsize)) + require.Equal(t, tt.wentries, limitSize(ents, entryEncodingSize(tt.maxsize))) }) } }