diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 3396e60f0726..757826cc9e1e 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -211,7 +211,7 @@ func TestConfgChangeBlocksApply(t *testing.T) { } // finish apply, unblock raft routine - <-ap.raftDone + <-ap.notifyc select { case <-continueC: diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 457b27c8b7e0..d4bc99a544a7 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -951,7 +951,7 @@ func TestSnapshot(t *testing.T) { <-ch } -// snapshot should snapshot the store and cut the persistent +// TestSnapshotOrdering ensures that when apply snapshot, etcdserver renames snap db to db before raft persists snapshot to wal and snap files. func TestSnapshotOrdering(t *testing.T) { n := newNopReadyNode() st := store.New() @@ -968,12 +968,13 @@ func TestSnapshotOrdering(t *testing.T) { } rs := raft.NewMemoryStorage() + p := mockstorage.NewStorageRecorderStream(testdir) tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) r := newRaftNode(raftNodeConfig{ isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: n, transport: tr, - storage: mockstorage.NewStorageRecorder(testdir), + storage: p, raftStorage: rs, }) s := &EtcdServer{ @@ -997,55 +998,102 @@ func TestSnapshotOrdering(t *testing.T) { s.start() defer s.Stop() - // submit applied entries and snap entries - idx := uint64(0) - outdated := 0 - accepted := 0 - for k := 1; k <= 101; k++ { - idx++ - ch := s.w.Register(uint64(idx)) - req := &pb.Request{Method: "QGET", ID: uint64(idx)} - ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)} - ready := raft.Ready{Entries: []raftpb.Entry{ent}} - n.readyc <- ready - - ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}} - n.readyc <- ready - - // "idx" applied - <-ch - - // one snapshot for every two messages - if k%2 != 0 { - continue - } - - n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}} - // get the snapshot sent by the transport - snapMsg := <-snapDoneC - // If the snapshot trails applied records, recovery will panic - // since there's no allocated snapshot at the place of the - // snapshot record. This only happens when the applier and the - // snapshot sender get out of sync. - if snapMsg.Snapshot.Metadata.Index == idx { - idx++ - snapMsg.Snapshot.Metadata.Index = idx - ready = raft.Ready{Snapshot: snapMsg.Snapshot} - n.readyc <- ready - accepted++ - } else { - outdated++ + actionc := p.Chan() + n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}} + ac := <-actionc + // MsgSnap triggers raftNode to call Save() + if ac.Name != "Save" { + t.Fatalf("expect save() is called, but got %v", ac.Name) + } + // get the snapshot sent by the transport + snapMsg := <-snapDoneC + + // Snapshot ready triggers etcd server to rename snapshot db to db the first + // and raftnode to persist snapshot to wal and snap files the second. + snapMsg.Snapshot.Metadata.Index = 1 + ready := raft.Ready{Snapshot: snapMsg.Snapshot} + n.readyc <- ready + var seenDBFilePath bool + timer := time.After(5 * time.Second) + for { + select { + case ac := <-actionc: + switch ac.Name { + case "DBFilePath": + seenDBFilePath = true + case "SaveSnap": + if !seenDBFilePath { + t.Fatalf("expect DBFilePath calls before SaveSnap, but it is other way around") + } + return + default: + continue + } + case <-timer: + t.Fatalf("timeout waiting on actions") } - // don't wait for the snapshot to complete, move to next message - } - if accepted != 50 { - t.Errorf("accepted=%v, want 50", accepted) - } - if outdated != 0 { - t.Errorf("outdated=%v, want 0", outdated) } } +// func TestSnapshotOrdering2(t *testing.T) { +// testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir") +// if err != nil { +// t.Fatalf("Couldn't open tempdir (%v)", err) +// } +// defer os.RemoveAll(testdir) +// snapDir := testdir + "/member/snap" +// if err := os.MkdirAll(snapDir, 0755); err != nil { +// t.Fatalf("Couldn't make snap dir (%v)", err) +// } + +// n := newNopReadyNode() +// s := raft.NewMemoryStorage() +// s.Append([]raftpb.Entry{{Index: 1}}) +// st := mockstore.NewRecorderStream() +// p := mockstorage.NewStorageRecorderStream(snapDir) +// r := newRaftNode(raftNodeConfig{ +// Node: n, +// raftStorage: s, +// storage: p, +// transport: rafthttp.NewNopTransporter(), +// }) +// srv := &EtcdServer{ +// Cfg: &ServerConfig{ +// DataDir: testdir, +// }, +// r: *r, +// store: st, +// cluster: membership.NewCluster("abc"), +// } +// // create snap db file. +// beconf := backend.DefaultBackendConfig() +// beconf.Path = snapDir + "/0000000000000001.snap.db" +// be := backend.New(beconf) +// kv := mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex) +// // write consistIndex to db +// kv.Commit() +// kv.Close() +// be.Close() +// // create normal db file. +// beconf = backend.DefaultBackendConfig() +// beconf.Path = snapDir + "/db" +// be = backend.New(beconf) +// srv.be = be +// srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex) + +// srv.start() +// defer srv.Stop() + +// snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}} +// ready := raft.Ready{Snapshot: snap} +// n.readyc <- ready +// acs, err := p.Wait(2) +// if err != nil { +// t.Fatal(err) +// } +// log.Printf("actions %+v", acs) +// } + // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend()