Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
master: only fetch necessary DM-worker's config (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Oct 17, 2019
1 parent 5ba776e commit 39d8f5d
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 4 deletions.
20 changes: 17 additions & 3 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,7 @@ func (s *Server) UpdateWorkerRelayConfig(ctx context.Context, req *pb.UpdateWork
}

// TODO: refine the call stack of this API, query worker configs that we needed only
func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConfig, error) {
func (s *Server) getWorkerConfigs(ctx context.Context, workerIDs []string) (map[string]config.DBConfig, error) {
var (
wg sync.WaitGroup
workerMutex sync.Mutex
Expand Down Expand Up @@ -1733,7 +1733,11 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf
Type: workerrpc.CmdQueryWorkerConfig,
QueryWorkerConfig: &pb.QueryWorkerConfigRequest{},
}
for worker, client := range s.workerClients {
for _, worker := range workerIDs {
client, ok := s.workerClients[worker]
if !ok {
continue // outer caller can handle the lack of the config
}
wg.Add(1)
go s.ap.Emit(ctx, 0, func(args ...interface{}) {
defer wg.Done()
Expand Down Expand Up @@ -1843,7 +1847,17 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}

sourceCfgs, err := s.allWorkerConfigs(ctx)
// get workerID from deploy map by sourceID, refactor this when dynamic add/remove worker supported.
workerIDs := make([]string, 0, len(cfg.MySQLInstances))
for _, inst := range cfg.MySQLInstances {
workerID, ok := s.cfg.DeployMap[inst.SourceID]
if !ok {
return nil, nil, terror.ErrMasterTaskConfigExtractor.Generatef("%s relevant worker not found", inst.SourceID)
}
workerIDs = append(workerIDs, workerID)
}

sourceCfgs, err := s.getWorkerConfigs(ctx, workerIDs)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func testGenSubTaskConfig(c *check.C, server *Server, ctrl *gomock.Controller) m
}

func testMockWorkerConfig(c *check.C, server *Server, ctrl *gomock.Controller, password string, result bool) {
// mock QueryWorkerConfig API to be used in s.allWorkerConfigs
// mock QueryWorkerConfig API to be used in s.getWorkerConfigs
for idx, deploy := range server.cfg.Deploy {
dbCfg := &config.DBConfig{
Host: "127.0.0.1",
Expand Down
4 changes: 4 additions & 0 deletions tests/start_task/conf/dm-master.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@
[[deploy]]
source-id = "mysql-replica-01"
dm-worker = "127.0.0.1:8262"

[[deploy]]
source-id = "mysql-not-exist"
dm-worker = "127.0.0.1:8888"
5 changes: 5 additions & 0 deletions tests/start_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ function run() {
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

echo "check un-accessible DM-worker exists"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"transport: Error while dialing dial tcp 127.0.0.1:8888: connect: connection refused" 1

echo "start task and will failed"
task_conf="$cur/conf/dm-task.yaml"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down

0 comments on commit 39d8f5d

Please sign in to comment.