Skip to content

Commit

Permalink
Merge pull request #6794 from xiang90/fix_migration
Browse files Browse the repository at this point in the history
ctlv3: fix migration
  • Loading branch information
xiang90 authored Nov 3, 2016
2 parents 4d2ec2f + 2ba4299 commit c5ac021
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 10 deletions.
4 changes: 2 additions & 2 deletions e2e/ctl_v3_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func TestCtlV3Migrate(t *testing.T) {
if len(resp.Kvs) != 1 {
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
}
if resp.Kvs[0].CreateRevision != 4 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision)
if resp.Kvs[0].CreateRevision != 7 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision)
}
}

Expand Down
53 changes: 45 additions & 8 deletions etcdctl/ctlv3/command/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ import (
"github.com/coreos/etcd/client"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
Expand Down Expand Up @@ -74,18 +77,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
writer, reader, errc = defaultTransformer()
}

st := rebuildStoreV2()
st, index := rebuildStoreV2()
be := prepareBackend()
defer be.Close()

maxIndexc := make(chan uint64, 1)
go func() {
maxIndexc <- writeStore(writer, st)
writeStore(writer, st)
writer.Close()
}()

readKeys(reader, be)
mvcc.UpdateConsistentIndex(be, <-maxIndexc)
mvcc.UpdateConsistentIndex(be, index)
err := <-errc
if err != nil {
fmt.Println("failed to transform keys")
Expand All @@ -106,7 +108,10 @@ func prepareBackend() backend.Backend {
return be
}

func rebuildStoreV2() store.Store {
func rebuildStoreV2() (store.Store, uint64) {
var index uint64
cl := membership.NewCluster("")

waldir := migrateWALdir
if len(waldir) == 0 {
waldir = path.Join(migrateDatadir, "member", "wal")
Expand All @@ -122,6 +127,7 @@ func rebuildStoreV2() store.Store {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
index = snapshot.Metadata.Index
}

w, err := wal.OpenForRead(waldir, walsnap)
Expand All @@ -143,9 +149,15 @@ func rebuildStoreV2() store.Store {
}
}

applier := etcdserver.NewApplierV2(st, nil)
cl.SetStore(st)
cl.Recover(api.UpdateCapability)

applier := etcdserver.NewApplierV2(st, cl)
for _, ent := range ents {
if ent.Type != raftpb.EntryNormal {
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, ent.Data)
applyConf(cc, cl)
continue
}

Expand All @@ -160,9 +172,34 @@ func rebuildStoreV2() store.Store {
applyRequest(req, applier)
}
}
if ent.Index > index {
index = ent.Index
}
}

return st
return st, index
}

func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
if err := cl.ValidateConfigurationChange(cc); err != nil {
return
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.AddMember(m)
case raftpb.ConfChangeRemoveNode:
cl.RemoveMember(types.ID(cc.NodeID))
case raftpb.ConfChangeUpdateNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
}
}

func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
Expand Down

0 comments on commit c5ac021

Please sign in to comment.