Skip to content

Commit

Permalink
ddl(dm): make skipped ddl pass SplitDDL() (pingcap#4176)
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang authored and zhaoxinyu committed Jan 20, 2022
1 parent 2a7ba40 commit c62326d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
33 changes: 22 additions & 11 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2380,26 +2380,37 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o
sourceTbls: make(map[string]map[string]struct{}),
eventStatusVars: ev.StatusVars,
}
qec.p, err = event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err))
}

stmt, err := parseOneStmt(qec)
if err != nil {
// return error if parse fail and filter fail
defer func() {
if err == nil {
return
}
// why not `skipSQLByPattern` at beginning, but at defer?
// it is in order to track every ddl except for the one that will cause error.
// if `skipSQLByPattern` at beginning, some ddl should be tracked may be skipped.
needSkip, err2 := s.skipSQLByPattern(qec.originSQL)
if err2 != nil {
return err2
err = err2
return
}
if !needSkip {
return err
return
}
// don't return error if parse fail and filter success
// don't return error if filter success
metrics.SkipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds())
ec.tctx.L().Warn("skip event", zap.String("event", "query"), zap.Stringer("query event context", qec))
*ec.lastLocation = *ec.currentLocation // before record skip location, update lastLocation
return s.recordSkipSQLsLocation(&ec)
err = s.recordSkipSQLsLocation(&ec)
}()

qec.p, err = event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err))
}

stmt, err := parseOneStmt(qec)
if err != nil {
return err
}

if node, ok := stmt.(ast.DMLNode); ok {
Expand Down
10 changes: 9 additions & 1 deletion dm/tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,15 @@ function run() {
# use sync_diff_inspector to check full dump loader
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# check create view(should be skipped by func `skipSQLByPattern`) will not stop sync task
run_sql_source1 "create view all_mode.t1_v as select * from all_mode.t1 where id=0;"
sleep 1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"\"result\": true" 2 \
"\"unit\": \"Sync\"" 1 \
"\"stage\": \"Running\"" 2

run_sql_source1 "SHOW SLAVE HOSTS;"
check_contains 'Slave_UUID'

Expand Down Expand Up @@ -495,7 +504,6 @@ function run() {
check_log_not_contains $WORK_DIR/worker2/log/dm-worker.log "Error .* Table .* doesn't exist"

# test Db not exists should be reported

run_sql_tidb "drop database all_mode"
run_sql_source1 "create table all_mode.db_error (c int primary key);"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down

0 comments on commit c62326d

Please sign in to comment.