Skip to content

Commit

Permalink
storage: use quorum-applied-index instead of oldest-index
Browse files Browse the repository at this point in the history
When deciding where to truncate the raft log, use the quorum applied
index (minus `raftLogPadding`) instead of the oldest index. The oldest
index can get excessively (and arbitrarily) old if a replica goes down
for a significant period of time.

See cockroachdb#6012.
  • Loading branch information
petermattis committed Jun 3, 2016
1 parent 382976b commit d53ea6b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 12 deletions.
60 changes: 48 additions & 12 deletions storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package storage

import (
"sort"
"time"

"github.com/cockroachdb/cockroach/client"
Expand All @@ -31,6 +32,9 @@ import (
const (
// raftLogQueueMaxSize is the max size of the queue.
raftLogQueueMaxSize = 100
// raftLogPadding is the number of log entries that should be kept for
// lagging replicas.
raftLogPadding = 1000
// RaftLogQueueTimerDuration is the duration between checking the
// raft logs.
RaftLogQueueTimerDuration = time.Second
Expand Down Expand Up @@ -68,6 +72,8 @@ func (*raftLogQueue) acceptsUnsplitRanges() bool {
// can be truncated and the oldest index that cannot be pruned.
func getTruncatableIndexes(r *Replica) (uint64, uint64, error) {
rangeID := r.RangeID
// TODO(peter): r.store.RaftStatus(rangeID) differs from r.RaftStatus() in
// tests, causing TestGetTruncatableIndexes to fail. Figure out why and fix.
raftStatus := r.store.RaftStatus(rangeID)
if raftStatus == nil {
if log.V(1) {
Expand All @@ -76,18 +82,14 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) {
return 0, 0, nil
}

// Is this the raft leader?
// Is this the raft leader? We only perform log truncation on the raft leader
// which has the up to date info on followers.
if raftStatus.RaftState != raft.StateLeader {
return 0, 0, nil
}

// Find the oldest index still in use by the range.
oldestIndex := raftStatus.Applied
for _, progress := range raftStatus.Progress {
if progress.Match < oldestIndex {
oldestIndex = progress.Match
}
}
// Calculate the quorum applied index and adjust based on padding.
oldestIndex := getQuorumAppliedIndex(raftStatus, raftLogPadding)

r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -108,9 +110,9 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) {
// shouldQueue determines whether a range should be queued for truncating. This
// is true only if the replica is the raft leader and if the total number of
// the range's raft log's stale entries exceeds RaftLogQueueStaleThreshold.
func (*raftLogQueue) shouldQueue(now roachpb.Timestamp, r *Replica, _ config.SystemConfig) (shouldQ bool,
priority float64) {

func (*raftLogQueue) shouldQueue(
now roachpb.Timestamp, r *Replica, _ config.SystemConfig,
) (shouldQ bool, priority float64) {
truncatableIndexes, _, err := getTruncatableIndexes(r)
if err != nil {
log.Warning(err)
Expand All @@ -124,7 +126,6 @@ func (*raftLogQueue) shouldQueue(now roachpb.Timestamp, r *Replica, _ config.Sys
// leader and if the total number of the range's raft log's stale entries
// exceeds RaftLogQueueStaleThreshold.
func (rlq *raftLogQueue) process(now roachpb.Timestamp, r *Replica, _ config.SystemConfig) error {

truncatableIndexes, oldestIndex, err := getTruncatableIndexes(r)
if err != nil {
return err
Expand Down Expand Up @@ -155,3 +156,38 @@ func (*raftLogQueue) timer() time.Duration {
func (*raftLogQueue) purgatoryChan() <-chan struct{} {
return nil
}

type uint64Slice []uint64

func (p uint64Slice) Len() int { return len(p) }
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

// getQuorumAppliedIndex returns the index which a quorum of the nodes have
// applied. The returned value is adjusted by padding to allow retaining
// additional entries, but this adjustment is limited so that we won't keep
// entries which all nodes have applied.
func getQuorumAppliedIndex(raftStatus *raft.Status, padding uint64) uint64 {
appliedIndexes := make([]uint64, 0, len(raftStatus.Progress))
for _, progress := range raftStatus.Progress {
index := progress.Match
if index > raftStatus.Applied {
index = raftStatus.Applied
}
appliedIndexes = append(appliedIndexes, index)
}
sort.Sort(uint64Slice(appliedIndexes))

n := len(appliedIndexes)
q := computeQuorum(n)
index := appliedIndexes[n-q]
if index >= padding {
index -= padding
} else {
index = 0
}
if index < appliedIndexes[0] {
index = appliedIndexes[0]
}
return index
}
40 changes: 40 additions & 0 deletions storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,48 @@ import (
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/coreos/etcd/raft"
)

func TestGetQuorumAppliedIndex(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
applied uint64
progress []uint64
padding uint64
expected uint64
}{
// Basic cases.
{1, []uint64{1}, 0, 1},
{2, []uint64{1, 2}, 0, 1},
{3, []uint64{1, 2, 3}, 0, 2},
{4, []uint64{1, 2, 3, 4}, 0, 2},
{5, []uint64{1, 2, 3, 4, 5}, 0, 3},
// Sorting.
{5, []uint64{5, 4, 3, 2, 1}, 0, 3},
// Padding.
{3, []uint64{1, 2, 3}, 1, 1},
{3, []uint64{1, 2, 3}, 2, 1},
{3, []uint64{1, 2, 3}, 3, 1},
// Applied limits progress values.
{1, []uint64{2, 2, 2}, 0, 1},
}
for i, c := range testCases {
status := &raft.Status{
Applied: c.applied,
Progress: make(map[uint64]raft.Progress),
}
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
quorumAppliedIndex := getQuorumAppliedIndex(status, c.padding)
if c.expected != quorumAppliedIndex {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumAppliedIndex)
}
}
}

// TestGetTruncatableIndexes verifies that the correctly returns when there are
// indexes to be truncated.
func TestGetTruncatableIndexes(t *testing.T) {
Expand Down

0 comments on commit d53ea6b

Please sign in to comment.