Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl(dm): make skipped ddl pass SplitDDL() (#4176) #4227

Merged
33 changes: 22 additions & 11 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2159,26 +2159,37 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o
appliedDDLs: make([]string, 0),
sourceTbls: make(map[string]map[string]struct{}),
}
qec.p, err = event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err))
}

stmt, err := parseOneStmt(qec)
if err != nil {
// return error if parse fail and filter fail
defer func() {
if err == nil {
return
}
// why not `skipSQLByPattern` at beginning, but at defer?
// it is in order to track every ddl except for the one that will cause error.
// if `skipSQLByPattern` at beginning, some ddl should be tracked may be skipped.
needSkip, err2 := s.skipSQLByPattern(qec.originSQL)
if err2 != nil {
return err2
err = err2
return
}
if !needSkip {
return err
return
}
// don't return error if parse fail and filter success
// don't return error if filter success
metrics.SkipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds())
ec.tctx.L().Warn("skip event", zap.String("event", "query"), zap.Stringer("query event context", qec))
*ec.lastLocation = *ec.currentLocation // before record skip location, update lastLocation
return s.recordSkipSQLsLocation(&ec)
err = s.recordSkipSQLsLocation(&ec)
}()

qec.p, err = event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err))
}

stmt, err := parseOneStmt(qec)
if err != nil {
return err
}

if node, ok := stmt.(ast.DMLNode); ok {
Expand Down
10 changes: 9 additions & 1 deletion dm/tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,15 @@ function run() {
# use sync_diff_inspector to check full dump loader
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# check create view(should be skipped by func `skipSQLByPattern`) will not stop sync task
run_sql_source1 "create view all_mode.t1_v as select * from all_mode.t1 where id=0;"
sleep 1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"\"result\": true" 2 \
"\"unit\": \"Sync\"" 1 \
"\"stage\": \"Running\"" 2

run_sql_source1 "SHOW SLAVE HOSTS;"
check_contains 'Slave_UUID'

Expand Down Expand Up @@ -498,7 +507,6 @@ function run() {
check_log_not_contains $WORK_DIR/worker2/log/dm-worker.log "Error .* Table .* doesn't exist"

# test Db not exists should be reported

run_sql_tidb "drop database all_mode"
run_sql_source1 "create table all_mode.db_error (c int primary key);"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down