Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

fix data race in import unit(loader) #349

Merged
merged 9 commits into from
Nov 14, 2019
Merged
100 changes: 70 additions & 30 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,32 +104,49 @@ func NewWorker(loader *Loader, id int) (worker *Worker, err error) {

// Close closes worker
func (w *Worker) Close() {
// simulate the case that doesn't wait all doJob goroutine exit
failpoint.Inject("workerCantClose", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "workerCantClose"))
failpoint.Return()
})

if !atomic.CompareAndSwapInt64(&w.closed, 0, 1) {
w.wg.Wait()
w.tctx.L().Info("already closed...")
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
return
}

w.tctx.L().Info("start to close...")
close(w.jobQueue)
w.wg.Wait()
w.tctx.L().Info("closed !!!")
}

func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *sync.WaitGroup, runFatalChan chan *pb.ProcessError) {
func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalChan chan *pb.ProcessError) {
atomic.StoreInt64(&w.closed, 0)
defer workerWg.Done()

newCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
cancel()
// make sure all doJob goroutines exit
w.Close()
}()

ctctx := w.tctx.WithContext(newCtx)

doJob := func() {
defer w.wg.Done()
for {
select {
case <-newCtx.Done():
w.tctx.L().Debug("execution goroutine exits")
w.tctx.L().Info("context canceled, execution goroutine exits")
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
return
case job, ok := <-w.jobQueue:
if !ok || job == nil {
if !ok {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
w.tctx.L().Info("job queue was closed, execution goroutine exits")
return
}
if job == nil {
w.tctx.L().Info("jobs are finished, execution goroutine exits")
return
}
sqls := make([]string, 0, 3)
Expand All @@ -149,7 +166,12 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *

failpoint.Inject("LoadDataSlowDown", nil)

if err := w.conn.executeSQL(ctctx, sqls); err != nil {
err := w.conn.executeSQL(ctctx, sqls)
failpoint.Inject("executeSQLError", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "executeSQLError"))
err = errors.New("inject failpoint executeSQLError")
})
if err != nil {
// expect pause rather than exit
err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream)
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
Expand All @@ -164,15 +186,19 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *
for {
select {
case <-newCtx.Done():
w.tctx.L().Info("context canceled, main goroutine exits")
return
case job, ok := <-fileJobQueue:
if !ok {
w.tctx.L().Debug("main routine exit.")
w.tctx.L().Info("file queue was closed, main routine exit.")
return
}

w.wg.Add(1)
go doJob()
go func() {
defer w.wg.Done()
doJob()
}()

// restore a table
if err := w.restoreDataFile(ctx, filepath.Join(w.cfg.Dir, job.dataFile), job.offset, job.info); err != nil {
Expand All @@ -192,12 +218,17 @@ func (w *Worker) restoreDataFile(ctx context.Context, filePath string, offset in
return err
}

// dispatchSQL completed, send nil.
failpoint.Inject("dispatchError", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "dispatchError"))
failpoint.Return(errors.New("inject failpoint dispatchError"))
})

// dispatchSQL completed, send nil to make sure all dmls are applied to target database
// we don't want to close and re-make chan frequently
// but if we need to re-call w.run, we need re-make jobQueue chan
w.jobQueue <- nil

w.wg.Wait()
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

w.tctx.L().Info("finish to restore dump sql file", zap.String("data file", filePath))
return nil
}
Expand Down Expand Up @@ -333,7 +364,6 @@ type Loader struct {
bwList *filter.Filter
columnMapping *cm.Mapping

pool []*Worker
closed sync2.AtomicBool

toDB *conn.BaseDB
Expand All @@ -355,7 +385,6 @@ func NewLoader(cfg *config.SubTaskConfig) *Loader {
db2Tables: make(map[string]Tables2DataFiles),
tableInfos: make(map[string]*tableInfo),
workerWg: new(sync.WaitGroup),
pool: make([]*Worker, 0, cfg.PoolSize),
tctx: tcontext.Background().WithLogger(log.With(zap.String("task", cfg.Name), zap.String("unit", "load"))),
}
loader.fileJobQueueClosed.Set(true) // not open yet
Expand Down Expand Up @@ -449,7 +478,13 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {

err := l.Restore(newCtx)
close(l.runFatalChan) // Restore returned, all potential fatal sent to l.runFatalChan
wg.Wait() // wait for receive all fatal from l.runFatalChan

failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) {
l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit"))
l.workerWg.Wait()
})

wg.Wait() // wait for receive all fatal from l.runFatalChan

if err != nil {
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name).Inc()
Expand Down Expand Up @@ -536,13 +571,24 @@ func (l *Loader) Restore(ctx context.Context) error {

go l.PrintStatus(ctx)

if err := l.restoreData(ctx); err != nil {
if errors.Cause(err) == context.Canceled {
return nil
}
begin := time.Now()
err = l.restoreData(ctx)
if err != nil && errors.Cause(err) != context.Canceled {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
return err
}

failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) {
l.tctx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit"))
failpoint.Return(nil)
})

// make sure all workers exit
l.closeFileJobQueue() // all data file dispatched, close it
l.workerWg.Wait()
if err == nil {
l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin)))
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

