From 19b79e03763080848cef6ba403c5afce7045572f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 18 Nov 2022 16:17:48 +0000 Subject: [PATCH 1/4] raftlog: minimize Iterator/Visit Remove unused context; remove unnecessary Index comparison, guarantees are given by the MVCC iterator option; squash a few lines. Release note: None --- pkg/kv/kvserver/loqrecovery/collect.go | 2 +- pkg/kv/kvserver/raftlog/iter_bench_test.go | 15 +++++-------- pkg/kv/kvserver/raftlog/iterator.go | 26 ++++------------------ 3 files changed, 11 insertions(+), 32 deletions(-) diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 6c6356729213..994e2c9469fc 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -82,7 +82,7 @@ func GetDescriptorChangesFromRaftLog( rangeID roachpb.RangeID, lo, hi uint64, reader storage.Reader, ) ([]loqrecoverypb.DescriptorChangeInfo, error) { var changes []loqrecoverypb.DescriptorChangeInfo - if err := raftlog.Visit(context.Background(), rangeID, reader, lo, hi, func(ctx context.Context, e *raftlog.Entry) error { + if err := raftlog.Visit(rangeID, reader, lo, hi, func(e *raftlog.Entry) error { raftCmd := e.Cmd switch { case raftCmd.ReplicatedEvalResult.Split != nil: diff --git a/pkg/kv/kvserver/raftlog/iter_bench_test.go b/pkg/kv/kvserver/raftlog/iter_bench_test.go index 92e5c0244ead..78df79de51eb 100644 --- a/pkg/kv/kvserver/raftlog/iter_bench_test.go +++ b/pkg/kv/kvserver/raftlog/iter_bench_test.go @@ -11,7 +11,6 @@ package raftlog import ( - "context" "math" "math/rand" "testing" @@ -136,6 +135,7 @@ func BenchmarkIterator(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { it := NewIterator(rangeID, &mockReader{}, IterOptions{Hi: 123456}) + setMockIter(it) it.Close() } }) @@ -169,8 +169,7 @@ func BenchmarkIterator(b *testing.B) { b.Run("NextPooled", func(b *testing.B) { benchForOp(b, func(it *Iterator) (bool, error) { - ok, err := it.Next() - if err != nil || !ok { + if ok, err := it.Next(); err != nil || !ok { return false, err } ent := it.Entry() @@ -192,12 +191,10 @@ func BenchmarkVisit(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - err := Visit(context.Background(), rangeID, eng, 0, math.MaxUint64, - func(ctx context.Context, entry *Entry) error { - entry.Release() - return nil - }) - if err != nil { + if err := Visit(rangeID, eng, 0, math.MaxUint64, func(entry *Entry) error { + entry.Release() + return nil + }); err != nil { b.Fatal(err) } } diff --git a/pkg/kv/kvserver/raftlog/iterator.go b/pkg/kv/kvserver/raftlog/iterator.go index 6c4736b7516b..b483d6bd6fbf 100644 --- a/pkg/kv/kvserver/raftlog/iterator.go +++ b/pkg/kv/kvserver/raftlog/iterator.go @@ -11,13 +11,10 @@ package raftlog import ( - "context" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/iterutil" - "github.com/cockroachdb/errors" ) type storageIter interface { @@ -89,9 +86,7 @@ func NewIterator(rangeID roachpb.RangeID, eng Reader, opts IterOptions) *Iterato // Close releases the resources associated with this Iterator. func (it *Iterator) Close() { - if it.iter != nil { - it.iter.Close() - } + it.iter.Close() } func (it *Iterator) load() (bool, error) { @@ -146,13 +141,7 @@ func (it *Iterator) Entry() *Entry { // // The closure may invoke `(*Entry).Release` if it is no longer going to access // any memory in the current Entry. -func Visit( - ctx context.Context, - rangeID roachpb.RangeID, - eng Reader, - lo, hi uint64, - fn func(context.Context, *Entry) error, -) error { +func Visit(rangeID roachpb.RangeID, eng Reader, lo, hi uint64, fn func(*Entry) error) error { it := NewIterator(rangeID, eng, IterOptions{Hi: hi}) defer it.Close() ok, err := it.SeekGE(lo) @@ -160,15 +149,8 @@ func Visit( return err } for ; ok; ok, err = it.Next() { - ent := it.Entry() - if ent.Index >= hi { - return nil - } - if err := fn(ctx, ent); err != nil { - if errors.Is(err, iterutil.StopIteration()) { - return nil - } - return err + if err := fn(it.Entry()); err != nil { + return iterutil.Map(err) } } return err From 37326191e4bce7afe83ea6d9b4aaf35bc6bf4d6f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 18 Nov 2022 17:00:09 +0000 Subject: [PATCH 2/4] raftlog: return raftpb.Entry from Iterator The log storage in Replica iterates over raw raft.Entry (see iterateEntries). This commit makes raftlog.Iterator usable in that context, by making it return raftpb.Entry instead of the raftlog.Entry wrapper. Leave it to the caller to convert to raftlog.Entry if necessary. Release note: None --- pkg/kv/kvserver/loqrecovery/BUILD.bazel | 1 + pkg/kv/kvserver/loqrecovery/collect.go | 7 ++++- pkg/kv/kvserver/raftlog/entry.go | 16 ++++++++++ pkg/kv/kvserver/raftlog/iter_bench_test.go | 6 ++-- pkg/kv/kvserver/raftlog/iter_test.go | 8 ++--- pkg/kv/kvserver/raftlog/iterator.go | 34 ++++++++-------------- 6 files changed, 41 insertions(+), 31 deletions(-) diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index b3ea61a00bc0..76eb75c4592e 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", + "@io_etcd_go_etcd_raft_v3//raftpb", ], ) diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 994e2c9469fc..fa1b387e2ee8 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/errors" + "go.etcd.io/etcd/raft/v3/raftpb" ) // CollectReplicaInfo captures states of all replicas in all stores for the sake of quorum recovery. @@ -82,7 +83,11 @@ func GetDescriptorChangesFromRaftLog( rangeID roachpb.RangeID, lo, hi uint64, reader storage.Reader, ) ([]loqrecoverypb.DescriptorChangeInfo, error) { var changes []loqrecoverypb.DescriptorChangeInfo - if err := raftlog.Visit(rangeID, reader, lo, hi, func(e *raftlog.Entry) error { + if err := raftlog.Visit(reader, rangeID, lo, hi, func(ent raftpb.Entry) error { + e, err := raftlog.NewEntry(ent) + if err != nil { + return err + } raftCmd := e.Cmd switch { case raftCmd.ReplicatedEvalResult.Split != nil: diff --git a/pkg/kv/kvserver/raftlog/entry.go b/pkg/kv/kvserver/raftlog/entry.go index 315ce3ee0c9b..daa9d8a58089 100644 --- a/pkg/kv/kvserver/raftlog/entry.go +++ b/pkg/kv/kvserver/raftlog/entry.go @@ -76,6 +76,22 @@ func NewEntryFromRawValue(b []byte) (*Entry, error) { return e, nil } +// raftEntryFromRawValue decodes a raft.Entry from a raw MVCC value. +// +// Same as NewEntryFromRawValue, but doesn't decode the command and doesn't use +// the pool of entries. +func raftEntryFromRawValue(b []byte) (raftpb.Entry, error) { + var meta enginepb.MVCCMetadata + if err := protoutil.Unmarshal(b, &meta); err != nil { + return raftpb.Entry{}, errors.Wrap(err, "decoding raft log MVCCMetadata") + } + var entry raftpb.Entry + if err := storage.MakeValue(meta).GetProto(&entry); err != nil { + return raftpb.Entry{}, errors.Wrap(err, "unmarshalling raft Entry") + } + return entry, nil +} + func (e *Entry) load() error { if len(e.Data) == 0 { // Raft-proposed empty entry. diff --git a/pkg/kv/kvserver/raftlog/iter_bench_test.go b/pkg/kv/kvserver/raftlog/iter_bench_test.go index 78df79de51eb..dad05ea5cbd2 100644 --- a/pkg/kv/kvserver/raftlog/iter_bench_test.go +++ b/pkg/kv/kvserver/raftlog/iter_bench_test.go @@ -172,8 +172,7 @@ func BenchmarkIterator(b *testing.B) { if ok, err := it.Next(); err != nil || !ok { return false, err } - ent := it.Entry() - ent.Release() + _ = it.Entry() return true, nil }) }) @@ -191,8 +190,7 @@ func BenchmarkVisit(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - if err := Visit(rangeID, eng, 0, math.MaxUint64, func(entry *Entry) error { - entry.Release() + if err := Visit(eng, rangeID, 0, math.MaxUint64, func(entry raftpb.Entry) error { return nil }); err != nil { b.Fatal(err) diff --git a/pkg/kv/kvserver/raftlog/iter_test.go b/pkg/kv/kvserver/raftlog/iter_test.go index f5c6e8d1ee7d..b9b067d763e1 100644 --- a/pkg/kv/kvserver/raftlog/iter_test.go +++ b/pkg/kv/kvserver/raftlog/iter_test.go @@ -101,9 +101,9 @@ type modelIter struct { hi uint64 } -func (it *modelIter) load() (*Entry, error) { +func (it *modelIter) load() (raftpb.Entry, error) { // Only called when on valid position. - return NewEntry(it.ents[it.idx]) + return it.ents[it.idx], nil } func (it *modelIter) check() error { @@ -139,7 +139,7 @@ func (it *modelIter) Next() (bool, error) { return err == nil, err } -func (it *modelIter) Entry() *Entry { +func (it *modelIter) Entry() raftpb.Entry { e, err := it.load() if err != nil { panic(err) // bug in modelIter @@ -275,7 +275,7 @@ func TestIterator(t *testing.T) { type iter interface { SeekGE(idx uint64) (bool, error) Next() (bool, error) - Entry() *Entry + Entry() raftpb.Entry } func consumeIter(it iter, lo uint64) ([]uint64, error) { diff --git a/pkg/kv/kvserver/raftlog/iterator.go b/pkg/kv/kvserver/raftlog/iterator.go index b483d6bd6fbf..ed6dcaf8ca00 100644 --- a/pkg/kv/kvserver/raftlog/iterator.go +++ b/pkg/kv/kvserver/raftlog/iterator.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/iterutil" + "go.etcd.io/etcd/raft/v3/raftpb" ) type storageIter interface { @@ -52,12 +53,12 @@ type Iterator struct { // TODO(tbg): we're not reusing memory here. Since all of our allocs come // from protobuf marshaling, this is hard to avoid but we can do a little // better and at least avoid a few allocations. - entry *Entry + entry raftpb.Entry } // IterOptions are options to NewIterator. type IterOptions struct { - // Hi ensures the Iterator never seeks to any Entry with index >= Hi. This is + // Hi ensures the Iterator never seeks to any entry with index >= Hi. This is // useful when the caller is interested in a slice [Lo, Hi) of the raft log. Hi uint64 } @@ -90,44 +91,36 @@ func (it *Iterator) Close() { } func (it *Iterator) load() (bool, error) { - ok, err := it.iter.Valid() - if err != nil || !ok { + if ok, err := it.iter.Valid(); err != nil || !ok { return false, err } - - it.entry, err = NewEntryFromRawValue(it.iter.UnsafeValue()) - if err != nil { + var err error + if it.entry, err = raftEntryFromRawValue(it.iter.UnsafeValue()); err != nil { return false, err } - return true, nil } // SeekGE positions the Iterator at the first raft log with index greater than // or equal to idx. Returns (true, nil) on success, (false, nil) if no such -// Entry exists. +// entry exists. func (it *Iterator) SeekGE(idx uint64) (bool, error) { it.iter.SeekGE(storage.MakeMVCCMetadataKey(it.prefixBuf.RaftLogKey(idx))) return it.load() } // Next returns (true, nil) when the (in ascending index order) next entry is -// available via Entry. It returns (false, nil) if there are no more -// entries. +// available via Entry(). It returns (false, nil) if there are no more entries. // -// Note that a valid raft log has no gaps, and that the Iterator does not -// validate that. +// NB: a valid raft log has no gaps, but the Iterator does not validate that. func (it *Iterator) Next() (bool, error) { it.iter.Next() return it.load() } -// Entry returns the Entry the iterator is currently positioned at. This +// Entry returns the raft entry the iterator is currently positioned at. This // must only be called after a prior successful call to SeekGE or Next. -// -// The caller may invoke `(*Entry).Release` if it no longer wishes to access the -// current Entry. -func (it *Iterator) Entry() *Entry { +func (it *Iterator) Entry() raftpb.Entry { return it.entry } @@ -138,10 +131,7 @@ func (it *Iterator) Entry() *Entry { // // The closure may return iterutil.StopIteration(), which will stop iteration // without returning an error. -// -// The closure may invoke `(*Entry).Release` if it is no longer going to access -// any memory in the current Entry. -func Visit(rangeID roachpb.RangeID, eng Reader, lo, hi uint64, fn func(*Entry) error) error { +func Visit(eng Reader, rangeID roachpb.RangeID, lo, hi uint64, fn func(raftpb.Entry) error) error { it := NewIterator(rangeID, eng, IterOptions{Hi: hi}) defer it.Close() ok, err := it.SeekGE(lo) From 5b0181830494e91849b098329016f339f933f5d9 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 18 Nov 2022 17:22:42 +0000 Subject: [PATCH 3/4] kvserver: use raftlog.Visit to iterate Raft log Release note: None --- pkg/kv/kvserver/replica_raftstorage.go | 47 ++------------------------ 1 file changed, 2 insertions(+), 45 deletions(-) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 051dda2fd953..a61996a57155 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -19,17 +19,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -194,7 +193,7 @@ func entries( return nil } - if err := iterateEntries(ctx, reader, rangeID, expectedIndex, hi, scanFunc); err != nil { + if err := raftlog.Visit(reader, rangeID, expectedIndex, hi, scanFunc); err != nil { return nil, err } // Cache the fetched entries, if we may. @@ -245,48 +244,6 @@ func entries( return nil, raft.ErrUnavailable } -// iterateEntries iterates over each of the Raft log entries in the range -// [lo,hi). At each step of the iteration, f() is invoked with the current log -// entry. -// -// The function does not accept a maximum number of entries or bytes. Instead, -// callers should enforce any limits by returning iterutil.StopIteration from -// the iteration function to terminate iteration early, if necessary. -func iterateEntries( - ctx context.Context, - reader storage.Reader, - rangeID roachpb.RangeID, - lo, hi uint64, - f func(raftpb.Entry) error, -) error { - key := keys.RaftLogKey(rangeID, lo) - endKey := keys.RaftLogKey(rangeID, hi) - iter := reader.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ - UpperBound: endKey, - }) - defer iter.Close() - - var meta enginepb.MVCCMetadata - var ent raftpb.Entry - - iter.SeekGE(storage.MakeMVCCMetadataKey(key)) - for ; ; iter.Next() { - if ok, err := iter.Valid(); err != nil || !ok { - return err - } - - if err := protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil { - return errors.Wrap(err, "unable to decode MVCCMetadata") - } - if err := storage.MakeValue(meta).GetProto(&ent); err != nil { - return errors.Wrap(err, "unable to unmarshal raft Entry") - } - if err := f(ent); err != nil { - return iterutil.Map(err) - } - } -} - // invalidLastTerm is an out-of-band value for r.mu.lastTerm that // invalidates lastTerm caching and forces retrieval of Term(lastTerm) // from the raftEntryCache/RocksDB. From 5c87a9d5ab90aebaefdb5e47b8f206bfb5095a8f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 18 Nov 2022 23:46:51 +0000 Subject: [PATCH 4/4] raftlog: remove obsolete test Iterator no longer uses the pool of entries. Release note: None --- pkg/kv/kvserver/raftlog/iter_bench_test.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/kv/kvserver/raftlog/iter_bench_test.go b/pkg/kv/kvserver/raftlog/iter_bench_test.go index dad05ea5cbd2..06b0ac2ea26d 100644 --- a/pkg/kv/kvserver/raftlog/iter_bench_test.go +++ b/pkg/kv/kvserver/raftlog/iter_bench_test.go @@ -166,16 +166,6 @@ func BenchmarkIterator(b *testing.B) { b.Run("SeekGE", func(b *testing.B) { benchForOp(b, (*Iterator).Next) }) - - b.Run("NextPooled", func(b *testing.B) { - benchForOp(b, func(it *Iterator) (bool, error) { - if ok, err := it.Next(); err != nil || !ok { - return false, err - } - _ = it.Entry() - return true, nil - }) - }) } // Visit benchmarks Visit on a pebble engine, i.e. the results will measure