From ceac2bba8dea855518a5f0fa5f1d99e0ccd224c4 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Mon, 11 Nov 2019 13:35:49 +0800 Subject: [PATCH 1/6] fix data race in import unit(loader) --- loader/loader.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index c82298ee33..6329327bc9 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -105,23 +105,32 @@ func NewWorker(loader *Loader, id int) (worker *Worker, err error) { // Close closes worker func (w *Worker) Close() { if !atomic.CompareAndSwapInt64(&w.closed, 0, 1) { + w.tctx.L().Info("already closed...") return } + w.tctx.L().Info("start to close...") + close(w.jobQueue) w.wg.Wait() + + w.tctx.L().Info("closed !!!") } func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *sync.WaitGroup, runFatalChan chan *pb.ProcessError) { atomic.StoreInt64(&w.closed, 0) - defer workerWg.Done() newCtx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + cancel() + w.Close() + workerWg.Done() + }() ctctx := w.tctx.WithContext(newCtx) - doJob := func() { + w.wg.Add(1) + go func() { defer w.wg.Done() for { select { @@ -158,7 +167,7 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg * w.loader.finishedDataSize.Add(job.offset - job.lastOffset) } } - } + }() // worker main routine for { @@ -171,9 +180,6 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg * return } - w.wg.Add(1) - go doJob() - // restore a table if err := w.restoreDataFile(ctx, filepath.Join(w.cfg.Dir, job.dataFile), job.offset, job.info); err != nil { // expect pause rather than exit @@ -192,12 +198,6 @@ func (w *Worker) restoreDataFile(ctx context.Context, filePath string, offset in return err } - // dispatchSQL completed, send nil. - // we don't want to close and re-make chan frequently - // but if we need to re-call w.run, we need re-make jobQueue chan - w.jobQueue <- nil - - w.wg.Wait() w.tctx.L().Info("finish to restore dump sql file", zap.String("data file", filePath)) return nil } From ebbd060d0b813e506f203f12280893004a3586c2 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Mon, 11 Nov 2019 14:00:38 +0800 Subject: [PATCH 2/6] add import goroutine leak integration test --- loader/loader.go | 52 ++++++++++--- .../import_goroutine_leak/conf/dm-master.toml | 9 +++ tests/import_goroutine_leak/conf/dm-task.yaml | 49 ++++++++++++ .../conf/dm-worker1.toml | 13 ++++ .../conf/dm-worker2.toml | 13 ++++ tests/import_goroutine_leak/run.sh | 77 +++++++++++++++++++ 6 files changed, 203 insertions(+), 10 deletions(-) create mode 100644 tests/import_goroutine_leak/conf/dm-master.toml create mode 100644 tests/import_goroutine_leak/conf/dm-task.yaml create mode 100644 tests/import_goroutine_leak/conf/dm-worker1.toml create mode 100644 tests/import_goroutine_leak/conf/dm-worker2.toml create mode 100644 tests/import_goroutine_leak/run.sh diff --git a/loader/loader.go b/loader/loader.go index 6329327bc9..8e9b853169 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -104,16 +104,20 @@ func NewWorker(loader *Loader, id int) (worker *Worker, err error) { // Close closes worker func (w *Worker) Close() { + failpoint.Inject("workerCantClose", func(_ failpoint.Value) { + w.tctx.L().Info("", zap.String("failpoint", "workerCantClose")) + failpoint.Return() + }) + if !atomic.CompareAndSwapInt64(&w.closed, 0, 1) { + w.wg.Wait() w.tctx.L().Info("already closed...") return } w.tctx.L().Info("start to close...") - close(w.jobQueue) w.wg.Wait() - w.tctx.L().Info("closed !!!") } @@ -129,16 +133,19 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg * ctctx := w.tctx.WithContext(newCtx) - w.wg.Add(1) - go func() { - defer w.wg.Done() + doJob := func() { for { select { case <-newCtx.Done(): - w.tctx.L().Debug("execution goroutine exits") + w.tctx.L().Info("context canceled, execution goroutine exits") return case job, ok := <-w.jobQueue: - if !ok || job == nil { + if job == nil { + w.tctx.L().Info("jobs are finished, execution goroutine exits") + return + } + if !ok { + w.tctx.L().Info("job queue was closed, execution goroutine exits") return } sqls := make([]string, 0, 3) @@ -158,7 +165,12 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg * failpoint.Inject("LoadDataSlowDown", nil) - if err := w.conn.executeSQL(ctctx, sqls); err != nil { + err := w.conn.executeSQL(ctctx, sqls) + failpoint.Inject("executeSQLError", func(_ failpoint.Value) { + w.tctx.L().Info("", zap.String("failpoint", "executeSQLError")) + err = errors.New("inject failpoint executeSQLError") + }) + if err != nil { // expect pause rather than exit err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream) runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err) @@ -167,19 +179,26 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg * w.loader.finishedDataSize.Add(job.offset - job.lastOffset) } } - }() + } // worker main routine for { select { case <-newCtx.Done(): + w.tctx.L().Info("context canceled, main goroutine exits") return case job, ok := <-fileJobQueue: if !ok { - w.tctx.L().Debug("main routine exit.") + w.tctx.L().Info("file queue was closed, main routine exit.") return } + w.wg.Add(1) + go func() { + defer w.wg.Done() + doJob() + }() + // restore a table if err := w.restoreDataFile(ctx, filepath.Join(w.cfg.Dir, job.dataFile), job.offset, job.info); err != nil { // expect pause rather than exit @@ -198,6 +217,17 @@ func (w *Worker) restoreDataFile(ctx context.Context, filePath string, offset in return err } + failpoint.Inject("dispatchError", func(_ failpoint.Value) { + w.tctx.L().Info("", zap.String("failpoint", "dispatchError")) + failpoint.Return(errors.New("inject failpoint dispatchError")) + }) + + // dispatchSQL completed, send nil to make sure all dmls are applied to target database + // we don't want to close and re-make chan frequently + // but if we need to re-call w.run, we need re-make jobQueue chan + w.jobQueue <- nil + w.wg.Wait() + w.tctx.L().Info("finish to restore dump sql file", zap.String("data file", filePath)) return nil } @@ -576,6 +606,8 @@ func (l *Loader) Close() { func (l *Loader) stopLoad() { // before re-write workflow, simply close all job queue and job workers // when resuming, re-create them + l.tctx.L().Info("stop importing data process") + l.closeFileJobQueue() l.workerWg.Wait() diff --git a/tests/import_goroutine_leak/conf/dm-master.toml b/tests/import_goroutine_leak/conf/dm-master.toml new file mode 100644 index 0000000000..334e0de993 --- /dev/null +++ b/tests/import_goroutine_leak/conf/dm-master.toml @@ -0,0 +1,9 @@ +# Master Configuration. + +[[deploy]] +source-id = "mysql-replica-01" +dm-worker = "127.0.0.1:8262" + +[[deploy]] +source-id = "mysql-replica-02" +dm-worker = "127.0.0.1:8263" diff --git a/tests/import_goroutine_leak/conf/dm-task.yaml b/tests/import_goroutine_leak/conf/dm-task.yaml new file mode 100644 index 0000000000..fe57809826 --- /dev/null +++ b/tests/import_goroutine_leak/conf/dm-task.yaml @@ -0,0 +1,49 @@ +--- +name: test +task-mode: full +is-sharding: false +meta-schema: "dm_meta" +remove-meta: false +enable-heartbeat: true +timezone: "Asia/Shanghai" + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + black-white-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + black-white-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +black-white-list: + instance: + do-dbs: ["import_goroutine_leak"] + +mydumpers: + global: + mydumper-path: "./bin/mydumper" + threads: 4 + chunk-filesize: 0 + skip-tz-utc: true + extra-args: "--statement-size=100" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/tests/import_goroutine_leak/conf/dm-worker1.toml b/tests/import_goroutine_leak/conf/dm-worker1.toml new file mode 100644 index 0000000000..1cb2c0a256 --- /dev/null +++ b/tests/import_goroutine_leak/conf/dm-worker1.toml @@ -0,0 +1,13 @@ +# Worker Configuration. + +source-id = "mysql-replica-01" +flavor = "" +enable-gtid = false +relay-binlog-name = "" +relay-binlog-gtid = "" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3306 diff --git a/tests/import_goroutine_leak/conf/dm-worker2.toml b/tests/import_goroutine_leak/conf/dm-worker2.toml new file mode 100644 index 0000000000..b1d74402eb --- /dev/null +++ b/tests/import_goroutine_leak/conf/dm-worker2.toml @@ -0,0 +1,13 @@ +# Worker Configuration. + +source-id = "mysql-replica-02" +flavor = "" +enable-gtid = false +relay-binlog-name = "" +relay-binlog-gtid = "" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3307 diff --git a/tests/import_goroutine_leak/run.sh b/tests/import_goroutine_leak/run.sh new file mode 100644 index 0000000000..a10be10146 --- /dev/null +++ b/tests/import_goroutine_leak/run.sh @@ -0,0 +1,77 @@ +#!/bin/bash + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME + +COUNT=200 +function prepare_datafile() { + for i in $(seq 2); do + data_file="$WORK_DIR/db$i.prepare.sql" + echo 'DROP DATABASE if exists import_goroutine_leak;' >> $data_file + echo 'CREATE DATABASE import_goroutine_leak;' >> $data_file + echo 'USE import_goroutine_leak;' >> $data_file + echo "CREATE TABLE t$i(i TINYINT, j INT UNIQUE KEY);" >> $data_file + for j in $(seq $COUNT); do + echo "INSERT INTO t$i VALUES ($i,${j}000$i),($i,${j}001$i);" >> $data_file + done + done +} + +function run() { + prepare_datafile + + run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 + run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 + + + # check workers of import unit exit + inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" + "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + "github.com/pingcap/dm/loader/executeSQLError=return(1)" + "github.com/pingcap/dm/loader/workerCantClose=return(1)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + dmctl_start_task + + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + # dm-worker1 panics + err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l` + if [ $err_cnt -ne 1 ]; then + echo "dm-worker1 doesn't panic, panic count ${err_cnt}" + exit 2 + fi + + # check workers of import unit exit + inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" + "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + "github.com/pingcap/dm/loader/executeSQLError=return(1)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + export GO_FAILPOINTS='' +} + +cleanup_data import_goroutine_leak +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" From 060fb922d9d2597ba28bcbb5f5f18925e49f416c Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Tue, 12 Nov 2019 14:03:23 +0800 Subject: [PATCH 3/6] fix goroutine leak in restoreData --- loader/loader.go | 50 +++++++++++++++++------------- tests/import_goroutine_leak/run.sh | 23 +++++++++++++- 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index 8e9b853169..5f108476c8 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -104,6 +104,7 @@ func NewWorker(loader *Loader, id int) (worker *Worker, err error) { // Close closes worker func (w *Worker) Close() { + // simulate the case that doesn't wait all doJob goroutine exit failpoint.Inject("workerCantClose", func(_ failpoint.Value) { w.tctx.L().Info("", zap.String("failpoint", "workerCantClose")) failpoint.Return() @@ -121,14 +122,14 @@ func (w *Worker) Close() { w.tctx.L().Info("closed !!!") } -func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *sync.WaitGroup, runFatalChan chan *pb.ProcessError) { +func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalChan chan *pb.ProcessError) { atomic.StoreInt64(&w.closed, 0) newCtx, cancel := context.WithCancel(ctx) defer func() { cancel() + // make sure all doJob goroutines exit w.Close() - workerWg.Done() }() ctctx := w.tctx.WithContext(newCtx) @@ -363,7 +364,6 @@ type Loader struct { bwList *filter.Filter columnMapping *cm.Mapping - pool []*Worker closed sync2.AtomicBool toDB *conn.BaseDB @@ -385,7 +385,6 @@ func NewLoader(cfg *config.SubTaskConfig) *Loader { db2Tables: make(map[string]Tables2DataFiles), tableInfos: make(map[string]*tableInfo), workerWg: new(sync.WaitGroup), - pool: make([]*Worker, 0, cfg.PoolSize), tctx: tcontext.Background().WithLogger(log.With(zap.String("task", cfg.Name), zap.String("unit", "load"))), } loader.fileJobQueueClosed.Set(true) // not open yet @@ -479,7 +478,13 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) { err := l.Restore(newCtx) close(l.runFatalChan) // Restore returned, all potential fatal sent to l.runFatalChan - wg.Wait() // wait for receive all fatal from l.runFatalChan + + failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) { + l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) + l.workerWg.Wait() + }) + + wg.Wait() // wait for receive all fatal from l.runFatalChan if err != nil { loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name).Inc() @@ -566,13 +571,24 @@ func (l *Loader) Restore(ctx context.Context) error { go l.PrintStatus(ctx) - if err := l.restoreData(ctx); err != nil { - if errors.Cause(err) == context.Canceled { - return nil - } + begin := time.Now() + err = l.restoreData(ctx) + if err != nil && errors.Cause(err) != context.Canceled { return err } + failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) { + l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) + failpoint.Return(nil) + }) + + // make sure all workers exit + l.closeFileJobQueue() // all data file dispatched, close it + l.workerWg.Wait() + if err == nil { + l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) + } + return nil } @@ -611,10 +627,6 @@ func (l *Loader) stopLoad() { l.closeFileJobQueue() l.workerWg.Wait() - for _, worker := range l.pool { - worker.Close() - } - l.pool = l.pool[:0] l.tctx.L().Debug("all workers have been closed") } @@ -741,9 +753,10 @@ func (l *Loader) initAndStartWorkerPool(ctx context.Context) error { } l.workerWg.Add(1) // for every worker goroutine, Add(1) - go worker.run(ctx, l.fileJobQueue, l.workerWg, l.runFatalChan) - - l.pool = append(l.pool, worker) + go func() { + defer l.workerWg.Done() + worker.run(ctx, l.fileJobQueue, l.runFatalChan) + }() } return nil } @@ -1138,17 +1151,12 @@ func (l *Loader) restoreData(ctx context.Context) error { select { case <-ctx.Done(): l.tctx.L().Warn("stop dispatch data file job", log.ShortError(ctx.Err())) - l.closeFileJobQueue() return ctx.Err() case l.fileJobQueue <- j: } } - l.closeFileJobQueue() // all data file dispatched, close it l.tctx.L().Info("all data files have been dispatched, waiting for them finished") - l.workerWg.Wait() - - l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) return nil } diff --git a/tests/import_goroutine_leak/run.sh b/tests/import_goroutine_leak/run.sh index a10be10146..384d85b46c 100644 --- a/tests/import_goroutine_leak/run.sh +++ b/tests/import_goroutine_leak/run.sh @@ -27,7 +27,8 @@ function run() { run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 - # check workers of import unit exit + echo "dm-worker paninc, doJob of import unit workers don't exit" + # check doJobs of import unit worker exit inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" "github.com/pingcap/dm/loader/executeSQLError=return(1)" @@ -54,6 +55,26 @@ function run() { exit 2 fi + echo "dm-workers paninc again, workers of import unit don't exit" + # check workers of import unit exit + inject_points=("github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)" + "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + "github.com/pingcap/dm/loader/executeSQLError=return(1)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + sleep 2s + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + # dm-worker1 panics + err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l` + if [ $err_cnt -ne 2 ]; then + echo "dm-worker1 doesn't panic again, panic count ${err_cnt}" + exit 2 + fi + # check workers of import unit exit inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" From 36ba61ad3fbb7e9c77f93e65d198a71aede60187 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Wed, 13 Nov 2019 13:05:27 +0800 Subject: [PATCH 4/6] address comment --- loader/loader.go | 8 ++++---- tests/import_goroutine_leak/run.sh | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index 5f108476c8..cd06f442f1 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -141,14 +141,14 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh w.tctx.L().Info("context canceled, execution goroutine exits") return case job, ok := <-w.jobQueue: - if job == nil { - w.tctx.L().Info("jobs are finished, execution goroutine exits") - return - } if !ok { w.tctx.L().Info("job queue was closed, execution goroutine exits") return } + if job == nil { + w.tctx.L().Info("jobs are finished, execution goroutine exits") + return + } sqls := make([]string, 0, 3) sqls = append(sqls, fmt.Sprintf("USE `%s`;", job.schema)) sqls = append(sqls, job.sql) diff --git a/tests/import_goroutine_leak/run.sh b/tests/import_goroutine_leak/run.sh index 384d85b46c..ae70acdf8b 100644 --- a/tests/import_goroutine_leak/run.sh +++ b/tests/import_goroutine_leak/run.sh @@ -55,7 +55,7 @@ function run() { exit 2 fi - echo "dm-workers paninc again, workers of import unit don't exit" + echo "dm-workers panic again, workers of import unit don't exit" # check workers of import unit exit inject_points=("github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)" "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" From 3ee0c419cc91136336a94aa07e457c75c1cd3c56 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Wed, 13 Nov 2019 20:58:00 +0800 Subject: [PATCH 5/6] refine old logic of restoreData --- loader/loader.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index cd06f442f1..dab8bd279a 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -573,9 +573,6 @@ func (l *Loader) Restore(ctx context.Context) error { begin := time.Now() err = l.restoreData(ctx) - if err != nil && errors.Cause(err) != context.Canceled { - return err - } failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) { l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) @@ -585,8 +582,11 @@ func (l *Loader) Restore(ctx context.Context) error { // make sure all workers exit l.closeFileJobQueue() // all data file dispatched, close it l.workerWg.Wait() + if err == nil { l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) + } else if errors.Cause(err) != context.Canceled { + return err } return nil From a6c0c901aab376feeeaa40f9578970ff3df470e9 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 13 Nov 2019 23:10:23 +0800 Subject: [PATCH 6/6] worker: fix data race ``` 2019-11-13T14:04:06.914Z] WARNING: DATA RACE [2019-11-13T14:04:06.914Z] Write at 0x00c0003bb5a0 by goroutine 97: [2019-11-13T14:04:06.914Z] github.com/pingcap/dm/dm/worker.(*SubTask).Init() [2019-11-13T14:04:06.914Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/subtask.go:111 +0xf5 [2019-11-13T14:04:06.914Z] github.com/pingcap/dm/dm/worker.(*SubTask).Run() [2019-11-13T14:04:06.914Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/subtask.go:169 +0x2cc [2019-11-13T14:04:06.914Z] github.com/pingcap/dm/dm/worker.(*SubTask).Resume() [2019-11-13T14:04:06.914Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/subtask.go:412 +0x80e [2019-11-13T14:04:06.914Z] github.com/pingcap/dm/dm/worker.(*Worker).handleTask() [2019-11-13T14:04:06.914Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/worker.go:880 +0x1ecc [2019-11-13T14:04:06.914Z] github.com/pingcap/dm/dm/worker.(*Worker).Start.func1() [2019-11-13T14:04:06.914Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/worker.go:178 +0x80 [2019-11-13T14:04:06.914Z] [2019-11-13T14:04:06.914Z] Previous read at 0x00c0003bb5a0 by goroutine 147: [2019-11-13T14:04:06.914Z] github.com/pingcap/dm/dm/worker.(*Worker).doFetchDDLInfo() [2019-11-13T14:04:06.914Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/worker.go:398 +0x203 [2019-11-13T14:04:06.914Z] github.com/pingcap/dm/dm/worker.(*Worker).FetchDDLInfo.func1() [2019-11-13T14:04:06.914Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/worker.go:374 +0x98 [2019-11-13T14:04:06.915Z] [2019-11-13T14:04:06.915Z] Goroutine 97 (running) created at: [2019-11-13T14:04:06.915Z] github.com/pingcap/dm/dm/worker.(*Worker).Start() [2019-11-13T14:04:06.915Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/worker.go:176 +0x26f [2019-11-13T14:04:06.915Z] github.com/pingcap/dm/dm/worker.(*Server).Start.func1() [2019-11-13T14:04:06.915Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/server.go:87 +0x96 [2019-11-13T14:04:06.915Z] [2019-11-13T14:04:06.915Z] Goroutine 147 (running) created at: [2019-11-13T14:04:06.915Z] github.com/pingcap/dm/dm/worker.(*Worker).FetchDDLInfo() [2019-11-13T14:04:06.915Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/worker.go:372 +0x1f4 [2019-11-13T14:04:06.915Z] github.com/pingcap/dm/dm/worker.(*Server).FetchDDLInfo() [2019-11-13T14:04:06.915Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/worker/server.go:286 +0x28b [2019-11-13T14:04:06.915Z] github.com/pingcap/dm/dm/pb._Worker_FetchDDLInfo_Handler() [2019-11-13T14:04:06.915Z] /home/jenkins/agent/workspace/dm_ghpr_test/go/src/github.com/pingcap/dm/dm/pb/dmworker.pb.go:3865 +0xe5 [2019-11-13T14:04:06.915Z] google.golang.org/grpc.(*Server).processStreamingRPC() [2019-11-13T14:04:06.915Z] /go/pkg/mod/google.golang.org/grpc@v1.23.0/server.go:1199 +0x15f2 [2019-11-13T14:04:06.915Z] google.golang.org/grpc.(*Server).handleStream() [2019-11-13T14:04:06.915Z] /go/pkg/mod/google.golang.org/grpc@v1.23.0/server.go:1279 +0x12e5 [2019-11-13T14:04:06.915Z] google.golang.org/grpc.(*Server).serveStreams.func1.1() [2019-11-13T14:04:06.915Z] /go/pkg/mod/google.golang.org/grpc@v1.23.0/server.go:710 +0xac ``` --- dm/worker/subtask.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 59c085a9e8..6b58e36297 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -93,10 +93,11 @@ func NewSubTask(cfg *config.SubTaskConfig) *SubTask { // NewSubTaskWithStage creates a new SubTask with stage func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage) *SubTask { st := SubTask{ - cfg: cfg, - units: createUnits(cfg), - stage: stage, - l: log.With(zap.String("subtask", cfg.Name)), + cfg: cfg, + units: createUnits(cfg), + stage: stage, + l: log.With(zap.String("subtask", cfg.Name)), + DDLInfo: make(chan *pb.DDLInfo, 1), } taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) return &st @@ -108,8 +109,6 @@ func (st *SubTask) Init() error { return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode) } - st.DDLInfo = make(chan *pb.DDLInfo, 1) - initializeUnitSuccess := true // when error occurred, initialized units should be closed // when continue sub task from loader / syncer, ahead units should be closed