diff --git a/syncer/optimist.go b/syncer/optimist.go index de363bce4f..1391388fc2 100644 --- a/syncer/optimist.go +++ b/syncer/optimist.go @@ -204,7 +204,8 @@ func (s *Syncer) handleQueryEventOptimistic( err = s.execError.Get() if err != nil { - return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) + s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err)) + return nil } for _, table := range onlineDDLTableNames { diff --git a/syncer/syncer.go b/syncer/syncer.go index fef973a1fe..b09c399cdf 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -931,11 +931,18 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, } } if err != nil { + s.execError.Set(err) + if !utils.IsContextCanceledError(err) { + err = s.handleEventError(err, &sqlJob.startLocation, &sqlJob.currentLocation) + s.runFatalChan <- unit.NewProcessError(err) + } s.appendExecErrors(&ExecErrorContext{ err: err, location: sqlJob.currentLocation.Clone(), jobs: fmt.Sprintf("%v", sqlJob.ddls), }) + s.jobWg.Done() + continue } switch s.cfg.ShardMode { @@ -964,15 +971,16 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, err = s.optimist.DoneOperation(*(s.optimist.PendingOperation())) } } - s.jobWg.Done() if err != nil { s.execError.Set(err) if !utils.IsContextCanceledError(err) { err = s.handleEventError(err, &sqlJob.startLocation, &sqlJob.currentLocation) s.runFatalChan <- unit.NewProcessError(err) } + s.jobWg.Done() continue } + s.jobWg.Done() s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls))) } } @@ -1884,9 +1892,11 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // when add ddl job, will execute ddl and then flush checkpoint. // if execute ddl failed, the execErrorDetected will be true. + // return nil here to avoid duplicate error message err = s.execError.Get() if err != nil { - return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) + s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err)) + return nil } s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation)) @@ -2069,7 +2079,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e err = s.execError.Get() if err != nil { - return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) + s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err)) + return nil } if len(onlineDDLTableNames) > 0 { diff --git a/tests/handle_error/run.sh b/tests/handle_error/run.sh index d70f04e69f..775d6eb631 100644 --- a/tests/handle_error/run.sh +++ b/tests/handle_error/run.sh @@ -369,6 +369,41 @@ function DM_REPLACE_ERROR_MULTIPLE() { run_case REPLACE_ERROR_MULTIPLE "double-source-optimistic" "init_table 11 21" "clean_table" "optimistic" } +function DM_EXEC_ERROR_SKIP_CASE() { + run_sql_source1 "insert into ${db}.${tb1} values(1,1);" + run_sql_source2 "insert into ${db}.${tb1} values(2,2);" + run_sql_tidb "insert into ${db}.${tb} values(1,1);" + run_sql_tidb "insert into ${db}.${tb} values(2,2);" + run_sql_tidb_with_retry "select count(1) from ${db}.${tb};" "count(1): 4" + + run_sql_source1 "alter table ${db}.${tb1} add unique index ua(a);" + run_sql_source2 "alter table ${db}.${tb1} add unique index ua(a);" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "Error 1062: Duplicate entry " 1 + + run_sql_tidb "insert into ${db}.${tb} values(3,3);" + run_sql_tidb "insert into ${db}.${tb} values(4,4);" + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "handle-error test skip" \ + "\"result\": true" 2 + + run_sql_tidb_with_retry "select count(1) from ${db}.${tb};" "count(1): 6" +} + +function DM_EXEC_ERROR_SKIP() { + run_case EXEC_ERROR_SKIP "double-source-pessimistic" \ + "run_sql_source1 \"create table ${db}.${tb1} (a int, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb1} (a int, b int);\"" \ + "clean_table" "pessimistic" + run_case EXEC_ERROR_SKIP "double-source-optimistic" \ + "run_sql_source1 \"create table ${db}.${tb1} (a int, b int);\"; \ + run_sql_source2 \"create table ${db}.${tb1} (a int, b int);\"" \ + "clean_table" "optimistic" +} + function run() { init_cluster init_database @@ -377,6 +412,7 @@ function run() { DM_REPLACE_ERROR DM_REPLACE_ERROR_SHARDING DM_REPLACE_ERROR_MULTIPLE + DM_EXEC_ERROR_SKIP } cleanup_data $db