Skip to content

Commit

Permalink
Merge pull request cockroachdb#10482 from petermattis/pmattis/preserv…
Browse files Browse the repository at this point in the history
…e-raft-log-during-upreplication

storage: compute quorum commit index instead of using raft.Status.Commit
  • Loading branch information
petermattis authored Nov 7, 2016
2 parents fc842bc + 561398f commit 09c0f88
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 48 deletions.
65 changes: 46 additions & 19 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package storage

import (
"sort"
"time"

"github.com/coreos/etcd/raft"
Expand Down Expand Up @@ -136,42 +137,54 @@ func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, err
func computeTruncatableIndex(
raftStatus *raft.Status, raftLogSize, targetSize int64, firstIndex, pendingSnapshotIndex uint64,
) uint64 {
truncatableIndex := raftStatus.Commit
quorumIndex := getQuorumIndex(raftStatus, pendingSnapshotIndex)
truncatableIndex := quorumIndex

if raftLogSize <= targetSize {
// Only truncate to one of the behind indexes if the raft log is less than
// the target size. If the raft log is greater than the target size we
// Only truncate to one of the follower indexes if the raft log is less
// than the target size. If the raft log is greater than the target size we
// always truncate to the quorum commit index.
truncatableIndex = getBehindIndex(raftStatus)
for _, progress := range raftStatus.Progress {
index := progress.Match
if truncatableIndex > index {
truncatableIndex = index
}
}
// The pending snapshot index acts as a placeholder for a replica that is
// about to be added to the range. We don't want to truncate the log in a
// way that will require that new replica to be caught up via a Raft
// snapshot.
if pendingSnapshotIndex > 0 && truncatableIndex > pendingSnapshotIndex {
truncatableIndex = pendingSnapshotIndex
}
if truncatableIndex < firstIndex {
truncatableIndex = firstIndex
}
}

// Never truncate past the quorum committed index.
if truncatableIndex > raftStatus.Commit {
truncatableIndex = raftStatus.Commit
if truncatableIndex < firstIndex {
truncatableIndex = firstIndex
}
// Never truncate past the quorum commit index (this can only occur if
// firstIndex > quorumIndex).
if truncatableIndex > quorumIndex {
truncatableIndex = quorumIndex
}
return truncatableIndex
}

// getBehindIndex returns the raft log index of the oldest node or the quorum
// commit index if all nodes are caught up.
func getBehindIndex(raftStatus *raft.Status) uint64 {
behind := raftStatus.Commit
// getQuorumIndex returns the index which a quorum of the nodes have
// committed. The pendingSnapshotIndex indicates the index of a pending
// snapshot which is considered part of the Raft group even though it hasn't
// been added yet.
func getQuorumIndex(raftStatus *raft.Status, pendingSnapshotIndex uint64) uint64 {
match := make([]uint64, 0, len(raftStatus.Progress)+1)
for _, progress := range raftStatus.Progress {
index := progress.Match
if behind > index {
behind = index
}
match = append(match, progress.Match)
}
if pendingSnapshotIndex != 0 {
match = append(match, pendingSnapshotIndex)
}
return behind
sort.Sort(uint64Slice(match))
quorum := computeQuorum(len(match))
return match[len(match)-quorum]
}

// shouldQueue determines whether a range should be queued for truncating. This
Expand Down Expand Up @@ -224,3 +237,17 @@ func (*raftLogQueue) timer() time.Duration {
func (*raftLogQueue) purgatoryChan() <-chan struct{} {
return nil
}

var _ sort.Interface = uint64Slice(nil)

// uint64Slice implements sort.Interface
type uint64Slice []uint64

// Len implements sort.Interface
func (a uint64Slice) Len() int { return len(a) }

// Swap implements sort.Interface
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// Less implements sort.Interface
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
63 changes: 34 additions & 29 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,38 @@ import (
"github.com/pkg/errors"
)

func TestGetBehindIndex(t *testing.T) {
func TestGetQuorumIndex(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
progress []uint64
commit uint64
expected uint64
progress []uint64
pendingSnapshotIndex uint64
expected uint64
}{
// Basic cases.
{[]uint64{1}, 1, 1},
{[]uint64{1, 2}, 2, 1},
{[]uint64{2, 3, 4}, 4, 2},
{[]uint64{1, 2, 3, 4, 5}, 3, 1},
// sorting.
{[]uint64{5, 4, 3, 2, 1}, 3, 1},
{[]uint64{1}, 0, 1},
{[]uint64{2}, 1, 1},
{[]uint64{1, 2}, 0, 1},
{[]uint64{2, 3}, 1, 2},
{[]uint64{1, 2, 3}, 0, 2},
{[]uint64{2, 3, 4}, 1, 2},
{[]uint64{1, 2, 3, 4}, 0, 2},
{[]uint64{2, 3, 4, 5}, 1, 3},
{[]uint64{1, 2, 3, 4, 5}, 0, 3},
{[]uint64{2, 3, 4, 5, 6}, 1, 3},
// Sorting.
{[]uint64{5, 4, 3, 2, 1}, 0, 3},
}
for i, c := range testCases {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
}
status.Commit = c.commit
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
out := getBehindIndex(status)
if !reflect.DeepEqual(c.expected, out) {
t.Errorf("%d: getBehindIndex(...) expected %d, but got %d", i, c.expected, out)
quorumMatchedIndex := getQuorumIndex(status, c.pendingSnapshotIndex)
if c.expected != quorumMatchedIndex {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex)
}
}
}
Expand All @@ -69,32 +74,32 @@ func TestComputeTruncatableIndex(t *testing.T) {

testCases := []struct {
progress []uint64
commit uint64
raftLogSize int64
firstIndex uint64
pendingSnapshot uint64
expected uint64
}{
{[]uint64{1, 2}, 1, 100, 1, 0, 1},
{[]uint64{1, 5, 5}, 5, 100, 1, 0, 1},
{[]uint64{1, 5, 5}, 5, 100, 2, 0, 2},
{[]uint64{5, 5, 5}, 5, 100, 2, 0, 5},
{[]uint64{5, 5, 5}, 5, 100, 2, 1, 2},
{[]uint64{5, 5, 5}, 5, 100, 2, 3, 3},
{[]uint64{1, 2, 3, 4}, 3, 100, 1, 0, 1},
{[]uint64{1, 2, 3, 4}, 3, 100, 2, 0, 2},
{[]uint64{1, 2}, 100, 1, 0, 1},
{[]uint64{1, 5, 5}, 100, 1, 0, 1},
{[]uint64{1, 5, 5}, 100, 2, 0, 2},
{[]uint64{5, 5, 5}, 100, 2, 0, 5},
{[]uint64{5, 5, 5}, 100, 2, 1, 2},
{[]uint64{5, 5, 5}, 100, 2, 3, 3},
{[]uint64{1, 2, 3, 4}, 100, 1, 0, 1},
{[]uint64{1, 2, 3, 4}, 100, 2, 0, 2},
// If over targetSize, should truncate to quorum committed index.
{[]uint64{1, 2, 3, 4}, 3, 2000, 1, 0, 3},
{[]uint64{1, 2, 3, 4}, 3, 2000, 2, 0, 3},
{[]uint64{1, 2, 3, 4}, 3, 2000, 3, 0, 3},
// Never truncate past raftStatus.Commit.
{[]uint64{4, 5, 6}, 3, 100, 4, 0, 3},
{[]uint64{1, 3, 3, 4}, 2000, 1, 0, 3},
{[]uint64{1, 3, 3, 4}, 2000, 2, 0, 3},
{[]uint64{1, 3, 3, 4}, 2000, 3, 0, 3},
// The pending snapshot index affects the quorum commit index.
{[]uint64{4}, 2000, 1, 1, 1},
// Never truncate past the quorum commit index.
{[]uint64{3, 3, 6}, 100, 4, 0, 3},
}
for i, c := range testCases {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
}
status.Commit = c.commit
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
Expand Down

0 comments on commit 09c0f88

Please sign in to comment.