Skip to content

Commit

Permalink
master(dm): clean and treat invalid load task (#4004) (#4145)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 30, 2021
1 parent b394044 commit e73e411
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 14 deletions.
57 changes: 44 additions & 13 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2269,16 +2269,56 @@ func (s *Scheduler) getNextLoadTaskTransfer(worker, source string) (string, stri
return "", ""
}

// hasLoadTaskByWorkerAndSource check whether there is a load subtask for the worker and source.
// hasLoadTaskByWorkerAndSource check whether there is an existing load subtask for the worker and source.
func (s *Scheduler) hasLoadTaskByWorkerAndSource(worker, source string) bool {
for _, sourceWorkerMap := range s.loadTasks {
if workerName, ok := sourceWorkerMap[source]; ok && workerName == worker {
for taskName, sourceWorkerMap := range s.loadTasks {
// don't consider removed subtask
subtasksV, ok := s.subTaskCfgs.Load(taskName)
if !ok {
continue
}
subtasks := subtasksV.(map[string]config.SubTaskConfig)
if _, ok2 := subtasks[source]; !ok2 {
continue
}

if workerName, ok2 := sourceWorkerMap[source]; ok2 && workerName == worker {
return true
}
}
return false
}

// TryResolveLoadTask checks if there are sources whose load task has local files and not bound to the worker which is
// accessible to the local files. If so, trigger a transfer source.
func (s *Scheduler) TryResolveLoadTask(sources []string) {
for _, source := range sources {
s.mu.Lock()
worker, ok := s.bounds[source]
if !ok {
s.mu.Unlock()
continue
}
if err := s.tryResolveLoadTask(worker.baseInfo.Name, source); err != nil {
s.logger.Error("tryResolveLoadTask failed", zap.Error(err))
}
s.mu.Unlock()
}
}

func (s *Scheduler) tryResolveLoadTask(originWorker, originSource string) error {
if s.hasLoadTaskByWorkerAndSource(originWorker, originSource) {
return nil
}

worker, source := s.getNextLoadTaskTransfer(originWorker, originSource)
if worker == "" && source == "" {
return nil
}

return s.transferWorkerAndSource(originWorker, originSource, worker, source)
}

func (s *Scheduler) handleLoadTaskDel(loadTask ha.LoadTask) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -2296,16 +2336,7 @@ func (s *Scheduler) handleLoadTaskDel(loadTask ha.LoadTask) error {
delete(s.loadTasks, loadTask.Task)
}

if s.hasLoadTaskByWorkerAndSource(originWorker, loadTask.Source) {
return nil
}

worker, source := s.getNextLoadTaskTransfer(originWorker, loadTask.Source)
if worker == "" && source == "" {
return nil
}

return s.transferWorkerAndSource(originWorker, loadTask.Source, worker, source)
return s.tryResolveLoadTask(originWorker, loadTask.Source)
}

func (s *Scheduler) handleLoadTaskPut(loadTask ha.LoadTask) {
Expand Down
12 changes: 12 additions & 0 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,13 @@ func (t *testScheduler) TestWatchLoadTask(c *C) {
s.workers[workerName4] = worker4
s.sourceCfgs[sourceID1] = &config.SourceConfig{}
s.sourceCfgs[sourceID2] = &config.SourceConfig{}
s.subTaskCfgs.Store(task1, map[string]config.SubTaskConfig{
sourceID1: {},
})
s.subTaskCfgs.Store(task2, map[string]config.SubTaskConfig{
sourceID1: {},
sourceID2: {},
})

worker1.ToFree()
c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil)
Expand Down Expand Up @@ -1651,6 +1658,11 @@ func (t *testScheduler) TestWatchLoadTask(c *C) {
c.Assert(s.bounds[sourceID2], DeepEquals, worker4)
c.Assert(worker2.stage, Equals, WorkerFree)

// after stop-task, hasLoadTaskByWorkerAndSource is no longer valid
c.Assert(s.hasLoadTaskByWorkerAndSource(workerName4, sourceID2), IsTrue)
s.subTaskCfgs.Delete(task2)
c.Assert(s.hasLoadTaskByWorkerAndSource(workerName4, sourceID2), IsFalse)

cancel1()
wg.Wait()
}
Expand Down
2 changes: 2 additions & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,8 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
release()
}

go s.scheduler.TryResolveLoadTask(sources)

resp.Result = true
if cfg.RemoveMeta {
resp.Msg = "`remove-meta` in task config is deprecated, please use `start-task ... --remove-meta` instead"
Expand Down
7 changes: 6 additions & 1 deletion dm/tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,18 @@ function join_string() {

# shortcut for start task on one DM-worker
function dmctl_start_task_standalone() {
if [ $# -ge 2 ]; then
remove_meta=$2
else
remove_meta=""
fi
if [ $# -ge 1 ]; then
task_conf=$1
else
task_conf="$cur/conf/dm-task.yaml"
fi
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $task_conf" \
"start-task $task_conf $remove_meta" \
"\"result\": true" 2 \
"\"source\": \"$SOURCE_ID1\"" 1
}
Expand Down
41 changes: 41 additions & 0 deletions dm/tests/load_task/conf/dm-task-standalone.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
name: load_task1
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
heartbeat-update-interval: 1
heartbeat-report-interval: 1

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["load_task1"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
41 changes: 41 additions & 0 deletions dm/tests/load_task/conf/dm-task2-standalone.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
name: load_task2
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
heartbeat-update-interval: 1
heartbeat-report-interval: 1

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["load_task2"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
81 changes: 81 additions & 0 deletions dm/tests/load_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,94 @@ function test_transfer_two_sources() {
"\"taskStatus\": \"Running\"" 4
}

function stop_task_left_load() {
echo "start DM master, workers and sources"
run_dm_master $WORK_DIR/master $MASTER_PORT1 $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1

export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/loader/LoadDataSlowDownByTask=return(\"load_task1\")"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

dmctl_start_task_standalone "$cur/conf/dm-task-standalone.yaml" "--remove-meta"

export GO_FAILPOINTS=""
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# kill worker1, load_task1 will be transferred to worker2, but lack local files
ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true
check_port_offline $WORKER1_PORT 20
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status load_task1" \
"different worker in load stage, previous worker: worker1, current worker: worker2" 1

# now stop this task without clean meta (left a load_task KV in etcd)
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task load_task1" \
"\"result\": true" 2

dmctl_start_task_standalone "$cur/conf/dm-task2-standalone.yaml" "--remove-meta"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status load_task2" \
"\"unit\": \"Sync\"" 1

# after worker1 goes online, although it has unfinished load_task1, but load_task1 is stopped so should not rebound
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/loader/LoadDataSlowDownByTask=return(\"load_task1\")"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member --name worker1" \
"\"source\": \"\"" 1

# start-task again, expect the source is auto transferred back
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task-standalone.yaml"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member --name worker1" \
"\"source\": \"mysql-replica-01\"" 1

# repeat again and check start-task --remove-meta will not cause transfer
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task load_task1" \
"\"result\": true" 2
ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true
check_port_offline $WORKER1_PORT 20

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status load_task2" \
"\"unit\": \"Sync\"" 1

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

dmctl_start_task_standalone "$cur/conf/dm-task-standalone.yaml" "--remove-meta"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status load_task1" \
"\"unit\": \"Sync\"" 1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member --name worker1" \
"\"source\": \"\"" 1

cleanup_process $*
cleanup_data load_task1
cleanup_data load_task2
}

function run() {
echo "import prepare data"
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

stop_task_left_load

echo "start DM master, workers and sources"
run_dm_master $WORK_DIR/master $MASTER_PORT1 $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1
Expand Down

0 comments on commit e73e411

Please sign in to comment.