Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

ddl: pause subtask when exec ddl err #1055

Merged
merged 4 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion syncer/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ func (s *Syncer) handleQueryEventOptimistic(

err = s.execError.Get()
if err != nil {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err))
return nil
}

for _, table := range onlineDDLTableNames {
Expand Down
14 changes: 12 additions & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,11 +931,18 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
}
}
if err != nil {
s.jobWg.Done()
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, &sqlJob.startLocation, &sqlJob.currentLocation)
s.runFatalChan <- unit.NewProcessError(err)
}
s.appendExecErrors(&ExecErrorContext{
err: err,
location: sqlJob.currentLocation.Clone(),
jobs: fmt.Sprintf("%v", sqlJob.ddls),
})
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forward: "can we just rollback checkpoint as we did before?" #1032 (comment)

I think that's no need, we could use caller's rollback

}

switch s.cfg.ShardMode {
Expand Down Expand Up @@ -1884,9 +1891,11 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

// when add ddl job, will execute ddl and then flush checkpoint.
// if execute ddl failed, the execErrorDetected will be true.
// return nil here to avoid duplicate error message
err = s.execError.Get()
if err != nil {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err))
return nil
}

s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation))
Expand Down Expand Up @@ -2069,7 +2078,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

err = s.execError.Get()
if err != nil {
return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query)
s.tctx.L().Error("error detected when executing SQL job", log.ShortError(err))
return nil
}

if len(onlineDDLTableNames) > 0 {
Expand Down
36 changes: 36 additions & 0 deletions tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,41 @@ function DM_REPLACE_ERROR_MULTIPLE() {
run_case REPLACE_ERROR_MULTIPLE "double-source-optimistic" "init_table 11 21" "clean_table" "optimistic"
}

function DM_EXEC_ERROR_SKIP_CASE() {
run_sql_source1 "insert into ${db}.${tb1} values(1,1);"
run_sql_source2 "insert into ${db}.${tb1} values(2,2);"
run_sql_tidb "insert into ${db}.${tb} values(1,1);"
run_sql_tidb "insert into ${db}.${tb} values(2,2);"
run_sql_tidb_with_retry "select count(1) from ${db}.${tb};" "count(1): 4"

run_sql_source1 "alter table ${db}.${tb1} add unique index ua(a);"
run_sql_source2 "alter table ${db}.${tb1} add unique index ua(a);"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Error 1062: Duplicate entry " 1

run_sql_tidb "insert into ${db}.${tb} values(3,3);"
run_sql_tidb "insert into ${db}.${tb} values(4,4);"

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"handle-error test skip" \
"\"result\": true" 2

run_sql_tidb_with_retry "select count(1) from ${db}.${tb};" "count(1): 6"
}

function DM_EXEC_ERROR_SKIP() {
run_case EXEC_ERROR_SKIP "double-source-pessimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int, b int);\"" \
"clean_table" "pessimistic"
run_case EXEC_ERROR_SKIP "double-source-optimistic" \
"run_sql_source1 \"create table ${db}.${tb1} (a int, b int);\"; \
run_sql_source2 \"create table ${db}.${tb1} (a int, b int);\"" \
"clean_table" "optimistic"
}

function run() {
init_cluster
init_database
Expand All @@ -377,6 +412,7 @@ function run() {
DM_REPLACE_ERROR
DM_REPLACE_ERROR_SHARDING
DM_REPLACE_ERROR_MULTIPLE
DM_EXEC_ERROR_SKIP
}

cleanup_data $db
Expand Down