Skip to content

Commit

Permalink
storage: fix NPE while printing trivial truncateDecision
Browse files Browse the repository at this point in the history
Fixes #34398.

Release note: None
  • Loading branch information
tbg committed Jan 30, 2019
1 parent 395d842 commit a49e3b9
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
20 changes: 10 additions & 10 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func newRaftLogQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *raftLo
}

// newTruncateDecision returns a truncateDecision for the given Replica if no
// error occurs. If no truncation can be carried out, a zero decision is
// returned.
func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, error) {
// error occurs. If input data to establish a truncateDecision is missing, a
// zero decision is returned.
func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, error) {
rangeID := r.RangeID
now := timeutil.Now()

Expand Down Expand Up @@ -123,20 +123,20 @@ func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, er
r.mu.Unlock()

if err != nil {
return nil, errors.Errorf("error retrieving first index for r%d: %s", rangeID, err)
return truncateDecision{}, errors.Errorf("error retrieving first index for r%d: %s", rangeID, err)
}

if raftStatus == nil {
if log.V(6) {
log.Infof(ctx, "the raft group doesn't exist for r%d", rangeID)
}
return &truncateDecision{}, nil
return truncateDecision{}, nil
}

// Is this the raft leader? We only perform log truncation on the raft leader
// which has the up to date info on followers.
if raftStatus.RaftState != raft.StateLeader {
return &truncateDecision{}, nil
return truncateDecision{}, nil
}

// For all our followers, overwrite the RecentActive field (which is always
Expand All @@ -155,7 +155,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, er
}

input := truncateDecisionInput{
RaftStatus: raftStatus,
RaftStatus: *raftStatus,
LogSize: raftLogSize,
MaxLogSize: targetSize,
FirstIndex: firstIndex,
Expand All @@ -164,7 +164,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, er
}

decision := computeTruncateDecision(input)
return &decision, nil
return decision, nil
}

func updateRaftProgressFromActivity(
Expand Down Expand Up @@ -203,7 +203,7 @@ const (
)

type truncateDecisionInput struct {
RaftStatus *raft.Status // never nil
RaftStatus raft.Status
LogSize, MaxLogSize int64
FirstIndex, LastIndex uint64
PendingPreemptiveSnapshotIndex uint64
Expand Down Expand Up @@ -315,7 +315,7 @@ func (td *truncateDecision) ShouldTruncate() bool {
// snapshots. See #8629.
func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision := truncateDecision{Input: input}
decision.QuorumIndex = getQuorumIndex(input.RaftStatus)
decision.QuorumIndex = getQuorumIndex(&input.RaftStatus)

decision.NewFirstIndex = decision.QuorumIndex
decision.ChosenVia = truncatableIndexChosenViaQuorumIndex
Expand Down
16 changes: 13 additions & 3 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestComputeTruncateDecision(t *testing.T) {
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
}}
for i, c := range testCases {
status := &raft.Status{
status := raft.Status{
Progress: make(map[uint64]raft.Progress),
}
for j, v := range c.progress {
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestComputeTruncateDecisionProgressStatusProbe(t *testing.T) {

testutils.RunTrueAndFalse(t, "tooLarge", func(t *testing.T, tooLarge bool) {
testutils.RunTrueAndFalse(t, "active", func(t *testing.T, active bool) {
status := &raft.Status{
status := raft.Status{
Progress: make(map[uint64]raft.Progress),
}
for j, v := range []uint64{500, 400, 300, 200, 100} {
Expand Down Expand Up @@ -277,10 +277,20 @@ func TestComputeTruncateDecisionProgressStatusProbe(t *testing.T) {
})
}

func TestTruncateDecisionZeroValue(t *testing.T) {
defer leaktest.AfterTest(t)()

var decision truncateDecision
assert.False(t, decision.ShouldTruncate())
assert.Zero(t, decision.NumNewRaftSnapshots())
assert.Zero(t, decision.NumTruncatableIndexes())
assert.Equal(t, "should truncate: false [truncate 0 entries to first index 0 (chosen via: )]", decision.String())
}

func TestTruncateDecisionNumSnapshots(t *testing.T) {
defer leaktest.AfterTest(t)()

status := &raft.Status{
status := raft.Status{
Progress: map[uint64]raft.Progress{
// Fully caught up.
5: {State: raft.ProgressStateReplicate, Match: 11, Next: 12},
Expand Down

0 comments on commit a49e3b9

Please sign in to comment.