Skip to content

Commit

Permalink
*: don't auto-resume for dump unit error (pingcap#866)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Aug 7, 2020
1 parent 2374a8d commit 21a6e6e
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 20 deletions.
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ ErrRelayTrimUUIDNotFound,[code=30040:class=relay-unit:scope=internal:level=high]
ErrRelayRemoveFileFail,[code=30041:class=relay-unit:scope=internal:level=high], "Message: remove relay log %s %s"
ErrRelayPurgeArgsNotValid,[code=30042:class=relay-unit:scope=internal:level=high], "Message: args (%T) %+v not valid"
ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high], "Message: previousGTIDs %s not valid"
ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error"
ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper runs with error, with output (may empty): %s"
ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file."
ErrDumpUnitGenBAList,[code=32003:class=dump-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrDumpUnitGlobalLock,[code=32004:class=dump-unit:scope=internal:level=high], "Message: Couldn't acquire global lock, Workaround: Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers"
Expand Down
9 changes: 5 additions & 4 deletions dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,21 +269,19 @@ func isResumableError(err *pb.ProcessError) bool {
return false
}
}

for _, msg := range retry.UnsupportedDMLMsgs {
if strings.Contains(strings.ToLower(err.RawCause), strings.ToLower(msg)) {
return false
}
}

switch err.ErrCode {
case int32(terror.ErrParserParseRelayLog.Code()):
if err.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(strings.ToLower(err.Message), strings.ToLower(msg)) {
return false
}
}
case int32(terror.ErrDumpUnitGlobalLock.Code()):
return false
}

if _, ok := retry.UnresumableErrCodes[err.ErrCode]; ok {
Expand All @@ -302,6 +300,9 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus,
// TODO: use different strategies based on the error detail
for _, processErr := range stStatus.Result.Errors {
if !isResumableError(processErr) {
failpoint.Inject("TaskCheckInterval", func(_ failpoint.Value) {
tsc.l.Info("error is not resumable", zap.Stringer("error", processErr))
})
return ResumeNoSense
}
}
Expand Down
24 changes: 10 additions & 14 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"os/exec"
"regexp"
Expand Down Expand Up @@ -99,11 +98,11 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) {
}

// Cmd cannot be reused, so we create a new cmd when begin processing
output, err := m.spawn(ctx)
err = m.spawn(ctx)

if err != nil {
mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name).Inc()
errs = append(errs, unit.NewProcessError(fmt.Errorf("%s. %s", err.Error(), output)))
errs = append(errs, unit.NewProcessError(err))
} else {
select {
case <-ctx.Done():
Expand All @@ -129,7 +128,7 @@ var mydumperLogRegexp = regexp.MustCompile(
`^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} \[(DEBUG|INFO|WARNING|ERROR)\] - `,
)

func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
func (m *Mydumper) spawn(ctx context.Context) error {
var (
stdout bytes.Buffer
stderr bytes.Buffer
Expand All @@ -138,10 +137,10 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
cmd.Stdout = &stdout
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return nil, terror.ErrDumpUnitRuntime.Delegate(err)
return terror.ErrDumpUnitRuntime.Delegate(err, "")
}
if err = cmd.Start(); err != nil {
return nil, terror.ErrDumpUnitRuntime.Delegate(err)
return terror.ErrDumpUnitRuntime.Delegate(err, "")
}

// Read the stderr from mydumper, which contained the logs.
Expand All @@ -154,10 +153,8 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
// store first error detected in mydumper's log
// TODO(lance6716): if mydumper will not exit when detected error happens, we should return firstErr earlier
// and using non-block IO to drain and output mydumper's stderr
var (
firstErr error
errMsg []byte
)
var firstErr error

for scanner.Scan() {
line := scanner.Bytes()
if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 {
Expand All @@ -177,7 +174,6 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
m.logger.Error(string(msg))
if firstErr == nil && strings.HasPrefix(string(msg), "Couldn't acquire global lock") {
firstErr = terror.ErrDumpUnitGlobalLock
errMsg = msg
}
continue
}
Expand All @@ -187,17 +183,17 @@ func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
}

if firstErr != nil {
return errMsg, firstErr
return firstErr
}

if err = scanner.Err(); err != nil {
stdout.Write(stderr.Bytes())
return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err)
return terror.ErrDumpUnitRuntime.Delegate(err, stdout.Bytes())
}

err = cmd.Wait()
stdout.Write(stderr.Bytes())
return stdout.Bytes(), terror.ErrDumpUnitRuntime.Delegate(err)
return terror.ErrDumpUnitRuntime.Delegate(err, stdout.Bytes())
}

// Close implements Unit.Close
Expand Down
2 changes: 2 additions & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
UnresumableErrCodes = map[int32]struct{}{
int32(terror.ErrSyncUnitDDLWrongSequence.Code()): {},
int32(terror.ErrSyncerShardDDLConflict.Code()): {},
int32(terror.ErrDumpUnitGlobalLock.Code()): {},
int32(terror.ErrDumpUnitRuntime.Code()): {},
}
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ var (
ErrPreviousGTIDsNotValid = New(codePreviousGTIDsNotValid, ClassRelayUnit, ScopeInternal, LevelHigh, "previousGTIDs %s not valid", "")

// Dump unit error
ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error", "")
ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper runs with error, with output (may empty): %s", "")
ErrDumpUnitGenTableRouter = New(codeDumpUnitGenTableRouter, ClassDumpUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.")
ErrDumpUnitGenBAList = New(codeDumpUnitGenBAList, ClassDumpUnit, ScopeInternal, LevelHigh, "generate block allow list", "Please check the `block-allow-list` config in task configuration file.")
ErrDumpUnitGlobalLock = New(codeDumpUnitGlobalLock, ClassDumpUnit, ScopeInternal, LevelHigh, "Couldn't acquire global lock", "Please check upstream privilege about FTWRL, or add `--no-locks` to extra-args of mydumpers")
Expand Down
8 changes: 8 additions & 0 deletions tests/full_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

function fail_acquire_global_lock() {
export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
Expand All @@ -32,8 +34,14 @@ function fail_acquire_global_lock() {
sed -i '/timezone/i\ignore-checking-items: ["dump_privilege"]' $WORK_DIR/dm-task.yaml
dmctl_start_task $WORK_DIR/dm-task.yaml

# TaskCheckInterval set to 500ms
sleep 1

check_log_contains $WORK_DIR/worker1/log/dm-worker.log "Couldn't acquire global lock"
check_log_contains $WORK_DIR/worker1/log/dm-worker.log "error is not resumable"
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "Couldn't acquire global lock"
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "error is not resumable"

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Paused\"" 4 \
Expand Down

0 comments on commit 21a6e6e

Please sign in to comment.