Expand Down Expand Up @@ -576,13 +622,11 @@ func (l *Loader) Close() {
func (l *Loader) stopLoad() {
// before re-write workflow, simply close all job queue and job workers
// when resuming, re-create them
l.tctx.L().Info("stop importing data process")

l.closeFileJobQueue()
l.workerWg.Wait()

for _, worker := range l.pool {
worker.Close()
}
l.pool = l.pool[:0]
l.tctx.L().Debug("all workers have been closed")
}

Expand Down Expand Up @@ -709,9 +753,10 @@ func (l *Loader) initAndStartWorkerPool(ctx context.Context) error {
}

l.workerWg.Add(1) // for every worker goroutine, Add(1)
go worker.run(ctx, l.fileJobQueue, l.workerWg, l.runFatalChan)

l.pool = append(l.pool, worker)
go func() {
defer l.workerWg.Done()
worker.run(ctx, l.fileJobQueue, l.runFatalChan)
}()
}
return nil
}
Expand Down Expand Up @@ -1106,17 +1151,12 @@ func (l *Loader) restoreData(ctx context.Context) error {
select {
case <-ctx.Done():
l.tctx.L().Warn("stop dispatch data file job", log.ShortError(ctx.Err()))
l.closeFileJobQueue()
return ctx.Err()
case l.fileJobQueue <- j:
}
}
l.closeFileJobQueue() // all data file dispatched, close it

l.tctx.L().Info("all data files have been dispatched, waiting for them finished")
l.workerWg.Wait()

l.tctx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin)))
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions tests/import_goroutine_leak/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Master Configuration.

[[deploy]]
source-id = "mysql-replica-01"
dm-worker = "127.0.0.1:8262"

[[deploy]]
source-id = "mysql-replica-02"
dm-worker = "127.0.0.1:8263"
49 changes: 49 additions & 0 deletions tests/import_goroutine_leak/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
name: test
task-mode: full
is-sharding: false
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
black-white-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
black-white-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

black-white-list:
instance:
do-dbs: ["import_goroutine_leak"]

mydumpers:
global:
mydumper-path: "./bin/mydumper"
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "--statement-size=100"

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
13 changes: 13 additions & 0 deletions tests/import_goroutine_leak/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Worker Configuration.

source-id = "mysql-replica-01"
flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""

[from]
host = "127.0.0.1"
user = "root"
password = ""
port = 3306
13 changes: 13 additions & 0 deletions tests/import_goroutine_leak/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Worker Configuration.

source-id = "mysql-replica-02"
flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""

[from]
host = "127.0.0.1"
user = "root"
password = ""
port = 3307
Loading