diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index 7956872505..c7187f000e 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/backoff" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" ) @@ -231,38 +232,22 @@ func (tsc *realTaskStatusChecker) run() { // isResumableError checks the error message and returns whether we need to // resume the task and retry func isResumableError(err *pb.ProcessError) bool { - // not elegant code, because TiDB doesn't expose some error - unsupportedDDLMsgs := []string{ - "can't drop column with index", - "unsupported add column", - "unsupported modify column", - "unsupported modify", - "unsupported drop integer primary key", - } - unsupportedDMLMsgs := []string{ - "Error 1062: Duplicate entry", - "Error 1406: Data too long for column", - } - parseRelayLogErrMsg := []string{ - "binlog checksum mismatch, data may be corrupted", - "get event err EOF", - } - switch err.Type { case pb.ErrorType_ExecSQL: - for _, msg := range unsupportedDDLMsgs { + // not elegant code, because TiDB doesn't expose some error + for _, msg := range retry.UnsupportedDDLMsgs { if err.Error != nil && strings.Contains(err.Error.RawCause, msg) { return false } } - for _, msg := range unsupportedDMLMsgs { + for _, msg := range retry.UnsupportedDMLMsgs { if err.Error != nil && strings.Contains(err.Error.RawCause, msg) { return false } } case pb.ErrorType_UnknownError: if err.Error != nil && err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { - for _, msg := range parseRelayLogErrMsg { + for _, msg := range retry.ParseRelayLogErrMsgs { if strings.Contains(err.Error.Message, msg) { return false } diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index fb2e10a2f8..ce8464652b 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -14,12 +14,38 @@ package retry import ( + "strings" + "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" tmysql "github.com/pingcap/parser/mysql" gmysql "github.com/siddontang/go-mysql/mysql" ) +var ( + // UnsupportedDDLMsgs list the error messages of some unsupported DDL in TiDB + UnsupportedDDLMsgs = []string{ + "can't drop column with index", + "unsupported add column", + "unsupported modify column", + "unsupported modify charset", + "unsupported modify collate", + "unsupported drop integer primary key", + } + + // UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery + UnsupportedDMLMsgs = []string{ + "Error 1062: Duplicate entry", + "Error 1406: Data too long for column", + } + + // ParseRelayLogErrMsgs list the error messages of some un-recoverable relay log parsing error, which is used in task auto recovery. + ParseRelayLogErrMsgs = []string{ + "binlog checksum mismatch, data may be corrupted", + "get event err EOF", + } +) + // IsRetryableError tells whether this error should retry func IsRetryableError(err error) bool { err = errors.Cause(err) // check the original error @@ -35,3 +61,18 @@ func IsRetryableError(err error) bool { } return false } + +// IsRetryableErrorFastFailFilter tells whether this error should retry, +// filtering some incompatible DDL error to achieve fast fail. +func IsRetryableErrorFastFailFilter(err error) bool { + err2 := errors.Cause(err) // check the original error + if mysqlErr, ok := err2.(*mysql.MySQLError); ok && mysqlErr.Number == tmysql.ErrUnknown { + for _, msg := range UnsupportedDDLMsgs { + if strings.Contains(mysqlErr.Message, msg) { + return false + } + } + } + + return IsRetryableError(err) +} diff --git a/syncer/db.go b/syncer/db.go index 0d53fa8bc0..61d2973d58 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -168,7 +168,7 @@ func (conn *Conn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func( FirstRetryDuration: retryTimeout, BackoffStrategy: retry.Stable, IsRetryableFn: func(retryTime int, err error) bool { - if retry.IsRetryableError(err) { + if retry.IsRetryableErrorFastFailFilter(err) { tctx.L().Warn("execute statements", zap.Int("retry", retryTime), zap.String("queries", utils.TruncateInterface(queries, -1)), zap.String("arguments", utils.TruncateInterface(args, -1)))