diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 59c085a9e8..6b58e36297 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -93,10 +93,11 @@ func NewSubTask(cfg *config.SubTaskConfig) *SubTask { // NewSubTaskWithStage creates a new SubTask with stage func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage) *SubTask { st := SubTask{ - cfg: cfg, - units: createUnits(cfg), - stage: stage, - l: log.With(zap.String("subtask", cfg.Name)), + cfg: cfg, + units: createUnits(cfg), + stage: stage, + l: log.With(zap.String("subtask", cfg.Name)), + DDLInfo: make(chan *pb.DDLInfo, 1), } taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) return &st @@ -108,8 +109,6 @@ func (st *SubTask) Init() error { return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode) } - st.DDLInfo = make(chan *pb.DDLInfo, 1) - initializeUnitSuccess := true // when error occurred, initialized units should be closed // when continue sub task from loader / syncer, ahead units should be closed diff --git a/loader/loader.go b/loader/loader.go index c82298ee33..dab8bd279a 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -104,32 +104,49 @@ func NewWorker(loader *Loader, id int) (worker *Worker, err error) { // Close closes worker func (w *Worker) Close() { + // simulate the case that doesn't wait all doJob goroutine exit + failpoint.Inject("workerCantClose", func(_ failpoint.Value) { + w.tctx.L().Info("", zap.String("failpoint", "workerCantClose")) + failpoint.Return() + }) + if !atomic.CompareAndSwapInt64(&w.closed, 0, 1) { + w.wg.Wait() + w.tctx.L().Info("already closed...") return } + w.tctx.L().Info("start to close...") close(w.jobQueue) w.wg.Wait() + w.tctx.L().Info("closed !!!") } -func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *sync.WaitGroup, runFatalChan chan *pb.ProcessError) { +func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalChan chan *pb.ProcessError) { atomic.StoreInt64(&w.closed, 0) - defer workerWg.Done() newCtx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + cancel() + // make sure all doJob goroutines exit + w.Close() + }() ctctx := w.tctx.WithContext(newCtx) doJob := func() { - defer w.wg.Done() for { select { case <-newCtx.Done(): - w.tctx.L().Debug("execution goroutine exits") + w.tctx.L().Info("context canceled, execution goroutine exits") return case job, ok := <-w.jobQueue: - if !ok || job == nil { + if !ok { + w.tctx.L().Info("job queue was closed, execution goroutine exits") + return + } + if job == nil { + w.tctx.L().Info("jobs are finished, execution goroutine exits") return } sqls := make([]string, 0, 3) @@ -149,7 +166,12 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg * failpoint.Inject("LoadDataSlowDown", nil) - if err := w.conn.executeSQL(ctctx, sqls); err != nil { + err := w.conn.executeSQL(ctctx, sqls) + failpoint.Inject("executeSQLError", func(_ failpoint.Value) { + w.tctx.L().Info("", zap.String("failpoint", "executeSQLError")) + err = errors.New("inject failpoint executeSQLError") + }) + if err != nil { // expect pause rather than exit err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream) runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) @@ -164,15 +186,19 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg * for { select { case <-newCtx.Done(): + w.tctx.L().Info("context canceled, main goroutine exits") return case job, ok := <-fileJobQueue: if !ok { - w.tctx.L().Debug("main routine exit.") + w.tctx.L().Info("file queue was closed, main routine exit.") return } w.wg.Add(1) - go doJob() + go func() { + defer w.wg.Done() + doJob() + }() // restore a table if err := w.restoreDataFile(ctx, filepath.Join(w.cfg.Dir, job.dataFile), job.offset, job.info); err != nil { @@ -192,12 +218,17 @@ func (w *Worker) restoreDataFile(ctx context.Context, filePath string, offset in return err } - // dispatchSQL completed, send nil. + failpoint.Inject("dispatchError", func(_ failpoint.Value) { + w.tctx.L().Info("", zap.String("failpoint", "dispatchError")) + failpoint.Return(errors.New("inject failpoint dispatchError")) + }) + + // dispatchSQL completed, send nil to make sure all dmls are applied to target database // we don't want to close and re-make chan frequently // but if we need to re-call w.run, we need re-make jobQueue chan w.jobQueue <- nil - w.wg.Wait() + w.tctx.L().Info("finish to restore dump sql file", zap.String("data file", filePath)) return nil } @@ -333,7 +364,6 @@ type Loader struct { bwList *filter.Filter columnMapping *cm.Mapping - pool []*Worker closed sync2.AtomicBool toDB *conn.BaseDB @@ -355,7 +385,6 @@ func NewLoader(cfg *config.SubTaskConfig) *Loader { db2Tables: make(map[string]Tables2DataFiles), tableInfos: make(map[string]*tableInfo), workerWg: new(sync.WaitGroup), - pool: make([]*Worker, 0, cfg.PoolSize), tctx: tcontext.Background().WithLogger(log.With(zap.String("task", cfg.Name), zap.String("unit", "load"))), } loader.fileJobQueueClosed.Set(true) // not open yet @@ -449,7 +478,13 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) { err := l.Restore(newCtx) close(l.runFatalChan) // Restore returned, all potential fatal sent to l.runFatalChan - wg.Wait() // wait for receive all fatal from l.runFatalChan + + failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) { + l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) + l.workerWg.Wait() + }) + + wg.Wait() // wait for receive all fatal from l.runFatalChan if err != nil { loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name).Inc() @@ -536,10 +571,21 @@ func (l *Loader) Restore(ctx context.Context) error { go l.PrintStatus(ctx) - if err := l.restoreData(ctx); err != nil { - if errors.Cause(err) == context.Canceled { - return nil - } + begin := time.Now() + err = l.restoreData(ctx) + + failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) { + l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) + failpoint.Return(nil) + }) + + // make sure all workers exit + l.closeFileJobQueue() // all data file dispatched, close it + l.workerWg.Wait() + + if err == nil { + l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) + } else if errors.Cause(err) != context.Canceled { return err } @@ -576,13 +622,11 @@ func (l *Loader) Close() { func (l *Loader) stopLoad() { // before re-write workflow, simply close all job queue and job workers // when resuming, re-create them + l.tctx.L().Info("stop importing data process") + l.closeFileJobQueue() l.workerWg.Wait() - for _, worker := range l.pool { - worker.Close() - } - l.pool = l.pool[:0] l.tctx.L().Debug("all workers have been closed") } @@ -709,9 +753,10 @@ func (l *Loader) initAndStartWorkerPool(ctx context.Context) error { } l.workerWg.Add(1) // for every worker goroutine, Add(1) - go worker.run(ctx, l.fileJobQueue, l.workerWg, l.runFatalChan) - - l.pool = append(l.pool, worker) + go func() { + defer l.workerWg.Done() + worker.run(ctx, l.fileJobQueue, l.runFatalChan) + }() } return nil } @@ -1106,17 +1151,12 @@ func (l *Loader) restoreData(ctx context.Context) error { select { case <-ctx.Done(): l.tctx.L().Warn("stop dispatch data file job", log.ShortError(ctx.Err())) - l.closeFileJobQueue() return ctx.Err() case l.fileJobQueue <- j: } } - l.closeFileJobQueue() // all data file dispatched, close it l.tctx.L().Info("all data files have been dispatched, waiting for them finished") - l.workerWg.Wait() - - l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) return nil } diff --git a/tests/import_goroutine_leak/conf/dm-master.toml b/tests/import_goroutine_leak/conf/dm-master.toml new file mode 100644 index 0000000000..334e0de993 --- /dev/null +++ b/tests/import_goroutine_leak/conf/dm-master.toml @@ -0,0 +1,9 @@ +# Master Configuration. + +[[deploy]] +source-id = "mysql-replica-01" +dm-worker = "127.0.0.1:8262" + +[[deploy]] +source-id = "mysql-replica-02" +dm-worker = "127.0.0.1:8263" diff --git a/tests/import_goroutine_leak/conf/dm-task.yaml b/tests/import_goroutine_leak/conf/dm-task.yaml new file mode 100644 index 0000000000..fe57809826 --- /dev/null +++ b/tests/import_goroutine_leak/conf/dm-task.yaml @@ -0,0 +1,49 @@ +--- +name: test +task-mode: full +is-sharding: false +meta-schema: "dm_meta" +remove-meta: false +enable-heartbeat: true +timezone: "Asia/Shanghai" + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + black-white-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + black-white-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +black-white-list: + instance: + do-dbs: ["import_goroutine_leak"] + +mydumpers: + global: + mydumper-path: "./bin/mydumper" + threads: 4 + chunk-filesize: 0 + skip-tz-utc: true + extra-args: "--statement-size=100" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/tests/import_goroutine_leak/conf/dm-worker1.toml b/tests/import_goroutine_leak/conf/dm-worker1.toml new file mode 100644 index 0000000000..1cb2c0a256 --- /dev/null +++ b/tests/import_goroutine_leak/conf/dm-worker1.toml @@ -0,0 +1,13 @@ +# Worker Configuration. + +source-id = "mysql-replica-01" +flavor = "" +enable-gtid = false +relay-binlog-name = "" +relay-binlog-gtid = "" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3306 diff --git a/tests/import_goroutine_leak/conf/dm-worker2.toml b/tests/import_goroutine_leak/conf/dm-worker2.toml new file mode 100644 index 0000000000..b1d74402eb --- /dev/null +++ b/tests/import_goroutine_leak/conf/dm-worker2.toml @@ -0,0 +1,13 @@ +# Worker Configuration. + +source-id = "mysql-replica-02" +flavor = "" +enable-gtid = false +relay-binlog-name = "" +relay-binlog-gtid = "" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3307 diff --git a/tests/import_goroutine_leak/run.sh b/tests/import_goroutine_leak/run.sh new file mode 100644 index 0000000000..ae70acdf8b --- /dev/null +++ b/tests/import_goroutine_leak/run.sh @@ -0,0 +1,98 @@ +#!/bin/bash + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME + +COUNT=200 +function prepare_datafile() { + for i in $(seq 2); do + data_file="$WORK_DIR/db$i.prepare.sql" + echo 'DROP DATABASE if exists import_goroutine_leak;' >> $data_file + echo 'CREATE DATABASE import_goroutine_leak;' >> $data_file + echo 'USE import_goroutine_leak;' >> $data_file + echo "CREATE TABLE t$i(i TINYINT, j INT UNIQUE KEY);" >> $data_file + for j in $(seq $COUNT); do + echo "INSERT INTO t$i VALUES ($i,${j}000$i),($i,${j}001$i);" >> $data_file + done + done +} + +function run() { + prepare_datafile + + run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 + run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 + + + echo "dm-worker paninc, doJob of import unit workers don't exit" + # check doJobs of import unit worker exit + inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" + "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + "github.com/pingcap/dm/loader/executeSQLError=return(1)" + "github.com/pingcap/dm/loader/workerCantClose=return(1)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + dmctl_start_task + + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + # dm-worker1 panics + err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l` + if [ $err_cnt -ne 1 ]; then + echo "dm-worker1 doesn't panic, panic count ${err_cnt}" + exit 2 + fi + + echo "dm-workers panic again, workers of import unit don't exit" + # check workers of import unit exit + inject_points=("github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)" + "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + "github.com/pingcap/dm/loader/executeSQLError=return(1)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + sleep 2s + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + # dm-worker1 panics + err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l` + if [ $err_cnt -ne 2 ]; then + echo "dm-worker1 doesn't panic again, panic count ${err_cnt}" + exit 2 + fi + + # check workers of import unit exit + inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" + "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + "github.com/pingcap/dm/loader/executeSQLError=return(1)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + export GO_FAILPOINTS='' +} + +cleanup_data import_goroutine_leak +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"