From 09788c95d1ef88124be7eb7de47833c932ec106d Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 9 Aug 2019 23:27:27 +0200 Subject: [PATCH] [wip] raft/rafttest: introduce datadriven testing It has often been tedious to test the interactions between multi-member Raft groups, especially when many steps were required to reach a certain scenario. Often, this boilerplate was as boring as it is hard to write and hard to maintain, making it attractive to resort to shortcuts whenever possible, which in turn tended to undercut how meaningful and maintainable the tests ended up being - that is, if the tests were even written, which sometimes they weren't. This change introduces a datadriven framework specifically for testing deterministically the interaction between multiple members of a raft group with the goal of reducing the friction for writing these tests to near zero. In the near term, this will be used to add thorough testing for joint consensus (which is already available today, but wildly undertested), but just converting an existing test into this framework has shown that the concise representation and built-in inspection of log messages highlights unexpected behavior much more readily than the previous unit tests did (the test in question is `snapshot_succeed_via_app_resp`; the reader is invited to compare the old and new version of it). The main building block is `InteractionEnv`, which holds on to the state of the whole system and exposes various relevant methods for manipulating it, including but not limited to adding nodes, delivering and dropping messages, and proposing configuration changes. All of this is extensible so that in the future I hope to use it to explore the phenomena discussed in https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263 which requires injecting appropriate "crash points" in the Ready handling loop. Discussions of the "what if X happened in state Y" can quickly be made concrete by "scripting up an interaction test". Additionally, this framework is intentionally not kept internal to the raft package.. Though this is in its infancy, a goal is that it should be possible for a suite of interaction tests to allow applications to validate that their Storage implementation behaves accordingly, simply by running a raft-provided interaction suite against their Storage. --- raft/interaction_test.go | 17 +++ raft/raft.go | 13 +- raft/raft_snap_test.go | 120 --------------- raft/raftpb/confchange.go | 65 ++++++++ raft/rafttest/interaction_env.go | 94 ++++++++++++ raft/rafttest/interaction_env_handler.go | 100 +++++++++++++ .../interaction_env_handler_add_nodes.go | 116 ++++++++++++++ .../interaction_env_handler_campaign.go | 30 ++++ .../interaction_env_handler_compact.go | 38 +++++ .../interaction_env_handler_deliver_msgs.go | 57 +++++++ .../interaction_env_handler_handle_ready.go | 92 ++++++++++++ .../interaction_env_handler_log_level.go | 37 +++++ ...raction_env_handler_propose_conf_change.go | 79 ++++++++++ .../interaction_env_handler_raft_log.go | 59 ++++++++ .../interaction_env_handler_stabilize.go | 76 ++++++++++ .../interaction_env_handler_status.go | 39 +++++ .../interaction_env_handler_tick_heartbeat.go | 33 ++++ raft/rafttest/interaction_env_logger.go | 96 ++++++++++++ raft/rafttest/network.go | 8 +- raft/testdata/campaign.txt | 117 +++++++++++++++ raft/testdata/confchange_v1.txt | 78 ++++++++++ .../snapshot_succeed_via_app_resp.txt | 141 ++++++++++++++++++ raft/tracker/progress.go | 2 + raft/util.go | 100 ++++++++++++- 24 files changed, 1477 insertions(+), 130 deletions(-) create mode 100644 raft/interaction_test.go create mode 100644 raft/rafttest/interaction_env.go create mode 100644 raft/rafttest/interaction_env_handler.go create mode 100644 raft/rafttest/interaction_env_handler_add_nodes.go create mode 100644 raft/rafttest/interaction_env_handler_campaign.go create mode 100644 raft/rafttest/interaction_env_handler_compact.go create mode 100644 raft/rafttest/interaction_env_handler_deliver_msgs.go create mode 100644 raft/rafttest/interaction_env_handler_handle_ready.go create mode 100644 raft/rafttest/interaction_env_handler_log_level.go create mode 100644 raft/rafttest/interaction_env_handler_propose_conf_change.go create mode 100644 raft/rafttest/interaction_env_handler_raft_log.go create mode 100644 raft/rafttest/interaction_env_handler_stabilize.go create mode 100644 raft/rafttest/interaction_env_handler_status.go create mode 100644 raft/rafttest/interaction_env_handler_tick_heartbeat.go create mode 100644 raft/rafttest/interaction_env_logger.go create mode 100644 raft/testdata/campaign.txt create mode 100644 raft/testdata/confchange_v1.txt create mode 100644 raft/testdata/snapshot_succeed_via_app_resp.txt diff --git a/raft/interaction_test.go b/raft/interaction_test.go new file mode 100644 index 000000000000..80713dc294c2 --- /dev/null +++ b/raft/interaction_test.go @@ -0,0 +1,17 @@ +package raft_test + +import ( + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/rafttest" +) + +func TestInteraction(t *testing.T) { + datadriven.Walk(t, "testdata", func(t *testing.T, path string) { + env := rafttest.NewInteractionEnv(nil) + datadriven.RunTest(t, path, func(d *datadriven.TestData) string { + return env.Handle(t, *d) + }) + }) +} diff --git a/raft/raft.go b/raft/raft.go index 62e79642c3c4..83d783eb3387 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "math/rand" + "sort" "strings" "sync" "time" @@ -529,7 +530,6 @@ func (r *raft) bcastAppend() { if id == r.id { return } - r.sendAppend(id) }) } @@ -795,7 +795,16 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs.Voters.IDs() { + var ids []uint64 + { + idMap := r.prs.Voters.IDs() + ids = make([]uint64, 0, len(idMap)) + for id := range idMap { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + } + for _, id := range ids { if id == r.id { continue } diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index d49c8837e9b6..070f3a2b9c02 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -18,7 +18,6 @@ import ( "testing" pb "go.etcd.io/etcd/raft/raftpb" - "go.etcd.io/etcd/raft/tracker" ) var ( @@ -112,125 +111,6 @@ func TestSnapshotSucceed(t *testing.T) { } } -// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap- -// shot is sent to a follower at the most recent index (i.e. the snapshot index -// is the leader's last index is the committed index). In that situation, a bug -// in the past left the follower in probing status until the next log entry was -// committed. -func TestSnapshotSucceedViaAppResp(t *testing.T) { - s1 := NewMemoryStorage() - // Create a single-node leader. - n1 := newTestRaft(1, []uint64{1}, 10, 1, s1) - n1.becomeCandidate() - n1.becomeLeader() - // We need to add a second empty entry so that we can truncate the first - // one away. - n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - - rd := newReady(n1, &SoftState{}, pb.HardState{}) - s1.Append(rd.Entries) - s1.SetHardState(rd.HardState) - - if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp { - t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1) - } - - // Force a log truncation. - if err := s1.Compact(1); err != nil { - t.Fatal(err) - } - - // Add a follower to the group. Do this in a clandestine way for simplicity. - // Also set up a snapshot that will be sent to the follower. - n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) - s1.snapshot = pb.Snapshot{ - Metadata: pb.SnapshotMetadata{ - ConfState: pb.ConfState{Voters: []uint64{1, 2}}, - Index: s1.lastIndex(), - Term: s1.ents[len(s1.ents)-1].Term, - }, - } - - noMessage := pb.MessageType(-1) - mustSend := func(from, to *raft, typ pb.MessageType) pb.Message { - t.Helper() - for i, msg := range from.msgs { - if msg.From != from.id || msg.To != to.id || msg.Type != typ { - continue - } - t.Log(DescribeMessage(msg, func([]byte) string { return "" })) - if len(msg.Entries) > 0 { - t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) })) - } - if err := to.Step(msg); err != nil { - t.Fatalf("%v: %s", msg, err) - } - from.msgs = append(from.msgs[:i], from.msgs[i+1:]...) - return msg - } - if typ == noMessage { - if len(from.msgs) == 0 { - return pb.Message{} - } - t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs) - } - t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs) - return pb.Message{} // unreachable - } - - // Create the follower that will receive the snapshot. - s2 := NewMemoryStorage() - n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2) - - // Let the leader probe the follower. - if !n1.maybeSendAppend(2, true /* sendIfEmpty */) { - t.Fatalf("expected message to be sent") - } - if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 { - // For this test to work, the leader must not have anything to append - // to the follower right now. - t.Fatalf("unexpectedly appending entries %v", msg.Entries) - } - - // Follower rejects the append (because it doesn't have any log entries) - if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject { - t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint) - } - - const expIdx = 2 - // Leader sends snapshot due to RejectHint of zero (we set up the raft log - // to start at index 2). - if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx { - t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index) - } - - // n2 reacts to snapshot with MsgAppResp. - if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx { - t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index) - } - - // Leader sends MsgApp to communicate commit index. - if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx { - t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit) - } - - // Follower responds. - mustSend(n2, n1, pb.MsgAppResp) - - // Leader has correct state for follower. - pr := n1.prs.Progress[2] - if pr.State != tracker.StateReplicate { - t.Fatalf("unexpected state %v", pr) - } - if pr.Match != expIdx || pr.Next != expIdx+1 { - t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next) - } - - // Leader and follower are done. - mustSend(n1, n2, noMessage) - mustSend(n2, n1, noMessage) -} - func TestSnapshotAbort(t *testing.T) { storage := NewMemoryStorage() sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) diff --git a/raft/raftpb/confchange.go b/raft/raftpb/confchange.go index a91c18dc12c4..46a7a70212e4 100644 --- a/raft/raftpb/confchange.go +++ b/raft/raftpb/confchange.go @@ -16,6 +16,8 @@ package raftpb import ( "fmt" + "strconv" + "strings" "github.com/gogo/protobuf/proto" ) @@ -103,3 +105,66 @@ func (c *ConfChangeV2) LeaveJoint() bool { cpy.Context = nil return proto.Equal(&cpy, &ConfChangeV2{}) } + +// ConfChangesFromString parses a Space-delimited sequence of operations into a +// slice of ConfChangeSingle. The supported operations are: +// - vn: make n a voter, +// - ln: make n a learner, +// - rn: remove n, and +// - un: update n. +func ConfChangesFromString(s string) ([]ConfChangeSingle, error) { + var ccs []ConfChangeSingle + toks := strings.Split(strings.TrimSpace(s), " ") + if toks[0] == "" { + toks = nil + } + for _, tok := range toks { + if len(tok) < 2 { + return nil, fmt.Errorf("unknown token %s", tok) + } + var cc ConfChangeSingle + switch tok[0] { + case 'v': + cc.Type = ConfChangeAddNode + case 'l': + cc.Type = ConfChangeAddLearnerNode + case 'r': + cc.Type = ConfChangeRemoveNode + case 'u': + cc.Type = ConfChangeUpdateNode + default: + return nil, fmt.Errorf("unknown input: %s", tok) + } + id, err := strconv.ParseUint(tok[1:], 10, 64) + if err != nil { + return nil, err + } + cc.NodeID = id + ccs = append(ccs, cc) + } + return ccs, nil +} + +// ConfChangesToString is the inverse to ConfChangesFromString. +func ConfChangesToString(ccs []ConfChangeSingle) string { + var buf strings.Builder + for i, cc := range ccs { + if i > 0 { + buf.WriteByte(' ') + } + switch cc.Type { + case ConfChangeAddNode: + buf.WriteByte('v') + case ConfChangeAddLearnerNode: + buf.WriteByte('l') + case ConfChangeRemoveNode: + buf.WriteByte('r') + case ConfChangeUpdateNode: + buf.WriteByte('u') + default: + buf.WriteString("unknown") + } + fmt.Fprintf(&buf, "%d", cc.NodeID) + } + return buf.String() +} diff --git a/raft/rafttest/interaction_env.go b/raft/rafttest/interaction_env.go new file mode 100644 index 000000000000..aba94fdd58c4 --- /dev/null +++ b/raft/rafttest/interaction_env.go @@ -0,0 +1,94 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "math" + + "go.etcd.io/etcd/raft" + pb "go.etcd.io/etcd/raft/raftpb" +) + +// InteractionOpts blob +type InteractionOpts struct { + OnConfig func(*raft.Config) +} + +type State struct { + AppliedIndex uint64 + pb.ConfState + Content string +} + +type Storage interface { + raft.Storage + SetHardState(state pb.HardState) error + ApplySnapshot(pb.Snapshot) error + Compact(newFirstIndex uint64) error + Append([]pb.Entry) error +} + +type InteractionEnv struct { + Options *InteractionOpts + Nodes []*raft.RawNode + Storages []Storage + Configs []*raft.Config + Messages []pb.Message + + Histories [][]pb.Snapshot + + Output *RedirectLogger +} + +func NewInteractionEnv(opts *InteractionOpts) *InteractionEnv { + if opts == nil { + opts = &InteractionOpts{} + } + return &InteractionEnv{ + Options: opts, + Output: &RedirectLogger{}, + } +} + +func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config { + return &raft.Config{ + ID: id, + Applied: applied, + ElectionTick: 3, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: math.MaxUint64, + MaxInflightMsgs: math.MaxInt32, + } +} + +func defaultEntryFormatter(b []byte) string { + return fmt.Sprintf("%q", b) +} + +type Store struct { + Storage + SnapshotOverride func() (pb.Snapshot, error) +} + +func (s Store) Snapshot() (pb.Snapshot, error) { + if s.SnapshotOverride != nil { + return s.SnapshotOverride() + } + return s.Storage.Snapshot() +} + +var _ raft.Storage = Store{} diff --git a/raft/rafttest/interaction_env_handler.go b/raft/rafttest/interaction_env_handler.go new file mode 100644 index 000000000000..12d9219343c8 --- /dev/null +++ b/raft/rafttest/interaction_env_handler.go @@ -0,0 +1,100 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strconv" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { + env.Output.Reset() + var err error + switch d.Cmd { + case "add-nodes": + err = env.handleAddNodes(t, d) + case "campaign": + err = env.handleCampaign(t, d) + case "compact": + err = env.handleCompact(t, d) + case "deliver-msgs": + err = env.handleDeliverMsgs(t, d) + case "process-ready": + err = env.handleProcessReady(t, d) + case "log-level": + err = env.handleLogLevel(t, d) + case "raft-log": + err = env.handleRaftLog(t, d) + case "stabilize": + err = env.handleStabilize(t, d) + case "status": + err = env.handleStatus(t, d) + case "tick-heartbeat": + err = env.handleTickHeartbeat(t, d) + case "propose-conf-change": + err = env.handleProposeConfChange(t, d) + default: + err = fmt.Errorf("unknown command") + } + if err != nil { + env.Output.WriteString(err.Error()) + } + // NB: the highest log level suppresses all output, including that of the + // handlers. This comes in useful during setup which can be chatty. + // However, errors are always logged. + if env.Output.Len() == 0 { + return "ok" + } + if env.Output.Lvl == len(lvlNames)-1 { + if err != nil { + return err.Error() + } + return "ok (quiet)" + } + return env.Output.String() +} + +func firstAsInt(t *testing.T, d datadriven.TestData) int { + t.Helper() + n, err := strconv.Atoi(d.CmdArgs[0].Key) + if err != nil { + t.Fatal(err) + } + return n +} + +func firstAsNodeIdx(t *testing.T, d datadriven.TestData) int { + t.Helper() + n := firstAsInt(t, d) + return n - 1 +} + +func ints(t *testing.T, d datadriven.TestData) []int { + var ints []int + for i := 0; i < len(d.CmdArgs); i++ { + if len(d.CmdArgs[i].Vals) != 0 { + continue + } + n, err := strconv.Atoi(d.CmdArgs[i].Key) + if err != nil { + t.Fatal(err) + } + ints = append(ints, n) + } + return ints +} diff --git a/raft/rafttest/interaction_env_handler_add_nodes.go b/raft/rafttest/interaction_env_handler_add_nodes.go new file mode 100644 index 000000000000..03b304a75c1a --- /dev/null +++ b/raft/rafttest/interaction_env_handler_add_nodes.go @@ -0,0 +1,116 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "errors" + "fmt" + "reflect" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error { + n := firstAsInt(t, d) + var snap raftpb.Snapshot + for _, arg := range d.CmdArgs[1:] { + for i := range arg.Vals { + switch arg.Key { + case "voters": + var id uint64 + arg.Scan(t, i, &id) + snap.Metadata.ConfState.Voters = append(snap.Metadata.ConfState.Voters, id) + case "learners": + var id uint64 + arg.Scan(t, i, &id) + snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id) + case "index": + arg.Scan(t, i, &snap.Metadata.Index) + case "content": + arg.Scan(t, i, &snap.Data) + } + } + } + return env.AddNodes(n, snap) +} + +func (env *InteractionEnv) AddNodes(n int, snap raftpb.Snapshot) error { + bootstrap := !reflect.DeepEqual(snap, raftpb.Snapshot{}) + for i := 0; i < n; i++ { + id := uint64(1 + len(env.Nodes)) + s := Store{ + Storage: raft.NewMemoryStorage(), + // When you ask for a snapshot, you get the most recent snapshot. + // + // TODO(tbg): this is sort of clunky, but MemoryStorage itself will + // give you some fixed snapshot and also the snapshot changes + // whenever you compact the logs and vice versa, so it's all a bit + // awkward to use. + SnapshotOverride: func() (raftpb.Snapshot, error) { + snaps := env.Histories[int(id-1)] + return snaps[len(snaps)-1], nil + }, + } + if bootstrap { + // NB: we could make this work with 1, but MemoryStorage just + // doesn't play well with that and it's not a loss of generality. + if snap.Metadata.Index <= 1 { + return errors.New("index must be specified as > 1 due to bootstrap") + } + snap.Metadata.Term = 1 + if err := s.ApplySnapshot(snap); err != nil { + return err + } + fi, err := s.FirstIndex() + if err != nil { + return err + } + // At the time of writing and for *MemoryStorage, applying a + // snapshot also truncates appropriately, but this would change with + // other storage engines potentially. + if exp := snap.Metadata.Index + 1; fi != exp { + return fmt.Errorf("failed to establish first index %d; got %d", exp, fi) + } + } + cfg := defaultRaftConfig(id, snap.Metadata.Index, s) + if env.Options.OnConfig != nil { + env.Options.OnConfig(cfg) + if cfg.ID != id { + // This could be supported but then we need to do more work + // translating back and forth -- not worth it. + return errors.New("OnConfig must not change the ID") + } + } + if cfg.Logger != nil { + return errors.New("OnConfig must not set Logger") + } + cfg.Logger = env.Output + + rn, err := raft.NewRawNode(cfg) + if err != nil { + return err + } + env.Configs = append(env.Configs, cfg) + env.Nodes = append(env.Nodes, rn) + // TODO(tbg): allow a more general Storage, as long as it also allows + // us to apply snapshots, append entries, and update the HardState. + env.Storages = append(env.Storages, s) + env.Histories = append(env.Histories, []raftpb.Snapshot{snap}) + } + return nil +} diff --git a/raft/rafttest/interaction_env_handler_campaign.go b/raft/rafttest/interaction_env_handler_campaign.go new file mode 100644 index 000000000000..02c4f1a5fb4a --- /dev/null +++ b/raft/rafttest/interaction_env_handler_campaign.go @@ -0,0 +1,30 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleCampaign(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.Campaign(t, idx) +} + +func (env *InteractionEnv) Campaign(t *testing.T, idx int) error { + return env.Nodes[idx].Campaign() +} diff --git a/raft/rafttest/interaction_env_handler_compact.go b/raft/rafttest/interaction_env_handler_compact.go new file mode 100644 index 000000000000..76cf61a8c2a8 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_compact.go @@ -0,0 +1,38 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "strconv" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleCompact(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + newFirstIndex, err := strconv.ParseUint(d.CmdArgs[1].Key, 10, 64) + if err != nil { + return err + } + return env.Compact(idx, newFirstIndex) +} + +func (env *InteractionEnv) Compact(idx int, newFirstIndex uint64) error { + if err := env.Storages[idx].Compact(newFirstIndex); err != nil { + return err + } + return env.RaftLog(idx) +} diff --git a/raft/rafttest/interaction_env_handler_deliver_msgs.go b/raft/rafttest/interaction_env_handler_deliver_msgs.go new file mode 100644 index 000000000000..5ccfbce96a83 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_deliver_msgs.go @@ -0,0 +1,57 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "errors" + "fmt" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData) error { + if len(env.Messages) == 0 { + return errors.New("no messages to deliver") + } + + msgs := env.Messages + env.Messages = nil + + return env.Deliver(msgs) +} + +func (env *InteractionEnv) Deliver(msgs []raftpb.Message) error { + for _, msg := range msgs { + toIdx := int(msg.To - 1) + var drop bool + if toIdx >= len(env.Nodes) { + // Drop messages for peers that don't exist yet. + drop = true + env.Output.WriteString("dropped: ") + } + fmt.Fprintln(env.Output, raft.DescribeMessage(msg, defaultEntryFormatter)) + if drop { + continue + } + if err := env.Nodes[toIdx].Step(msg); err != nil { + env.Output.WriteString(err.Error()) + continue + } + } + return nil +} diff --git a/raft/rafttest/interaction_env_handler_handle_ready.go b/raft/rafttest/interaction_env_handler_handle_ready.go new file mode 100644 index 000000000000..6c67732f26d0 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_handle_ready.go @@ -0,0 +1,92 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/quorum" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleProcessReady(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.ProcessReady(idx) +} + +func (env *InteractionEnv) ProcessReady(idx int) error { + // TODO(tbg): Allow simulating crashes here. + rn, s := env.Nodes[idx], env.Storages[idx] + rd := rn.Ready() + if !raft.IsEmptyHardState(rd.HardState) { + if err := s.SetHardState(rd.HardState); err != nil { + return err + } + } + if err := s.Append(rd.Entries); err != nil { + return err + } + if !raft.IsEmptySnap(rd.Snapshot) { + if err := s.ApplySnapshot(rd.Snapshot); err != nil { + return err + } + } + for _, ent := range rd.CommittedEntries { + var update []byte + switch ent.Type { + case raftpb.EntryConfChange: + var cc raftpb.ConfChange + if err := cc.Unmarshal(ent.Data); err != nil { + return err + } + update = cc.Context + rn.ApplyConfChange(cc) + case raftpb.EntryConfChangeV2: + var cc raftpb.ConfChangeV2 + if err := cc.Unmarshal(ent.Data); err != nil { + return err + } + rn.ApplyConfChange(cc) + update = cc.Context + default: + update = ent.Data + } + lastSnap := env.Histories[idx][len(env.Histories[idx])-1] + + // Record the current state. + var snap raftpb.Snapshot + snap.Data = append(snap.Data, lastSnap.Data...) + // NB: this hard-codes an "appender" state machine. + snap.Data = append(snap.Data, update...) + snap.Metadata.Index = ent.Index + snap.Metadata.Term = ent.Term + cfg := rn.Status().Config + snap.Metadata.ConfState = raftpb.ConfState{ + Voters: cfg.Voters[0].Slice(), + VotersOutgoing: cfg.Voters[1].Slice(), + Learners: quorum.MajorityConfig(cfg.Learners).Slice(), + LearnersNext: quorum.MajorityConfig(cfg.LearnersNext).Slice(), + } + env.Histories[idx] = append(env.Histories[idx], snap) + } + for _, msg := range rd.Messages { + env.Messages = append(env.Messages, msg) + } + rn.Advance(rd) + env.Output.WriteString(raft.DescribeReady(rd, defaultEntryFormatter)) + return nil +} diff --git a/raft/rafttest/interaction_env_handler_log_level.go b/raft/rafttest/interaction_env_handler_log_level.go new file mode 100644 index 000000000000..a61ba37cd891 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_log_level.go @@ -0,0 +1,37 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleLogLevel(t *testing.T, d datadriven.TestData) error { + return env.LogLevel(d.CmdArgs[0].Key) +} + +func (env *InteractionEnv) LogLevel(name string) error { + for i, s := range lvlNames { + if strings.ToLower(s) == strings.ToLower(name) { + env.Output.Lvl = i + return nil + } + } + return fmt.Errorf("log levels must be either of %v", lvlNames) +} diff --git a/raft/rafttest/interaction_env_handler_propose_conf_change.go b/raft/rafttest/interaction_env_handler_propose_conf_change.go new file mode 100644 index 000000000000..54a6b4593612 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_propose_conf_change.go @@ -0,0 +1,79 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strconv" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleProposeConfChange(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + var v1 bool + transition := raftpb.ConfChangeTransitionAuto + for _, arg := range d.CmdArgs[1:] { + for _, val := range arg.Vals { + switch arg.Key { + case "v1": + var err error + v1, err = strconv.ParseBool(val) + if err != nil { + return err + } + case "transition": + switch val { + case "implicit": + transition = raftpb.ConfChangeTransitionJointImplicit + case "explicit": + transition = raftpb.ConfChangeTransitionJointExplicit + default: + return fmt.Errorf("unknown transition %s", val) + } + default: + return fmt.Errorf("unknown command %s", arg.Key) + } + } + } + + ccs, err := raftpb.ConfChangesFromString(d.Input) + if err != nil { + return err + } + + var c raftpb.ConfChangeI + if v1 { + if len(ccs) > 1 || transition != raftpb.ConfChangeTransitionAuto { + return fmt.Errorf("v1 conf change can only have one operation and no transition") + } + c = raftpb.ConfChange{ + Type: ccs[0].Type, + NodeID: ccs[0].NodeID, + } + } else { + c = raftpb.ConfChangeV2{ + Transition: transition, + Changes: ccs, + } + } + return env.ProposeConfChange(idx, c) +} + +func (env *InteractionEnv) ProposeConfChange(idx int, c raftpb.ConfChangeI) error { + return env.Nodes[idx].ProposeConfChange(c) +} diff --git a/raft/rafttest/interaction_env_handler_raft_log.go b/raft/rafttest/interaction_env_handler_raft_log.go new file mode 100644 index 000000000000..4ab999f8748c --- /dev/null +++ b/raft/rafttest/interaction_env_handler_raft_log.go @@ -0,0 +1,59 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "math" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" +) + +func (env *InteractionEnv) writeErr(err error) { + if err != nil { + env.Output.WriteString(err.Error()) + } +} + +func (env *InteractionEnv) handleRaftLog(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.RaftLog(idx) +} + +func (env *InteractionEnv) RaftLog(idx int) error { + s := env.Storages[idx] + fi, err := s.FirstIndex() + if err != nil { + return err + } + li, err := s.LastIndex() + if err != nil { + return err + } + if li < fi { + // TODO(tbg): this is what MemoryStorage returns, but unclear if it's + // the "correct" thing to do. + fmt.Fprintf(env.Output, "log is empty: first index=%d, last index=%d", fi, li) + return nil + } + ents, err := s.Entries(fi, li+1, math.MaxUint64) + if err != nil { + return err + } + env.Output.WriteString(raft.DescribeEntries(ents, defaultEntryFormatter)) + return err +} diff --git a/raft/rafttest/interaction_env_handler_stabilize.go b/raft/rafttest/interaction_env_handler_stabilize.go new file mode 100644 index 000000000000..d3e56e6eda12 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_stabilize.go @@ -0,0 +1,76 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" +) + +func (env *InteractionEnv) handleStabilize(t *testing.T, d datadriven.TestData) error { + var idxs []int + for _, id := range ints(t, d) { + idxs = append(idxs, id-1) + } + return env.Stabilize(idxs...) +} + +func (env *InteractionEnv) Stabilize(idxs ...int) error { + var nodes []*raft.RawNode + for _, idx := range idxs { + nodes = append(nodes, env.Nodes[idx]) + } + if len(nodes) == 0 { + nodes = env.Nodes + } + for { + done := true + for _, rn := range nodes { + if rn.HasReady() { + done = false + idx := int(rn.Status().ID - 1) + fmt.Fprintf(env.Output, "> %d handling Ready\n", idx+1) + env.ProcessReady(idx) + } + } + var msgs []raftpb.Message + for _, rn := range nodes { + msgs, env.Messages = splitMsgs(env.Messages, rn.Status().ID) + if len(msgs) > 0 { + fmt.Fprintf(env.Output, "> delivering messages\n") + env.Deliver(msgs) + done = false + } + if done { + return nil + } + } + } +} + +func splitMsgs(msgs []raftpb.Message, to uint64) (toMsgs []raftpb.Message, rmdr []raftpb.Message) { + for _, msg := range msgs { + if msg.To == to { + toMsgs = append(toMsgs, msg) + } else { + rmdr = append(rmdr, msg) + } + } + return toMsgs, rmdr +} diff --git a/raft/rafttest/interaction_env_handler_status.go b/raft/rafttest/interaction_env_handler_status.go new file mode 100644 index 000000000000..ff15d07d2f9c --- /dev/null +++ b/raft/rafttest/interaction_env_handler_status.go @@ -0,0 +1,39 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/datadriven" + "go.etcd.io/etcd/raft/tracker" +) + +func (env *InteractionEnv) handleStatus(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.Status(idx) +} + +func (env *InteractionEnv) Status(idx int) error { + st := env.Nodes[idx].Status() + m := tracker.ProgressMap{} + for id, pr := range st.Progress { + pr := pr // loop-local copy + m[id] = &pr + } + fmt.Fprint(env.Output, m) + return nil +} diff --git a/raft/rafttest/interaction_env_handler_tick_heartbeat.go b/raft/rafttest/interaction_env_handler_tick_heartbeat.go new file mode 100644 index 000000000000..716410249100 --- /dev/null +++ b/raft/rafttest/interaction_env_handler_tick_heartbeat.go @@ -0,0 +1,33 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "testing" + + "github.com/cockroachdb/datadriven" +) + +func (env *InteractionEnv) handleTickHeartbeat(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + return env.Tick(idx, env.Configs[idx].HeartbeatTick) +} + +func (env *InteractionEnv) Tick(idx int, num int) error { + for i := 0; i < num; i++ { + env.Nodes[idx].Tick() + } + return nil +} diff --git a/raft/rafttest/interaction_env_logger.go b/raft/rafttest/interaction_env_logger.go new file mode 100644 index 000000000000..551e61a4a27d --- /dev/null +++ b/raft/rafttest/interaction_env_logger.go @@ -0,0 +1,96 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafttest + +import ( + "fmt" + "strings" +) + +type RedirectLogger struct { + strings.Builder + Lvl int // 0 = DEBUG, 1 = INFO, 2 = WARNING, 3 = ERROR, 4 = FATAL, 5 = NONE +} + +var lvlNames = [...]string{"DEBUG", "INFO", "WARN", "ERROR", "FATAL", "NONE"} + +func (RedirectLogger) levStr(i int) string { + return lvlNames[i] +} + +func (l *RedirectLogger) printf(lvl int, format string, args ...interface{}) { + if l.Lvl <= lvl { + fmt.Fprint(l, l.levStr(lvl), " ") + fmt.Fprintf(l, format, args...) + if n := len(format); n > 0 && format[n-1] != '\n' { + l.WriteByte('\n') + } + } +} +func (l *RedirectLogger) print(lvl int, args ...interface{}) { + if l.Lvl <= lvl { + fmt.Fprint(l, l.levStr(lvl), " ") + fmt.Fprintln(l, args...) + } +} + +func (l *RedirectLogger) Debug(v ...interface{}) { + l.print(0, v...) +} + +func (l *RedirectLogger) Debugf(format string, v ...interface{}) { + l.printf(0, format, v...) +} + +func (l *RedirectLogger) Info(v ...interface{}) { + l.print(1, v...) +} + +func (l *RedirectLogger) Infof(format string, v ...interface{}) { + l.printf(1, format, v...) +} + +func (l *RedirectLogger) Warning(v ...interface{}) { + l.print(2, v...) +} + +func (l *RedirectLogger) Warningf(format string, v ...interface{}) { + l.printf(2, format, v...) +} + +func (l *RedirectLogger) Error(v ...interface{}) { + l.print(3, v...) +} + +func (l *RedirectLogger) Errorf(format string, v ...interface{}) { + l.printf(3, format, v...) +} + +func (l *RedirectLogger) Fatal(v ...interface{}) { + l.print(4, v...) +} + +func (l *RedirectLogger) Fatalf(format string, v ...interface{}) { + + l.printf(4, format, v...) +} + +func (l *RedirectLogger) Panic(v ...interface{}) { + l.print(4, v...) +} + +func (l *RedirectLogger) Panicf(format string, v ...interface{}) { + l.printf(4, format, v...) +} diff --git a/raft/rafttest/network.go b/raft/rafttest/network.go index ee30fc0c426c..5e7cdf5cedd7 100644 --- a/raft/rafttest/network.go +++ b/raft/rafttest/network.go @@ -44,6 +44,7 @@ type network interface { } type raftNetwork struct { + rand *rand.Rand mu sync.Mutex disconnected map[uint64]bool dropmap map[conn]float64 @@ -62,6 +63,7 @@ type delay struct { func newRaftNetwork(nodes ...uint64) *raftNetwork { pn := &raftNetwork{ + rand: rand.New(rand.NewSource(1)), recvQueues: make(map[uint64]chan raftpb.Message), dropmap: make(map[conn]float64), delaymap: make(map[conn]delay), @@ -91,12 +93,12 @@ func (rn *raftNetwork) send(m raftpb.Message) { if to == nil { return } - if drop != 0 && rand.Float64() < drop { + if drop != 0 && rn.rand.Float64() < drop { return } // TODO: shall we dl without blocking the send call? - if dl.d != 0 && rand.Float64() < dl.rate { - rd := rand.Int63n(int64(dl.d)) + if dl.d != 0 && rn.rand.Float64() < dl.rate { + rd := rn.rand.Int63n(int64(dl.d)) time.Sleep(time.Duration(rd)) } diff --git a/raft/testdata/campaign.txt b/raft/testdata/campaign.txt new file mode 100644 index 000000000000..754bc0a82c36 --- /dev/null +++ b/raft/testdata/campaign.txt @@ -0,0 +1,117 @@ +log-level info +---- +ok + +add-nodes 3 voters=(1,2,3) index=2 +---- +INFO 1 switched to configuration voters=(1 2 3) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] +INFO 2 switched to configuration voters=(1 2 3) +INFO 2 became follower at term 0 +INFO newRaft 2 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 received MsgVoteResp from 1 at term 1 +INFO 1 [logterm: 1, index: 2] sent MsgVote request to 2 at term 1 +INFO 1 [logterm: 1, index: 2] sent MsgVote request to 3 at term 1 + +stabilize +---- +> 1 handling Ready +Ready MustSync=true: +Lead:0 State:StateCandidate +HardState Term:1 Vote:1 Commit:2 +Messages: +1->2 MsgVote Term:1 Log:1/2 +1->3 MsgVote Term:1 Log:1/2 +> delivering messages +1->2 MsgVote Term:1 Log:1/2 +INFO 2 [term: 0] received a MsgVote message with higher term from 1 [term: 1] +INFO 2 became follower at term 1 +INFO 2 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1 +> delivering messages +1->3 MsgVote Term:1 Log:1/2 +INFO 3 [term: 0] received a MsgVote message with higher term from 1 [term: 1] +INFO 3 became follower at term 1 +INFO 3 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1 +> 2 handling Ready +Ready MustSync=true: +HardState Term:1 Vote:1 Commit:2 +Messages: +2->1 MsgVoteResp Term:1 Log:0/0 +> 3 handling Ready +Ready MustSync=true: +HardState Term:1 Vote:1 Commit:2 +Messages: +3->1 MsgVoteResp Term:1 Log:0/0 +> delivering messages +2->1 MsgVoteResp Term:1 Log:0/0 +INFO 1 received MsgVoteResp from 2 at term 1 +INFO 1 has received 2 MsgVoteResp votes and 0 vote rejections +INFO 1 became leader at term 1 +3->1 MsgVoteResp Term:1 Log:0/0 +> 1 handling Ready +Ready MustSync=true: +Lead:1 State:StateLeader +Entries: +1/3 EntryNormal "" +Messages: +1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> delivering messages +1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> delivering messages +1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> 2 handling Ready +Ready MustSync=true: +Lead:1 State:StateFollower +Entries: +1/3 EntryNormal "" +Messages: +2->1 MsgAppResp Term:1 Log:0/3 +> 3 handling Ready +Ready MustSync=true: +Lead:1 State:StateFollower +Entries: +1/3 EntryNormal "" +Messages: +3->1 MsgAppResp Term:1 Log:0/3 +> delivering messages +2->1 MsgAppResp Term:1 Log:0/3 +3->1 MsgAppResp Term:1 Log:0/3 +> 1 handling Ready +Ready MustSync=false: +HardState Term:1 Vote:1 Commit:3 +CommittedEntries: +1/3 EntryNormal "" +Messages: +1->2 MsgApp Term:1 Log:1/3 Commit:3 +1->3 MsgApp Term:1 Log:1/3 Commit:3 +> delivering messages +1->2 MsgApp Term:1 Log:1/3 Commit:3 +> delivering messages +1->3 MsgApp Term:1 Log:1/3 Commit:3 +> 2 handling Ready +Ready MustSync=false: +HardState Term:1 Vote:1 Commit:3 +CommittedEntries: +1/3 EntryNormal "" +Messages: +2->1 MsgAppResp Term:1 Log:0/3 +> 3 handling Ready +Ready MustSync=false: +HardState Term:1 Vote:1 Commit:3 +CommittedEntries: +1/3 EntryNormal "" +Messages: +3->1 MsgAppResp Term:1 Log:0/3 +> delivering messages +2->1 MsgAppResp Term:1 Log:0/3 +3->1 MsgAppResp Term:1 Log:0/3 diff --git a/raft/testdata/confchange_v1.txt b/raft/testdata/confchange_v1.txt new file mode 100644 index 000000000000..e626992a6027 --- /dev/null +++ b/raft/testdata/confchange_v1.txt @@ -0,0 +1,78 @@ +add-nodes 1 voters=(1) index=2 +---- +INFO 1 switched to configuration voters=(1) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 received MsgVoteResp from 1 at term 1 +INFO 1 became leader at term 1 + +propose-conf-change 1 +v2 v3 +---- +ok + +add-nodes 2 + +process-ready 1 +---- +INFO 2 switched to configuration voters=() +INFO 2 became follower at term 0 +INFO newRaft 2 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] +INFO 3 switched to configuration voters=() +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] + +stabilize 1 +---- +> 1 handling Ready +INFO 1 switched to configuration voters=(1 2 3)&&(1) autoleave +INFO initiating automatic transition out of joint configuration voters=(1 2 3)&&(1) autoleave +Ready MustSync=true: +Lead:1 State:StateLeader +HardState Term:1 Vote:1 Commit:4 +Entries: +1/3 EntryNormal "" +1/4 EntryConfChangeV2 v2 v3 +CommittedEntries: +1/3 EntryNormal "" +1/4 EntryConfChangeV2 v2 v3 +> 1 handling Ready +Ready MustSync=true: +Entries: +1/5 EntryConfChangeV2 + +# NB: this test is broken from here on because the leader doesn't propagate the +# commit index proactively, see the buglet #11002. + +stabilize 2 +---- +ok + +stabilize 1 +---- +ok + +stabilize 2 +---- +ok + +stabilize 1 +---- +ok + +stabilize 2 +---- +ok + +stabilize 1 +---- +ok + +stabilize 2 +---- +ok diff --git a/raft/testdata/snapshot_succeed_via_app_resp.txt b/raft/testdata/snapshot_succeed_via_app_resp.txt new file mode 100644 index 000000000000..d54b4163d0bd --- /dev/null +++ b/raft/testdata/snapshot_succeed_via_app_resp.txt @@ -0,0 +1,141 @@ +# TestSnapshotSucceedViaAppResp regression tests the situation in which a snap- +# shot is sent to a follower at the most recent index (i.e. the snapshot index +# is the leader's last index is the committed index). In that situation, a bug +# in the past left the follower in probing status until the next log entry was +# committed. +# +# See https://github.com/etcd-io/etcd/pull/10308 for additional background. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with two nodes, but the config already has a third. +add-nodes 2 voters=(1,2,3) index=10 +---- +ok + +campaign 1 +---- +ok + +# Fully replicate everything, including the leader's empty index. +stabilize +---- +ok (quiet) + +compact 1 11 +---- +ok (quiet) + +# Drop inflight messages to n3. +deliver-msgs 3 +---- +ok (quiet) + +# Show the Raft log messages from now on. +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=11 next=12 inactive +2: StateReplicate match=11 next=12 +3: StateProbe match=0 next=11 paused inactive + +# Add the node that will receive a snapshot (it has no state at all, does not +# even have a config). +add-nodes 1 +---- +INFO 3 switched to configuration voters=() +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] + +# Time passes on the leader so that it will try the previously missing follower +# again. +tick-heartbeat 1 +---- +ok + +process-ready 1 +---- +Ready MustSync=false: +Messages: +1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 +1->3 MsgHeartbeat Term:1 Log:0/0 + +# Iterate until no more work is done by the new peer. It receives the heartbeat +# and responds. +stabilize 3 +---- +> delivering messages +1->3 MsgHeartbeat Term:1 Log:0/0 +INFO 3 [term: 0] received a MsgHeartbeat message with higher term from 1 [term: 1] +INFO 3 became follower at term 1 +> 3 handling Ready +Ready MustSync=true: +Lead:1 State:StateFollower +HardState Term:1 Commit:0 +Messages: +3->1 MsgHeartbeatResp Term:1 Log:0/0 + +# The leader in turn will realize that n3 needs a snapshot, which it initiates. +stabilize 1 +---- +> delivering messages +3->1 MsgHeartbeatResp Term:1 Log:0/0 +DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11] +DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=11 paused pendingSnap=11] +> 1 handling Ready +Ready MustSync=false: +Messages: +1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] + +status 1 +---- +1: StateReplicate match=11 next=12 inactive +2: StateReplicate match=11 next=12 +3: StateSnapshot match=0 next=11 paused pendingSnap=11 + +# Follower applies the snapshot. Note how it reacts with a MsgAppResp upon completion. +# The snapshot fully catches the follower up (i.e. there are no more log entries it +# needs to apply after). The bug was that the leader failed to realize that the follower +# was now fully caught up. +stabilize 3 +---- +> delivering messages +1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] +INFO log [committed=0, applied=0, unstable.offset=1, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 [commit: 11, lastindex: 11, lastterm: 1] restored snapshot [index: 11, term: 1] +INFO 3 [commit: 11] restored snapshot [index: 11, term: 1] +> 3 handling Ready +Ready MustSync=false: +HardState Term:1 Commit:11 +Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] +Messages: +3->1 MsgAppResp Term:1 Log:0/11 + +# The MsgAppResp lets the leader move the follower back to replicating state. +# Leader sends another MsgAppResp, to communicate the updated commit index. +stabilize 1 +---- +> delivering messages +3->1 MsgAppResp Term:1 Log:0/11 +DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11] +> 1 handling Ready +Ready MustSync=false: +Messages: +1->3 MsgApp Term:1 Log:1/11 Commit:11 + +status 1 +---- +1: StateReplicate match=11 next=12 inactive +2: StateReplicate match=11 next=12 +3: StateReplicate match=11 next=12 + +stabilize +---- +ok diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index 697277b26430..62c81f45af81 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -52,6 +52,8 @@ type Progress struct { // RecentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout. + // + // TODO(tbg): the leader should always have this set to true. RecentActive bool // ProbeSent is used while this follower is in StateProbe. When ProbeSent is diff --git a/raft/util.go b/raft/util.go index 8394f647cc3a..881a6e14e241 100644 --- a/raft/util.go +++ b/raft/util.go @@ -17,6 +17,7 @@ package raft import ( "bytes" "fmt" + "strings" pb "go.etcd.io/etcd/raft/raftpb" ) @@ -60,6 +61,69 @@ func voteRespMsgType(msgt pb.MessageType) pb.MessageType { } } +func DescribeHardState(hs pb.HardState) string { + var buf strings.Builder + fmt.Fprintf(&buf, "Term:%d", hs.Term) + if hs.Vote != 0 { + fmt.Fprintf(&buf, " Vote:%d", hs.Vote) + } + fmt.Fprintf(&buf, " Commit:%d", hs.Commit) + return buf.String() +} + +func DescribeSoftState(ss SoftState) string { + return fmt.Sprintf("Lead:%d State:%s", ss.Lead, ss.RaftState) +} + +func DescribeConfState(state pb.ConfState) string { + return fmt.Sprintf( + "Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v", + state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext, + ) +} + +func DescribeSnapshot(snap pb.Snapshot) string { + m := snap.Metadata + return fmt.Sprintf("Index:%d Term:%d ConfState:%s", m.Index, m.Term, DescribeConfState(m.ConfState)) +} + +func DescribeReady(rd Ready, f EntryFormatter) string { + var buf strings.Builder + if rd.SoftState != nil { + fmt.Fprint(&buf, DescribeSoftState(*rd.SoftState)) + buf.WriteByte('\n') + } + if !IsEmptyHardState(rd.HardState) { + fmt.Fprintf(&buf, "HardState %s", DescribeHardState(rd.HardState)) + buf.WriteByte('\n') + } + if len(rd.ReadStates) > 0 { + fmt.Fprintf(&buf, "ReadStates %v\n", rd.ReadStates) + } + if len(rd.Entries) > 0 { + buf.WriteString("Entries:\n") + fmt.Fprint(&buf, DescribeEntries(rd.Entries, f)) + } + if !IsEmptySnap(rd.Snapshot) { + fmt.Fprintf(&buf, "Snapshot %s\n", DescribeSnapshot(rd.Snapshot)) + } + if len(rd.CommittedEntries) > 0 { + buf.WriteString("CommittedEntries:\n") + fmt.Fprint(&buf, DescribeEntries(rd.CommittedEntries, f)) + } + if len(rd.Messages) > 0 { + buf.WriteString("Messages:\n") + for _, msg := range rd.Messages { + fmt.Fprint(&buf, DescribeMessage(msg, f)) + buf.WriteByte('\n') + } + } + if buf.Len() > 0 { + return fmt.Sprintf("Ready MustSync=%t:\n%s", rd.MustSync, buf.String()) + } + return "" +} + // EntryFormatter can be implemented by the application to provide human-readable formatting // of entry data. Nil is a valid EntryFormatter and will use a default format. type EntryFormatter func([]byte) string @@ -86,7 +150,7 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { fmt.Fprintf(&buf, "]") } if !IsEmptySnap(m.Snapshot) { - fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot) + fmt.Fprintf(&buf, " Snapshot: %s", DescribeSnapshot(m.Snapshot)) } return buf.String() } @@ -100,13 +164,39 @@ func PayloadSize(e pb.Entry) int { // DescribeEntry returns a concise human-readable description of an // Entry for debugging. func DescribeEntry(e pb.Entry, f EntryFormatter) string { + if f == nil { + f = func(data []byte) string { return fmt.Sprintf("%q", data) } + } + + formatConfChange := func(cc pb.ConfChangeI) string { + // TODO(tbg): give the EntryFormatter a type argument so that it gets + // a chance to expose the Context. + return pb.ConfChangesToString(cc.AsV2().Changes) + } + var formatted string - if e.Type == pb.EntryNormal && f != nil { + switch e.Type { + case pb.EntryNormal: formatted = f(e.Data) - } else { - formatted = fmt.Sprintf("%q", e.Data) + case pb.EntryConfChange: + var cc pb.ConfChange + if err := cc.Unmarshal(e.Data); err != nil { + formatted = err.Error() + } else { + formatted = formatConfChange(cc) + } + case pb.EntryConfChangeV2: + var cc pb.ConfChangeV2 + if err := cc.Unmarshal(e.Data); err != nil { + formatted = err.Error() + } else { + formatted = formatConfChange(cc) + } + } + if formatted != "" { + formatted = " " + formatted } - return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted) + return fmt.Sprintf("%d/%d %s%s", e.Term, e.Index, e.Type, formatted) } // DescribeEntries calls DescribeEntry for each Entry, adding a newline to