From bc176c35e40c6dfb3a386097a4e189b37639ef4a Mon Sep 17 00:00:00 2001 From: liumengya94 <80016792+liumengya94@users.noreply.github.com> Date: Thu, 1 Dec 2022 21:42:02 +0800 Subject: [PATCH] syncer(dm): fix log error caused by "COMMIT" in QueryEvent (#7726) ref pingcap/tiflow#7525 --- dm/syncer/syncer.go | 65 ++++++++++++++++++++++++-------------- dm/tests/start_task/run.sh | 63 ++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 23 deletions(-) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index f53c09a0a3e..aad973ba17c 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -2256,24 +2256,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { var originSQL string // show origin sql when error, only ddl now var err2 error var sourceTable *filter.Table + var needContinue bool + var eventType string - switch ev := e.Event.(type) { - case *replication.RotateEvent: - err2 = s.handleRotateEvent(ev, ec) - case *replication.RowsEvent: - eventIndex++ - s.metricsProxies.Metrics.BinlogEventRowHistogram.Observe(float64(len(ev.Rows))) - sourceTable, err2 = s.handleRowsEvent(ev, ec) - if sourceTable != nil && err2 == nil && s.cfg.EnableGTID { - if _, ok := affectedSourceTables[sourceTable.Schema]; !ok { - affectedSourceTables[sourceTable.Schema] = make(map[string]struct{}) - } - affectedSourceTables[sourceTable.Schema][sourceTable.Name] = struct{}{} - } - case *replication.QueryEvent: - originSQL = strings.TrimSpace(string(ev.Query)) - err2 = s.ddlWorker.HandleQueryEvent(ev, ec, originSQL) - case *replication.XIDEvent: + funcCommit := func() (bool, error) { // reset eventIndex and force safeMode flag here. eventIndex = 0 for schemaName, tableMap := range affectedSourceTables { @@ -2287,19 +2273,52 @@ func (s *Syncer) Run(ctx context.Context) (err error) { shardingReSync.currLocation = endLocation if binlog.CompareLocation(shardingReSync.currLocation, shardingReSync.latestLocation, s.cfg.EnableGTID) >= 0 { - s.tctx.L().Info("re-replicate shard group was completed", zap.String("event", "XID"), zap.Stringer("re-shard", shardingReSync)) + s.tctx.L().Info("re-replicate shard group was completed", zap.String("event", eventType), zap.Stringer("re-shard", shardingReSync)) err = closeShardingResync() if err != nil { - return terror.Annotatef(err, "shard group current location %s", shardingReSync.currLocation) + return false, terror.Annotatef(err, "shard group current location %s", shardingReSync.currLocation) } - continue + return true, nil } } - s.tctx.L().Debug("", zap.String("event", "XID"), zap.Stringer("last location", lastTxnEndLocation), log.WrapStringerField("location", endLocation)) + s.tctx.L().Debug("", zap.String("event", eventType), zap.Stringer("last location", lastTxnEndLocation), log.WrapStringerField("location", endLocation)) job := newXIDJob(endLocation, startLocation, endLocation) - _, err2 = s.handleJobFunc(job) + _, err = s.handleJobFunc(job) + return false, err + } + + switch ev := e.Event.(type) { + case *replication.RotateEvent: + err2 = s.handleRotateEvent(ev, ec) + case *replication.RowsEvent: + eventIndex++ + s.metricsProxies.Metrics.BinlogEventRowHistogram.Observe(float64(len(ev.Rows))) + sourceTable, err2 = s.handleRowsEvent(ev, ec) + if sourceTable != nil && err2 == nil && s.cfg.EnableGTID { + if _, ok := affectedSourceTables[sourceTable.Schema]; !ok { + affectedSourceTables[sourceTable.Schema] = make(map[string]struct{}) + } + affectedSourceTables[sourceTable.Schema][sourceTable.Name] = struct{}{} + } + case *replication.QueryEvent: + originSQL = strings.TrimSpace(string(ev.Query)) + if originSQL == "COMMIT" { + eventType = "COMMIT query event" + needContinue, err2 = funcCommit() + if needContinue { + continue + } + } else { + err2 = s.ddlWorker.HandleQueryEvent(ev, ec, originSQL) + } + case *replication.XIDEvent: + eventType = "XID" + needContinue, err2 = funcCommit() + if needContinue { + continue + } case *replication.GenericEvent: if e.Header.EventType == replication.HEARTBEAT_EVENT { // flush checkpoint even if there are no real binlog events @@ -2777,7 +2796,7 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex func (s *Syncer) trackOriginDDL(ev *replication.QueryEvent, ec eventContext) (map[string]map[string]struct{}, error) { originSQL := strings.TrimSpace(string(ev.Query)) - if originSQL == "BEGIN" || originSQL == "" || utils.IsBuildInSkipDDL(originSQL) { + if originSQL == "BEGIN" || originSQL == "COMMIT" || originSQL == "" || utils.IsBuildInSkipDDL(originSQL) { return nil, nil } var err error diff --git a/dm/tests/start_task/run.sh b/dm/tests/start_task/run.sh index b5205e0b442..8b814682639 100644 --- a/dm/tests/start_task/run.sh +++ b/dm/tests/start_task/run.sh @@ -240,6 +240,69 @@ function run() { cleanup_process done + + test_COMMIT_in_QueryEvent +} + +function prepare_data_MyISAM() { + run_sql 'DROP DATABASE if exists start_task;' $TIDB_PORT $TIDB_PASSWORD + run_sql 'DROP DATABASE if exists start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql 'CREATE DATABASE start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "CREATE TABLE start_task.t1(i TINYINT, j INT UNIQUE KEY) engine=MyISAM;" $MYSQL_PORT1 $MYSQL_PASSWORD1 + for j in $(seq 10); do + run_sql "INSERT INTO start_task.t1 VALUES ($j,${j}000$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + done +} + +function test_COMMIT_in_QueryEvent() { + echo "[$(date)] <<<<<< start test_COMMIT_in_QueryEvent >>>>>>" + cleanup_process + cleanup_data start_task + prepare_data_MyISAM + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/dm-master.toml $WORK_DIR/ + cp $cur/conf/dm-worker1.toml $WORK_DIR/ + cp $cur/conf/dm-task.yaml $WORK_DIR/ + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $WORK_DIR/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $WORK_DIR/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # operate mysql config to worker + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-source create $WORK_DIR/source1.yaml" \ + "\"result\": true" 2 \ + "\"source\": \"$SOURCE_ID1\"" 1 + + echo "check master alive" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member" \ + "\"alive\": true" 1 + + echo "start task and check stage" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml --remove-meta=true" \ + "\"result\": true" 2 + + run_sql "CREATE TABLE start_task.t2(i TINYINT, j INT UNIQUE KEY) engine=MyISAM;" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql 'INSERT INTO start_task.t1 VALUES (99,9999);' $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql 'INSERT INTO start_task.t2 VALUES (99,9999);' $MYSQL_PORT1 $MYSQL_PASSWORD1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 2 \ + "\"unit\": \"Sync\"" 1 \ + "\"stage\": \"Running\"" 2 + + echo "check data" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + check_log_not_contains $WORK_DIR/worker1/log/dm-worker.log "originSQL: COMMIT" + + echo "<<<<<< test_COMMIT_in_QueryEvent success! >>>>>>" } cleanup_data start_task