From 9a0e4dfe4fa463c607aba26ba73e18987a5eb574 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 2 Nov 2016 19:36:07 -0700 Subject: [PATCH] ctlv3: fix migration --- e2e/ctl_v3_migrate_test.go | 4 +- etcdctl/ctlv3/command/migrate_command.go | 53 ++++++++++++++++++++---- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/e2e/ctl_v3_migrate_test.go b/e2e/ctl_v3_migrate_test.go index c88c7b290d6..3136c4920f8 100644 --- a/e2e/ctl_v3_migrate_test.go +++ b/e2e/ctl_v3_migrate_test.go @@ -89,8 +89,8 @@ func TestCtlV3Migrate(t *testing.T) { if len(resp.Kvs) != 1 { t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs) } - if resp.Kvs[0].CreateRevision != 4 { - t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision) + if resp.Kvs[0].CreateRevision != 7 { + t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision) } } diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index c4e8d7e6d59..be0ae06fdef 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -27,11 +27,14 @@ import ( "github.com/coreos/etcd/client" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" @@ -76,18 +79,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) { writer, reader, errc = defaultTransformer() } - st := rebuildStoreV2() + st, index := rebuildStoreV2() be := prepareBackend() defer be.Close() - maxIndexc := make(chan uint64, 1) go func() { - maxIndexc <- writeStore(writer, st) + writeStore(writer, st) writer.Close() }() readKeys(reader, be) - mvcc.UpdateConsistentIndex(be, <-maxIndexc) + mvcc.UpdateConsistentIndex(be, index) err := <-errc if err != nil { fmt.Println("failed to transform keys") @@ -108,7 +110,10 @@ func prepareBackend() backend.Backend { return be } -func rebuildStoreV2() store.Store { +func rebuildStoreV2() (store.Store, uint64) { + var index uint64 + cl := membership.NewCluster("") + waldir := migrateWALdir if len(waldir) == 0 { waldir = path.Join(migrateDatadir, "member", "wal") @@ -124,6 +129,7 @@ func rebuildStoreV2() store.Store { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + index = snapshot.Metadata.Index } w, err := wal.OpenForRead(waldir, walsnap) @@ -145,9 +151,15 @@ func rebuildStoreV2() store.Store { } } - applier := etcdserver.NewApplierV2(st, nil) + cl.SetStore(st) + cl.Recover(api.UpdateCapability) + + applier := etcdserver.NewApplierV2(st, cl) for _, ent := range ents { - if ent.Type != raftpb.EntryNormal { + if ent.Type == raftpb.EntryConfChange { + var cc raftpb.ConfChange + pbutil.MustUnmarshal(&cc, ent.Data) + applyConf(cc, cl) continue } @@ -162,9 +174,34 @@ func rebuildStoreV2() store.Store { applyRequest(req, applier) } } + if ent.Index > index { + index = ent.Index + } } - return st + return st, index +} + +func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) { + if err := cl.ValidateConfigurationChange(cc); err != nil { + return + } + switch cc.Type { + case raftpb.ConfChangeAddNode: + m := new(membership.Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + panic(err) + } + cl.AddMember(m) + case raftpb.ConfChangeRemoveNode: + cl.RemoveMember(types.ID(cc.NodeID)) + case raftpb.ConfChangeUpdateNode: + m := new(membership.Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + panic(err) + } + cl.UpdateRaftAttributes(m.ID, m.RaftAttributes) + } } func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {