From a9f4c984c501e560e29183c0ae88c566c18767e0 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Wed, 20 Mar 2019 16:29:15 +0800 Subject: [PATCH 01/22] support restore tasks after dm-worker restart --- dm/worker/config.go | 5 ++ dm/worker/meta.go | 115 +++++++++++++++++++++++++++++++++++++++++ dm/worker/meta_test.go | 56 ++++++++++++++++++++ dm/worker/server.go | 23 ++++++++- dm/worker/worker.go | 14 +++++ 5 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 dm/worker/meta.go create mode 100644 dm/worker/meta_test.go diff --git a/dm/worker/config.go b/dm/worker/config.go index 20af28e60e..7ca9e62918 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -72,6 +72,7 @@ type Config struct { EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"` AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"` RelayDir string `toml:"relay-dir" json:"relay-dir"` + MetaDir string `toml:"meta-dir" json:"meta-dir"` ServerID int `toml:"server-id" json:"server-id"` Flavor string `toml:"flavor" json:"flavor"` Charset string `toml:"charset" json:"charset"` @@ -191,6 +192,10 @@ func (c *Config) Parse(arguments []string) error { } c.From.Password = pswd + if len(c.MetaDir) == 0 { + c.MetaDir = "./dm_worker" + } + return c.verify() } diff --git a/dm/worker/meta.go b/dm/worker/meta.go new file mode 100644 index 0000000000..b08c361494 --- /dev/null +++ b/dm/worker/meta.go @@ -0,0 +1,115 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package worker + +import ( + "encoding/json" + "io/ioutil" + "path" + "sync" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/errors" +) + +// Meta information contains +// * sub-task +type Meta struct { + SubTasks map[string]*config.SubTaskConfig `json:"sub-tasks"` +} + +// FileMetaDB stores meta information in disk +type FileMetaDB struct { + lock sync.Mutex // we need to ensure only a thread can access to `metaDB` at a time + meta *Meta + path string +} + +// NewFileMetaDB return a meta file db +func NewFileMetaDB(dir string) (*FileMetaDB, error) { + metaDB := &FileMetaDB{ + path: path.Join(dir, "meta"), + meta: &Meta{ + SubTasks: make(map[string]*config.SubTaskConfig), + }, + } + // ignore all errors -- file maybe not created yet (and it is fine). + content, err := ioutil.ReadFile(metaDB.path) + if err == nil { + err1 := json.Unmarshal(content, metaDB.meta) + if err1 != nil { + return nil, errors.Annotatef(err1, "decode meta %s", content) + } + } else { + log.Warnf("failed to open meta file %s, going to create a new one: %v", dir, err) + } + + return metaDB, nil +} + +// Close closes meta DB +func (metaDB *FileMetaDB) Close() error { + metaDB.lock.Lock() + defer metaDB.lock.Unlock() + + return errors.Trace(metaDB.save()) +} + +func (metaDB *FileMetaDB) save() error { + serialized, err := json.Marshal(metaDB.meta) + if err != nil { + return errors.Trace(err) + } + if err := ioutil.WriteFile(metaDB.path, serialized, 0644); err != nil { + return errors.Trace(err) + } + return nil +} + +// Get returns `Meta` object +func (metaDB *FileMetaDB) Get() *Meta { + metaDB.lock.Lock() + defer metaDB.lock.Unlock() + + meta := &Meta{ + SubTasks: make(map[string]*config.SubTaskConfig), + } + + for name, task := range metaDB.meta.SubTasks { + meta.SubTasks[name] = task + } + + return meta +} + +// Set sets subtask in Meta +func (metaDB *FileMetaDB) Set(subTask *config.SubTaskConfig) error { + metaDB.lock.Lock() + defer metaDB.lock.Unlock() + + metaDB.meta.SubTasks[subTask.Name] = subTask + + return errors.Trace(metaDB.save()) +} + +// Del deletes subtask in Meta +func (metaDB *FileMetaDB) Del(name string) error { + metaDB.lock.Lock() + defer metaDB.lock.Unlock() + + delete(metaDB.meta.SubTasks, name) + + return errors.Trace(metaDB.save()) +} diff --git a/dm/worker/meta_test.go b/dm/worker/meta_test.go new file mode 100644 index 0000000000..9ea09b1986 --- /dev/null +++ b/dm/worker/meta_test.go @@ -0,0 +1,56 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package worker + +import ( + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/dm/dm/config" +) + +func TestWorker(t *testing.T) { + TestingT(t) +} + +type testWorker struct{} + +var _ = Suite(&testWorker{}) + +func (t *testWorker) TestFileMetaDB(c *C) { + dir := c.MkDir() + + metaDB, err := NewFileMetaDB(dir) + c.Assert(err, IsNil) + c.Assert(metaDB.meta.SubTasks, HasLen, 0) + + meta := metaDB.Get() + c.Assert(meta.SubTasks, HasLen, 0) + + err = metaDB.Set(&config.SubTaskConfig{ + Name: "task1", + }) + c.Assert(err, IsNil) + + meta = metaDB.Get() + c.Assert(meta.SubTasks, HasLen, 1) + c.Assert(meta.SubTasks["task1"], NotNil) + + c.Assert(metaDB.Close(), IsNil) + + metaDB, err = NewFileMetaDB(dir) + c.Assert(err, IsNil) + c.Assert(metaDB.meta.SubTasks, HasLen, 1) + c.Assert(meta.SubTasks["task1"], NotNil) +} diff --git a/dm/worker/server.go b/dm/worker/server.go index f02040d862..667faadc4d 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -149,6 +149,14 @@ func (s *Server) StartSubTask(ctx context.Context, req *pb.StartSubTaskRequest) }, nil } + if err = s.worker.meta.Set(cfg); err != nil { + log.Errorf("[server] insert task %+v into meta: %v", cfg, errors.ErrorStack(err)) + return &pb.CommonWorkerResponse{ + Result: false, + Msg: fmt.Sprintf("insert task %+v into meta: %v", cfg, errors.ErrorStack(err)), + }, nil + } + err = s.worker.StartSubTask(cfg) if err != nil { log.Errorf("[server] start sub task %s error %v", cfg.Name, errors.ErrorStack(err)) @@ -177,7 +185,12 @@ func (s *Server) OperateSubTask(ctx context.Context, req *pb.OperateSubTaskReque var err error switch req.Op { case pb.TaskOp_Stop: - err = s.worker.StopSubTask(name) + if err = s.worker.meta.Del(name); err != nil { + log.Errorf("update task %s into meta: %v", name, errors.ErrorStack(err)) + resp.Msg = fmt.Sprintf("update task %s into meta: %v", name, errors.ErrorStack(err)) + } else { + err = s.worker.StopSubTask(name) + } case pb.TaskOp_Pause: err = s.worker.PauseSubTask(name) case pb.TaskOp_Resume: @@ -211,6 +224,14 @@ func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest }, nil } + if err = s.worker.meta.Set(cfg); err != nil { + log.Errorf("[server] update task %+v into meta: %v", cfg, errors.ErrorStack(err)) + return &pb.CommonWorkerResponse{ + Result: false, + Msg: fmt.Sprintf("update task %+v into meta: %v", cfg, errors.ErrorStack(err)), + }, nil + } + err = s.worker.UpdateSubTask(cfg) if err != nil { log.Errorf("[server] update sub task %s error %v", cfg.Name, errors.ErrorStack(err)) diff --git a/dm/worker/worker.go b/dm/worker/worker.go index aaed438db6..6b21e0145e 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -53,6 +53,8 @@ type Worker struct { subTasks map[string]*SubTask relayHolder *RelayHolder relayPurger *purger.Purger + + meta *FileMetaDB } // NewWorker creates a new Worker @@ -86,6 +88,18 @@ func (w *Worker) Init() error { InitConditionHub(w) + w.meta, err = NewFileMetaDB(w.cfg.MetaDir) + if err != nil { + return errors.Trace(err) + } + + meta := w.meta.Get() + for taskName, subtask := range meta.SubTasks { + if err = w.StartSubTask(subtask); err != nil { + return errors.Annotatef(err, "restore task %s (%+v) in worker starting", taskName, subtask) + } + } + return nil } From 6cc540d5ee0d34f942cc098900e8fe75691270f4 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Wed, 20 Mar 2019 19:18:33 +0800 Subject: [PATCH 02/22] *: fix encrypt password --- checker/checker.go | 7 +++++ dm/worker/config.go | 41 ++------------------------- dm/worker/meta.go | 68 ++++++++++++++++++++++++++++++++++++--------- dm/worker/server.go | 8 +++--- dm/worker/worker.go | 31 ++++++--------------- 5 files changed, 78 insertions(+), 77 deletions(-) diff --git a/checker/checker.go b/checker/checker.go index 58036c9422..06a8a82877 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -126,6 +126,13 @@ func (c *Checker) Init() error { } instance.targetDBInfo.Password = pswd } + if len(instance.sourceDBinfo.Password) > 0 { + pswd, err2 := utils.Decrypt(instance.sourceDBinfo.Password) + if err2 != nil { + return errors.Annotatef(err2, "can not decrypt password %s", instance.sourceDBinfo.Password) + } + instance.sourceDBinfo.Password = pswd + } instance.targetDB, err = dbutil.OpenDB(*instance.targetDBInfo) if err != nil { return errors.Trace(err) diff --git a/dm/worker/config.go b/dm/worker/config.go index 7ca9e62918..06a77b81fa 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -111,29 +111,12 @@ func (c *Config) String() string { // Toml returns TOML format representation of config func (c *Config) Toml() (string, error) { var b bytes.Buffer - var pswd string - var err error - - enc := toml.NewEncoder(&b) - if len(c.From.Password) > 0 { - pswd, err = utils.Encrypt(c.From.Password) - if err != nil { - return "", errors.Annotatef(err, "can not encrypt password %s", c.From.Password) - } - } - c.From.Password = pswd - err = enc.Encode(c) + err := toml.NewEncoder(&b).Encode(c) if err != nil { log.Errorf("[worker] marshal config to toml error %v", err) } - if len(c.From.Password) > 0 { - pswd, err = utils.Decrypt(c.From.Password) - if err != nil { - return "", errors.Annotatef(err, "can not decrypt password %s", c.From.Password) - } - } - c.From.Password = pswd + return string(b.String()), nil } @@ -182,18 +165,8 @@ func (c *Config) Parse(arguments []string) error { return errors.Errorf("'%s' is an invalid flag", c.flagSet.Arg(0)) } - // try decrypt password - var pswd string - if len(c.From.Password) > 0 { - pswd, err = utils.Decrypt(c.From.Password) - if err != nil { - return errors.Annotatef(err, "can not decrypt password %s", c.From.Password) - } - } - c.From.Password = pswd - if len(c.MetaDir) == 0 { - c.MetaDir = "./dm_worker" + c.MetaDir = "./dm_worker_meta" } return c.verify() @@ -252,13 +225,5 @@ func (c *Config) Reload() error { return errors.Trace(err) } - if len(c.From.Password) > 0 { - pswd, err = utils.Decrypt(c.From.Password) - if err != nil { - return errors.Annotatef(err, "can not decrypt password %s", c.From.Password) - } - } - c.From.Password = pswd - return nil } diff --git a/dm/worker/meta.go b/dm/worker/meta.go index b08c361494..671ab8cbad 100644 --- a/dm/worker/meta.go +++ b/dm/worker/meta.go @@ -14,11 +14,13 @@ package worker import ( - "encoding/json" + "bytes" "io/ioutil" + "os" "path" "sync" + "github.com/BurntSushi/toml" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/errors" @@ -27,7 +29,38 @@ import ( // Meta information contains // * sub-task type Meta struct { - SubTasks map[string]*config.SubTaskConfig `json:"sub-tasks"` + SubTasks map[string]*config.SubTaskConfig `json:"sub-tasks" toml:"sub-tasks"` +} + +// Toml returns TOML format representation of config +func (m *Meta) Toml() (string, error) { + var b bytes.Buffer + enc := toml.NewEncoder(&b) + err := enc.Encode(m) + if err != nil { + return "", errors.Trace(err) + } + return b.String(), nil +} + +// DecodeFile loads and decodes config from file +func (m *Meta) DecodeFile(fpath string) error { + _, err := toml.DecodeFile(fpath, m) + if err != nil { + return errors.Trace(err) + } + + return nil +} + +// Decode loads config from file data +func (m *Meta) Decode(data string) error { + _, err := toml.Decode(data, m) + if err != nil { + return errors.Trace(err) + } + + return nil } // FileMetaDB stores meta information in disk @@ -45,15 +78,23 @@ func NewFileMetaDB(dir string) (*FileMetaDB, error) { SubTasks: make(map[string]*config.SubTaskConfig), }, } - // ignore all errors -- file maybe not created yet (and it is fine). - content, err := ioutil.ReadFile(metaDB.path) - if err == nil { - err1 := json.Unmarshal(content, metaDB.meta) - if err1 != nil { - return nil, errors.Annotatef(err1, "decode meta %s", content) - } - } else { - log.Warnf("failed to open meta file %s, going to create a new one: %v", dir, err) + + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, errors.Annotatef(err, "create meta directory") + } + + fd, err := os.Open(metaDB.path) + if os.IsNotExist(err) { + log.Warnf("failed to open meta file %s, going to create a new one: %v", metaDB.path, err) + return metaDB, nil + } else if err != nil { + return nil, errors.Trace(err) + } + defer fd.Close() + + err = metaDB.meta.DecodeFile(metaDB.path) + if err != nil { + return metaDB, errors.Trace(err) } return metaDB, nil @@ -68,11 +109,12 @@ func (metaDB *FileMetaDB) Close() error { } func (metaDB *FileMetaDB) save() error { - serialized, err := json.Marshal(metaDB.meta) + serialized, err := metaDB.meta.Toml() if err != nil { return errors.Trace(err) } - if err := ioutil.WriteFile(metaDB.path, serialized, 0644); err != nil { + + if err := ioutil.WriteFile(metaDB.path, []byte(serialized), 0644); err != nil { return errors.Trace(err) } return nil diff --git a/dm/worker/server.go b/dm/worker/server.go index 667faadc4d..02923240f7 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -150,10 +150,10 @@ func (s *Server) StartSubTask(ctx context.Context, req *pb.StartSubTaskRequest) } if err = s.worker.meta.Set(cfg); err != nil { - log.Errorf("[server] insert task %+v into meta: %v", cfg, errors.ErrorStack(err)) + log.Errorf("[server] insert task %s into meta: %v", cfg, errors.ErrorStack(err)) return &pb.CommonWorkerResponse{ Result: false, - Msg: fmt.Sprintf("insert task %+v into meta: %v", cfg, errors.ErrorStack(err)), + Msg: fmt.Sprintf("insert task %s into meta: %v", cfg, errors.ErrorStack(err)), }, nil } @@ -225,10 +225,10 @@ func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest } if err = s.worker.meta.Set(cfg); err != nil { - log.Errorf("[server] update task %+v into meta: %v", cfg, errors.ErrorStack(err)) + log.Errorf("[server] update task %s into meta: %v", cfg, errors.ErrorStack(err)) return &pb.CommonWorkerResponse{ Result: false, - Msg: fmt.Sprintf("update task %+v into meta: %v", cfg, errors.ErrorStack(err)), + Msg: fmt.Sprintf("update task %s into meta: %v", cfg, errors.ErrorStack(err)), }, nil } diff --git a/dm/worker/worker.go b/dm/worker/worker.go index 6b21e0145e..7d8d247989 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/streamer" - "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/relay/purger" ) @@ -93,13 +92,6 @@ func (w *Worker) Init() error { return errors.Trace(err) } - meta := w.meta.Get() - for taskName, subtask := range meta.SubTasks { - if err = w.StartSubTask(subtask); err != nil { - return errors.Annotatef(err, "restore task %s (%+v) in worker starting", taskName, subtask) - } - } - return nil } @@ -111,6 +103,14 @@ func (w *Worker) Start() { log.Info("[worker] start running") + // restore tasks + meta := w.meta.Get() + for taskName, subtask := range meta.SubTasks { + if err := w.StartSubTask(subtask); err != nil { + panic(fmt.Sprintf("restore task %s (%s) in worker starting: %v", taskName, subtask, err)) + } + } + // start relay w.relayHolder.Start() @@ -177,21 +177,8 @@ func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) error { log.Infof("[worker] starting sub task with config: %v", cfg) - // try decrypt password for To DB - var ( - pswdTo string - err error - ) - if len(cfg.To.Password) > 0 { - pswdTo, err = utils.Decrypt(cfg.To.Password) - if err != nil { - return errors.Trace(err) - } - } - cfg.To.Password = pswdTo - st := NewSubTask(cfg) - err = st.Init() + err := st.Init() if err != nil { return errors.Trace(err) } From 25075a5162dec8edd596f9c3e9bf17b5b26fcc9c Mon Sep 17 00:00:00 2001 From: gregoryIan Date: Wed, 20 Mar 2019 21:24:48 +0800 Subject: [PATCH 03/22] fix password --- checker/checker.go | 15 +++++++-------- dm/worker/config.go | 5 +---- loader/db.go | 5 ++--- mydumper/mydumper.go | 35 ++++++++++++++++++++++++++++------- pkg/utils/db.go | 18 ++++++++++++++++++ relay/relay.go | 6 ++---- syncer/db.go | 5 +++-- syncer/heartbeat.go | 4 ++-- 8 files changed, 63 insertions(+), 30 deletions(-) diff --git a/checker/checker.go b/checker/checker.go index 06a8a82877..65253c08b3 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -107,7 +107,13 @@ func (c *Checker) Init() error { User: instance.cfg.From.User, Password: instance.cfg.From.Password, } - + if len(instance.sourceDBinfo.Password) > 0 { + pswd, err2 := utils.Decrypt(instance.sourceDBinfo.Password) + if err2 != nil { + return errors.Annotatef(err2, "can not decrypt password %s", instance.sourceDBinfo.Password) + } + instance.sourceDBinfo.Password = pswd + } instance.sourceDB, err = dbutil.OpenDB(*instance.sourceDBinfo) if err != nil { return errors.Trace(err) @@ -126,13 +132,6 @@ func (c *Checker) Init() error { } instance.targetDBInfo.Password = pswd } - if len(instance.sourceDBinfo.Password) > 0 { - pswd, err2 := utils.Decrypt(instance.sourceDBinfo.Password) - if err2 != nil { - return errors.Annotatef(err2, "can not decrypt password %s", instance.sourceDBinfo.Password) - } - instance.sourceDBinfo.Password = pswd - } instance.targetDB, err = dbutil.OpenDB(*instance.targetDBInfo) if err != nil { return errors.Trace(err) diff --git a/dm/worker/config.go b/dm/worker/config.go index 06a77b81fa..2bc5081e49 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -213,14 +213,11 @@ func (c *Config) UpdateConfigFile(content string) error { // Reload reload configure from ConfigFile func (c *Config) Reload() error { - var pswd string - var err error - if c.ConfigFile == "" { c.ConfigFile = "dm-worker-config.bak" } - err = c.configFromFile(c.ConfigFile) + err := c.configFromFile(c.ConfigFile) if err != nil { return errors.Trace(err) } diff --git a/loader/db.go b/loader/db.go index 1701c25475..44bac03f33 100644 --- a/loader/db.go +++ b/loader/db.go @@ -15,12 +15,12 @@ package loader import ( "database/sql" - "fmt" "time" "github.com/go-sql-driver/mysql" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/errors" tmysql "github.com/pingcap/parser/mysql" ) @@ -195,8 +195,7 @@ func executeSQLImp(db *sql.DB, sqls []string) error { } func createConn(cfg *config.SubTaskConfig) (*Conn, error) { - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8", cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port) - db, err := sql.Open("mysql", dbDSN) + db, err := utils.OpenDB(cfg.To.Host, cfg.To.Port, cfg.To.User, cfg.To.Password, "5m") if err != nil { return nil, errors.Trace(err) } diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index b18f9bf738..e8b3ae3c89 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -26,6 +26,8 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" + "github.com/pingcap/errors" "github.com/siddontang/go/sync2" ) @@ -42,13 +44,19 @@ func NewMydumper(cfg *config.SubTaskConfig) *Mydumper { m := &Mydumper{ cfg: cfg, } - m.args = m.constructArgs() + return m } // Init implements Unit.Init func (m *Mydumper) Init() error { - return nil // always return nil + var err error + m.args, err = m.constructArgs() + if err != nil { + return errors.Trace(err) + } + + return nil } // Process implements Unit.Process @@ -68,7 +76,6 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) { // Cmd cannot be reused, so we create a new cmd when begin processing cmd := exec.CommandContext(ctx, m.cfg.MydumperPath, m.args...) - log.Infof("[mydumper] starting mydumper using args %v", cmd.Args) output, err := cmd.CombinedOutput() if err != nil { @@ -146,9 +153,21 @@ func (m *Mydumper) IsFreshTask() (bool, error) { } // constructArgs constructs arguments for exec.Command -func (m *Mydumper) constructArgs() []string { +func (m *Mydumper) constructArgs() ([]string, error) { cfg := m.cfg db := cfg.From + + var ( + password string + err error + ) + if len(db.Password) > 0 { + password, err = utils.Decrypt(password) + if err != nil { + return nil, errors.Annotatef(err, "can not decrypt password %s of db %+v", db) + } + } + ret := []string{ "--host", db.Host, @@ -156,8 +175,6 @@ func (m *Mydumper) constructArgs() []string { strconv.Itoa(db.Port), "--user", db.User, - "--password", - db.Password, "--outputdir", cfg.Dir, // use LoaderConfig.Dir as --outputdir } @@ -177,7 +194,11 @@ func (m *Mydumper) constructArgs() []string { if len(extraArgs) > 0 { ret = append(ret, ParseArgLikeBash(extraArgs)...) } - return ret + + log.Infof("[mydumper] create mydumper using args %v", ret) + + ret = append(ret, "--password", password) + return ret, nil } // logArgs constructs arguments for log from SubTaskConfig diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 60b407c0ee..d7e48ca7b7 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -262,3 +262,21 @@ func IsErrDupEntry(err error) bool { func IsNoSuchThreadError(err error) bool { return IsMySQLError(err, tmysql.ErrNoSuchThread) } + +// OpenDB returns a db fd wih encrypted password +func OpenDB(host string, port int, user, password, timeout string) (db *sql.DB, err error) { + if len(password) > 0 { + password, err = Decrypt(password) + if err != nil { + return nil, errors.Annotatef(err, "can not decrypt password %s of user %s for db %s:%d", password, user, host, port) + } + } + + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&interpolateParams=true&readTimeout=%s", user, password, host, port, timeout) + db, err = sql.Open("mysql", dbDSN) + if err != nil { + return nil, errors.Trace(err) + } + + return +} diff --git a/relay/relay.go b/relay/relay.go index d258e8a5ee..fa137d11e3 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -131,8 +131,7 @@ func NewRelay(cfg *Config) *Relay { // Init implements the dm.Unit interface. func (r *Relay) Init() error { cfg := r.cfg.From - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&readTimeout=%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, showStatusConnectionTimeout) - db, err := sql.Open("mysql", dbDSN) + db, err := utils.OpenDB(cfg.Host, cfg.Port, cfg.User, cfg.Password, showStatusConnectionTimeout) if err != nil { return errors.Trace(err) } @@ -1014,8 +1013,7 @@ func (r *Relay) Reload(newCfg *Config) error { r.db.Close() cfg := r.cfg.From - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&readTimeout=%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, showStatusConnectionTimeout) - db, err := sql.Open("mysql", dbDSN) + db, err := utils.OpenDB(cfg.Host, cfg.Port, cfg.User, cfg.Password, showStatusConnectionTimeout) if err != nil { return errors.Trace(err) } diff --git a/syncer/db.go b/syncer/db.go index ff9e5bb3b4..c11abbee70 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" @@ -246,8 +247,8 @@ func (conn *Conn) executeSQLJobImp(jobs []*job) *ExecErrorContext { } func createDB(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string) (*Conn, error) { - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&interpolateParams=true&readTimeout=%s", dbCfg.User, dbCfg.Password, dbCfg.Host, dbCfg.Port, timeout) - db, err := sql.Open("mysql", dbDSN) + db, err := utils.OpenDB(dbCfg.Host, dbCfg.Port, dbCfg.User, dbCfg.Password, timeout) + if err != nil { return nil, errors.Trace(err) } diff --git a/syncer/heartbeat.go b/syncer/heartbeat.go index d1863b0a45..dd391b338a 100644 --- a/syncer/heartbeat.go +++ b/syncer/heartbeat.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" ) // privileges: SELECT, UPDATE, optionaly INSERT, optionaly CREATE. @@ -122,8 +123,7 @@ func (h *Heartbeat) AddTask(name string) error { if h.master == nil { // open DB dbCfg := h.cfg.masterCfg - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&interpolateParams=true&readTimeout=1m", dbCfg.User, dbCfg.Password, dbCfg.Host, dbCfg.Port) - master, err := sql.Open("mysql", dbDSN) + master, err := utils.OpenDB(dbCfg.Host, dbCfg.Port, dbCfg.User, dbCfg.Password, "1m") if err != nil { return errors.Trace(err) } From 69b6406a811570c3b7b4b4e5227aa595e757ac40 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 10:03:27 +0800 Subject: [PATCH 04/22] *: refine password --- dm/config/subtask.go | 26 ++++++++++++++++++++++++++ dm/worker/config.go | 18 ++++++++++++++++++ dm/worker/worker.go | 11 ++++++++++- loader/db.go | 5 +++-- mydumper/mydumper.go | 29 +++++------------------------ pkg/utils/db.go | 4 ++-- relay/relay.go | 6 ++++-- syncer/db.go | 5 ++--- syncer/heartbeat.go | 4 ++-- 9 files changed, 72 insertions(+), 36 deletions(-) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index ceee578361..84040e080c 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -289,3 +289,29 @@ func (c *SubTaskConfig) Parse(arguments []string) error { return errors.Trace(c.adjust()) } + +// DecryptPassword tries to decrypt db password in config +func (c *SubTaskConfig) DecryptPassword() error { + // try decrypt password for To DB + var ( + pswdTo string + pswdFrom string + err error + ) + if len(c.To.Password) > 0 { + pswdTo, err = utils.Decrypt(c.To.Password) + if err != nil { + return errors.Trace(err) + } + } + if len(c.From.Password) > 0 { + pswdFrom, err = utils.Decrypt(c.From.Password) + if err != nil { + return errors.Trace(err) + } + } + c.From.Password = pswdFrom + c.To.Password = pswdTo + + return nil +} diff --git a/dm/worker/config.go b/dm/worker/config.go index cf161e5bec..4fa6b3743f 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -235,3 +235,21 @@ func (c *Config) Reload() error { return nil } + +// DecryptPassword tries to decrypt db password in config +func (c *Config) DecryptPassword() error { + // try decrypt password for To DB + var ( + pswdFrom string + err error + ) + if len(c.From.Password) > 0 { + pswdFrom, err = utils.Decrypt(c.From.Password) + if err != nil { + return errors.Trace(err) + } + } + c.From.Password = pswdFrom + + return nil +} diff --git a/dm/worker/worker.go b/dm/worker/worker.go index 8021a81d53..d8145c22cf 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -189,9 +189,13 @@ func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) error { w.copyConfigFromWorker(cfg) log.Infof("[worker] starting sub task with config: %v", cfg) + err := cfg.DecryptPassword() + if err != nil { + return errors.Trace(err) + } st := NewSubTask(cfg) - err := st.Init() + err = st.Init() if err != nil { return errors.Trace(err) } @@ -584,6 +588,11 @@ func (w *Worker) UpdateRelayConfig(ctx context.Context, content string) error { return errors.Trace(err) } + err = newCfg.DecryptPassword() + if err != nil { + return errors.Trace(err) + } + log.Infof("[worker] update relay configure with config: %v", newCfg) // Update SubTask configure diff --git a/loader/db.go b/loader/db.go index e6637a4042..1701c25475 100644 --- a/loader/db.go +++ b/loader/db.go @@ -15,12 +15,12 @@ package loader import ( "database/sql" + "fmt" "time" "github.com/go-sql-driver/mysql" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" - "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/errors" tmysql "github.com/pingcap/parser/mysql" ) @@ -195,7 +195,8 @@ func executeSQLImp(db *sql.DB, sqls []string) error { } func createConn(cfg *config.SubTaskConfig) (*Conn, error) { - db, err := utils.OpenDBWithEncryptedPwd(cfg.To.Host, cfg.To.Port, cfg.To.User, cfg.To.Password, "5m") + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8", cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port) + db, err := sql.Open("mysql", dbDSN) if err != nil { return nil, errors.Trace(err) } diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index e8b3ae3c89..b042e11ccf 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -26,8 +26,6 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/log" - "github.com/pingcap/dm/pkg/utils" - "github.com/pingcap/errors" "github.com/siddontang/go/sync2" ) @@ -44,19 +42,13 @@ func NewMydumper(cfg *config.SubTaskConfig) *Mydumper { m := &Mydumper{ cfg: cfg, } - + m.args = m.constructArgs() return m } // Init implements Unit.Init func (m *Mydumper) Init() error { - var err error - m.args, err = m.constructArgs() - if err != nil { - return errors.Trace(err) - } - - return nil + return nil // always return nil } // Process implements Unit.Process @@ -153,21 +145,10 @@ func (m *Mydumper) IsFreshTask() (bool, error) { } // constructArgs constructs arguments for exec.Command -func (m *Mydumper) constructArgs() ([]string, error) { +func (m *Mydumper) constructArgs() []string { cfg := m.cfg db := cfg.From - var ( - password string - err error - ) - if len(db.Password) > 0 { - password, err = utils.Decrypt(password) - if err != nil { - return nil, errors.Annotatef(err, "can not decrypt password %s of db %+v", db) - } - } - ret := []string{ "--host", db.Host, @@ -197,8 +178,8 @@ func (m *Mydumper) constructArgs() ([]string, error) { log.Infof("[mydumper] create mydumper using args %v", ret) - ret = append(ret, "--password", password) - return ret, nil + ret = append(ret, "--password", db.Password) + return ret } // logArgs constructs arguments for log from SubTaskConfig diff --git a/pkg/utils/db.go b/pkg/utils/db.go index f3fdafe332..133d53c8be 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -264,7 +264,7 @@ func IsNoSuchThreadError(err error) bool { } // OpenDBWithEncryptedPwd returns a db fd with encrypted password -func OpenDBWithEncryptedPwd(host string, port int, user, password, timeout string) (db *sql.DB, err error) { +/*func OpenDBWithEncryptedPwd(host string, port int, user, password, timeout string) (db *sql.DB, err error) { if len(password) > 0 { password, err = Decrypt(password) if err != nil { @@ -279,4 +279,4 @@ func OpenDBWithEncryptedPwd(host string, port int, user, password, timeout strin } return -} +}*/ diff --git a/relay/relay.go b/relay/relay.go index 887d76e444..4ffebb4b42 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -139,7 +139,8 @@ func (r *Relay) Init() (err error) { }() cfg := r.cfg.From - db, err := utils.OpenDBWithEncryptedPwd(cfg.Host, cfg.Port, cfg.User, cfg.Password, showStatusConnectionTimeout) + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&readTimeout=%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, showStatusConnectionTimeout) + db, err := sql.Open("mysql", dbDSN) if err != nil { return errors.Trace(err) } @@ -1026,7 +1027,8 @@ func (r *Relay) Reload(newCfg *Config) error { r.db.Close() cfg := r.cfg.From - db, err := utils.OpenDBWithEncryptedPwd(cfg.Host, cfg.Port, cfg.User, cfg.Password, showStatusConnectionTimeout) + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&readTimeout=%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, showStatusConnectionTimeout) + db, err := sql.Open("mysql", dbDSN) if err != nil { return errors.Trace(err) } diff --git a/syncer/db.go b/syncer/db.go index 93ff08d32f..ff9e5bb3b4 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -20,7 +20,6 @@ import ( "time" "github.com/pingcap/dm/pkg/log" - "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" @@ -247,8 +246,8 @@ func (conn *Conn) executeSQLJobImp(jobs []*job) *ExecErrorContext { } func createDB(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string) (*Conn, error) { - db, err := utils.OpenDBWithEncryptedPwd(dbCfg.Host, dbCfg.Port, dbCfg.User, dbCfg.Password, timeout) - + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&interpolateParams=true&readTimeout=%s", dbCfg.User, dbCfg.Password, dbCfg.Host, dbCfg.Port, timeout) + db, err := sql.Open("mysql", dbDSN) if err != nil { return nil, errors.Trace(err) } diff --git a/syncer/heartbeat.go b/syncer/heartbeat.go index d47edb47ca..d1863b0a45 100644 --- a/syncer/heartbeat.go +++ b/syncer/heartbeat.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" - "github.com/pingcap/dm/pkg/utils" ) // privileges: SELECT, UPDATE, optionaly INSERT, optionaly CREATE. @@ -123,7 +122,8 @@ func (h *Heartbeat) AddTask(name string) error { if h.master == nil { // open DB dbCfg := h.cfg.masterCfg - master, err := utils.OpenDBWithEncryptedPwd(dbCfg.Host, dbCfg.Port, dbCfg.User, dbCfg.Password, "1m") + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&interpolateParams=true&readTimeout=1m", dbCfg.User, dbCfg.Password, dbCfg.Host, dbCfg.Port) + master, err := sql.Open("mysql", dbDSN) if err != nil { return errors.Trace(err) } From 0131160ed65ed968be672e7c63a7f09a6d7d7241 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 11:44:54 +0800 Subject: [PATCH 05/22] *: refine password --- dm/config/subtask.go | 46 +++++++++++++++++++++++++++++++------------- dm/worker/config.go | 35 ++++++++++++++++++++++----------- dm/worker/relay.go | 25 ++++++++++++------------ dm/worker/worker.go | 29 +++++++++++----------------- 4 files changed, 81 insertions(+), 54 deletions(-) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 84040e080c..855aa5bb98 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -253,7 +253,8 @@ func (c *SubTaskConfig) adjust() error { } } - return nil + _, err := c.DecryptPassword() + return err } // Parse parses flag definitions from the argument list. @@ -291,27 +292,46 @@ func (c *SubTaskConfig) Parse(arguments []string) error { } // DecryptPassword tries to decrypt db password in config -func (c *SubTaskConfig) DecryptPassword() error { - // try decrypt password for To DB +func (c *SubTaskConfig) DecryptPassword() (*SubTaskConfig, error) { + clone, err := c.Clone() + if err != nil { + return nil, errors.Trace(err) + } + var ( pswdTo string pswdFrom string - err error ) - if len(c.To.Password) > 0 { - pswdTo, err = utils.Decrypt(c.To.Password) + if len(clone.To.Password) > 0 { + pswdTo, err = utils.Decrypt(clone.To.Password) if err != nil { - return errors.Trace(err) + return nil, errors.Annotatef(err, "can not decrypt password %s of downstream DB", clone.To.Password) } } - if len(c.From.Password) > 0 { - pswdFrom, err = utils.Decrypt(c.From.Password) + if len(clone.From.Password) > 0 { + pswdFrom, err = utils.Decrypt(clone.From.Password) if err != nil { - return errors.Trace(err) + return nil, errors.Annotatef(err, "can not decrypt password %s of source DB", clone.From.Password) } } - c.From.Password = pswdFrom - c.To.Password = pswdTo + clone.From.Password = pswdFrom + clone.To.Password = pswdTo + + return clone, nil +} + +// Clone returns a replica of SubTaskConfig +func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) { + content, err := c.Toml() + if err != nil { + return nil, errors.Trace(err) + } + + clone := &SubTaskConfig{} + _, err = toml.Decode(content, clone) + if err != nil { + return nil, errors.Trace(err) + } - return nil + return clone, nil } diff --git a/dm/worker/config.go b/dm/worker/config.go index 4fa6b3743f..221faa8437 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -189,25 +189,36 @@ func (c *Config) verify() error { return errors.Errorf("dm-worker should bind a non-empty source ID which represents a MySQL/MariaDB instance or a replica group. \n notice: if you use old version dm-ansible, please update to newest version.") } + var err error if len(c.RelayBinLogName) > 0 { - _, err := streamer.GetBinlogFileIndex(c.RelayBinLogName) + _, err = streamer.GetBinlogFileIndex(c.RelayBinLogName) if err != nil { return errors.Annotatef(err, "relay-binlog-name %s", c.RelayBinLogName) } } if len(c.RelayBinlogGTID) > 0 { - _, err := gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID) + _, err = gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID) if err != nil { return errors.Annotatef(err, "relay-binlog-gtid %s", c.RelayBinlogGTID) } } - return nil + + _, err = c.DecryptPassword() + return err } // configFromFile loads config from file. func (c *Config) configFromFile(path string) error { _, err := toml.DecodeFile(path, c) - return errors.Trace(err) + if err != nil { + return errors.Trace(err) + } + + err = c.verify() + if err != nil { + return errors.Trace(err) + } + return nil } // UpdateConfigFile write configure to local file @@ -236,20 +247,22 @@ func (c *Config) Reload() error { return nil } -// DecryptPassword tries to decrypt db password in config -func (c *Config) DecryptPassword() error { +// DecryptPassword return a decrypted config replica in config +func (c *Config) DecryptPassword() (*Config, error) { // try decrypt password for To DB + + clone := c.Clone() var ( pswdFrom string err error ) - if len(c.From.Password) > 0 { - pswdFrom, err = utils.Decrypt(c.From.Password) + if len(clone.From.Password) > 0 { + pswdFrom, err = utils.Decrypt(clone.From.Password) if err != nil { - return errors.Trace(err) + return nil, errors.Annotatef(err, "can not decrypt password %s", clone.From.Password) } } - c.From.Password = pswdFrom + clone.From.Password = pswdFrom - return nil + return clone, nil } diff --git a/dm/worker/relay.go b/dm/worker/relay.go index 597642487a..6caa8ebbd8 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -43,21 +43,22 @@ type RelayHolder struct { // NewRelayHolder creates a new RelayHolder func NewRelayHolder(cfg *Config) *RelayHolder { + clone, _ := cfg.DecryptPassword() relayCfg := &relay.Config{ - EnableGTID: cfg.EnableGTID, - AutoFixGTID: cfg.AutoFixGTID, - Flavor: cfg.Flavor, - RelayDir: cfg.RelayDir, - ServerID: cfg.ServerID, - Charset: cfg.Charset, + EnableGTID: clone.EnableGTID, + AutoFixGTID: clone.AutoFixGTID, + Flavor: clone.Flavor, + RelayDir: clone.RelayDir, + ServerID: clone.ServerID, + Charset: clone.Charset, From: relay.DBConfig{ - Host: cfg.From.Host, - Port: cfg.From.Port, - User: cfg.From.User, - Password: cfg.From.Password, + Host: clone.From.Host, + Port: clone.From.Port, + User: clone.From.User, + Password: clone.From.Password, }, - BinLogName: cfg.RelayBinLogName, - BinlogGTID: cfg.RelayBinlogGTID, + BinLogName: clone.RelayBinLogName, + BinlogGTID: clone.RelayBinlogGTID, } h := &RelayHolder{ diff --git a/dm/worker/worker.go b/dm/worker/worker.go index d8145c22cf..d7c70fa631 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -106,6 +106,12 @@ func (w *Worker) Start() { log.Info("[worker] start running") + // start relay + w.relayHolder.Start() + + // start purger + w.relayPurger.Start() + // restore tasks meta := w.meta.Get() for taskName, subtask := range meta.SubTasks { @@ -114,12 +120,6 @@ func (w *Worker) Start() { } } - // start relay - w.relayHolder.Start() - - // start purger - w.relayPurger.Start() - // start tracer if w.tracer.Enable() { w.tracer.Start() @@ -189,13 +189,10 @@ func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) error { w.copyConfigFromWorker(cfg) log.Infof("[worker] starting sub task with config: %v", cfg) - err := cfg.DecryptPassword() - if err != nil { - return errors.Trace(err) - } + cloneCfg, _ := cfg.DecryptPassword() - st := NewSubTask(cfg) - err = st.Init() + st := NewSubTask(cloneCfg) + err := st.Init() if err != nil { return errors.Trace(err) } @@ -588,18 +585,14 @@ func (w *Worker) UpdateRelayConfig(ctx context.Context, content string) error { return errors.Trace(err) } - err = newCfg.DecryptPassword() - if err != nil { - return errors.Trace(err) - } - log.Infof("[worker] update relay configure with config: %v", newCfg) + cloneCfg, _ := newCfg.DecryptPassword() // Update SubTask configure for _, st := range w.subTasks { cfg := config.NewSubTaskConfig() - cfg.From = newCfg.From + cfg.From = cloneCfg.From stage := st.Stage() if stage == pb.Stage_Paused { From c470b4559319283dbab426f38e3a377d1ab41e3b Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 11:54:15 +0800 Subject: [PATCH 06/22] *: add config test case --- dm/worker/config_test.go | 21 +++++++++++++++++++++ dm/worker/dm-worker.toml | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 dm/worker/config_test.go diff --git a/dm/worker/config_test.go b/dm/worker/config_test.go new file mode 100644 index 0000000000..a8b31ac0c9 --- /dev/null +++ b/dm/worker/config_test.go @@ -0,0 +1,21 @@ +package worker + +import ( + . "github.com/pingcap/check" +) + +func (t *testWorker) TestConfig(c *C) { + cfg := &Config{} + + err := cfg.configFromFile("./dm-worker.toml") + c.Assert(err, IsNil) + c.Assert(cfg.SourceID, Equals, "mysql-replica-01") + + clone1 := cfg.Clone() + c.Assert(cfg, DeepEquals, clone1) + clone1.From.Password = "1234" + + clone2, err := cfg.DecryptPassword() + c.Assert(err, IsNil) + c.Assert(clone2, DeepEquals, clone1) +} diff --git a/dm/worker/dm-worker.toml b/dm/worker/dm-worker.toml index f8b178970c..1e180431ba 100644 --- a/dm/worker/dm-worker.toml +++ b/dm/worker/dm-worker.toml @@ -29,7 +29,7 @@ enable-gtid = false [from] host = "127.0.0.1" user = "root" -password = "" +password = "Up8156jArvIPymkVC+5LxkAT6rek" port = 3306 #relay log purge strategy From e6753165c5732c6bd3d86e19a8865fb5130d7475 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 12:50:42 +0800 Subject: [PATCH 07/22] add test case --- dm/config/subtask_test.go | 52 +++++++++++++++++++++++++++++++++++++++ dm/worker/config_test.go | 23 ++++++++++++++++- 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 dm/config/subtask_test.go diff --git a/dm/config/subtask_test.go b/dm/config/subtask_test.go new file mode 100644 index 0000000000..0000325d1d --- /dev/null +++ b/dm/config/subtask_test.go @@ -0,0 +1,52 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + . "github.com/pingcap/check" +) + +func (t *testConfig) TestSubTask(c *C) { + cfg := &SubTaskConfig{ + From: DBConfig{ + Host: "127.0.0.1", + Port: 3306, + User: "root", + Password: "Up8156jArvIPymkVC+5LxkAT6rek", + }, + To: DBConfig{ + Host: "127.0.0.1", + Port: 4306, + User: "root", + Password: "", + }, + } + + clone1, err := cfg.Clone() + c.Assert(err, IsNil) + c.Assert(cfg, DeepEquals, clone1) + + clone1.From.Password = "1234" + clone2, err := cfg.DecryptPassword() + c.Assert(err, IsNil) + c.Assert(clone2, DeepEquals, clone1) + + cfg.From.Password = "xxx" + clone3, err := cfg.DecryptPassword() + c.Assert(err, NotNil) + + cfg.From.Password = "" + clone3, err = cfg.DecryptPassword() + c.Assert(clone3, DeepEquals, cfg) +} diff --git a/dm/worker/config_test.go b/dm/worker/config_test.go index a8b31ac0c9..548e085dbd 100644 --- a/dm/worker/config_test.go +++ b/dm/worker/config_test.go @@ -1,3 +1,16 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package worker import ( @@ -13,9 +26,17 @@ func (t *testWorker) TestConfig(c *C) { clone1 := cfg.Clone() c.Assert(cfg, DeepEquals, clone1) - clone1.From.Password = "1234" + clone1.From.Password = "1234" clone2, err := cfg.DecryptPassword() c.Assert(err, IsNil) c.Assert(clone2, DeepEquals, clone1) + + cfg.From.Password = "xxx" + clone3, err := cfg.DecryptPassword() + c.Assert(err, NotNil) + + cfg.From.Password = "" + clone3, err = cfg.DecryptPassword() + c.Assert(clone3, DeepEquals, cfg) } From f1be6b966bb450fafc398353a60b03473d4a88dc Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 16:45:10 +0800 Subject: [PATCH 08/22] *: fix ci --- mydumper/mydumper_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mydumper/mydumper_test.go b/mydumper/mydumper_test.go index 512d3dd5c3..bf14427b4e 100644 --- a/mydumper/mydumper_test.go +++ b/mydumper/mydumper_test.go @@ -53,9 +53,10 @@ func (m *testMydumperSuite) SetUpSuite(c *C) { } func (m *testMydumperSuite) TestArgs(c *C) { - expected := strings.Fields("--host 127.0.0.1 --port 3306 --user root --password 123 " + + expected := strings.Fields("--host 127.0.0.1 --port 3306 --user root " + "--outputdir ./dumped_data --threads 4 --chunk-filesize 64 --skip-tz-utc " + - "--regex ^(?!(mysql|information_schema|performance_schema))") + "--regex ^(?!(mysql|information_schema|performance_schema)) " + + "--password 123") m.cfg.MydumperConfig.ExtraArgs = "--regex '^(?!(mysql|information_schema|performance_schema))'" mydumper := NewMydumper(m.cfg) args := mydumper.constructArgs() From 6a4c824eeec30522c6dde1c7cf07b250f9ede2b5 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 17:59:55 +0800 Subject: [PATCH 09/22] add test case to restart dm-worker --- tests/all_mode/run.sh | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 1d5f85dbe0..7359eff7ce 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -27,6 +27,14 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + # kill dm-worker + killall dm-master.test + wait_process_exit dm-worker.test + + # restart dm-worker + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 From 8db23bdac8bb5c9e1c0fff4454ba694f31e87819 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 18:07:16 +0800 Subject: [PATCH 10/22] fix ci --- tests/all_mode/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 7359eff7ce..8ba35276ae 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -28,7 +28,7 @@ function run() { check_sync_diff $WORK_DIR $cur/conf/diff_config.toml # kill dm-worker - killall dm-master.test + pkill -hup dm-worker.test 2>/dev/null || true wait_process_exit dm-worker.test # restart dm-worker From 97ce3a71cbc740c723c2d84718f797d426b4e538 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 18:16:12 +0800 Subject: [PATCH 11/22] *: restore dm-worker --- tests/_utils/restart_dm_worker | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100755 tests/_utils/restart_dm_worker diff --git a/tests/_utils/restart_dm_worker b/tests/_utils/restart_dm_worker new file mode 100755 index 0000000000..b11c1c8dcd --- /dev/null +++ b/tests/_utils/restart_dm_worker @@ -0,0 +1,21 @@ +#!/bin/sh +# parameter 1: work directory +# parameter 2: worker-addr port +# parameter 3: config file for DM-worker + +set -eu + +workdir=$1 +port=$2 +conf=$3 + +PWD=$(pwd) +binary=$PWD/bin/dm-worker.test + +echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>" +cd $workdir +$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.out" DEVEL \ + --worker-addr=:$port --relay-dir="$workdir/relay_log" \ + --log-file="$workdir/log/dm-worker.log" -L=debug --config="$conf" \ + > $workdir/log/stdout.log 2>&1 & +cd $PWD From 414c97600d3653201067de5d0d202020e8cdf6c3 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 18:23:04 +0800 Subject: [PATCH 12/22] fix ci --- tests/_utils/restart_dm_worker | 4 ++++ tests/all_mode/run.sh | 8 ++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/_utils/restart_dm_worker b/tests/_utils/restart_dm_worker index b11c1c8dcd..170f42bffc 100755 --- a/tests/_utils/restart_dm_worker +++ b/tests/_utils/restart_dm_worker @@ -12,6 +12,10 @@ conf=$3 PWD=$(pwd) binary=$PWD/bin/dm-worker.test +pkill -hup dm-worker.test 2>/dev/null || true +source $cur/../_utils/wait_process_exit +wait_process_exit dm-worker.test + echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>" cd $workdir $binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.out" DEVEL \ diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 8ba35276ae..e66665b8ca 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -27,13 +27,9 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - # kill dm-worker - pkill -hup dm-worker.test 2>/dev/null || true - wait_process_exit dm-worker.test - # restart dm-worker - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + restart_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + restart_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 From 7ec9ff66e95d351ef61b9b03adda82274018d444 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 18:27:33 +0800 Subject: [PATCH 13/22] fix ci --- tests/_utils/restart_dm_worker | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/_utils/restart_dm_worker b/tests/_utils/restart_dm_worker index 170f42bffc..e78cd4424c 100755 --- a/tests/_utils/restart_dm_worker +++ b/tests/_utils/restart_dm_worker @@ -13,7 +13,7 @@ PWD=$(pwd) binary=$PWD/bin/dm-worker.test pkill -hup dm-worker.test 2>/dev/null || true -source $cur/../_utils/wait_process_exit +source ./wait_process_exit wait_process_exit dm-worker.test echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>" From b338e391020a328366317917eaaae60a4827b7ba Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Thu, 21 Mar 2019 18:31:24 +0800 Subject: [PATCH 14/22] fix ci --- tests/_utils/restart_dm_worker | 4 ---- tests/all_mode/run.sh | 3 +++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/_utils/restart_dm_worker b/tests/_utils/restart_dm_worker index e78cd4424c..b11c1c8dcd 100755 --- a/tests/_utils/restart_dm_worker +++ b/tests/_utils/restart_dm_worker @@ -12,10 +12,6 @@ conf=$3 PWD=$(pwd) binary=$PWD/bin/dm-worker.test -pkill -hup dm-worker.test 2>/dev/null || true -source ./wait_process_exit -wait_process_exit dm-worker.test - echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>" cd $workdir $binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.out" DEVEL \ diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index e66665b8ca..b943b8bc96 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -27,6 +27,9 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + pkill -hup dm-worker.test 2>/dev/null || true + wait_process_exit dm-worker.test + # restart dm-worker restart_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml restart_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml From f036005df715c47c9b71849b0fcd07b1aa931e70 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 25 Mar 2019 10:38:32 +0800 Subject: [PATCH 15/22] Update dm/worker/config.go Co-Authored-By: GregoryIan --- dm/worker/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/worker/config.go b/dm/worker/config.go index 221faa8437..5d78e90324 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -247,7 +247,7 @@ func (c *Config) Reload() error { return nil } -// DecryptPassword return a decrypted config replica in config +// DecryptPassword returns a decrypted config replica in config func (c *Config) DecryptPassword() (*Config, error) { // try decrypt password for To DB From be2b474710257152bb4ca5873fb63d000b543496 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 25 Mar 2019 10:54:34 +0800 Subject: [PATCH 16/22] Apply suggestions from code review Co-Authored-By: GregoryIan --- dm/worker/meta.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/worker/meta.go b/dm/worker/meta.go index 671ab8cbad..915bee7df3 100644 --- a/dm/worker/meta.go +++ b/dm/worker/meta.go @@ -70,7 +70,7 @@ type FileMetaDB struct { path string } -// NewFileMetaDB return a meta file db +// NewFileMetaDB returns a meta file db func NewFileMetaDB(dir string) (*FileMetaDB, error) { metaDB := &FileMetaDB{ path: path.Join(dir, "meta"), From f9895cc8d12e273e672dd8e83cd4e77059111f6f Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Mon, 25 Mar 2019 10:56:03 +0800 Subject: [PATCH 17/22] *: refine code --- checker/checker.go | 18 +++--------------- dm/config/subtask.go | 4 ++-- dm/master/server.go | 2 +- dm/worker/config.go | 8 ++++++-- pkg/utils/encrypt.go | 4 ++-- 5 files changed, 14 insertions(+), 22 deletions(-) diff --git a/checker/checker.go b/checker/checker.go index e8b6797be3..15dff813fd 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -72,8 +72,10 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) * } for _, cfg := range cfgs { + // we have verify it in subtask config + replica, _ := cfg.DecryptPassword() c.instances = append(c.instances, &mysqlInstance{ - cfg: cfg, + cfg: replica, }) } @@ -118,13 +120,6 @@ func (c *Checker) Init() (err error) { User: instance.cfg.From.User, Password: instance.cfg.From.Password, } - if len(instance.sourceDBinfo.Password) > 0 { - pswd, err2 := utils.Decrypt(instance.sourceDBinfo.Password) - if err2 != nil { - return errors.Annotatef(err2, "can not decrypt password %s", instance.sourceDBinfo.Password) - } - instance.sourceDBinfo.Password = pswd - } instance.sourceDB, err = dbutil.OpenDB(*instance.sourceDBinfo) if err != nil { return errors.Trace(err) @@ -136,13 +131,6 @@ func (c *Checker) Init() (err error) { User: instance.cfg.To.User, Password: instance.cfg.To.Password, } - if len(instance.targetDBInfo.Password) > 0 { - pswd, err2 := utils.Decrypt(instance.targetDBInfo.Password) - if err2 != nil { - return errors.Annotatef(err2, "can not decrypt password %s", instance.targetDBInfo.Password) - } - instance.targetDBInfo.Password = pswd - } instance.targetDB, err = dbutil.OpenDB(*instance.targetDBInfo) if err != nil { return errors.Trace(err) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 855aa5bb98..40bcde4bfe 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -305,13 +305,13 @@ func (c *SubTaskConfig) DecryptPassword() (*SubTaskConfig, error) { if len(clone.To.Password) > 0 { pswdTo, err = utils.Decrypt(clone.To.Password) if err != nil { - return nil, errors.Annotatef(err, "can not decrypt password %s of downstream DB", clone.To.Password) + return nil, errors.Annotatef(err, "downstream DB") } } if len(clone.From.Password) > 0 { pswdFrom, err = utils.Decrypt(clone.From.Password) if err != nil { - return nil, errors.Annotatef(err, "can not decrypt password %s of source DB", clone.From.Password) + return nil, errors.Annotatef(err, "source DB") } } clone.From.Password = pswdFrom diff --git a/dm/master/server.go b/dm/master/server.go index 91879e6631..fe30966ad4 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1718,7 +1718,7 @@ func (s *Server) CheckTask(ctx context.Context, req *pb.CheckTaskRequest) (*pb.C if err != nil { return &pb.CheckTaskResponse{ Result: false, - Msg: errors.Cause(err).Error(), + Msg: errors.ErrorStack(err), }, nil } diff --git a/dm/worker/config.go b/dm/worker/config.go index 221faa8437..f4c4e608e4 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -204,7 +204,11 @@ func (c *Config) verify() error { } _, err = c.DecryptPassword() - return err + if err != nil { + return errors.Trace(err) + } + + return nil } // configFromFile loads config from file. @@ -259,7 +263,7 @@ func (c *Config) DecryptPassword() (*Config, error) { if len(clone.From.Password) > 0 { pswdFrom, err = utils.Decrypt(clone.From.Password) if err != nil { - return nil, errors.Annotatef(err, "can not decrypt password %s", clone.From.Password) + return nil, errors.Trace(err) } } clone.From.Password = pswdFrom diff --git a/pkg/utils/encrypt.go b/pkg/utils/encrypt.go index c9d4c13616..00017448f2 100644 --- a/pkg/utils/encrypt.go +++ b/pkg/utils/encrypt.go @@ -34,12 +34,12 @@ func Encrypt(plaintext string) (string, error) { func Decrypt(ciphertextB64 string) (string, error) { ciphertext, err := base64.StdEncoding.DecodeString(ciphertextB64) if err != nil { - return "", errors.Trace(err) + return "", errors.Annotatef(err, "can not decrypt password %s", ciphertextB64) } plaintext, err := encrypt.Decrypt(ciphertext) if err != nil { - return "", errors.Trace(err) + return "", errors.Annotatef(err, "can not decrypt password %s", ciphertextB64) } return string(plaintext), nil } From 9ac42b881bba58c649d76c52de3427c1469a785f Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Mon, 25 Mar 2019 11:18:42 +0800 Subject: [PATCH 18/22] *: fix test --- tests/_utils/run_dm_worker | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/_utils/run_dm_worker b/tests/_utils/run_dm_worker index ae948ed604..4e939b0404 100755 --- a/tests/_utils/run_dm_worker +++ b/tests/_utils/run_dm_worker @@ -17,7 +17,7 @@ ln -s $PWD/bin/mydumper $workdir/bin/mydumper echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>" cd $workdir -$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.out" DEVEL \ +$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.$(date +"%s").out" DEVEL \ --worker-addr=:$port --relay-dir="$workdir/relay_log" \ --log-file="$workdir/log/dm-worker.log" -L=debug --config="$conf" \ > $workdir/log/stdout.log 2>&1 & From cf28596004caae0efed35df49079c40a7b732839 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Mon, 25 Mar 2019 13:16:29 +0800 Subject: [PATCH 19/22] *: refine code --- dm/worker/config.go | 2 -- dm/worker/meta.go | 2 +- tests/_utils/restart_dm_worker | 21 --------------------- tests/_utils/run_dm_worker | 2 +- 4 files changed, 2 insertions(+), 25 deletions(-) delete mode 100755 tests/_utils/restart_dm_worker diff --git a/dm/worker/config.go b/dm/worker/config.go index 00a6bd7994..7265708a27 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -253,8 +253,6 @@ func (c *Config) Reload() error { // DecryptPassword returns a decrypted config replica in config func (c *Config) DecryptPassword() (*Config, error) { - // try decrypt password for To DB - clone := c.Clone() var ( pswdFrom string diff --git a/dm/worker/meta.go b/dm/worker/meta.go index 915bee7df3..bf7757c536 100644 --- a/dm/worker/meta.go +++ b/dm/worker/meta.go @@ -80,7 +80,7 @@ func NewFileMetaDB(dir string) (*FileMetaDB, error) { } if err := os.MkdirAll(dir, 0700); err != nil { - return nil, errors.Annotatef(err, "create meta directory") + return nil, errors.Annotatef(err, "create meta directory %s", dir) } fd, err := os.Open(metaDB.path) diff --git a/tests/_utils/restart_dm_worker b/tests/_utils/restart_dm_worker deleted file mode 100755 index b11c1c8dcd..0000000000 --- a/tests/_utils/restart_dm_worker +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/sh -# parameter 1: work directory -# parameter 2: worker-addr port -# parameter 3: config file for DM-worker - -set -eu - -workdir=$1 -port=$2 -conf=$3 - -PWD=$(pwd) -binary=$PWD/bin/dm-worker.test - -echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>" -cd $workdir -$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.out" DEVEL \ - --worker-addr=:$port --relay-dir="$workdir/relay_log" \ - --log-file="$workdir/log/dm-worker.log" -L=debug --config="$conf" \ - > $workdir/log/stdout.log 2>&1 & -cd $PWD diff --git a/tests/_utils/run_dm_worker b/tests/_utils/run_dm_worker index 4e939b0404..e4055e5257 100755 --- a/tests/_utils/run_dm_worker +++ b/tests/_utils/run_dm_worker @@ -13,7 +13,7 @@ mkdir -p $workdir/relay_log $workdir/dumped_data $workdir/log $workdir/bin PWD=$(pwd) binary=$PWD/bin/dm-worker.test -ln -s $PWD/bin/mydumper $workdir/bin/mydumper +[ -f $workdir/bin/mydumper ] || ln -s $PWD/bin/mydumper $workdir/bin/mydumper echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>" cd $workdir From ae28bc0942eae20c6ed978ece3cedd9d20b6413f Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Mon, 25 Mar 2019 13:25:04 +0800 Subject: [PATCH 20/22] *: fix ci --- tests/all_mode/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index b943b8bc96..1488d38f2c 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -31,8 +31,8 @@ function run() { wait_process_exit dm-worker.test # restart dm-worker - restart_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - restart_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 From 710250937c742a53787ce71fb18fb2104e445d53 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Mon, 25 Mar 2019 14:38:35 +0800 Subject: [PATCH 21/22] *: change context dependency --- dm/tracer/server.go | 2 +- go.mod | 1 - pkg/tracing/tracer.go | 2 +- pkg/tracing/tracer_test.go | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dm/tracer/server.go b/dm/tracer/server.go index 99babf056e..a64eeac1ad 100644 --- a/dm/tracer/server.go +++ b/dm/tracer/server.go @@ -14,6 +14,7 @@ package tracer import ( + "context" "net" "net/http" "sync" @@ -23,7 +24,6 @@ import ( "github.com/pingcap/errors" "github.com/siddontang/go/sync2" "github.com/soheilhy/cmux" - "golang.org/x/net/context" "google.golang.org/grpc" "github.com/pingcap/dm/dm/common" diff --git a/go.mod b/go.mod index ca2c402a4f..233a039fa7 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 // indirect golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc // indirect - golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e golang.org/x/sys v0.0.0-20190116161447-11f53e031339 google.golang.org/grpc v1.17.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect diff --git a/pkg/tracing/tracer.go b/pkg/tracing/tracer.go index 016af3daeb..e3a6aa5c1c 100644 --- a/pkg/tracing/tracer.go +++ b/pkg/tracing/tracer.go @@ -14,12 +14,12 @@ package tracing import ( + "context" "sync" "time" "github.com/pingcap/errors" "github.com/siddontang/go/sync2" - "golang.org/x/net/context" "google.golang.org/grpc" "github.com/pingcap/dm/dm/pb" diff --git a/pkg/tracing/tracer_test.go b/pkg/tracing/tracer_test.go index 8fb5bbe4ce..b7cd566a06 100644 --- a/pkg/tracing/tracer_test.go +++ b/pkg/tracing/tracer_test.go @@ -14,6 +14,7 @@ package tracing import ( + "context" "fmt" "net" "sync" @@ -26,7 +27,6 @@ import ( "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go/sync2" "github.com/soheilhy/cmux" - "golang.org/x/net/context" "google.golang.org/grpc" "github.com/pingcap/dm/dm/common" From 6a80363bb572d16bdcc273412ca48d3f6958da9d Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Mon, 25 Mar 2019 19:27:58 +0800 Subject: [PATCH 22/22] address comments --- dm/worker/meta_test.go | 4 ++++ dm/worker/server.go | 7 ++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dm/worker/meta_test.go b/dm/worker/meta_test.go index 9ea09b1986..631892ef07 100644 --- a/dm/worker/meta_test.go +++ b/dm/worker/meta_test.go @@ -53,4 +53,8 @@ func (t *testWorker) TestFileMetaDB(c *C) { c.Assert(err, IsNil) c.Assert(metaDB.meta.SubTasks, HasLen, 1) c.Assert(meta.SubTasks["task1"], NotNil) + + c.Assert(metaDB.Del("task1"), IsNil) + meta = metaDB.Get() + c.Assert(meta.SubTasks, HasLen, 0) } diff --git a/dm/worker/server.go b/dm/worker/server.go index 02923240f7..e4d8149805 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -186,8 +186,8 @@ func (s *Server) OperateSubTask(ctx context.Context, req *pb.OperateSubTaskReque switch req.Op { case pb.TaskOp_Stop: if err = s.worker.meta.Del(name); err != nil { - log.Errorf("update task %s into meta: %v", name, errors.ErrorStack(err)) resp.Msg = fmt.Sprintf("update task %s into meta: %v", name, errors.ErrorStack(err)) + log.Errorf(resp.Msg) } else { err = s.worker.StopSubTask(name) } @@ -225,10 +225,11 @@ func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest } if err = s.worker.meta.Set(cfg); err != nil { - log.Errorf("[server] update task %s into meta: %v", cfg, errors.ErrorStack(err)) + errMsg := fmt.Sprintf("[server] update task %s into meta: %v", cfg, errors.ErrorStack(err)) + log.Errorf(errMsg) return &pb.CommonWorkerResponse{ Result: false, - Msg: fmt.Sprintf("update task %s into meta: %v", cfg, errors.ErrorStack(err)), + Msg: errMsg, }, nil }