diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 0628875560..a08c3380af 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -219,7 +219,7 @@ ErrRelayTrimUUIDNotFound,[code=30040:class=relay-unit:scope=internal:level=high] ErrRelayRemoveFileFail,[code=30041:class=relay-unit:scope=internal:level=high], "Message: remove relay log %s %s" ErrRelayPurgeArgsNotValid,[code=30042:class=relay-unit:scope=internal:level=high], "Message: args (%T) %+v not valid" ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high], "Message: previousGTIDs %s not valid" -ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error" +ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error, with output (may empty): %s" ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file." ErrDumpUnitGenBAList,[code=32003:class=dump-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file." ErrDumpUnitGlobalLock,[code=32004:class=dump-unit:scope=internal:level=high], "Message: Couldn't acquire global lock, Workaround: Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers" diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 9acaba3d34..e80786207c 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -279,6 +279,9 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, // TODO: use different strategies based on the error detail for _, processErr := range stStatus.Result.Errors { if !isResumableError(processErr) { + failpoint.Inject("TaskCheckInterval", func(_ failpoint.Value) { + tsc.l.Info("error is not resumable", zap.Stringer("error", processErr)) + }) return ResumeNoSense } } diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index c7859a08fd..75f0c891e6 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dumpling/v4/export" @@ -90,7 +91,7 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) { err := os.RemoveAll(m.cfg.Dir) if err != nil { m.logger.Error("fail to remove output directory", zap.String("directory", m.cfg.Dir), log.ShortError(err)) - errs = append(errs, unit.NewProcessError(err)) + errs = append(errs, unit.NewProcessError(terror.ErrDumpUnitRuntime.Delegate(err, "fail to remove output directory: "+m.cfg.Dir))) pr <- pb.ProcessResult{ IsCanceled: false, Errors: errs, @@ -103,7 +104,7 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) { if err != nil { dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Inc() - errs = append(errs, unit.NewProcessError(err)) + errs = append(errs, unit.NewProcessError(terror.ErrDumpUnitRuntime.Delegate(err, ""))) } if len(errs) == 0 { diff --git a/errors.toml b/errors.toml index b1b5afa43d..bee8246056 100644 --- a/errors.toml +++ b/errors.toml @@ -1325,7 +1325,7 @@ workaround = "" tags = ["internal", "high"] [error.DM-dump-unit-32001] -message = "mydumper runs with error" +message = "mydumper runs with error, with output (may empty): %s" description = "" workaround = "" tags = ["internal", "high"] diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index 7b14deea75..fe0209cb8e 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -17,7 +17,6 @@ import ( "bufio" "bytes" "context" - "fmt" "os" "os/exec" "regexp" @@ -99,11 +98,11 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) { } // Cmd cannot be reused, so we create a new cmd when begin processing - output, err := m.spawn(ctx) + err = m.spawn(ctx) if err != nil { mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Inc() - errs = append(errs, unit.NewProcessError(fmt.Errorf("%s. %s", err.Error(), output))) + errs = append(errs, unit.NewProcessError(err)) } else { select { case <-ctx.Done(): @@ -129,7 +128,7 @@ var mydumperLogRegexp = regexp.MustCompile( `^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} \[(DEBUG|INFO|WARNING|ERROR)\] - `, ) -func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { +func (m *Mydumper) spawn(ctx context.Context) error { var ( stdout bytes.Buffer stderr bytes.Buffer @@ -138,10 +137,10 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { cmd.Stdout = &stdout stderrPipe, err := cmd.StderrPipe() if err != nil { - return nil, terror.ErrDumpUnitRuntime.Delegate(err) + return terror.ErrDumpUnitRuntime.Delegate(err, "") } if err = cmd.Start(); err != nil { - return nil, terror.ErrDumpUnitRuntime.Delegate(err) + return terror.ErrDumpUnitRuntime.Delegate(err, "") } // Read the stderr from mydumper, which contained the logs. @@ -151,6 +150,11 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { // // so we parse all these lines and translate into our own logs. scanner := bufio.NewScanner(stderrPipe) + // store first error detected in mydumper's log + // TODO(lance6716): if mydumper will not exit when detected error happens, we should return firstErr earlier + // and using non-block IO to drain and output mydumper's stderr + var firstErr error + for scanner.Scan() { line := scanner.Bytes() if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 { @@ -168,20 +172,28 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) { continue case "ERROR": m.logger.Error(string(msg)) + if firstErr == nil && strings.HasPrefix(string(msg), "Couldn't acquire global lock") { + firstErr = terror.ErrDumpUnitGlobalLock + } continue } } stderr.Write(line) stderr.WriteByte('\n') } + + if firstErr != nil { + return firstErr + } + if err = scanner.Err(); err != nil { stdout.Write(stderr.Bytes()) - return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err) + return terror.ErrDumpUnitRuntime.Delegate(err, stdout.Bytes()) } err = cmd.Wait() stdout.Write(stderr.Bytes()) - return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err) + return terror.ErrDumpUnitRuntime.Delegate(err, stdout.Bytes()) } // Close implements Unit.Close diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index e4190c6109..88237adae5 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -49,6 +49,8 @@ var ( UnresumableErrCodes = map[int32]struct{}{ int32(terror.ErrSyncUnitDDLWrongSequence.Code()): {}, int32(terror.ErrSyncerShardDDLConflict.Code()): {}, + int32(terror.ErrDumpUnitGlobalLock.Code()): {}, + int32(terror.ErrDumpUnitRuntime.Code()): {}, } ) diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index fafb4af46f..4764d5d60a 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -849,7 +849,7 @@ var ( ErrPreviousGTIDsNotValid = New(codePreviousGTIDsNotValid, ClassRelayUnit, ScopeInternal, LevelHigh, "previousGTIDs %s not valid", "") // Dump unit error - ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error", "") + ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error, with output (may empty): %s", "") ErrDumpUnitGenTableRouter = New(codeDumpUnitGenTableRouter, ClassDumpUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.") ErrDumpUnitGenBAList = New(codeDumpUnitGenBAList, ClassDumpUnit, ScopeInternal, LevelHigh, "generate block allow list", "Please check the `block-allow-list` config in task configuration file.") ErrDumpUnitGlobalLock = New(codeDumpUnitGlobalLock, ClassDumpUnit, ScopeInternal, LevelHigh, "Couldn't acquire global lock", "Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers") diff --git a/tests/full_mode/run.sh b/tests/full_mode/run.sh index 6c0dcf9eeb..0666fbd9f2 100755 --- a/tests/full_mode/run.sh +++ b/tests/full_mode/run.sh @@ -6,7 +6,63 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +function fail_acquire_global_lock() { + export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")" + + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + cp $cur/data/db1.prepare.user.sql $WORK_DIR/db1.prepare.user.sql + sed -i "/revoke create temporary/i\revoke reload on *.* from 'dm_full'@'%';" $WORK_DIR/db1.prepare.user.sql + run_sql_file $WORK_DIR/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_count 'Query OK, 0 rows affected' 8 + cp $cur/data/db2.prepare.user.sql $WORK_DIR/db2.prepare.user.sql + sed -i "/revoke create temporary/i\revoke reload on *.* from 'dm_full'@'%';" $WORK_DIR/db2.prepare.user.sql + run_sql_file $WORK_DIR/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_count 'Query OK, 0 rows affected' 8 + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml + sed -i '/timezone/i\ignore-checking-items: ["dump_privilege"]' $WORK_DIR/dm-task.yaml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml --remove-meta" \ + "\"result\": true" 1 + + # TaskCheckInterval set to 500ms + sleep 1 + + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "you need (at least one of) the RELOAD privilege(s) for this operation" + check_log_contains $WORK_DIR/worker1/log/dm-worker.log "error is not resumable" + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "you need (at least one of) the RELOAD privilege(s) for this operation" + check_log_contains $WORK_DIR/worker2/log/dm-worker.log "error is not resumable" + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Paused\"" 2 \ + "you need (at least one of) the RELOAD privilege(s) for this operation" 2 + + cleanup_data full_mode + cleanup_process $* +} + function run() { + fail_acquire_global_lock + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2