Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
fanminshi committed May 5, 2017
1 parent 0121c23 commit c1713e0
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,101 @@ func TestSnapshot(t *testing.T) {
<-ch
}

// snapshot should snapshot the store and cut the persistent
func TestSnapshotOrdering(t *testing.T) {
n := newNopReadyNode()
st := store.New()
cl := membership.NewCluster("abc")
cl.SetStore(st)

testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
if err != nil {
t.Fatalf("Couldn't open tempdir (%v)", err)
}
defer os.RemoveAll(testdir)
if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
t.Fatalf("Couldn't make snap dir (%v)", err)
}

rs := raft.NewMemoryStorage()
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
r := newRaftNode(raftNodeConfig{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
storage: mockstorage.NewStorageRecorder(testdir),
raftStorage: rs,
})
s := &EtcdServer{
Cfg: &ServerConfig{
DataDir: testdir,
},
r: *r,
store: st,
cluster: cl,
SyncTicker: &time.Ticker{},
}
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}

be, tmpPath := backend.NewDefaultTmpBackend()
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
s.be = be

s.start()
defer s.Stop()

// submit applied entries and snap entries
idx := uint64(0)
outdated := 0
accepted := 0
for k := 1; k <= 101; k++ {
idx++
ch := s.w.Register(uint64(idx))
req := &pb.Request{Method: "QGET", ID: uint64(idx)}
ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)}
ready := raft.Ready{Entries: []raftpb.Entry{ent}}
n.readyc <- ready

ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
n.readyc <- ready

// "idx" applied
<-ch

// one snapshot for every two messages
if k%2 != 0 {
continue
}

n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
// get the snapshot sent by the transport
snapMsg := <-snapDoneC
// If the snapshot trails applied records, recovery will panic
// since there's no allocated snapshot at the place of the
// snapshot record. This only happens when the applier and the
// snapshot sender get out of sync.
if snapMsg.Snapshot.Metadata.Index == idx {
idx++
snapMsg.Snapshot.Metadata.Index = idx
ready = raft.Ready{Snapshot: snapMsg.Snapshot}
n.readyc <- ready
accepted++
} else {
outdated++
}
// don't wait for the snapshot to complete, move to next message
}
if accepted != 50 {
t.Errorf("accepted=%v, want 50", accepted)
}
if outdated != 0 {
t.Errorf("outdated=%v, want 0", outdated)
}
}

// Applied > SnapCount should trigger a SaveSnap event
func TestTriggerSnap(t *testing.T) {
be, tmpPath := backend.NewDefaultTmpBackend()
Expand Down

0 comments on commit c1713e0

Please sign in to comment.