Skip to content

Commit

Permalink
[wip] raft/rafttest: introduce datadriven testing
Browse files Browse the repository at this point in the history
It has often been tedious to test the interactions between multi-member
Raft groups, especially when many steps were required to reach a certain
scenario. Often, this boilerplate was as boring as it is hard to write
and hard to maintain, making it attractive to resort to shortcuts
whenever possible, which in turn tended to undercut how meaningful and
maintainable the tests ended up being - that is, if the tests were even
written, which sometimes they weren't.

This change introduces a datadriven framework specifically for testing
deterministically the interaction between multiple members of a raft group
with the goal of reducing the friction for writing these tests to near
zero.

In the near term, this will be used to add thorough testing for joint
consensus (which is already available today, but wildly undertested),
but just converting an existing test into this framework has shown that
the concise representation and built-in inspection of log messages
highlights unexpected behavior much more readily than the previous unit
tests did (the test in question is `snapshot_succeed_via_app_resp`; the
reader is invited to compare the old and new version of it).

The main building block is `InteractionEnv`, which holds on to the state
of the whole system and exposes various relevant methods for
manipulating it, including but not limited to adding nodes, delivering
and dropping messages, and proposing configuration changes. All of this
is extensible so that in the future I hope to use it to explore the
phenomena discussed in

#7625 (comment)

which requires injecting appropriate "crash points" in the Ready
handling loop. Discussions of the "what if X happened in state Y"
can quickly be made concrete by "scripting up an interaction test".

Additionally, this framework is intentionally not kept internal to the
raft package.. Though this is in its infancy, a goal is that it should
be possible for a suite of interaction tests to allow applications to
validate that their Storage implementation behaves accordingly, simply
by running a raft-provided interaction suite against their Storage.
  • Loading branch information
tbg committed Aug 9, 2019
1 parent f57c16c commit 09788c9
Show file tree
Hide file tree
Showing 24 changed files with 1,477 additions and 130 deletions.
17 changes: 17 additions & 0 deletions raft/interaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package raft_test

import (
"testing"

"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft/rafttest"
)

func TestInteraction(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
env := rafttest.NewInteractionEnv(nil)
datadriven.RunTest(t, path, func(d *datadriven.TestData) string {
return env.Handle(t, *d)
})
})
}
13 changes: 11 additions & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -529,7 +530,6 @@ func (r *raft) bcastAppend() {
if id == r.id {
return
}

r.sendAppend(id)
})
}
Expand Down Expand Up @@ -795,7 +795,16 @@ func (r *raft) campaign(t CampaignType) {
}
return
}
for id := range r.prs.Voters.IDs() {
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
Expand Down
120 changes: 0 additions & 120 deletions raft/raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"

pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)

var (
Expand Down Expand Up @@ -112,125 +111,6 @@ func TestSnapshotSucceed(t *testing.T) {
}
}

// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap-
// shot is sent to a follower at the most recent index (i.e. the snapshot index
// is the leader's last index is the committed index). In that situation, a bug
// in the past left the follower in probing status until the next log entry was
// committed.
func TestSnapshotSucceedViaAppResp(t *testing.T) {
s1 := NewMemoryStorage()
// Create a single-node leader.
n1 := newTestRaft(1, []uint64{1}, 10, 1, s1)
n1.becomeCandidate()
n1.becomeLeader()
// We need to add a second empty entry so that we can truncate the first
// one away.
n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}})

rd := newReady(n1, &SoftState{}, pb.HardState{})
s1.Append(rd.Entries)
s1.SetHardState(rd.HardState)

if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp {
t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1)
}

// Force a log truncation.
if err := s1.Compact(1); err != nil {
t.Fatal(err)
}

// Add a follower to the group. Do this in a clandestine way for simplicity.
// Also set up a snapshot that will be sent to the follower.
n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
s1.snapshot = pb.Snapshot{
Metadata: pb.SnapshotMetadata{
ConfState: pb.ConfState{Voters: []uint64{1, 2}},
Index: s1.lastIndex(),
Term: s1.ents[len(s1.ents)-1].Term,
},
}

noMessage := pb.MessageType(-1)
mustSend := func(from, to *raft, typ pb.MessageType) pb.Message {
t.Helper()
for i, msg := range from.msgs {
if msg.From != from.id || msg.To != to.id || msg.Type != typ {
continue
}
t.Log(DescribeMessage(msg, func([]byte) string { return "" }))
if len(msg.Entries) > 0 {
t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) }))
}
if err := to.Step(msg); err != nil {
t.Fatalf("%v: %s", msg, err)
}
from.msgs = append(from.msgs[:i], from.msgs[i+1:]...)
return msg
}
if typ == noMessage {
if len(from.msgs) == 0 {
return pb.Message{}
}
t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs)
}
t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs)
return pb.Message{} // unreachable
}

// Create the follower that will receive the snapshot.
s2 := NewMemoryStorage()
n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2)

// Let the leader probe the follower.
if !n1.maybeSendAppend(2, true /* sendIfEmpty */) {
t.Fatalf("expected message to be sent")
}
if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 {
// For this test to work, the leader must not have anything to append
// to the follower right now.
t.Fatalf("unexpectedly appending entries %v", msg.Entries)
}

