Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
ddl: pause subtask when exec ddl err (#1055) (#1059)
Browse files Browse the repository at this point in the history
Co-authored-by: gmhdbjd <[email protected]>
  • Loading branch information
ti-srebot and GMHDBJD authored Sep 18, 2020
1 parent 1005725 commit 0e7f728
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 4 deletions.
3 changes: 2 additions & 1 deletion syncer/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ func (s *Syncer) handleQueryEventOptimistic(

err = s.execError.Get()
if err != nil {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err))
return nil
}

for _, table := range onlineDDLTableNames {
Expand Down
17 changes: 14 additions & 3 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,11 +931,18 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
}
}
if err != nil {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, &sqlJob.startLocation, &sqlJob.currentLocation)
s.runFatalChan <- unit.NewProcessError(err)
}
s.appendExecErrors(&ExecErrorContext{
err: err,
location: sqlJob.currentLocation.Clone(),
jobs: fmt.Sprintf("%v", sqlJob.ddls),
})
s.jobWg.Done()
continue
}

switch s.cfg.ShardMode {
Expand Down Expand Up @@ -964,15 +971,16 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
err = s.optimist.DoneOperation(*(s.optimist.PendingOperation()))
}
}
s.jobWg.Done()
if err != nil {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, &sqlJob.startLocation, &sqlJob.currentLocation)
s.runFatalChan <- unit.NewProcessError(err)
}
s.jobWg.Done()
continue
}
s.jobWg.Done()
s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls)))
}
}
Expand Down Expand Up @@ -1884,9 +1892,11 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

// when add ddl job, will execute ddl and then flush checkpoint.
// if execute ddl failed, the execErrorDetected will be true.
// return nil here to avoid duplicate error message
err = s.execError.Get()
if err != nil {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err))
return nil
}

s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation))
Expand Down Expand Up @@ -2069,7 +2079,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

err = s.execError.Get()
if err != nil {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err))
return nil
}

if len(onlineDDLTableNames) > 0 {
Expand Down
36 changes: 36 additions & 0 deletions tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,41 @@ function DM_REPLACE_ERROR_MULTIPLE() {
run_case REPLACE_ERROR_MULTIPLE "double-source-optimistic" "init_table 11 21" "clean_table" "optimistic"
}

function DM_EXEC_ERROR_SKIP_CASE() {
run_sql_source1 "insert into ${db}.${tb1} values(1,1);"
run_sql_source2 "insert into ${db}.${tb1} values(2,2);"
run_sql_tidb "insert into ${db}.${tb} values(1,1);"
run_sql_tidb "insert into ${db}.${tb} values(2,2);"
run_sql_tidb_with_retry "select count(1) from ${db}.${tb};" "count(1): 4"

run_sql_source1 "alter table ${db}.${tb1} add unique index ua(a);"
run_sql_source2 "alter table ${db}.${tb1} add unique index ua(a);"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Error 1062: Duplicate entry " 1

run_sql_tidb "insert into ${db}.${tb} values(3,3);"
run_sql_tidb "insert into ${db}.${tb} values(4,4);"

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"handle-error test skip" \
"\"result\": true" 2

run_sql_tidb_with_retry "select count(1) from ${db}.${tb};" "count(1): 6"
}

function DM_EXEC_ERROR_SKIP() {
run_case EXEC_ERROR_SKIP "double-source-pessimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int, b int);\"" \
"clean_table" "pessimistic"
run_case EXEC_ERROR_SKIP "double-source-optimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int, b int);\"" \
"clean_table" "optimistic"
}

function run() {
init_cluster
init_database
Expand All @@ -377,6 +412,7 @@ function run() {
DM_REPLACE_ERROR
DM_REPLACE_ERROR_SHARDING
DM_REPLACE_ERROR_MULTIPLE
DM_EXEC_ERROR_SKIP
}

cleanup_data $db
Expand Down

0 comments on commit 0e7f728

Please sign in to comment.