Skip to content

Commit

Permalink
syncer: fast fail on some error in ddl execution (pingcap#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Oct 17, 2019
1 parent bdee293 commit ef36c8a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 21 deletions.
25 changes: 5 additions & 20 deletions dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/backoff"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
)

Expand Down Expand Up @@ -231,38 +232,22 @@ func (tsc *realTaskStatusChecker) run() {
// isResumableError checks the error message and returns whether we need to
// resume the task and retry
func isResumableError(err *pb.ProcessError) bool {
// not elegant code, because TiDB doesn't expose some error
unsupportedDDLMsgs := []string{
"can't drop column with index",
"unsupported add column",
"unsupported modify column",
"unsupported modify",
"unsupported drop integer primary key",
}
unsupportedDMLMsgs := []string{
"Error 1062: Duplicate entry",
"Error 1406: Data too long for column",
}
parseRelayLogErrMsg := []string{
"binlog checksum mismatch, data may be corrupted",
"get event err EOF",
}

switch err.Type {
case pb.ErrorType_ExecSQL:
for _, msg := range unsupportedDDLMsgs {
// not elegant code, because TiDB doesn't expose some error
for _, msg := range retry.UnsupportedDDLMsgs {
if err.Error != nil && strings.Contains(err.Error.RawCause, msg) {
return false
}
}
for _, msg := range unsupportedDMLMsgs {
for _, msg := range retry.UnsupportedDMLMsgs {
if err.Error != nil && strings.Contains(err.Error.RawCause, msg) {
return false
}
}
case pb.ErrorType_UnknownError:
if err.Error != nil && err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range parseRelayLogErrMsg {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(err.Error.Message, msg) {
return false
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,38 @@
package retry

import (
"strings"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/parser/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
)

var (
// UnsupportedDDLMsgs list the error messages of some unsupported DDL in TiDB
UnsupportedDDLMsgs = []string{
"can't drop column with index",
"unsupported add column",
"unsupported modify column",
"unsupported modify charset",
"unsupported modify collate",
"unsupported drop integer primary key",
}

// UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery
UnsupportedDMLMsgs = []string{
"Error 1062: Duplicate entry",
"Error 1406: Data too long for column",
}

// ParseRelayLogErrMsgs list the error messages of some un-recoverable relay log parsing error, which is used in task auto recovery.
ParseRelayLogErrMsgs = []string{
"binlog checksum mismatch, data may be corrupted",
"get event err EOF",
}
)

// IsRetryableError tells whether this error should retry
func IsRetryableError(err error) bool {
err = errors.Cause(err) // check the original error
Expand All @@ -35,3 +61,18 @@ func IsRetryableError(err error) bool {
}
return false
}

// IsRetryableErrorFastFailFilter tells whether this error should retry,
// filtering some incompatible DDL error to achieve fast fail.
func IsRetryableErrorFastFailFilter(err error) bool {
err2 := errors.Cause(err) // check the original error
if mysqlErr, ok := err2.(*mysql.MySQLError); ok && mysqlErr.Number == tmysql.ErrUnknown {
for _, msg := range UnsupportedDDLMsgs {
if strings.Contains(mysqlErr.Message, msg) {
return false
}
}
}

return IsRetryableError(err)
}
2 changes: 1 addition & 1 deletion syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (conn *Conn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(
FirstRetryDuration: retryTimeout,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
if retry.IsRetryableError(err) {
if retry.IsRetryableErrorFastFailFilter(err) {
tctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)))
Expand Down

0 comments on commit ef36c8a

Please sign in to comment.