Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

master: only fetch necessary DM-worker's config #316

Merged
merged 4 commits into from
Oct 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,7 @@ func (s *Server) UpdateWorkerRelayConfig(ctx context.Context, req *pb.UpdateWork
}

// TODO: refine the call stack of this API, query worker configs that we needed only
func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConfig, error) {
func (s *Server) getWorkerConfigs(ctx context.Context, workerIDs []string) (map[string]config.DBConfig, error) {
var (
wg sync.WaitGroup
workerMutex sync.Mutex
Expand Down Expand Up @@ -1733,7 +1733,11 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf
Type: workerrpc.CmdQueryWorkerConfig,
QueryWorkerConfig: &pb.QueryWorkerConfigRequest{},
}
for worker, client := range s.workerClients {
for _, worker := range workerIDs {
client, ok := s.workerClients[worker]
if !ok {
continue // outer caller can handle the lack of the config
}
wg.Add(1)
go s.ap.Emit(ctx, 0, func(args ...interface{}) {
defer wg.Done()
Expand Down Expand Up @@ -1843,7 +1847,17 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}

sourceCfgs, err := s.allWorkerConfigs(ctx)
// get workerID from deploy map by sourceID, refactor this when dynamic add/remove worker supported.
workerIDs := make([]string, 0, len(cfg.MySQLInstances))
for _, inst := range cfg.MySQLInstances {
workerID, ok := s.cfg.DeployMap[inst.SourceID]
if !ok {
return nil, nil, terror.ErrMasterTaskConfigExtractor.Generatef("%s relevant worker not found", inst.SourceID)
}
workerIDs = append(workerIDs, workerID)
}

sourceCfgs, err := s.getWorkerConfigs(ctx, workerIDs)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func testGenSubTaskConfig(c *check.C, server *Server, ctrl *gomock.Controller) m
}

func testMockWorkerConfig(c *check.C, server *Server, ctrl *gomock.Controller, password string, result bool) {
// mock QueryWorkerConfig API to be used in s.allWorkerConfigs
// mock QueryWorkerConfig API to be used in s.getWorkerConfigs
for idx, deploy := range server.cfg.Deploy {
dbCfg := &config.DBConfig{
Host: "127.0.0.1",
Expand Down
4 changes: 4 additions & 0 deletions tests/start_task/conf/dm-master.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@
[[deploy]]
source-id = "mysql-replica-01"
dm-worker = "127.0.0.1:8262"

[[deploy]]
source-id = "mysql-not-exist"
dm-worker = "127.0.0.1:8888"
5 changes: 5 additions & 0 deletions tests/start_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ function run() {
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

echo "check un-accessible DM-worker exists"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"transport: Error while dialing dial tcp 127.0.0.1:8888: connect: connection refused" 1

echo "start task and will failed"
task_conf="$cur/conf/dm-task.yaml"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down