Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130991: raft,kvserver: rm error from FirstIndex/LastIndex r=tbg a=pav-kv

Both `raft.Storage` implementations never return an error from these methods. They always know the current log bounds, by virtue of being a write-through cache.

Epic: none
Release note: none

131006: replica_rac2: fix missing lock in test r=tbg a=pav-kv

Fixes #130927

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Sep 19, 2024
3 parents 4cfdbe3 + 17f1275 + 58e8cdc commit b7ce25b
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 87 deletions.
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,25 @@ type testRaftNode struct {
}

func (rn *testRaftNode) TermLocked() uint64 {
rn.r.mu.AssertHeld()
rn.r.mu.AssertRHeld()
fmt.Fprintf(rn.b, " RaftNode.TermLocked() = %d\n", rn.term)
return rn.term
}

func (rn *testRaftNode) LeaderLocked() roachpb.ReplicaID {
rn.r.mu.AssertHeld()
rn.r.mu.AssertRHeld()
fmt.Fprintf(rn.b, " RaftNode.LeaderLocked() = %s\n", rn.leader)
return rn.leader
}

func (rn *testRaftNode) LogMarkLocked() rac2.LogMark {
rn.r.mu.AssertHeld()
rn.r.mu.AssertRHeld()
fmt.Fprintf(rn.b, " RaftNode.LogMarkLocked() = %+v\n", rn.mark)
return rn.mark
}