// Follower rejects the append (because it doesn't have any log entries)
if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject {
t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint)
}

const expIdx = 2
// Leader sends snapshot due to RejectHint of zero (we set up the raft log
// to start at index 2).
if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx {
t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index)
}

// n2 reacts to snapshot with MsgAppResp.
if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx {
t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index)
}

// Leader sends MsgApp to communicate commit index.
if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx {
t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit)
}

// Follower responds.
mustSend(n2, n1, pb.MsgAppResp)

// Leader has correct state for follower.
pr := n1.prs.Progress[2]
if pr.State != tracker.StateReplicate {
t.Fatalf("unexpected state %v", pr)
}
if pr.Match != expIdx || pr.Next != expIdx+1 {
t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next)
}

// Leader and follower are done.
mustSend(n1, n2, noMessage)
mustSend(n2, n1, noMessage)
}

func TestSnapshotAbort(t *testing.T) {
storage := NewMemoryStorage()
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
Expand Down
65 changes: 65 additions & 0 deletions raft/raftpb/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package raftpb

import (
"fmt"
"strconv"
"strings"

"github.com/gogo/protobuf/proto"
)
Expand Down Expand Up @@ -103,3 +105,66 @@ func (c *ConfChangeV2) LeaveJoint() bool {
cpy.Context = nil
return proto.Equal(&cpy, &ConfChangeV2{})
}

// ConfChangesFromString parses a Space-delimited sequence of operations into a
// slice of ConfChangeSingle. The supported operations are:
// - vn: make n a voter,
// - ln: make n a learner,
// - rn: remove n, and
// - un: update n.
func ConfChangesFromString(s string) ([]ConfChangeSingle, error) {
var ccs []ConfChangeSingle
toks := strings.Split(strings.TrimSpace(s), " ")
if toks[0] == "" {
toks = nil
}
for _, tok := range toks {
if len(tok) < 2 {
return nil, fmt.Errorf("unknown token %s", tok)
}
var cc ConfChangeSingle
switch tok[0] {
case 'v':
cc.Type = ConfChangeAddNode
case 'l':
cc.Type = ConfChangeAddLearnerNode
case 'r':
cc.Type = ConfChangeRemoveNode
case 'u':
cc.Type = ConfChangeUpdateNode
default:
return nil, fmt.Errorf("unknown input: %s", tok)
}
id, err := strconv.ParseUint(tok[1:], 10, 64)
if err != nil {
return nil, err
}
cc.NodeID = id
ccs = append(ccs, cc)
}
return ccs, nil
}

// ConfChangesToString is the inverse to ConfChangesFromString.
func ConfChangesToString(ccs []ConfChangeSingle) string {
var buf strings.Builder
for i, cc := range ccs {
if i > 0 {
buf.WriteByte(' ')
}
switch cc.Type {
case ConfChangeAddNode:
buf.WriteByte('v')
case ConfChangeAddLearnerNode:
buf.WriteByte('l')
case ConfChangeRemoveNode:
buf.WriteByte('r')
case ConfChangeUpdateNode:
buf.WriteByte('u')
default:
buf.WriteString("unknown")
}
fmt.Fprintf(&buf, "%d", cc.NodeID)
}
return buf.String()
}
94 changes: 94 additions & 0 deletions raft/rafttest/interaction_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafttest

import (
"fmt"
"math"

"go.etcd.io/etcd/raft"
pb "go.etcd.io/etcd/raft/raftpb"
)

// InteractionOpts blob
type InteractionOpts struct {
OnConfig func(*raft.Config)
}

type State struct {
AppliedIndex uint64
pb.ConfState
Content string
}

type Storage interface {
raft.Storage
SetHardState(state pb.HardState) error
ApplySnapshot(pb.Snapshot) error
Compact(newFirstIndex uint64) error
Append([]pb.Entry) error
}

type InteractionEnv struct {
Options *InteractionOpts
Nodes []*raft.RawNode
Storages []Storage
Configs []*raft.Config
Messages []pb.Message

Histories [][]pb.Snapshot

Output *RedirectLogger
}

func NewInteractionEnv(opts *InteractionOpts) *InteractionEnv {
if opts == nil {
opts = &InteractionOpts{}
}
return &InteractionEnv{
Options: opts,
Output: &RedirectLogger{},
}
}

func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config {
return &raft.Config{
ID: id,
Applied: applied,
ElectionTick: 3,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: math.MaxUint64,
MaxInflightMsgs: math.MaxInt32,
}
}

func defaultEntryFormatter(b []byte) string {
return fmt.Sprintf("%q", b)
}

type Store struct {
Storage
SnapshotOverride func() (pb.Snapshot, error)
}

func (s Store) Snapshot() (pb.Snapshot, error) {
if s.SnapshotOverride != nil {
return s.SnapshotOverride()
}
return s.Storage.Snapshot()
}

var _ raft.Storage = Store{}
Loading

0 comments on commit 09788c9

Please sign in to comment.