Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fourth solution] Fix the potential data loss for clusters with only one member (raft layer change) #14411

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ type node struct {
status chan chan Status

rn *RawNode
rd Ready
}

func newNode(rn *RawNode) node {
Expand Down Expand Up @@ -304,7 +305,6 @@ func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready

r := n.rn.raft

Expand All @@ -322,7 +322,7 @@ func (n *node) run() {
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = n.rn.readyWithoutAccept()
n.rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}

Expand Down Expand Up @@ -393,12 +393,12 @@ func (n *node) run() {
}
case <-n.tickc:
n.rn.Tick()
case readyc <- rd:
n.rn.acceptReady(rd)
case readyc <- n.rd:
n.rn.acceptReady(n.rd)
advancec = n.advancec
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
n.rn.Advance(n.rd)
n.rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
Expand Down Expand Up @@ -503,6 +503,25 @@ func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool)
func (n *node) Ready() <-chan Ready { return n.readyc }

func (n *node) Advance() {
ents := n.rd.Entries

// Provide a feedback that the `Entries` has already been successfully
// saved by the user. Note that only leader needs to do this, because
// a follower will not send the feedback to leader until it finishes
// saving the `Entries`.
//
// An empty entry is generated in the following two cases:
// 1. a new leader is elected.
// 2. enter into joint configuration.
// We need to ignore above two cases because the `pr.Match` is updated
// immediately after appending the empty entry. Note that it's safe to do
// so because it can only happen for multi-member cluster. Please also refer to
// https://github.com/ahrtr/etcd/blob/a79941679f401186dc29e10d4e86a53590c3d9d6/raft/raft.go#L643-L645
if n.isLeader() && (len(ents) > 1 || (len(ents) == 1 && len(ents[0].Data) > 0)) {
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
e := ents[len(ents)-1]
n.Step(context.TODO(), pb.Message{From: n.Status().ID, Type: pb.MsgAppResp, Index: e.Index})
}

select {
case n.advancec <- struct{}{}:
case <-n.done:
Expand Down Expand Up @@ -561,6 +580,11 @@ func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}

func (n *node) isLeader() bool {
sts := n.Status()
return sts.ID == sts.Lead
}

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
Expand Down
22 changes: 17 additions & 5 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
case rd := <-n.Ready():
s.Append(rd.Entries)
applied := false
for _, e := range rd.Entries {
for _, e := range rd.CommittedEntries {
rdyEntries = append(rdyEntries, e)
switch e.Type {
case raftpb.EntryNormal:
Expand Down Expand Up @@ -600,11 +600,17 @@ func TestNodeStart(t *testing.T) {
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
HardState: raftpb.HardState{},
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: nil,
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
Entries: nil,
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
MustSync: false,
},
}
storage := NewMemoryStorage()
c := &Config{
Expand Down Expand Up @@ -642,8 +648,11 @@ func TestNodeStart(t *testing.T) {

select {
case rd := <-n.Ready():
t.Errorf("unexpected Ready: %+v", rd)
case <-time.After(time.Millisecond):
if !reflect.DeepEqual(rd, wants[2]) {
t.Errorf("Unexpected ready: got = %+v,\n wanted = %+v", rd, wants[2])
}
case <-time.After(10 * time.Millisecond):
t.Error("Timed out waiting for the ready data")
}
}

Expand Down Expand Up @@ -932,6 +941,9 @@ func TestCommitPagination(t *testing.T) {
t.Fatal(err)
}
}
rd = readyWithTimeout(&n)
s.Append(rd.Entries)
n.Advance()

// The 3 proposals will commit in two batches.
rd = readyWithTimeout(&n)
Expand Down
9 changes: 8 additions & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ func newRaft(c *Config) *raft {

func (r *raft) hasLeader() bool { return r.lead != None }

func (r *raft) isLeader() bool { return r.id == r.lead }

func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }

func (r *raft) hardState() pb.HardState {
Expand Down Expand Up @@ -635,7 +637,12 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.prs.Progress[r.id].MaybeUpdate(li)
// The entries haven't been confirmed to be saved, so we can't
// update `pr.Match` for now.
// It's only called by `advance` and `becomeLeader` here.
if len(es) == 1 && es[0].Data == nil {
r.prs.Progress[r.id].MaybeUpdate(li)
}
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
Expand Down
1 change: 1 addition & 0 deletions raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ func TestLeaderAcknowledgeCommit(t *testing.T) {
commitNoopEntry(r, s)
li := r.raftLog.lastIndex()
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})

for _, m := range r.readMessages() {
if tt.acceptors[m.To] {
Expand Down
14 changes: 14 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func TestProgressLeader(t *testing.T) {
if err := r.Step(propMsg); err != nil {
t.Fatalf("proposal resulted in error: %v", err)
}
if err := r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}); err != nil {
t.Fatalf("reporting feedback resulted in error: %v", err)
}
}
}

Expand Down Expand Up @@ -3388,6 +3391,9 @@ func TestCommitAfterRemoveNode(t *testing.T) {
{Type: pb.EntryConfChange, Data: ccData},
},
})
// Node 1 acknowledges the config change to its local raft.
r.Step(pb.Message{Type: pb.MsgAppResp, From: 1, Index: r.raftLog.lastIndex()})