func (rn *testRaftNode) NextUnstableIndexLocked() uint64 {
rn.r.mu.AssertHeld()
rn.r.mu.AssertRHeld()
fmt.Fprintf(rn.b, " RaftNode.NextUnstableIndexLocked() = %d\n", rn.nextUnstableIndex)
return rn.nextUnstableIndex
}
Expand Down Expand Up @@ -330,8 +330,10 @@ func TestProcessorBasic(t *testing.T) {
var mark rac2.LogMark
d.ScanArgs(t, "log-term", &mark.Term)
d.ScanArgs(t, "log-index", &mark.Index)
r.mu.Lock()
r.initRaft(mark)
p.InitRaftLocked(ctx, r.raftNode)
r.mu.Unlock()
return builderStr()

case "set-raft-state":
Expand Down
24 changes: 8 additions & 16 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,12 @@ func (r *Replica) raftLastIndexRLocked() kvpb.RaftIndex {

// LastIndex implements the raft.Storage interface.
// LastIndex requires that r.mu is held for reading.
func (r *replicaRaftStorage) LastIndex() (uint64, error) {
index, err := r.TypedLastIndex()
if err != nil {
r.reportRaftStorageError(err)
}
return uint64(index), err
func (r *replicaRaftStorage) LastIndex() uint64 {
return uint64(r.TypedLastIndex())
}

func (r *replicaRaftStorage) TypedLastIndex() (kvpb.RaftIndex, error) {
return (*Replica)(r).raftLastIndexRLocked(), nil
func (r *replicaRaftStorage) TypedLastIndex() kvpb.RaftIndex {
return (*Replica)(r).raftLastIndexRLocked()
}

// GetLastIndex returns the index of the last entry in the replica's Raft log.
Expand All @@ -203,16 +199,12 @@ func (r *Replica) raftFirstIndexRLocked() kvpb.RaftIndex {

// FirstIndex implements the raft.Storage interface.
// FirstIndex requires that r.mu is held for reading.
func (r *replicaRaftStorage) FirstIndex() (uint64, error) {
index, err := r.TypedFirstIndex()
if err != nil {
r.reportRaftStorageError(err)
}
return uint64(index), err
func (r *replicaRaftStorage) FirstIndex() uint64 {
return uint64(r.TypedFirstIndex())
}

func (r *replicaRaftStorage) TypedFirstIndex() (kvpb.RaftIndex, error) {
return (*Replica)(r).raftFirstIndexRLocked(), nil
func (r *replicaRaftStorage) TypedFirstIndex() kvpb.RaftIndex {
return (*Replica)(r).raftFirstIndexRLocked()
}

// GetFirstIndex returns the index of the first entry in the replica's Raft log.
Expand Down
6 changes: 2 additions & 4 deletions pkg/raft/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
if len(peers) == 0 {
return errors.New("must provide at least one peer to Bootstrap")
}
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
if err != nil {
return err
} else if lastIndex != 0 {
lastIndex := rn.raft.raftLog.storage.LastIndex()
if lastIndex != 0 {
return errors.New("can't bootstrap a nonempty Storage")
}

Expand Down
17 changes: 3 additions & 14 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,10 @@ func newLog(storage Storage, logger Logger) *raftLog {
func newLogWithSize(
storage Storage, logger Logger, maxApplyingEntsSize entryEncodingSize,
) *raftLog {
firstIndex, err := storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
lastIndex, err := storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
firstIndex, lastIndex := storage.FirstIndex(), storage.LastIndex()
lastTerm, err := storage.Term(lastIndex)
if err != nil {
panic(err) // TODO(pav-kv)
panic(err) // TODO(pav-kv): the storage should always cache the last term.
}
last := entryID{term: lastTerm, index: lastIndex}
return &raftLog{
Expand Down Expand Up @@ -336,11 +329,7 @@ func (l *raftLog) firstIndex() uint64 {
if i, ok := l.unstable.maybeFirstIndex(); ok {
return i
}
index, err := l.storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
return index
return l.storage.FirstIndex()
}

func (l *raftLog) lastIndex() uint64 {
Expand Down
5 changes: 1 addition & 4 deletions pkg/raft/rafttest/interaction_env_handler_add_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er
if err := s.ApplySnapshot(snap); err != nil {
return err
}
fi, err := s.FirstIndex()
if err != nil {
return err
}
fi := s.FirstIndex()
// At the time of writing and for *MemoryStorage, applying a
// snapshot also truncates appropriately, but this would change with
// other storage engines potentially.
Expand Down
9 changes: 1 addition & 8 deletions pkg/raft/rafttest/interaction_env_handler_raft_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,7 @@ func (env *InteractionEnv) handleRaftLog(t *testing.T, d datadriven.TestData) er
// RaftLog pretty prints the raft log to the output buffer.
func (env *InteractionEnv) RaftLog(idx int) error {
s := env.Nodes[idx].Storage
fi, err := s.FirstIndex()
if err != nil {
return err
}
li, err := s.LastIndex()
if err != nil {
return err
}
fi, li := s.FirstIndex(), s.LastIndex()
if li < fi {
// TODO(tbg): this is what MemoryStorage returns, but unclear if it's
// the "correct" thing to do.
Expand Down
21 changes: 6 additions & 15 deletions pkg/raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,7 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
// down to the bits. Note that this comes from the Storage, which
// will not reflect any unstable entries that we'll only be presented
// with in the next Ready.
lastIndex, err = s.LastIndex()
require.NoError(t, err)

lastIndex = s.LastIndex()
entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
require.NoError(t, err)
require.Len(t, entries, 2)
Expand Down Expand Up @@ -393,9 +391,7 @@ func TestRawNodeJointAutoLeave(t *testing.T) {
// down to the bits. Note that this comes from the Storage, which
// will not reflect any unstable entries that we'll only be presented
// with in the next Ready.
lastIndex, err = s.LastIndex()
require.NoError(t, err)

lastIndex = s.LastIndex()
entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
require.NoError(t, err)
require.Len(t, entries, 2)
Expand Down Expand Up @@ -491,9 +487,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
require.NoError(t, err)
proposeConfChangeAndApply(cc2)

lastIndex, err := s.LastIndex()
require.NoError(t, err)

lastIndex := s.LastIndex()
// the last three entries should be: ConfChange cc1, cc1, cc2
entries, err := s.Entries(lastIndex-2, lastIndex+1, noLimit)
require.NoError(t, err)
Expand Down Expand Up @@ -547,17 +541,14 @@ func TestRawNodeStart(t *testing.T) {
}
bootstrap := func(storage appenderStorage, cs pb.ConfState) error {
require.NotEmpty(t, cs.Voters, "no voters specified")
fi, err := storage.FirstIndex()
require.NoError(t, err)
fi := storage.FirstIndex()
require.GreaterOrEqual(t, fi, uint64(2), "FirstIndex >= 2 is prerequisite for bootstrap")

_, err = storage.Entries(fi, fi, math.MaxUint64)
_, err := storage.Entries(fi, fi, math.MaxUint64)
// TODO(tbg): match exact error
require.Error(t, err, "should not have been able to load first index")

li, err := storage.LastIndex()
require.NoError(t, err)

li := storage.LastIndex()
_, err = storage.Entries(li, li, math.MaxUint64)
require.Error(t, err, "should not have been able to load last index")

Expand Down
13 changes: 7 additions & 6 deletions pkg/raft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ type LogStorage interface {
// entry may not be available.
Term(index uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// TODO(pav-kv): replace this with LastEntryID() which never fails.
LastIndex() uint64
// FirstIndex returns the index of the first log entry that is possibly
// available via Entries. Older entries have been incorporated into the
// StateStorage.Snapshot.
Expand All @@ -85,7 +86,7 @@ type LogStorage interface {
//
// TODO(pav-kv): replace this with a Prev() method equivalent to logSlice's
// prev field. The log storage is just a storage-backed logSlice.
FirstIndex() (uint64, error)
FirstIndex() uint64
}

// StateStorage provides read access to the state machine storage.
Expand Down Expand Up @@ -188,23 +189,23 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
}

// LastIndex implements the Storage interface.
func (ms *MemoryStorage) LastIndex() (uint64, error) {
func (ms *MemoryStorage) LastIndex() uint64 {
ms.Lock()
defer ms.Unlock()
ms.callStats.lastIndex++
return ms.lastIndex(), nil
return ms.lastIndex()
}

func (ms *MemoryStorage) lastIndex() uint64 {
return ms.ents[0].Index + uint64(len(ms.ents)) - 1
}

// FirstIndex implements the Storage interface.
func (ms *MemoryStorage) FirstIndex() (uint64, error) {
func (ms *MemoryStorage) FirstIndex() uint64 {
ms.Lock()
defer ms.Unlock()
ms.callStats.firstIndex++
return ms.firstIndex(), nil
return ms.firstIndex()
}

func (ms *MemoryStorage) firstIndex() uint64 {
Expand Down
20 changes: 4 additions & 16 deletions pkg/raft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,17 @@ func TestStorageEntries(t *testing.T) {
func TestStorageLastIndex(t *testing.T) {
ents := index(3).terms(3, 4, 5)
s := &MemoryStorage{ents: ents}

last, err := s.LastIndex()
require.NoError(t, err)
require.Equal(t, uint64(5), last)

require.Equal(t, uint64(5), s.LastIndex())
require.NoError(t, s.Append(index(6).terms(5)))
last, err = s.LastIndex()
require.NoError(t, err)
require.Equal(t, uint64(6), last)
require.Equal(t, uint64(6), s.LastIndex())
}

func TestStorageFirstIndex(t *testing.T) {
ents := index(3).terms(3, 4, 5)
s := &MemoryStorage{ents: ents}

first, err := s.FirstIndex()
require.NoError(t, err)
require.Equal(t, uint64(4), first)

require.Equal(t, uint64(4), s.FirstIndex())
require.NoError(t, s.Compact(4))
first, err = s.FirstIndex()
require.NoError(t, err)
require.Equal(t, uint64(5), first)
require.Equal(t, uint64(5), s.FirstIndex())
}

func TestStorageCompact(t *testing.T) {
Expand Down

0 comments on commit b7ce25b

Please sign in to comment.