From 313e96ce1b1dddf83751da65b4a08b1f4252712a Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 8 Mar 2023 13:34:34 +0000 Subject: [PATCH 1/5] raft: factor out unapplied config changes check Signed-off-by: Pavel Kalinnikov --- raft.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/raft.go b/raft.go index 1dc8b1ed5bc6..ae7f962f037c 100644 --- a/raft.go +++ b/raft.go @@ -913,17 +913,24 @@ func (r *raft) hup(t CampaignType) { r.logger.Warningf("%x is unpromotable and can not campaign", r.id) return } + if r.hasUnappliedConfChanges() { + r.logger.Warningf("%x cannot campaign at term %d since there are still pending configuration changes to apply", r.id, r.Term) + return + } + + r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) + r.campaign(t) +} + +func (r *raft) hasUnappliedConfChanges() bool { ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) if err != nil { r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) } if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { - r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n) - return + return true } - - r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) - r.campaign(t) + return false } // campaign transitions the raft instance to candidate state. This must only be From a22dbb1e4ab94e4ad286e0597a8e35b1cfe052ed Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 8 Mar 2023 13:37:51 +0000 Subject: [PATCH 2/5] raft: don't scan unapplied entries if applied==committed Signed-off-by: Pavel Kalinnikov --- raft.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/raft.go b/raft.go index ae7f962f037c..b1a9504d8537 100644 --- a/raft.go +++ b/raft.go @@ -923,14 +923,14 @@ func (r *raft) hup(t CampaignType) { } func (r *raft) hasUnappliedConfChanges() bool { + if r.raftLog.applied >= r.raftLog.committed { // in fact applied == committed + return false + } ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) if err != nil { r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) } - if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { - return true - } - return false + return numOfPendingConf(ents) != 0 } // campaign transitions the raft instance to candidate state. This must only be From 2cb76edb5d584cba21476b21b2d9e9b3dcda9754 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 8 Mar 2023 13:40:14 +0000 Subject: [PATCH 3/5] raft: inline numOfPendingConf() != 0 check Signed-off-by: Pavel Kalinnikov --- raft.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/raft.go b/raft.go index b1a9504d8537..8c91b3528330 100644 --- a/raft.go +++ b/raft.go @@ -930,7 +930,12 @@ func (r *raft) hasUnappliedConfChanges() bool { if err != nil { r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) } - return numOfPendingConf(ents) != 0 + for i := range ents { + if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 { + return true + } + } + return false } // campaign transitions the raft instance to candidate state. This must only be @@ -1992,16 +1997,6 @@ func (r *raft) reduceUncommittedSize(s entryPayloadSize) { } } -func numOfPendingConf(ents []pb.Entry) int { - n := 0 - for i := range ents { - if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 { - n++ - } - } - return n -} - func releasePendingReadIndexMessages(r *raft) { if len(r.pendingReadIndexMessages) == 0 { // Fast path for the common case to avoid a call to storage.LastIndex() From d980ecd480cfb3501f35e88750267580154fa6f9 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 8 Mar 2023 14:09:41 +0000 Subject: [PATCH 4/5] raft: paginate the unapplied config changes scan Signed-off-by: Pavel Kalinnikov --- raft.go | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/raft.go b/raft.go index 8c91b3528330..4306a16033a5 100644 --- a/raft.go +++ b/raft.go @@ -926,14 +926,27 @@ func (r *raft) hasUnappliedConfChanges() bool { if r.raftLog.applied >= r.raftLog.committed { // in fact applied == committed return false } - ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) - if err != nil { - r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) - } - for i := range ents { - if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 { - return true + // Scan all unapplied committed entries to find a config change. Paginate the + // scan, to avoid a potentially unlimited memory spike. + // + // TODO(pavelkalinnikov): this scan does not need to happen each time. It's + // possible to scan once to bootstrap, and then maintain the "has unapplied + // config changes" predicate throughout the lifetime of this instance, as all + // new log entries pass through "for free". + const maxSize = entryEncodingSize(16 << 20) // 16 MiB + for idx, end := r.raftLog.applied+1, r.raftLog.committed+1; idx < end; { + ents, err := r.raftLog.slice(idx, end, maxSize) + if err != nil { + r.logger.Panicf("unexpected error getting unapplied entries from %d: %v", idx, err) + } else if len(ents) == 0 { + r.logger.Panicf("could not read unapplied entries in [%d, %d)", idx, end) + } + for i := range ents { + if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 { + return true + } } + idx += uint64(len(ents)) } return false } From 1df762940b8c309a27cfafb086d767c0c7e3f58f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 7 Jun 2023 09:22:56 +0100 Subject: [PATCH 5/5] raft: add raftLog.scan method for paginated reads Signed-off-by: Pavel Kalinnikov --- log.go | 26 ++++++++++++++++++++++++ log_test.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++ raft.go | 34 ++++++++++++++++--------------- 3 files changed, 102 insertions(+), 16 deletions(-) diff --git a/log.go b/log.go index 177d0c1f76a5..84826c3a0b5f 100644 --- a/log.go +++ b/log.go @@ -461,6 +461,32 @@ func (l *raftLog) restore(s pb.Snapshot) { l.unstable.restore(s) } +// scan visits all log entries in the [lo, hi) range, returning them via the +// given callback. The callback can be invoked multiple times, with consecutive +// sub-ranges of the requested range. Returns up to pageSize bytes worth of +// entries at a time. May return more if a single entry size exceeds the limit. +// +// The entries in [lo, hi) must exist, otherwise scan() eventually returns an +// error (possibly after passing some entries through the callback). +// +// If the callback returns an error, scan terminates and returns this error +// immediately. This can be used to stop the scan early ("break" the loop). +func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.Entry) error) error { + for lo < hi { + ents, err := l.slice(lo, hi, pageSize) + if err != nil { + return err + } else if len(ents) == 0 { + return fmt.Errorf("got 0 entries in [%d, %d)", lo, hi) + } + if err := v(ents); err != nil { + return err + } + lo += uint64(len(ents)) + } + return nil +} + // slice returns a slice of log entries from lo through hi-1, inclusive. func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { if err := l.mustCheckOutOfBounds(lo, hi); err != nil { diff --git a/log_test.go b/log_test.go index fe79865d811a..2711ff9ca639 100644 --- a/log_test.go +++ b/log_test.go @@ -980,6 +980,64 @@ func TestSlice(t *testing.T) { } } +func TestScan(t *testing.T) { + offset := uint64(47) + num := uint64(20) + last := offset + num + half := offset + num/2 + entries := func(from, to uint64) []pb.Entry { + res := make([]pb.Entry, 0, to-from) + for i := from; i < to; i++ { + res = append(res, pb.Entry{Index: i, Term: i}) + } + return res + } + entrySize := entsSize(entries(half, half+1)) + + storage := NewMemoryStorage() + require.NoError(t, storage.ApplySnapshot(pb.Snapshot{ + Metadata: pb.SnapshotMetadata{Index: offset}})) + require.NoError(t, storage.Append(entries(offset+1, half))) + l := newLog(storage, raftLogger) + l.append(entries(half, last)...) + + // Test that scan() returns the same entries as slice(), on all inputs. + for _, pageSize := range []entryEncodingSize{0, 1, 10, 100, entrySize, entrySize + 1} { + for lo := offset + 1; lo < last; lo++ { + for hi := lo; hi <= last; hi++ { + var got []pb.Entry + require.NoError(t, l.scan(lo, hi, pageSize, func(e []pb.Entry) error { + got = append(got, e...) + require.True(t, len(e) == 1 || entsSize(e) <= pageSize) + return nil + })) + want, err := l.slice(lo, hi, noLimit) + require.NoError(t, err) + require.Equal(t, want, got, "scan() and slice() mismatch on [%d, %d) @ %d", lo, hi, pageSize) + } + } + } + + // Test that the callback error is propagated to the caller. + iters := 0 + require.ErrorIs(t, errBreak, l.scan(offset+1, half, 0, func([]pb.Entry) error { + iters++ + if iters == 2 { + return errBreak + } + return nil + })) + require.Equal(t, 2, iters) + + // Test that we max out the limit, and not just always return a single entry. + // NB: this test works only because the requested range length is even. + require.NoError(t, l.scan(offset+1, offset+11, entrySize*2, func(ents []pb.Entry) error { + require.Len(t, ents, 2) + require.Equal(t, entrySize*2, entsSize(ents)) + return nil + })) +} + func mustTerm(term uint64, err error) uint64 { if err != nil { panic(err) diff --git a/raft.go b/raft.go index 4306a16033a5..535889ea79c8 100644 --- a/raft.go +++ b/raft.go @@ -922,33 +922,35 @@ func (r *raft) hup(t CampaignType) { r.campaign(t) } +// errBreak is a sentinel error used to break a callback-based loop. +var errBreak = errors.New("break") + func (r *raft) hasUnappliedConfChanges() bool { if r.raftLog.applied >= r.raftLog.committed { // in fact applied == committed return false } + found := false // Scan all unapplied committed entries to find a config change. Paginate the // scan, to avoid a potentially unlimited memory spike. - // - // TODO(pavelkalinnikov): this scan does not need to happen each time. It's - // possible to scan once to bootstrap, and then maintain the "has unapplied - // config changes" predicate throughout the lifetime of this instance, as all - // new log entries pass through "for free". - const maxSize = entryEncodingSize(16 << 20) // 16 MiB - for idx, end := r.raftLog.applied+1, r.raftLog.committed+1; idx < end; { - ents, err := r.raftLog.slice(idx, end, maxSize) - if err != nil { - r.logger.Panicf("unexpected error getting unapplied entries from %d: %v", idx, err) - } else if len(ents) == 0 { - r.logger.Panicf("could not read unapplied entries in [%d, %d)", idx, end) - } + lo, hi := r.raftLog.applied+1, r.raftLog.committed+1 + // Reuse the maxApplyingEntsSize limit because it is used for similar purposes + // (limiting the read of unapplied committed entries) when raft sends entries + // via the Ready struct for application. + // TODO(pavelkalinnikov): find a way to budget memory/bandwidth for this scan + // outside the raft package. + pageSize := r.raftLog.maxApplyingEntsSize + if err := r.raftLog.scan(lo, hi, pageSize, func(ents []pb.Entry) error { for i := range ents { if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 { - return true + found = true + return errBreak } } - idx += uint64(len(ents)) + return nil + }); err != nil && err != errBreak { + r.logger.Panicf("error scanning unapplied entries [%d, %d): %v", lo, hi, err) } - return false + return found } // campaign transitions the raft instance to candidate state. This must only be