diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 43fc79af6fe..5a360875123 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1726,7 +1726,28 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // revert currentLocation to startLocation currentLocation = startLocation } else if op == pb.ErrorOp_Skip { + ec := eventContext{ + tctx: tctx, + header: e.Header, + startLocation: &startLocation, + currentLocation: ¤tLocation, + lastLocation: &lastLocation, + } + var sourceTbls map[string]map[string]struct{} + sourceTbls, err = s.trackOriginDDL(ev, ec) + if err != nil { + tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", ev.Query)) + } + s.saveGlobalPoint(currentLocation) + for sourceSchema, tableMap := range sourceTbls { + if sourceSchema == "" { + continue + } + for sourceTable := range tableMap { + s.saveTablePoint(&filter.Table{Schema: sourceSchema, Name: sourceTable}, currentLocation) + } + } err = s.flushJobs() if err != nil { tctx.L().Warn("failed to flush jobs when handle-error skip", zap.Error(err)) @@ -2774,6 +2795,70 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex return nil } +func (s *Syncer) trackOriginDDL(ev *replication.QueryEvent, ec eventContext) (map[string]map[string]struct{}, error) { + originSQL := strings.TrimSpace(string(ev.Query)) + if originSQL == "BEGIN" || originSQL == "" || utils.IsBuildInSkipDDL(originSQL) { + return nil, nil + } + var err error + qec := &queryEventContext{ + eventContext: &ec, + ddlSchema: string(ev.Schema), + originSQL: utils.TrimCtrlChars(originSQL), + splitDDLs: make([]string, 0), + appliedDDLs: make([]string, 0), + sourceTbls: make(map[string]map[string]struct{}), + } + qec.p, err = event.GetParserForStatusVars(ev.StatusVars) + if err != nil { + log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) + } + stmt, err := parseOneStmt(qec) + if err != nil { + // originSQL can't be parsed => can't be tracked by schema tracker + // we can use operate-schema to set a compatible schema after this + return nil, err + } + + if _, ok := stmt.(ast.DDLNode); !ok { + return nil, nil + } + + // TiDB can't handle multi schema change DDL, so we split it here. + qec.splitDDLs, err = parserpkg.SplitDDL(stmt, qec.ddlSchema) + if err != nil { + return nil, err + } + + affectedTbls := make(map[string]map[string]struct{}) + for _, sql := range qec.splitDDLs { + ddlInfo, err := s.genDDLInfo(qec.p, qec.ddlSchema, sql) + if err != nil { + return nil, err + } + sourceTable := ddlInfo.sourceTables[0] + switch ddlInfo.originStmt.(type) { + case *ast.DropDatabaseStmt: + delete(affectedTbls, sourceTable.Schema) + case *ast.DropTableStmt: + if affectedTable, ok := affectedTbls[sourceTable.Schema]; ok { + delete(affectedTable, sourceTable.Name) + } + default: + if _, ok := affectedTbls[sourceTable.Schema]; !ok { + affectedTbls[sourceTable.Schema] = make(map[string]struct{}) + } + affectedTbls[sourceTable.Schema][sourceTable.Name] = struct{}{} + } + err = s.trackDDL(qec.ddlSchema, ddlInfo, qec.eventContext) + if err != nil { + return nil, err + } + } + + return affectedTbls, nil +} + func (s *Syncer) genRouter() error { s.tableRouter, _ = router.NewTableRouter(s.cfg.CaseSensitive, []*router.TableRule{}) for _, rule := range s.cfg.RouteRules { diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index fa009a003b0..c1b8cd980a1 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -148,15 +148,11 @@ function DM_RENAME_COLUMN_OPTIMISTIC_CASE() { # dmls fail run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "Paused" 2 - #"Error 1054: Unknown column 'a' in 'field list'" 2 // may more than 2 dml error + "Paused" 1 \ + "Unknown column 'a' in 'field list'" 1 # third, set schema to be same with upstream - # TODO: support set schema automatically base on upstream schema - echo 'CREATE TABLE `tb1` ( `c` int NOT NULL, `b` varchar(10) DEFAULT NULL, PRIMARY KEY (`c`)) ENGINE=InnoDB DEFAULT CHARSET=latin1' >${WORK_DIR}/schema1.sql - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "binlog-schema update -s mysql-replica-01 test ${shardddl1} ${tb1} ${WORK_DIR}/schema1.sql --flush --sync" \ - "\"result\": true" 2 + echo 'CREATE TABLE `tb1` ( `c` int NOT NULL, `b` varchar(10) DEFAULT NULL, PRIMARY KEY (`c`)) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' >${WORK_DIR}/schema1.sql run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "binlog-schema update -s mysql-replica-02 test ${shardddl1} ${tb1} ${WORK_DIR}/schema1.sql --flush --sync" \ "\"result\": true" 2 @@ -169,7 +165,7 @@ function DM_RENAME_COLUMN_OPTIMISTIC_CASE() { # source2.table2's dml fails run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "Error 1054: Unknown column 'a' in 'field list'" 1 + "Unknown column 'a' in 'field list'" 1 # WARN: set schema of source2.table2 # Actually it should be tb2(a,b), dml is {a: 9, b: 'iii'} diff --git a/dm/tests/shardddl2/run.sh b/dm/tests/shardddl2/run.sh index 2c95319431c..8f5f5985da8 100644 --- a/dm/tests/shardddl2/run.sh +++ b/dm/tests/shardddl2/run.sh @@ -484,16 +484,10 @@ function DM_DropAddColumn_CASE() { "\"result\": true" 2 \ "\"source 'mysql-replica-02' has no error\"" 1 - # after we skip ADD COLUMN, we should fix the table structure run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "pause-task test" \ "\"result\": true" 3 - echo 'CREATE TABLE `tb1` ( `a` int(11) NOT NULL, `b` int(11) DEFAULT NULL, `c` int(11) DEFAULT NULL, PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' >${WORK_DIR}/schema.sql - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "binlog-schema update test ${shardddl1} ${tb1} ${WORK_DIR}/schema.sql -s mysql-replica-01" \ - "\"result\": true" 2 - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "resume-task test" \ "\"result\": true" 3