// Stabilize the log and make sure nothing is committed yet.
if ents := nextEnts(r, s); len(ents) > 0 {
t.Fatalf("unexpected committed entries: %v", ents)
Expand All @@ -3401,6 +3407,8 @@ func TestCommitAfterRemoveNode(t *testing.T) {
{Type: pb.EntryNormal, Data: []byte("hello")},
},
})
// Node 1 acknowledges the proposal to its local raft.
r.Step(pb.Message{Type: pb.MsgAppResp, From: 1, Index: r.raftLog.lastIndex()})

// Node 2 acknowledges the config change, committing it.
r.Step(pb.Message{
Expand Down Expand Up @@ -4714,6 +4722,12 @@ func (nw *network) send(msgs ...pb.Message) {
m := msgs[0]
p := nw.peers[m.To]
p.Step(m)
if sm, ok := p.(*raft); ok {
// The leader acknowledges the proposal to its local raft.
if sm.isLeader() && m.Type == pb.MsgProp && len(m.Entries) > 0 && m.Entries[0].Data != nil {
p.Step(pb.Message{From: sm.id, To: sm.id, Type: pb.MsgAppResp, Index: sm.raftLog.lastIndex()})
}
}
msgs = append(msgs[1:], nw.filter(p.readMessages())...)
}
}
Expand Down
9 changes: 9 additions & 0 deletions raft/rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
// propose-conf-change 2 v1=true
// v5
err = env.handleProposeConfChange(t, d)
case "report-status":
// The leader acknowledges that the Entries has already been
// successfully persisted.
//
// Example:
//
// report-status 1
// 4
err = env.handleLeaderReportStatus(t, d)
default:
err = fmt.Errorf("unknown command")
}
Expand Down
37 changes: 37 additions & 0 deletions raft/rafttest/interaction_env_handler_leader_report_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafttest

import (
"strconv"
"testing"

"github.com/cockroachdb/datadriven"
pb "go.etcd.io/etcd/raft/v3/raftpb"
)

func (env *InteractionEnv) handleLeaderReportStatus(t *testing.T, d datadriven.TestData) error {
idx := firstAsNodeIdx(t, d)
logIndex, err := strconv.Atoi(d.Input)
if err != nil {
t.Fatalf("Invalid input for the report-status command: %s, error: %v", d.Input, err)
}
return env.ReportStatus(idx, logIndex)
}

