From 2ba42990ecca0d589aa2f48e59780ff22463c715 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 2 Nov 2016 19:36:07 -0700 Subject: [PATCH] ctlv3: fix migration --- e2e/ctl_v3_migrate_test.go | 4 +- etcdctl/ctlv3/command/migrate_command.go | 53 ++++++++++++++++++++---- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/e2e/ctl_v3_migrate_test.go b/e2e/ctl_v3_migrate_test.go index c88c7b290d6..3136c4920f8 100644 --- a/e2e/ctl_v3_migrate_test.go +++ b/e2e/ctl_v3_migrate_test.go @@ -89,8 +89,8 @@ func TestCtlV3Migrate(t *testing.T) { if len(resp.Kvs) != 1 { t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs) } - if resp.Kvs[0].CreateRevision != 4 { - t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision) + if resp.Kvs[0].CreateRevision != 7 { + t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision) } } diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index 5b616c577f2..cf16099d417 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -27,11 +27,14 @@ import ( "github.com/coreos/etcd/client" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" @@ -74,18 +77,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) { writer, reader, errc = defaultTransformer() } - st := rebuildStoreV2() + st, index := rebuildStoreV2() be := prepareBackend() defer be.Close() - maxIndexc := make(chan uint64, 1) go func() { - maxIndexc <- writeStore(writer, st) + writeStore(writer, st) writer.Close() }() readKeys(reader, be) - mvcc.UpdateConsistentIndex(be, <-maxIndexc) + mvcc.UpdateConsistentIndex(be, index) err := <-errc if err != nil { fmt.Println("failed to transform keys") @@ -106,7 +108,10 @@ func prepareBackend() backend.Backend { return be } -func rebuildStoreV2() store.Store { +func rebuildStoreV2() (store.Store, uint64) { + var index uint64 + cl := membership.NewCluster("") + waldir := migrateWALdir if len(waldir) == 0 { waldir = path.Join(migrateDatadir, "member", "wal") @@ -122,6 +127,7 @@ func rebuildStoreV2() store.Store { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + index = snapshot.Metadata.Index } w, err := wal.OpenForRead(waldir, walsnap) @@ -143,9 +149,15 @@ func rebuildStoreV2() store.Store { } } - applier := etcdserver.NewApplierV2(st, nil) + cl.SetStore(st) + cl.Recover(api.UpdateCapability) + + applier := etcdserver.NewApplierV2(st, cl) for _, ent := range ents { - if ent.Type != raftpb.EntryNormal { + if ent.Type == raftpb.EntryConfChange { + var cc raftpb.ConfChange + pbutil.MustUnmarshal(&cc, ent.Data) + applyConf(cc, cl) continue } @@ -160,9 +172,34 @@ func rebuildStoreV2() store.Store { applyRequest(req, applier) } } + if ent.Index > index { + index = ent.Index + } } - return st + return st, index +} + +func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) { + if err := cl.ValidateConfigurationChange(cc); err != nil { + return + } + switch cc.Type { + case raftpb.ConfChangeAddNode: + m := new(membership.Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + panic(err) + } + cl.AddMember(m) + case raftpb.ConfChangeRemoveNode: + cl.RemoveMember(types.ID(cc.NodeID)) + case raftpb.ConfChangeUpdateNode: + m := new(membership.Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + panic(err) + } + cl.UpdateRaftAttributes(m.ID, m.RaftAttributes) + } } func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {