diff --git a/dm/master/server.go b/dm/master/server.go index 26deec4b73..d294734c44 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1695,7 +1695,7 @@ func (s *Server) UpdateWorkerRelayConfig(ctx context.Context, req *pb.UpdateWork } // TODO: refine the call stack of this API, query worker configs that we needed only -func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConfig, error) { +func (s *Server) getWorkerConfigs(ctx context.Context, workerIDs []string) (map[string]config.DBConfig, error) { var ( wg sync.WaitGroup workerMutex sync.Mutex @@ -1733,7 +1733,11 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf Type: workerrpc.CmdQueryWorkerConfig, QueryWorkerConfig: &pb.QueryWorkerConfigRequest{}, } - for worker, client := range s.workerClients { + for _, worker := range workerIDs { + client, ok := s.workerClients[worker] + if !ok { + continue // outer caller can handle the lack of the config + } wg.Add(1) go s.ap.Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() @@ -1843,7 +1847,17 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task return nil, nil, terror.WithClass(err, terror.ClassDMMaster) } - sourceCfgs, err := s.allWorkerConfigs(ctx) + // get workerID from deploy map by sourceID, refactor this when dynamic add/remove worker supported. + workerIDs := make([]string, 0, len(cfg.MySQLInstances)) + for _, inst := range cfg.MySQLInstances { + workerID, ok := s.cfg.DeployMap[inst.SourceID] + if !ok { + return nil, nil, terror.ErrMasterTaskConfigExtractor.Generatef("%s relevant worker not found", inst.SourceID) + } + workerIDs = append(workerIDs, workerID) + } + + sourceCfgs, err := s.getWorkerConfigs(ctx, workerIDs) if err != nil { return nil, nil, err } diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 2dbcaf344f..da7f1f8ceb 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -174,7 +174,7 @@ func testGenSubTaskConfig(c *check.C, server *Server, ctrl *gomock.Controller) m } func testMockWorkerConfig(c *check.C, server *Server, ctrl *gomock.Controller, password string, result bool) { - // mock QueryWorkerConfig API to be used in s.allWorkerConfigs + // mock QueryWorkerConfig API to be used in s.getWorkerConfigs for idx, deploy := range server.cfg.Deploy { dbCfg := &config.DBConfig{ Host: "127.0.0.1", diff --git a/tests/start_task/conf/dm-master.toml b/tests/start_task/conf/dm-master.toml index 8dc16ebacf..1712700549 100644 --- a/tests/start_task/conf/dm-master.toml +++ b/tests/start_task/conf/dm-master.toml @@ -3,3 +3,7 @@ [[deploy]] source-id = "mysql-replica-01" dm-worker = "127.0.0.1:8262" + +[[deploy]] +source-id = "mysql-not-exist" +dm-worker = "127.0.0.1:8888" diff --git a/tests/start_task/run.sh b/tests/start_task/run.sh index 9703b21bb5..f1bb0cd0dd 100644 --- a/tests/start_task/run.sh +++ b/tests/start_task/run.sh @@ -36,6 +36,11 @@ function run() { run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + echo "check un-accessible DM-worker exists" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status" \ + "transport: Error while dialing dial tcp 127.0.0.1:8888: connect: connection refused" 1 + echo "start task and will failed" task_conf="$cur/conf/dm-task.yaml" run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \