From fbba6095ccdda70281c4853b4b938a0287f1f9aa Mon Sep 17 00:00:00 2001 From: Simon Fell Date: Thu, 25 May 2017 12:45:55 -0700 Subject: [PATCH 1/2] Add Integ test that reproduces installSnapshot problem detailed in issue #212 --- Makefile | 2 +- integ_test.go | 126 ++++++++++++++++++++++++++++++++++++++------------ raft.go | 1 + 3 files changed, 99 insertions(+), 30 deletions(-) diff --git a/Makefile b/Makefile index 556aa2e20f8..92a0c0b44c0 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ test: go test -timeout=30s ./... integ: test - INTEG_TESTS=yes go test -timeout=3s -run=Integ ./... + INTEG_TESTS=yes go test -timeout=23s -run=Integ ./... deps: go get -d -v ./... diff --git a/integ_test.go b/integ_test.go index c4bf67a7acf..66654be428d 100644 --- a/integ_test.go +++ b/integ_test.go @@ -34,14 +34,36 @@ type RaftEnv struct { logger *log.Logger } +// Release shuts down and cleans up any stored data, its not restartable after this func (r *RaftEnv) Release() { - r.logger.Printf("[WARN] Release node at %v", r.raft.localAddr) + r.Shutdown() + os.RemoveAll(r.dir) +} + +// Shutdown shuts down raft & transport, but keeps track of its data, its restartable +// after a Shutdown() by calling Start() +func (r *RaftEnv) Shutdown() { + r.logger.Printf("[WARN] Shutdown node at %v", r.raft.localAddr) f := r.raft.Shutdown() if err := f.Error(); err != nil { panic(err) } r.trans.Close() - os.RemoveAll(r.dir) +} + +// Restart will start a raft node that was previously Shutdown() +func (r *RaftEnv) Restart(t *testing.T) { + trans, err := NewTCPTransport(r.raft.localAddr, nil, 2, time.Second, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + r.trans = trans + r.logger.Printf("[INFO] Starting node at %v", trans.LocalAddr()) + raft, err := NewRaft(r.conf, r.fsm, r.store, r.store, r.snapshot, r.peers, r.trans) + if err != nil { + t.Fatalf("err: %v", err) + } + r.raft = raft } func MakeRaft(t *testing.T, conf *Config) *RaftEnv { @@ -68,18 +90,19 @@ func MakeRaft(t *testing.T, conf *Config) *RaftEnv { store: stable, snapshot: snap, fsm: &MockFSM{}, - logger: log.New(&testLoggerAdapter{t: t}, "", log.Lmicroseconds), } trans, err := NewTCPTransport("127.0.0.1:0", nil, 2, time.Second, nil) if err != nil { t.Fatalf("err: %v", err) } + env.logger = log.New(os.Stdout, trans.LocalAddr()+" :", log.Lmicroseconds) env.trans = trans env.peers = NewJSONPeers(dir, trans) env.logger.Printf("[INFO] Starting node at %v", trans.LocalAddr()) + conf.Logger = env.logger raft, err := NewRaft(conf, env.fsm, stable, stable, snap, env.peers, trans) if err != nil { t.Fatalf("err: %v", err) @@ -134,33 +157,52 @@ func NoErr(err error, t *testing.T) { func CheckConsistent(envs []*RaftEnv, t *testing.T) { limit := time.Now().Add(400 * time.Millisecond) first := envs[0] + first.fsm.Lock() + defer first.fsm.Unlock() var err error CHECK: l1 := len(first.fsm.logs) for i := 1; i < len(envs); i++ { env := envs[i] + env.fsm.Lock() l2 := len(env.fsm.logs) if l1 != l2 { err = fmt.Errorf("log length mismatch %d %d", l1, l2) + env.fsm.Unlock() goto ERR } for idx, log := range first.fsm.logs { other := env.fsm.logs[idx] if bytes.Compare(log, other) != 0 { - err = fmt.Errorf("log %d mismatch %v %v", idx, log, other) + err = fmt.Errorf("log entry %d mismatch between %s/%s : '%s' / '%s'", idx, first.raft.localAddr, env.raft.localAddr, log, other) + env.fsm.Unlock() goto ERR } } + env.fsm.Unlock() } return ERR: if time.Now().After(limit) { t.Fatalf("%v", err) } + first.fsm.Unlock() time.Sleep(20 * time.Millisecond) + first.fsm.Lock() goto CHECK } +// return a log entry that's at least sz long that has the prefix 'test i ' +func logBytes(i, sz int) []byte { + var logBuffer bytes.Buffer + fmt.Fprintf(&logBuffer, "test %d ", i) + for logBuffer.Len() < sz { + logBuffer.WriteByte('x') + } + return logBuffer.Bytes() + +} + // Tests Raft by creating a cluster, growing it to 5 nodes while // causing various stressful conditions func TestRaft_Integ(t *testing.T) { @@ -178,15 +220,21 @@ func TestRaft_Integ(t *testing.T) { env1 := MakeRaft(t, conf) NoErr(WaitFor(env1, Leader), t) - // Do some commits - var futures []Future - for i := 0; i < 100; i++ { - futures = append(futures, env1.raft.Apply([]byte(fmt.Sprintf("test%d", i)), 0)) - } - for _, f := range futures { - NoErr(WaitFuture(f, t), t) - env1.logger.Printf("[DEBUG] Applied %v", f) + totalApplied := 0 + applyAndWait := func(leader *RaftEnv, n int, sz int) { + // Do some commits + var futures []ApplyFuture + for i := 0; i < n; i++ { + futures = append(futures, leader.raft.Apply(logBytes(i, sz), 0)) + } + for _, f := range futures { + NoErr(WaitFuture(f, t), t) + leader.logger.Printf("[DEBUG] Applied at %d, size %d", f.Index(), sz) + } + totalApplied += n } + // Do some commits + applyAndWait(env1, 100, 10) // Do a snapshot NoErr(WaitFuture(env1.raft.Snapshot(), t), t) @@ -205,15 +253,42 @@ func TestRaft_Integ(t *testing.T) { NoErr(err, t) // Do some more commits - futures = nil - for i := 0; i < 100; i++ { - futures = append(futures, leader.raft.Apply([]byte(fmt.Sprintf("test%d", i)), 0)) - } - for _, f := range futures { - NoErr(WaitFuture(f, t), t) - leader.logger.Printf("[DEBUG] Applied %v", f) + applyAndWait(leader, 100, 10) + + // snapshot the leader + NoErr(WaitFuture(leader.raft.Snapshot(), t), t) + + CheckConsistent(append([]*RaftEnv{env1}, envs...), t) + + // shutdown a follower + disconnected := envs[len(envs)-1] + disconnected.Shutdown() + + // Do some more commits [make sure the resulting snapshot will be a reasonable size] + applyAndWait(leader, 100, 10000) + + // snapshot the leader [leaders log should be compacted past the disconnected follower log now] + NoErr(WaitFuture(leader.raft.Snapshot(), t), t) + + // Unfortuantly we need to wait for the leader to start backing off RPCs to the down follower + // such that when the follower comes back up it'll run an election before it gets an rpc from + // the leader + time.Sleep(time.Second * 5) + + // start the now out of date follower back up + disconnected.Restart(t) + + // wait for it to get caught up + timeout := time.Now().Add(time.Second * 10) + for disconnected.raft.getLastApplied() < leader.raft.getLastApplied() { + time.Sleep(time.Millisecond) + if time.Now().After(timeout) { + t.Fatalf("Gave up waiting for follower to get caught up to leader") + } } + CheckConsistent(append([]*RaftEnv{env1}, envs...), t) + // Shoot two nodes in the head! rm1, rm2 := envs[0], envs[1] rm1.Release() @@ -226,14 +301,7 @@ func TestRaft_Integ(t *testing.T) { NoErr(err, t) // Do some more commits - futures = nil - for i := 0; i < 100; i++ { - futures = append(futures, leader.raft.Apply([]byte(fmt.Sprintf("test%d", i)), 0)) - } - for _, f := range futures { - NoErr(WaitFuture(f, t), t) - leader.logger.Printf("[DEBUG] Applied %v", f) - } + applyAndWait(leader, 100, 10) // Join a few new nodes! for i := 0; i < 2; i++ { @@ -258,8 +326,8 @@ func TestRaft_Integ(t *testing.T) { allEnvs := append([]*RaftEnv{env1}, envs...) CheckConsistent(allEnvs, t) - if len(env1.fsm.logs) != 300 { - t.Fatalf("should apply 300 logs! %d", len(env1.fsm.logs)) + if len(env1.fsm.logs) != totalApplied { + t.Fatalf("should apply %d logs! %d", totalApplied, len(env1.fsm.logs)) } for _, e := range envs { diff --git a/raft.go b/raft.go index 8c718356507..1c90498c69c 100644 --- a/raft.go +++ b/raft.go @@ -1534,6 +1534,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Ignore an older term if req.Term < r.getCurrentTerm() { + r.logger.Printf("[INFO] raft: Ignoring installSnapshot request with older term of %d vs currentTerm %d", req.Term, r.getCurrentTerm()) return } From 93d025691229fccc6ed78d7307f2ceb2ce0e3087 Mon Sep 17 00:00:00 2001 From: Simon Fell Date: Thu, 25 May 2017 13:31:27 -0700 Subject: [PATCH 2/2] Ensure installSnapshot always consumes the snapshot data from the stream --- raft.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/raft.go b/raft.go index 1c90498c69c..c5dac73377b 100644 --- a/raft.go +++ b/raft.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "os" "strconv" @@ -1529,6 +1530,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { } var rpcErr error defer func() { + io.Copy(ioutil.Discard, rpc.Reader) // ensure we always consume all the snapshot data from the stream [see issue #212] rpc.Respond(resp, rpcErr) }()