Skip to content

Commit

Permalink
syncer(dm): fix log error caused by "COMMIT" in QueryEvent (#7726)
Browse files Browse the repository at this point in the history
ref #7525
  • Loading branch information
liumengya94 authored Dec 1, 2022
1 parent ee78be4 commit bc176c3
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 23 deletions.
65 changes: 42 additions & 23 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2256,24 +2256,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
var originSQL string // show origin sql when error, only ddl now
var err2 error
var sourceTable *filter.Table
var needContinue bool
var eventType string

switch ev := e.Event.(type) {
case *replication.RotateEvent:
err2 = s.handleRotateEvent(ev, ec)
case *replication.RowsEvent:
eventIndex++
s.metricsProxies.Metrics.BinlogEventRowHistogram.Observe(float64(len(ev.Rows)))
sourceTable, err2 = s.handleRowsEvent(ev, ec)
if sourceTable != nil && err2 == nil && s.cfg.EnableGTID {
if _, ok := affectedSourceTables[sourceTable.Schema]; !ok {
affectedSourceTables[sourceTable.Schema] = make(map[string]struct{})
}
affectedSourceTables[sourceTable.Schema][sourceTable.Name] = struct{}{}
}
case *replication.QueryEvent:
originSQL = strings.TrimSpace(string(ev.Query))
err2 = s.ddlWorker.HandleQueryEvent(ev, ec, originSQL)
case *replication.XIDEvent:
funcCommit := func() (bool, error) {
// reset eventIndex and force safeMode flag here.
eventIndex = 0
for schemaName, tableMap := range affectedSourceTables {
Expand All @@ -2287,19 +2273,52 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
shardingReSync.currLocation = endLocation

if binlog.CompareLocation(shardingReSync.currLocation, shardingReSync.latestLocation, s.cfg.EnableGTID) >= 0 {
s.tctx.L().Info("re-replicate shard group was completed", zap.String("event", "XID"), zap.Stringer("re-shard", shardingReSync))
s.tctx.L().Info("re-replicate shard group was completed", zap.String("event", eventType), zap.Stringer("re-shard", shardingReSync))
err = closeShardingResync()
if err != nil {
return terror.Annotatef(err, "shard group current location %s", shardingReSync.currLocation)
return false, terror.Annotatef(err, "shard group current location %s", shardingReSync.currLocation)
}
continue
return true, nil
}
}

s.tctx.L().Debug("", zap.String("event", "XID"), zap.Stringer("last location", lastTxnEndLocation), log.WrapStringerField("location", endLocation))
s.tctx.L().Debug("", zap.String("event", eventType), zap.Stringer("last location", lastTxnEndLocation), log.WrapStringerField("location", endLocation))

job := newXIDJob(endLocation, startLocation, endLocation)
_, err2 = s.handleJobFunc(job)
_, err = s.handleJobFunc(job)
return false, err
}

switch ev := e.Event.(type) {
case *replication.RotateEvent:
err2 = s.handleRotateEvent(ev, ec)
case *replication.RowsEvent:
eventIndex++
s.metricsProxies.Metrics.BinlogEventRowHistogram.Observe(float64(len(ev.Rows)))
sourceTable, err2 = s.handleRowsEvent(ev, ec)
if sourceTable != nil && err2 == nil && s.cfg.EnableGTID {
if _, ok := affectedSourceTables[sourceTable.Schema]; !ok {
affectedSourceTables[sourceTable.Schema] = make(map[string]struct{})
}
affectedSourceTables[sourceTable.Schema][sourceTable.Name] = struct{}{}
}
case *replication.QueryEvent:
originSQL = strings.TrimSpace(string(ev.Query))
if originSQL == "COMMIT" {
eventType = "COMMIT query event"
needContinue, err2 = funcCommit()
if needContinue {
continue
}
} else {
err2 = s.ddlWorker.HandleQueryEvent(ev, ec, originSQL)
}
case *replication.XIDEvent:
eventType = "XID"
needContinue, err2 = funcCommit()
if needContinue {
continue
}
case *replication.GenericEvent:
if e.Header.EventType == replication.HEARTBEAT_EVENT {
// flush checkpoint even if there are no real binlog events
Expand Down Expand Up @@ -2777,7 +2796,7 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex

func (s *Syncer) trackOriginDDL(ev *replication.QueryEvent, ec eventContext) (map[string]map[string]struct{}, error) {
originSQL := strings.TrimSpace(string(ev.Query))
if originSQL == "BEGIN" || originSQL == "" || utils.IsBuildInSkipDDL(originSQL) {
if originSQL == "BEGIN" || originSQL == "COMMIT" || originSQL == "" || utils.IsBuildInSkipDDL(originSQL) {
return nil, nil
}
var err error
Expand Down
63 changes: 63 additions & 0 deletions dm/tests/start_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,69 @@ function run() {

cleanup_process
done

test_COMMIT_in_QueryEvent
}

function prepare_data_MyISAM() {
run_sql 'DROP DATABASE if exists start_task;' $TIDB_PORT $TIDB_PASSWORD
run_sql 'DROP DATABASE if exists start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'CREATE DATABASE start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "CREATE TABLE start_task.t1(i TINYINT, j INT UNIQUE KEY) engine=MyISAM;" $MYSQL_PORT1 $MYSQL_PASSWORD1
for j in $(seq 10); do
run_sql "INSERT INTO start_task.t1 VALUES ($j,${j}000$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done
}

function test_COMMIT_in_QueryEvent() {
echo "[$(date)] <<<<<< start test_COMMIT_in_QueryEvent >>>>>>"
cleanup_process
cleanup_data start_task
prepare_data_MyISAM

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/dm-master.toml $WORK_DIR/
cp $cur/conf/dm-worker1.toml $WORK_DIR/
cp $cur/conf/dm-task.yaml $WORK_DIR/

# start DM worker and master
run_dm_master $WORK_DIR/master $MASTER_PORT $WORK_DIR/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $WORK_DIR/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

# operate mysql config to worker
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source create $WORK_DIR/source1.yaml" \
"\"result\": true" 2 \
"\"source\": \"$SOURCE_ID1\"" 1

echo "check master alive"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"\"alive\": true" 1

echo "start task and check stage"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $WORK_DIR/dm-task.yaml --remove-meta=true" \
"\"result\": true" 2

run_sql "CREATE TABLE start_task.t2(i TINYINT, j INT UNIQUE KEY) engine=MyISAM;" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'INSERT INTO start_task.t1 VALUES (99,9999);' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'INSERT INTO start_task.t2 VALUES (99,9999);' $MYSQL_PORT1 $MYSQL_PASSWORD1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"result\": true" 2 \
"\"unit\": \"Sync\"" 1 \
"\"stage\": \"Running\"" 2

echo "check data"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

check_log_not_contains $WORK_DIR/worker1/log/dm-worker.log "originSQL: COMMIT"

echo "<<<<<< test_COMMIT_in_QueryEvent success! >>>>>>"
}

cleanup_data start_task
Expand Down

0 comments on commit bc176c3

Please sign in to comment.