diff --git a/storage/raft_log_queue.go b/storage/raft_log_queue.go index f7c1fffac8ad..125883721b0a 100644 --- a/storage/raft_log_queue.go +++ b/storage/raft_log_queue.go @@ -17,6 +17,7 @@ package storage import ( + "sort" "time" "github.com/cockroachdb/cockroach/client" @@ -31,6 +32,9 @@ import ( const ( // raftLogQueueMaxSize is the max size of the queue. raftLogQueueMaxSize = 100 + // raftLogPadding is the number of log entries that should be kept for + // lagging replicas. + raftLogPadding = 1000 // RaftLogQueueTimerDuration is the duration between checking the // raft logs. RaftLogQueueTimerDuration = time.Second @@ -68,6 +72,8 @@ func (*raftLogQueue) acceptsUnsplitRanges() bool { // can be truncated and the oldest index that cannot be pruned. func getTruncatableIndexes(r *Replica) (uint64, uint64, error) { rangeID := r.RangeID + // TODO(peter): r.store.RaftStatus(rangeID) differs from r.RaftStatus() in + // tests, causing TestGetTruncatableIndexes to fail. Figure out why and fix. raftStatus := r.store.RaftStatus(rangeID) if raftStatus == nil { if log.V(1) { @@ -76,18 +82,14 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) { return 0, 0, nil } - // Is this the raft leader? + // Is this the raft leader? We only perform log truncation on the raft leader + // which has the up to date info on followers. if raftStatus.RaftState != raft.StateLeader { return 0, 0, nil } - // Find the oldest index still in use by the range. - oldestIndex := raftStatus.Applied - for _, progress := range raftStatus.Progress { - if progress.Match < oldestIndex { - oldestIndex = progress.Match - } - } + // Calculate the quorum applied index and adjust based on padding. + oldestIndex := getQuorumAppliedIndex(raftStatus, raftLogPadding) r.mu.Lock() defer r.mu.Unlock() @@ -108,9 +110,9 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) { // shouldQueue determines whether a range should be queued for truncating. This // is true only if the replica is the raft leader and if the total number of // the range's raft log's stale entries exceeds RaftLogQueueStaleThreshold. -func (*raftLogQueue) shouldQueue(now roachpb.Timestamp, r *Replica, _ config.SystemConfig) (shouldQ bool, - priority float64) { - +func (*raftLogQueue) shouldQueue( + now roachpb.Timestamp, r *Replica, _ config.SystemConfig, +) (shouldQ bool, priority float64) { truncatableIndexes, _, err := getTruncatableIndexes(r) if err != nil { log.Warning(err) @@ -124,7 +126,6 @@ func (*raftLogQueue) shouldQueue(now roachpb.Timestamp, r *Replica, _ config.Sys // leader and if the total number of the range's raft log's stale entries // exceeds RaftLogQueueStaleThreshold. func (rlq *raftLogQueue) process(now roachpb.Timestamp, r *Replica, _ config.SystemConfig) error { - truncatableIndexes, oldestIndex, err := getTruncatableIndexes(r) if err != nil { return err @@ -155,3 +156,38 @@ func (*raftLogQueue) timer() time.Duration { func (*raftLogQueue) purgatoryChan() <-chan struct{} { return nil } + +type uint64Slice []uint64 + +func (p uint64Slice) Len() int { return len(p) } +func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// getQuorumAppliedIndex returns the index which a quorum of the nodes have +// applied. The returned value is adjusted by padding to allow retaining +// additional entries, but this adjustment is limited so that we won't keep +// entries which all nodes have applied. +func getQuorumAppliedIndex(raftStatus *raft.Status, padding uint64) uint64 { + appliedIndexes := make([]uint64, 0, len(raftStatus.Progress)) + for _, progress := range raftStatus.Progress { + index := progress.Match + if index > raftStatus.Applied { + index = raftStatus.Applied + } + appliedIndexes = append(appliedIndexes, index) + } + sort.Sort(uint64Slice(appliedIndexes)) + + n := len(appliedIndexes) + q := computeQuorum(n) + index := appliedIndexes[n-q] + if index >= padding { + index -= padding + } else { + index = 0 + } + if index < appliedIndexes[0] { + index = appliedIndexes[0] + } + return index +} diff --git a/storage/raft_log_queue_test.go b/storage/raft_log_queue_test.go index 05b4024d183e..2aeada7c7aa1 100644 --- a/storage/raft_log_queue_test.go +++ b/storage/raft_log_queue_test.go @@ -24,8 +24,48 @@ import ( "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/util" "github.com/cockroachdb/cockroach/util/leaktest" + "github.com/coreos/etcd/raft" ) +func TestGetQuorumAppliedIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCases := []struct { + applied uint64 + progress []uint64 + padding uint64 + expected uint64 + }{ + // Basic cases. + {1, []uint64{1}, 0, 1}, + {2, []uint64{1, 2}, 0, 1}, + {3, []uint64{1, 2, 3}, 0, 2}, + {4, []uint64{1, 2, 3, 4}, 0, 2}, + {5, []uint64{1, 2, 3, 4, 5}, 0, 3}, + // Sorting. + {5, []uint64{5, 4, 3, 2, 1}, 0, 3}, + // Padding. + {3, []uint64{1, 2, 3}, 1, 1}, + {3, []uint64{1, 2, 3}, 2, 1}, + {3, []uint64{1, 2, 3}, 3, 1}, + // Applied limits progress values. + {1, []uint64{2, 2, 2}, 0, 1}, + } + for i, c := range testCases { + status := &raft.Status{ + Applied: c.applied, + Progress: make(map[uint64]raft.Progress), + } + for j, v := range c.progress { + status.Progress[uint64(j)] = raft.Progress{Match: v} + } + quorumAppliedIndex := getQuorumAppliedIndex(status, c.padding) + if c.expected != quorumAppliedIndex { + t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumAppliedIndex) + } + } +} + // TestGetTruncatableIndexes verifies that the correctly returns when there are // indexes to be truncated. func TestGetTruncatableIndexes(t *testing.T) {