func (env *InteractionEnv) ReportStatus(idx int, index int) error {
rn := env.Nodes[idx].RawNode
return rn.Step(pb.Message{From: rn.Status().ID, To: rn.Status().ID, Type: pb.MsgAppResp, Index: uint64(index)})
}
12 changes: 10 additions & 2 deletions raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
},
}

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
for i, tc := range testCases {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
s := newTestMemoryStorage(withPeers(1))
rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
if err != nil {
Expand Down Expand Up @@ -280,6 +280,9 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
}
rawNode.ProposeConfChange(ccv2)
}
if err = rawNode.Step(pb.Message{From: rawNode.raft.id, To: rawNode.raft.id, Type: pb.MsgAppResp, Index: rawNode.raft.raftLog.lastIndex()}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
proposed = true
}
}
Expand Down Expand Up @@ -433,6 +436,8 @@ func TestRawNodeJointAutoLeave(t *testing.T) {
t.Fatal(err)
}
rawNode.ProposeConfChange(testCc)
r := rawNode.raft
rawNode.Step(pb.Message{From: r.id, To: r.id, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
proposed = true
}
}
Expand Down Expand Up @@ -743,6 +748,7 @@ func TestRawNodeStart(t *testing.T) {
}
rawNode.Campaign()
rawNode.Propose([]byte("foo"))
rawNode.Step(pb.Message{From: rawNode.raft.id, To: rawNode.raft.id, Type: pb.MsgAppResp, Index: rawNode.raft.raftLog.lastIndex()})
if !rawNode.HasReady() {
t.Fatal("expected a Ready")
}
Expand Down Expand Up @@ -981,6 +987,8 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
// log to grow indefinitely.
for i := 0; i < 1024; i++ {
rawNode.Propose(data)
id := rawNode.raft.id
rawNode.Step(pb.Message{From: id, To: id, Type: pb.MsgAppResp, Index: rawNode.raft.raftLog.lastIndex()})
}

// Check the size of leader's uncommitted log tail. It should not exceed the
Expand Down
6 changes: 6 additions & 0 deletions raft/testdata/campaign_learner_must_vote.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ v3
----
ok

# The leader acknowledges that the entries has already been persisted.
report-status 1
4
----
ok

# Commit and fully apply said conf change. n1 and n2 now consider n3 a voter.
stabilize 1 2
----
Expand Down
6 changes: 6 additions & 0 deletions raft/testdata/confchange_v1_add_single.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ v2
----
ok

# The leader acknowledges that the entries has already been persisted.
report-status 1
4
----
ok

# Pull n2 out of thin air.
add-nodes 1
----
Expand Down
6 changes: 6 additions & 0 deletions raft/testdata/confchange_v1_remove_leader.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ r1
----
ok

# The leader acknowledges that the entries has already been persisted.
report-status 1
5
----
ok

raft-state
----
1: StateLeader (Voter)
Expand Down
20 changes: 20 additions & 0 deletions raft/testdata/confchange_v2_add_double_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ v2 v3
----
ok

# The leader acknowledges that the entries has already been persisted.
report-status 1
4
----
ok

# Add two "empty" nodes to the cluster, n2 and n3.
add-nodes 2
----
Expand Down Expand Up @@ -204,6 +210,12 @@ r2 r3
----
ok

# The leader acknowledges that the entries has already been persisted.
report-status 1
5
----
ok

# n1 sends out MsgApps.
stabilize 1
----
Expand Down Expand Up @@ -246,6 +258,12 @@ propose 1 bar
----
ok

# The leader acknowledges that the entries has already been persisted.
report-status 1
7
----
ok

# n1 switches to the joint config, then initiates a transition into the final
# config.
stabilize 1
Expand All @@ -260,9 +278,11 @@ stabilize 1
1->3 MsgApp Term:1 Log:1/6 Commit:5 Entries:[1/7 EntryNormal "foo"]
1->2 MsgApp Term:1 Log:1/7 Commit:5 Entries:[1/8 EntryNormal "bar"]
1->3 MsgApp Term:1 Log:1/7 Commit:5 Entries:[1/8 EntryNormal "bar"]
1->1 MsgApp Term:1 Log:1/7 Commit:5 Entries:[1/8 EntryNormal "bar"]
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/6
3->1 MsgAppResp Term:1 Log:0/6
1->1 MsgApp Term:1 Log:1/7 Commit:5 Entries:[1/8 EntryNormal "bar"]
> 1 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:6
Expand Down
Loading