Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

ddl: pause subtask when exec ddl err (#1055) #1059

Merged
merged 3 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion syncer/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ func (s *Syncer) handleQueryEventOptimistic(

err = s.execError.Get()
if err != nil {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err))
return nil
}

for _, table := range onlineDDLTableNames {
Expand Down
17 changes: 14 additions & 3 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,11 +931,18 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
}
}
if err != nil {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, &sqlJob.startLocation, &sqlJob.currentLocation)
s.runFatalChan <- unit.NewProcessError(err)
}
s.appendExecErrors(&ExecErrorContext{
err: err,
location: sqlJob.currentLocation.Clone(),
jobs: fmt.Sprintf("%v", sqlJob.ddls),
})
s.jobWg.Done()
continue
}

switch s.cfg.ShardMode {
Expand Down Expand Up @@ -964,15 +971,16 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
err = s.optimist.DoneOperation(*(s.optimist.PendingOperation()))
}
}
s.jobWg.Done()
if err != nil {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, &sqlJob.startLocation, &sqlJob.currentLocation)
s.runFatalChan <- unit.NewProcessError(err)
}
s.jobWg.Done()
continue
}
s.jobWg.Done()
s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls)))
}
}
Expand Down Expand Up @@ -1884,9 +1892,11 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

// when add ddl job, will execute ddl and then flush checkpoint.
// if execute ddl failed, the execErrorDetected will be true.
// return nil here to avoid duplicate error message
err = s.execError.Get()
if err != nil {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err))
return nil
}

s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation))
Expand Down Expand Up @@ -2069,7 +2079,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

err = s.execError.Get()
if err != nil {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err))
return nil
}

if len(onlineDDLTableNames) > 0 {
Expand Down
36 changes: 36 additions & 0 deletions tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,41 @@ function DM_REPLACE_ERROR_MULTIPLE() {
run_case REPLACE_ERROR_MULTIPLE "double-source-optimistic" "init_table 11 21" "clean_table" "optimistic"
}

function DM_EXEC_ERROR_SKIP_CASE() {
run_sql_source1 "insert into ${db}.${tb1} values(1,1);"
run_sql_source2 "insert into ${db}.${tb1} values(2,2);"
run_sql_tidb "insert into ${db}.${tb} values(1,1);"
run_sql_tidb "insert into ${db}.${tb} values(2,2);"
run_sql_tidb_with_retry "select count(1) from ${db}.${tb};" "count(1): 4"

run_sql_source1 "alter table ${db}.${tb1} add unique index ua(a);"
run_sql_source2 "alter table ${db}.${tb1} add unique index ua(a);"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Error 1062: Duplicate entry " 1

run_sql_tidb "insert into ${db}.${tb} values(3,3);"
run_sql_tidb "insert into ${db}.${tb} values(4,4);"

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"handle-error test skip" \
"\"result\": true" 2

run_sql_tidb_with_retry "select count(1) from ${db}.${tb};" "count(1): 6"
}

function DM_EXEC_ERROR_SKIP() {
run_case EXEC_ERROR_SKIP "double-source-pessimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int, b int);\"" \
"clean_table" "pessimistic"
run_case EXEC_ERROR_SKIP "double-source-optimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int, b int);\"" \
"clean_table" "optimistic"
}

function run() {
init_cluster
init_database
Expand All @@ -377,6 +412,7 @@ function run() {
DM_REPLACE_ERROR
DM_REPLACE_ERROR_SHARDING
DM_REPLACE_ERROR_MULTIPLE
DM_EXEC_ERROR_SKIP
}

cleanup_data $db
Expand Down