From bfe0d18276da5101eb37a946ce027bf3be8235b9 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 16 Oct 2019 18:14:54 +0800 Subject: [PATCH 1/3] master: only fetch necessary DM-worker's config --- dm/master/server.go | 20 +++++++++++++++++--- dm/master/server_test.go | 2 +- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/dm/master/server.go b/dm/master/server.go index 26deec4b73..442fa62083 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1695,7 +1695,7 @@ func (s *Server) UpdateWorkerRelayConfig(ctx context.Context, req *pb.UpdateWork } // TODO: refine the call stack of this API, query worker configs that we needed only -func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConfig, error) { +func (s *Server) getWorkerConfigs(ctx context.Context, workerIDs []string) (map[string]config.DBConfig, error) { var ( wg sync.WaitGroup workerMutex sync.Mutex @@ -1733,7 +1733,11 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf Type: workerrpc.CmdQueryWorkerConfig, QueryWorkerConfig: &pb.QueryWorkerConfigRequest{}, } - for worker, client := range s.workerClients { + for _, worker := range workerIDs { + client, ok := s.workerClients[worker] + if !ok { + continue // outer caller can handle the lack of the config + } wg.Add(1) go s.ap.Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() @@ -1843,7 +1847,17 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task return nil, nil, terror.WithClass(err, terror.ClassDMMaster) } - sourceCfgs, err := s.allWorkerConfigs(ctx) + // get workerID from deploy map by sourceID, refactor this when dynamic add/remove worker supported. + workerIDs := make([]string, 0, len(cfg.MySQLInstances)) + for _, inst := range cfg.MySQLInstances { + if workerID, ok := s.cfg.DeployMap[inst.SourceID]; !ok { + return nil, nil, terror.ErrMasterTaskConfigExtractor.Generatef("%s relevant worker not found", inst.SourceID) + } else { + workerIDs = append(workerIDs, workerID) + } + } + + sourceCfgs, err := s.getWorkerConfigs(ctx, workerIDs) if err != nil { return nil, nil, err } diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 2dbcaf344f..da7f1f8ceb 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -174,7 +174,7 @@ func testGenSubTaskConfig(c *check.C, server *Server, ctrl *gomock.Controller) m } func testMockWorkerConfig(c *check.C, server *Server, ctrl *gomock.Controller, password string, result bool) { - // mock QueryWorkerConfig API to be used in s.allWorkerConfigs + // mock QueryWorkerConfig API to be used in s.getWorkerConfigs for idx, deploy := range server.cfg.Deploy { dbCfg := &config.DBConfig{ Host: "127.0.0.1", From 44d7bcc80c0606f9845831676f78966a01832ce3 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 16 Oct 2019 19:36:16 +0800 Subject: [PATCH 2/3] master: fix CI --- dm/master/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/master/server.go b/dm/master/server.go index 442fa62083..d294734c44 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1850,11 +1850,11 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task // get workerID from deploy map by sourceID, refactor this when dynamic add/remove worker supported. workerIDs := make([]string, 0, len(cfg.MySQLInstances)) for _, inst := range cfg.MySQLInstances { - if workerID, ok := s.cfg.DeployMap[inst.SourceID]; !ok { + workerID, ok := s.cfg.DeployMap[inst.SourceID] + if !ok { return nil, nil, terror.ErrMasterTaskConfigExtractor.Generatef("%s relevant worker not found", inst.SourceID) - } else { - workerIDs = append(workerIDs, workerID) } + workerIDs = append(workerIDs, workerID) } sourceCfgs, err := s.getWorkerConfigs(ctx, workerIDs) From 409aad1cc26f163be3a86eac3c20392e8be00a78 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Thu, 17 Oct 2019 12:57:24 +0800 Subject: [PATCH 3/3] tests: address comment --- tests/start_task/conf/dm-master.toml | 4 ++++ tests/start_task/run.sh | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/tests/start_task/conf/dm-master.toml b/tests/start_task/conf/dm-master.toml index 8dc16ebacf..1712700549 100644 --- a/tests/start_task/conf/dm-master.toml +++ b/tests/start_task/conf/dm-master.toml @@ -3,3 +3,7 @@ [[deploy]] source-id = "mysql-replica-01" dm-worker = "127.0.0.1:8262" + +[[deploy]] +source-id = "mysql-not-exist" +dm-worker = "127.0.0.1:8888" diff --git a/tests/start_task/run.sh b/tests/start_task/run.sh index 9703b21bb5..f1bb0cd0dd 100644 --- a/tests/start_task/run.sh +++ b/tests/start_task/run.sh @@ -36,6 +36,11 @@ function run() { run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + echo "check un-accessible DM-worker exists" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status" \ + "transport: Error while dialing dial tcp 127.0.0.1:8888: connect: connection refused" 1 + echo "start task and will failed" task_conf="$cur/conf/dm-task.yaml" run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \