Skip to content

Commit

Permalink
deal with raft upgrade
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
danhhz authored and tbg committed Jul 17, 2019
1 parent 05509a6 commit 1984343
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 114 deletions.
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ func (s *statusServer) Ranges(
state.Progress[id] = serverpb.RaftState_Progress{
Match: progress.Match,
Next: progress.Next,
Paused: progress.Paused,
Paused: progress.IsPaused(),
PendingSnapshot: progress.PendingSnapshot,
State: progress.State.String(),
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/tracker"
)

const (
Expand Down Expand Up @@ -1196,7 +1197,7 @@ func replicaIsBehind(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool
// behind the actual commit index of the range.
if progress, ok := raftStatus.Progress[uint64(replicaID)]; ok {
if uint64(replicaID) == raftStatus.Lead ||
(progress.State == raft.ProgressStateReplicate &&
(progress.State == tracker.StateReplicate &&
progress.Match >= raftStatus.Commit) {
return false
}
Expand All @@ -1214,8 +1215,8 @@ func simulateFilterUnremovableReplicas(
brandNewReplicaID roachpb.ReplicaID,
) []roachpb.ReplicaDescriptor {
status := *raftStatus
status.Progress[uint64(brandNewReplicaID)] = raft.Progress{
State: raft.ProgressStateReplicate,
status.Progress[uint64(brandNewReplicaID)] = tracker.Progress{
State: tracker.StateReplicate,
Match: status.Commit,
}
return filterUnremovableReplicas(&status, replicas, brandNewReplicaID)
Expand Down
29 changes: 15 additions & 14 deletions pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/tracker"
)

const firstRange = roachpb.RangeID(1)
Expand Down Expand Up @@ -825,10 +826,10 @@ func TestAllocatorRebalanceTarget(t *testing.T) {
rangeInfo := rangeInfoForRepl(repl, desc)

status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
Progress: make(map[uint64]tracker.Progress),
}
for _, replica := range replicas {
status.Progress[uint64(replica.NodeID)] = raft.Progress{
status.Progress[uint64(replica.NodeID)] = tracker.Progress{
Match: 10,
}
}
Expand Down Expand Up @@ -5154,18 +5155,18 @@ func TestFilterBehindReplicas(t *testing.T) {
for _, c := range testCases {
t.Run("", func(t *testing.T) {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
Progress: make(map[uint64]tracker.Progress),
}
status.Lead = c.leader
status.Commit = c.commit
var replicas []roachpb.ReplicaDescriptor
for j, v := range c.progress {
p := raft.Progress{
p := tracker.Progress{
Match: v,
State: raft.ProgressStateReplicate,
State: tracker.StateReplicate,
}
if v == 0 {
p.State = raft.ProgressStateProbe
p.State = tracker.StateProbe
}
replicaID := uint64(j + 1)
status.Progress[replicaID] = p
Expand Down Expand Up @@ -5222,20 +5223,20 @@ func TestFilterUnremovableReplicas(t *testing.T) {
for _, c := range testCases {
t.Run("", func(t *testing.T) {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
Progress: make(map[uint64]tracker.Progress),
}
// Use an invalid replica ID for the leader. TestFilterBehindReplicas covers
// valid replica IDs.
status.Lead = 99
status.Commit = c.commit
var replicas []roachpb.ReplicaDescriptor
for j, v := range c.progress {
p := raft.Progress{
p := tracker.Progress{
Match: v,
State: raft.ProgressStateReplicate,
State: tracker.StateReplicate,
}
if v == 0 {
p.State = raft.ProgressStateProbe
p.State = tracker.StateProbe
}
replicaID := uint64(j + 1)
status.Progress[replicaID] = p
Expand Down Expand Up @@ -5277,20 +5278,20 @@ func TestSimulateFilterUnremovableReplicas(t *testing.T) {
for _, c := range testCases {
t.Run("", func(t *testing.T) {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
Progress: make(map[uint64]tracker.Progress),
}
// Use an invalid replica ID for the leader. TestFilterBehindReplicas covers
// valid replica IDs.
status.Lead = 99
status.Commit = c.commit
var replicas []roachpb.ReplicaDescriptor
for j, v := range c.progress {
p := raft.Progress{
p := tracker.Progress{
Match: v,
State: raft.ProgressStateReplicate,
State: tracker.StateReplicate,
}
if v == 0 {
p.State = raft.ProgressStateProbe
p.State = tracker.StateProbe
}
replicaID := uint64(j + 1)
status.Progress[replicaID] = p
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/tracker"
)

const (
Expand Down Expand Up @@ -209,7 +210,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err
if pr, ok := raftStatus.Progress[raftStatus.Lead]; ok {
// TODO(tschottdorf): remove this line once we have picked up
// https://github.com/etcd-io/etcd/pull/10279
pr.State = raft.ProgressStateReplicate
pr.State = tracker.StateReplicate
raftStatus.Progress[raftStatus.Lead] = pr
}

Expand All @@ -229,7 +230,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err

func updateRaftProgressFromActivity(
ctx context.Context,
prs map[uint64]raft.Progress,
prs map[uint64]tracker.Progress,
replicas []roachpb.ReplicaDescriptor,
lastUpdate lastUpdateTimesMap,
now time.Time,
Expand Down Expand Up @@ -285,7 +286,7 @@ type truncateDecision struct {
func (td *truncateDecision) raftSnapshotsForIndex(index uint64) int {
var n int
for _, p := range td.Input.RaftStatus.Progress {
if p.State != raft.ProgressStateReplicate {
if p.State != tracker.StateReplicate {
// If the follower isn't replicating, we can't trust its Match in
// the first place. But note that this shouldn't matter in practice
// as we already take care to not cut off these followers when
Expand Down Expand Up @@ -427,7 +428,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
// overlapping bounds that put significant stress on the Raft snapshot
// queue.
if progress.RecentActive {
if progress.State == raft.ProgressStateProbe {
if progress.State == tracker.StateProbe {
decision.ProtectIndex(decision.Input.FirstIndex, truncatableIndexChosenViaProbingFollower)
} else {
decision.ProtectIndex(progress.Match, truncatableIndexChosenViaFollowers)
Expand Down Expand Up @@ -475,7 +476,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
func getQuorumIndex(raftStatus *raft.Status) uint64 {
match := make([]uint64, 0, len(raftStatus.Progress))
for _, progress := range raftStatus.Progress {
if progress.State == raft.ProgressStateReplicate {
if progress.State == tracker.StateReplicate {
match = append(match, progress.Match)
} else {
match = append(match, 0)
Expand Down
69 changes: 39 additions & 30 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/tracker"
)

func TestShouldTruncate(t *testing.T) {
Expand Down Expand Up @@ -85,10 +86,13 @@ func TestGetQuorumIndex(t *testing.T) {
}
for i, c := range testCases {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
Progress: make(map[uint64]tracker.Progress),
}
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{State: raft.ProgressStateReplicate, Match: v}
status.Progress[uint64(j)] = tracker.Progress{
State: tracker.StateReplicate,
Match: v,
}
}
quorumMatchedIndex := getQuorumIndex(status)
if c.expected != quorumMatchedIndex {
Expand All @@ -99,10 +103,10 @@ func TestGetQuorumIndex(t *testing.T) {
// Verify that only replicating followers are taken into account (i.e. others
// are treated as Match == 0).
status := &raft.Status{
Progress: map[uint64]raft.Progress{
1: {State: raft.ProgressStateReplicate, Match: 100},
2: {State: raft.ProgressStateSnapshot, Match: 100},
3: {State: raft.ProgressStateReplicate, Match: 90},
Progress: map[uint64]tracker.Progress{
1: {State: tracker.StateReplicate, Match: 100},
2: {State: tracker.StateSnapshot, Match: 100},
3: {State: tracker.StateReplicate, Match: 90},
},
}
assert.Equal(t, uint64(90), getQuorumIndex(status))
Expand Down Expand Up @@ -211,10 +215,15 @@ func TestComputeTruncateDecision(t *testing.T) {
for i, c := range testCases {
t.Run("", func(t *testing.T) {
status := raft.Status{
Progress: make(map[uint64]raft.Progress),
Progress: make(map[uint64]tracker.Progress),
}
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{RecentActive: true, State: raft.ProgressStateReplicate, Match: v, Next: v + 1}
status.Progress[uint64(j)] = tracker.Progress{
RecentActive: true,
State: tracker.StateReplicate,
Match: v,
Next: v + 1,
}
}
input := truncateDecisionInput{
RaftStatus: status,
Expand Down Expand Up @@ -277,26 +286,26 @@ func TestComputeTruncateDecisionProgressStatusProbe(t *testing.T) {
testutils.RunTrueAndFalse(t, "tooLarge", func(t *testing.T, tooLarge bool) {
testutils.RunTrueAndFalse(t, "active", func(t *testing.T, active bool) {
status := raft.Status{
Progress: make(map[uint64]raft.Progress),
Progress: make(map[uint64]tracker.Progress),
}
for j, v := range []uint64{100, 200, 300, 400, 500} {
var pr raft.Progress
var pr tracker.Progress
if v == 100 {
// A probing follower is probed with some index (Next) but
// it has a zero Match (i.e. no idea how much of its log
// agrees with ours).
pr = raft.Progress{
pr = tracker.Progress{
RecentActive: active,
State: raft.ProgressStateProbe,
State: tracker.StateProbe,
Match: 0,
Next: v,
}
} else { // everyone else
pr = raft.Progress{
pr = tracker.Progress{
Match: v,
Next: v + 1,
RecentActive: true,
State: raft.ProgressStateReplicate,
State: tracker.StateReplicate,
}
}
status.Progress[uint64(j)] = pr
Expand Down Expand Up @@ -335,15 +344,15 @@ func TestTruncateDecisionNumSnapshots(t *testing.T) {
defer leaktest.AfterTest(t)()

status := raft.Status{
Progress: map[uint64]raft.Progress{
Progress: map[uint64]tracker.Progress{
// Fully caught up.
5: {State: raft.ProgressStateReplicate, Match: 11, Next: 12},
5: {State: tracker.StateReplicate, Match: 11, Next: 12},
// Behind.
6: {State: raft.ProgressStateReplicate, Match: 10, Next: 11},
6: {State: tracker.StateReplicate, Match: 10, Next: 11},
// Last MsgApp in flight, so basically caught up.
7: {State: raft.ProgressStateReplicate, Match: 10, Next: 12},
8: {State: raft.ProgressStateProbe}, // irrelevant
9: {State: raft.ProgressStateSnapshot}, // irrelevant
7: {State: tracker.StateReplicate, Match: 10, Next: 12},
8: {State: tracker.StateProbe}, // irrelevant
9: {State: tracker.StateSnapshot}, // irrelevant
},
}

Expand Down Expand Up @@ -374,12 +383,12 @@ func TestUpdateRaftStatusActivity(t *testing.T) {
defer leaktest.AfterTest(t)()

type testCase struct {
prs []raft.Progress
prs []tracker.Progress
replicas []roachpb.ReplicaDescriptor
lastUpdate lastUpdateTimesMap
now time.Time

exp []raft.Progress
exp []tracker.Progress
}

now := timeutil.Now()
Expand All @@ -388,40 +397,40 @@ func TestUpdateRaftStatusActivity(t *testing.T) {
// No data, no crash.
{},
// No knowledge = no update.
{prs: []raft.Progress{{RecentActive: true}}, exp: []raft.Progress{{RecentActive: true}}},
{prs: []raft.Progress{{RecentActive: false}}, exp: []raft.Progress{{RecentActive: false}}},
{prs: []tracker.Progress{{RecentActive: true}}, exp: []tracker.Progress{{RecentActive: true}}},
{prs: []tracker.Progress{{RecentActive: false}}, exp: []tracker.Progress{{RecentActive: false}}},
// See replica in descriptor but then don't find it in the map. Assumes the follower is not
// active.
{
replicas: []roachpb.ReplicaDescriptor{{ReplicaID: 1}},
prs: []raft.Progress{{RecentActive: true}},
exp: []raft.Progress{{RecentActive: false}},
prs: []tracker.Progress{{RecentActive: true}},
exp: []tracker.Progress{{RecentActive: false}},
},
// Three replicas in descriptor. The first one responded recently, the second didn't,
// the third did but it doesn't have a Progress.
{
replicas: []roachpb.ReplicaDescriptor{{ReplicaID: 1}, {ReplicaID: 2}, {ReplicaID: 3}},
prs: []raft.Progress{{RecentActive: false}, {RecentActive: true}},
prs: []tracker.Progress{{RecentActive: false}, {RecentActive: true}},
lastUpdate: map[roachpb.ReplicaID]time.Time{
1: now.Add(-1 * MaxQuotaReplicaLivenessDuration / 2),
2: now.Add(-1 - MaxQuotaReplicaLivenessDuration),
3: now,
},
now: now,

exp: []raft.Progress{{RecentActive: true}, {RecentActive: false}},
exp: []tracker.Progress{{RecentActive: true}, {RecentActive: false}},
},
}

ctx := context.Background()

for _, tc := range tcs {
t.Run("", func(t *testing.T) {
prs := make(map[uint64]raft.Progress)
prs := make(map[uint64]tracker.Progress)
for i, pr := range tc.prs {
prs[uint64(i+1)] = pr
}
expPRs := make(map[uint64]raft.Progress)
expPRs := make(map[uint64]tracker.Progress)
for i, pr := range tc.exp {
expPRs[uint64(i+1)] = pr
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/tracker"
)

const (
Expand Down Expand Up @@ -66,7 +66,7 @@ func (rq *raftSnapshotQueue) shouldQueue(
if status := repl.RaftStatus(); status != nil {
// raft.Status.Progress is only populated on the Raft group leader.
for _, p := range status.Progress {
if p.State == raft.ProgressStateSnapshot {
if p.State == tracker.StateSnapshot {
if log.V(2) {
log.Infof(ctx, "raft snapshot needed, enqueuing")
}
Expand All @@ -84,7 +84,7 @@ func (rq *raftSnapshotQueue) process(
if status := repl.RaftStatus(); status != nil {
// raft.Status.Progress is only populated on the Raft group leader.
for id, p := range status.Progress {
if p.State == raft.ProgressStateSnapshot {
if p.State == tracker.StateSnapshot {
if log.V(1) {
log.Infof(ctx, "sending raft snapshot")
}
Expand Down
Loading

0 comments on commit 1984343

Please sign in to comment.