From c795f3be8f63737dab37b2664286776e7ba38d65 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 4 Aug 2020 10:23:55 +0800 Subject: [PATCH 1/9] mydumper, worker: add terror for can't acquire global lock --- _utils/terror_gen/errors_release.txt | 1 + dm/worker/task_checker.go | 7 ++++++- mydumper/mydumper.go | 7 +++++++ pkg/terror/error_list.go | 2 ++ 4 files changed, 16 insertions(+), 1 deletion(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 44d903dbb0..20e4136361 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -214,6 +214,7 @@ ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high] ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error" ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file." ErrDumpUnitGenBAList,[code=32003:class=dump-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file." +ErrDumpUnitGlobalLock,[code=32004:class=dump-unit:scope=internal:level=high], "Message: Couldn't acquire global lock, Workaround: Please check upstream privilege about FTWL, or add --no-locks to extra-args of mydumpers" ErrLoadUnitCreateSchemaFile,[code=34001:class=load-unit:scope=internal:level=medium], "Message: generate schema file, Workaround: Please check the `loaders` config in task configuration file." ErrLoadUnitInvalidFileEnding,[code=34002:class=load-unit:scope=internal:level=high], "Message: corresponding ending of sql: ')' not found" ErrLoadUnitParseQuoteValues,[code=34003:class=load-unit:scope=internal:level=high], "Message: parse quote values error" diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index fd3f4031db..4186ee9399 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -274,12 +274,16 @@ func isResumableError(err *pb.ProcessError) bool { return false } } - if err.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { + + switch err.ErrCode { + case int32(terror.ErrParserParseRelayLog.Code()): for _, msg := range retry.ParseRelayLogErrMsgs { if strings.Contains(strings.ToLower(err.Message), strings.ToLower(msg)) { return false } } + case int32(terror.ErrDumpUnitGlobalLock.Code()): + return false } if _, ok := retry.UnresumableErrCodes[err.ErrCode]; ok { @@ -296,6 +300,7 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, } // TODO: use different strategies based on the error detail + // lance: here for _, processErr := range stStatus.Result.Errors { if !isResumableError(processErr) { return ResumeNoSense diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index b003ae6993..2d4e621212 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -168,11 +168,18 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { continue case "ERROR": m.logger.Error(string(msg)) + if strings.HasPrefix(string(msg), "Couldn't acquire global lock") { + err = terror.ErrDumpUnitGlobalLock + } continue } } stderr.Write(line) stderr.WriteByte('\n') + if err != nil { + stdout.Write(stderr.Bytes()) + return stdout.Bytes(), err + } } if err = scanner.Err(); err != nil { stdout.Write(stderr.Bytes()) diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 3a9118c1cd..4bbad30cf6 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -279,6 +279,7 @@ const ( codeDumpUnitRuntime ErrCode = iota + 32001 codeDumpUnitGenTableRouter codeDumpUnitGenBAList + codeDumpUnitGlobalLock ) // Load unit error code @@ -808,6 +809,7 @@ var ( ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error", "") ErrDumpUnitGenTableRouter = New(codeDumpUnitGenTableRouter, ClassDumpUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.") ErrDumpUnitGenBAList = New(codeDumpUnitGenBAList, ClassDumpUnit, ScopeInternal, LevelHigh, "generate block allow list", "Please check the `block-allow-list` config in task configuration file.") + ErrDumpUnitGlobalLock = New(codeDumpUnitGlobalLock, ClassDumpUnit, ScopeInternal, LevelHigh, "Couldn't acquire global lock", "Please check upstream privilege about FTWL, or add --no-locks to extra-args of mydumpers") // Load unit error ErrLoadUnitCreateSchemaFile = New(codeLoadUnitCreateSchemaFile, ClassLoadUnit, ScopeInternal, LevelMedium, "generate schema file", "Please check the `loaders` config in task configuration file.") From 6667e57e5f840ac8f786930964c7462c585fd27f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 4 Aug 2020 11:13:56 +0800 Subject: [PATCH 2/9] add test --- tests/full_mode/run.sh | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/full_mode/run.sh b/tests/full_mode/run.sh index 5b74d6bd4a..8912838d3b 100755 --- a/tests/full_mode/run.sh +++ b/tests/full_mode/run.sh @@ -6,7 +6,45 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +function fail_acquire_global_lock() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + cp $cur/data/db1.prepare.user.sql $WORK_DIR/db1.prepare.user.sql + sed -i "/revoke create temporary/i\revoke reload on *.* from 'dm_full'@'%';" $WORK_DIR/db1.prepare.user.sql + run_sql_file $WORK_DIR/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_count 'Query OK, 0 rows affected' 8 + cp $cur/data/db2.prepare.user.sql $WORK_DIR/db2.prepare.user.sql + sed -i "/revoke create temporary/i\revoke reload on *.* from 'dm_full'@'%';" $WORK_DIR/db2.prepare.user.sql + run_sql_file $WORK_DIR/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_count 'Query OK, 0 rows affected' 8 + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml + sed -i '/timezone/i\ignore-checking-items: ["dump_privilege"]' $WORK_DIR/dm-task.yaml + dmctl_start_task $WORK_DIR/dm-task.yaml + + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "Couldn't acquire global lock" + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "Couldn't acquire global lock" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Paused\"" 4 + + cleanup_data full_mode + cleanup_process $* +} + function run() { + fail_acquire_global_lock + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 From 63f13cc38ab7578a35422566e15ddbfb022a18b2 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 4 Aug 2020 11:23:01 +0800 Subject: [PATCH 3/9] remove comment --- dm/worker/task_checker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 4186ee9399..e9d81ad346 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -300,7 +300,6 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, } // TODO: use different strategies based on the error detail - // lance: here for _, processErr := range stStatus.Result.Errors { if !isResumableError(processErr) { return ResumeNoSense From 8c017f5c03d762303cdddef19a09496da8a01d88 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 4 Aug 2020 17:41:11 +0800 Subject: [PATCH 4/9] address comment --- _utils/terror_gen/errors_release.txt | 2 +- pkg/terror/error_list.go | 2 +- tests/full_mode/run.sh | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 20e4136361..ae914c05ec 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -214,7 +214,7 @@ ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high] ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error" ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file." ErrDumpUnitGenBAList,[code=32003:class=dump-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file." -ErrDumpUnitGlobalLock,[code=32004:class=dump-unit:scope=internal:level=high], "Message: Couldn't acquire global lock, Workaround: Please check upstream privilege about FTWL, or add --no-locks to extra-args of mydumpers" +ErrDumpUnitGlobalLock,[code=32004:class=dump-unit:scope=internal:level=high], "Message: Couldn't acquire global lock, Workaround: Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers" ErrLoadUnitCreateSchemaFile,[code=34001:class=load-unit:scope=internal:level=medium], "Message: generate schema file, Workaround: Please check the `loaders` config in task configuration file." ErrLoadUnitInvalidFileEnding,[code=34002:class=load-unit:scope=internal:level=high], "Message: corresponding ending of sql: ')' not found" ErrLoadUnitParseQuoteValues,[code=34003:class=load-unit:scope=internal:level=high], "Message: parse quote values error" diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 4bbad30cf6..afa406bbe9 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -809,7 +809,7 @@ var ( ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error", "") ErrDumpUnitGenTableRouter = New(codeDumpUnitGenTableRouter, ClassDumpUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.") ErrDumpUnitGenBAList = New(codeDumpUnitGenBAList, ClassDumpUnit, ScopeInternal, LevelHigh, "generate block allow list", "Please check the `block-allow-list` config in task configuration file.") - ErrDumpUnitGlobalLock = New(codeDumpUnitGlobalLock, ClassDumpUnit, ScopeInternal, LevelHigh, "Couldn't acquire global lock", "Please check upstream privilege about FTWL, or add --no-locks to extra-args of mydumpers") + ErrDumpUnitGlobalLock = New(codeDumpUnitGlobalLock, ClassDumpUnit, ScopeInternal, LevelHigh, "Couldn't acquire global lock", "Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers") // Load unit error ErrLoadUnitCreateSchemaFile = New(codeLoadUnitCreateSchemaFile, ClassLoadUnit, ScopeInternal, LevelMedium, "generate schema file", "Please check the `loaders` config in task configuration file.") diff --git a/tests/full_mode/run.sh b/tests/full_mode/run.sh index 8912838d3b..cc2ee7b7d0 100755 --- a/tests/full_mode/run.sh +++ b/tests/full_mode/run.sh @@ -36,7 +36,8 @@ function fail_acquire_global_lock() { check_log_contains $WORK_DIR/worker2/log/dm-worker.log "Couldn't acquire global lock" run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Paused\"" 4 + "\"stage\": \"Paused\"" 4 \ + "Please check upstream privilege about FTWRL" 2 cleanup_data full_mode cleanup_process $* From 433edcb16b8397d6026f2f273afd9168d4520588 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 5 Aug 2020 10:53:55 +0800 Subject: [PATCH 5/9] address comment --- mydumper/mydumper.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index 2d4e621212..a435346233 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -152,6 +152,11 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { // so we parse all these lines and translate into our own logs. scanner := bufio.NewScanner(stderrPipe) for scanner.Scan() { + // handle errors in scanning + if err != nil { + stdout.Write(stderr.Bytes()) + return stdout.Bytes(), err + } line := scanner.Bytes() if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 { level := string(line[loc[2]:loc[3]]) @@ -176,10 +181,6 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { } stderr.Write(line) stderr.WriteByte('\n') - if err != nil { - stdout.Write(stderr.Bytes()) - return stdout.Bytes(), err - } } if err = scanner.Err(); err != nil { stdout.Write(stderr.Bytes()) From 44855a000628f16a5c65cd675eb895692ac0926e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 5 Aug 2020 11:01:49 +0800 Subject: [PATCH 6/9] address comment --- mydumper/mydumper.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index a435346233..1ca9146b44 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -152,11 +152,6 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { // so we parse all these lines and translate into our own logs. scanner := bufio.NewScanner(stderrPipe) for scanner.Scan() { - // handle errors in scanning - if err != nil { - stdout.Write(stderr.Bytes()) - return stdout.Bytes(), err - } line := scanner.Bytes() if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 { level := string(line[loc[2]:loc[3]]) @@ -164,23 +159,26 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { switch level { case "DEBUG": m.logger.Debug(string(msg)) - continue case "INFO": m.logger.Info(string(msg)) - continue case "WARNING": m.logger.Warn(string(msg)) - continue case "ERROR": m.logger.Error(string(msg)) if strings.HasPrefix(string(msg), "Couldn't acquire global lock") { err = terror.ErrDumpUnitGlobalLock } - continue } + } else { + stderr.Write(line) + stderr.WriteByte('\n') + } + + // handle errors in scanning + if err != nil { + stdout.Write(stderr.Bytes()) + return stdout.Bytes(), err } - stderr.Write(line) - stderr.WriteByte('\n') } if err = scanner.Err(); err != nil { stdout.Write(stderr.Bytes()) From 10c276ae162ee6eb2eac14e1ed1d820bf09c1e82 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 5 Aug 2020 11:09:57 +0800 Subject: [PATCH 7/9] address comment --- mydumper/mydumper.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index 1ca9146b44..522c0e49ac 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -133,6 +133,7 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { var ( stdout bytes.Buffer stderr bytes.Buffer + errMsg []byte ) cmd := exec.CommandContext(ctx, m.cfg.MydumperPath, m.args...) cmd.Stdout = &stdout @@ -167,6 +168,7 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { m.logger.Error(string(msg)) if strings.HasPrefix(string(msg), "Couldn't acquire global lock") { err = terror.ErrDumpUnitGlobalLock + errMsg = msg } } } else { @@ -176,8 +178,7 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { // handle errors in scanning if err != nil { - stdout.Write(stderr.Bytes()) - return stdout.Bytes(), err + return errMsg, err } } if err = scanner.Err(); err != nil { From d739a17dd2a0f9784a65e80ed50c9f9c012a6c73 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 5 Aug 2020 11:51:22 +0800 Subject: [PATCH 8/9] address comment --- mydumper/mydumper.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index 522c0e49ac..ce01efe3a2 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -133,7 +133,6 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { var ( stdout bytes.Buffer stderr bytes.Buffer - errMsg []byte ) cmd := exec.CommandContext(ctx, m.cfg.MydumperPath, m.args...) cmd.Stdout = &stdout @@ -152,6 +151,11 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { // // so we parse all these lines and translate into our own logs. scanner := bufio.NewScanner(stderrPipe) + // store first error detected in mydumper's log + var ( + firstErr error + errMsg []byte + ) for scanner.Scan() { line := scanner.Bytes() if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 { @@ -160,27 +164,30 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { switch level { case "DEBUG": m.logger.Debug(string(msg)) + continue case "INFO": m.logger.Info(string(msg)) + continue case "WARNING": m.logger.Warn(string(msg)) + continue case "ERROR": m.logger.Error(string(msg)) - if strings.HasPrefix(string(msg), "Couldn't acquire global lock") { - err = terror.ErrDumpUnitGlobalLock + if firstErr == nil && strings.HasPrefix(string(msg), "Couldn't acquire global lock") { + firstErr = terror.ErrDumpUnitGlobalLock errMsg = msg } + continue } - } else { - stderr.Write(line) - stderr.WriteByte('\n') } + stderr.Write(line) + stderr.WriteByte('\n') + } - // handle errors in scanning - if err != nil { - return errMsg, err - } + if firstErr != nil { + return errMsg, firstErr } + if err = scanner.Err(); err != nil { stdout.Write(stderr.Bytes()) return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err) From 419b8efa1d6bcd6fccf26e1648e427c50ad73990 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 5 Aug 2020 11:53:52 +0800 Subject: [PATCH 9/9] add TODO --- mydumper/mydumper.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index ce01efe3a2..fbbbe65544 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -152,6 +152,8 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { // so we parse all these lines and translate into our own logs. scanner := bufio.NewScanner(stderrPipe) // store first error detected in mydumper's log + // TODO(lance6716): if mydumper will not exit when detected error happens, we should return firstErr earlier + // and using non-block IO to drain and output mydumper's stderr var ( firstErr error errMsg []byte