Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
92143: kvserver,raftlog: dedup Raft log iteration r=tbg a=pavelkalinnikov

The log storage in `Replica` iterates over raw `raftpb.Entry` (see `iterateEntries`). There is also a `raftlog.Iterator/Visit` which does the same thing, but wraps the entry into `raftlog.Entry`.

This PR makes `raftlog.Iterator` return `raftpb.Entry` instead of the wrapped entry. This enables `Replica` to use it too, and benefit from the fact that it's unit-tested.

Part of cockroachdb#91979
Release note: None

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Nov 21, 2022
2 parents f0554bc + 5c87a9d commit ae919e3
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 112 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/loqrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@io_etcd_go_etcd_raft_v3//raftpb",
],
)

Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/loqrecovery/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
)

// CollectReplicaInfo captures states of all replicas in all stores for the sake of quorum recovery.
Expand Down Expand Up @@ -82,7 +83,11 @@ func GetDescriptorChangesFromRaftLog(
rangeID roachpb.RangeID, lo, hi uint64, reader storage.Reader,
) ([]loqrecoverypb.DescriptorChangeInfo, error) {
var changes []loqrecoverypb.DescriptorChangeInfo
if err := raftlog.Visit(context.Background(), rangeID, reader, lo, hi, func(ctx context.Context, e *raftlog.Entry) error {
if err := raftlog.Visit(reader, rangeID, lo, hi, func(ent raftpb.Entry) error {
e, err := raftlog.NewEntry(ent)
if err != nil {
return err
}
raftCmd := e.Cmd
switch {
case raftCmd.ReplicatedEvalResult.Split != nil:
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/raftlog/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ func NewEntryFromRawValue(b []byte) (*Entry, error) {
return e, nil
}

// raftEntryFromRawValue decodes a raft.Entry from a raw MVCC value.
//
// Same as NewEntryFromRawValue, but doesn't decode the command and doesn't use
// the pool of entries.
func raftEntryFromRawValue(b []byte) (raftpb.Entry, error) {
var meta enginepb.MVCCMetadata
if err := protoutil.Unmarshal(b, &meta); err != nil {
return raftpb.Entry{}, errors.Wrap(err, "decoding raft log MVCCMetadata")
}
var entry raftpb.Entry
if err := storage.MakeValue(meta).GetProto(&entry); err != nil {
return raftpb.Entry{}, errors.Wrap(err, "unmarshalling raft Entry")
}
return entry, nil
}

func (e *Entry) load() error {
if len(e.Data) == 0 {
// Raft-proposed empty entry.
Expand Down
23 changes: 4 additions & 19 deletions pkg/kv/kvserver/raftlog/iter_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package raftlog

import (
"context"
"math"
"math/rand"
"testing"
Expand Down Expand Up @@ -136,6 +135,7 @@ func BenchmarkIterator(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
it := NewIterator(rangeID, &mockReader{}, IterOptions{Hi: 123456})
setMockIter(it)
it.Close()
}
})
Expand Down Expand Up @@ -166,18 +166,6 @@ func BenchmarkIterator(b *testing.B) {
b.Run("SeekGE", func(b *testing.B) {
benchForOp(b, (*Iterator).Next)
})

b.Run("NextPooled", func(b *testing.B) {
benchForOp(b, func(it *Iterator) (bool, error) {
ok, err := it.Next()
if err != nil || !ok {
return false, err
}
ent := it.Entry()
ent.Release()
return true, nil
})
})
}

// Visit benchmarks Visit on a pebble engine, i.e. the results will measure
Expand All @@ -192,12 +180,9 @@ func BenchmarkVisit(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := Visit(context.Background(), rangeID, eng, 0, math.MaxUint64,
func(ctx context.Context, entry *Entry) error {
entry.Release()
return nil
})
if err != nil {
if err := Visit(eng, rangeID, 0, math.MaxUint64, func(entry raftpb.Entry) error {
return nil
}); err != nil {
b.Fatal(err)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/raftlog/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ type modelIter struct {
hi uint64
}

func (it *modelIter) load() (*Entry, error) {
func (it *modelIter) load() (raftpb.Entry, error) {
// Only called when on valid position.
return NewEntry(it.ents[it.idx])
return it.ents[it.idx], nil
}

func (it *modelIter) check() error {
Expand Down Expand Up @@ -139,7 +139,7 @@ func (it *modelIter) Next() (bool, error) {
return err == nil, err
}

func (it *modelIter) Entry() *Entry {
func (it *modelIter) Entry() raftpb.Entry {
e, err := it.load()
if err != nil {
panic(err) // bug in modelIter
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestIterator(t *testing.T) {
type iter interface {
SeekGE(idx uint64) (bool, error)
Next() (bool, error)
Entry() *Entry
Entry() raftpb.Entry
}

func consumeIter(it iter, lo uint64) ([]uint64, error) {
Expand Down
58 changes: 15 additions & 43 deletions pkg/kv/kvserver/raftlog/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
package raftlog

import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
)

type storageIter interface {
Expand Down Expand Up @@ -55,12 +53,12 @@ type Iterator struct {
// TODO(tbg): we're not reusing memory here. Since all of our allocs come
// from protobuf marshaling, this is hard to avoid but we can do a little
// better and at least avoid a few allocations.
entry *Entry
entry raftpb.Entry
}

// IterOptions are options to NewIterator.
type IterOptions struct {
// Hi ensures the Iterator never seeks to any Entry with index >= Hi. This is
// Hi ensures the Iterator never seeks to any entry with index >= Hi. This is
// useful when the caller is interested in a slice [Lo, Hi) of the raft log.
Hi uint64
}
Expand Down Expand Up @@ -89,50 +87,40 @@ func NewIterator(rangeID roachpb.RangeID, eng Reader, opts IterOptions) *Iterato

// Close releases the resources associated with this Iterator.
func (it *Iterator) Close() {
if it.iter != nil {
it.iter.Close()
}
it.iter.Close()
}

func (it *Iterator) load() (bool, error) {
ok, err := it.iter.Valid()
if err != nil || !ok {
if ok, err := it.iter.Valid(); err != nil || !ok {
return false, err
}

it.entry, err = NewEntryFromRawValue(it.iter.UnsafeValue())
if err != nil {
var err error
if it.entry, err = raftEntryFromRawValue(it.iter.UnsafeValue()); err != nil {
return false, err
}

return true, nil
}

// SeekGE positions the Iterator at the first raft log with index greater than
// or equal to idx. Returns (true, nil) on success, (false, nil) if no such
// Entry exists.
// entry exists.
func (it *Iterator) SeekGE(idx uint64) (bool, error) {
it.iter.SeekGE(storage.MakeMVCCMetadataKey(it.prefixBuf.RaftLogKey(idx)))
return it.load()
}

// Next returns (true, nil) when the (in ascending index order) next entry is
// available via Entry. It returns (false, nil) if there are no more
// entries.
// available via Entry(). It returns (false, nil) if there are no more entries.
//
// Note that a valid raft log has no gaps, and that the Iterator does not
// validate that.
// NB: a valid raft log has no gaps, but the Iterator does not validate that.
func (it *Iterator) Next() (bool, error) {
it.iter.Next()
return it.load()
}

// Entry returns the Entry the iterator is currently positioned at. This
// Entry returns the raft entry the iterator is currently positioned at. This
// must only be called after a prior successful call to SeekGE or Next.
//
// The caller may invoke `(*Entry).Release` if it no longer wishes to access the
// current Entry.
func (it *Iterator) Entry() *Entry {
func (it *Iterator) Entry() raftpb.Entry {
return it.entry
}

Expand All @@ -143,32 +131,16 @@ func (it *Iterator) Entry() *Entry {
//
// The closure may return iterutil.StopIteration(), which will stop iteration
// without returning an error.
//
// The closure may invoke `(*Entry).Release` if it is no longer going to access
// any memory in the current Entry.
func Visit(
ctx context.Context,
rangeID roachpb.RangeID,
eng Reader,
lo, hi uint64,
fn func(context.Context, *Entry) error,
) error {
func Visit(eng Reader, rangeID roachpb.RangeID, lo, hi uint64, fn func(raftpb.Entry) error) error {
it := NewIterator(rangeID, eng, IterOptions{Hi: hi})
defer it.Close()
ok, err := it.SeekGE(lo)
if err != nil {
return err
}
for ; ok; ok, err = it.Next() {
ent := it.Entry()
if ent.Index >= hi {
return nil
}
if err := fn(ctx, ent); err != nil {
if errors.Is(err, iterutil.StopIteration()) {
return nil
}
return err
if err := fn(it.Entry()); err != nil {
return iterutil.Map(err)
}
}
return err
Expand Down
47 changes: 2 additions & 45 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -194,7 +193,7 @@ func entries(
return nil
}

if err := iterateEntries(ctx, reader, rangeID, expectedIndex, hi, scanFunc); err != nil {
if err := raftlog.Visit(reader, rangeID, expectedIndex, hi, scanFunc); err != nil {
return nil, err
}
// Cache the fetched entries, if we may.
Expand Down Expand Up @@ -245,48 +244,6 @@ func entries(
return nil, raft.ErrUnavailable
}

// iterateEntries iterates over each of the Raft log entries in the range
// [lo,hi). At each step of the iteration, f() is invoked with the current log
// entry.
//
// The function does not accept a maximum number of entries or bytes. Instead,
// callers should enforce any limits by returning iterutil.StopIteration from
// the iteration function to terminate iteration early, if necessary.
func iterateEntries(
ctx context.Context,
reader storage.Reader,
rangeID roachpb.RangeID,
lo, hi uint64,
f func(raftpb.Entry) error,
) error {
key := keys.RaftLogKey(rangeID, lo)
endKey := keys.RaftLogKey(rangeID, hi)
iter := reader.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
UpperBound: endKey,
})
defer iter.Close()

var meta enginepb.MVCCMetadata
var ent raftpb.Entry

iter.SeekGE(storage.MakeMVCCMetadataKey(key))
for ; ; iter.Next() {
if ok, err := iter.Valid(); err != nil || !ok {
return err
}

if err := protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil {
return errors.Wrap(err, "unable to decode MVCCMetadata")
}
if err := storage.MakeValue(meta).GetProto(&ent); err != nil {
return errors.Wrap(err, "unable to unmarshal raft Entry")
}
if err := f(ent); err != nil {
return iterutil.Map(err)
}
}
}

// invalidLastTerm is an out-of-band value for r.mu.lastTerm that
// invalidates lastTerm caching and forces retrieval of Term(lastTerm)
// from the raftEntryCache/RocksDB.
Expand Down

0 comments on commit ae919e3

Please sign in to comment.