Skip to content

Commit

Permalink
syncer(dm): split big transaction when flush checkpoint (#7259) (#7284)
Browse files Browse the repository at this point in the history
close #5010, ref #6784, close #7159
  • Loading branch information
ti-chi-bot authored Oct 8, 2022
1 parent 2310cfd commit 247ac7e
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 47 deletions.
13 changes: 6 additions & 7 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,11 +603,11 @@ func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSche

// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint.
// This function is used to skip old binlog events. Table checkpoint is saved after dispatching a binlog event.
// - For GTID based and position based replication, DML handling is a bit different but comparison is same here.
// When using position based, each event has unique position so we have confident to skip event which is <= table checkpoint.
// When using GTID based, there may be more than one event with same GTID, but we still skip event which is <= table checkpoint,
// to make this right we only save table point for the transaction affected tables only after the whole transaction is processed
// - DDL will not have unique position or GTID, so we can always skip events <= table checkpoint.
// - For GTID based and position based replication, DML handling is a bit different but comparison is same here.
// When using position based, each event has unique position so we have confident to skip event which is <= table checkpoint.
// When using GTID based, there may be more than one event with same GTID, but we still skip event which is <= table checkpoint,
// to make this right we only save table point for the transaction affected tables only after the whole transaction is processed
// - DDL will not have unique position or GTID, so we can always skip events <= table checkpoint.
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location binlog.Location) bool {
cp.RLock()
defer cp.RUnlock()
Expand Down Expand Up @@ -731,8 +731,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(
// use a new context apart from syncer, to make sure when syncer call `cancel` checkpoint could update
tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration)
defer cancel()
// TODO: refine dbConn and add ExecuteSQLAutoSplit
_, err := cp.dbConn.ExecuteSQL(tctx2, sqls, args...)
err := cp.dbConn.ExecuteSQLAutoSplit(tctx2, sqls, args...)
if err != nil {
return err
}
Expand Down
14 changes: 14 additions & 0 deletions dm/syncer/dbconn/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ func (conn *DBConn) ExecuteSQL(tctx *tcontext.Context, queries []string, args ..
return conn.ExecuteSQLWithIgnore(tctx, nil, queries, args...)
}

// ExecuteSQLAutoSplit wraps BaseConn.ExecuteSQLAutoSplit.
// TODO: refine DBConn and BaseConn.
func (conn *DBConn) ExecuteSQLAutoSplit(
tctx *tcontext.Context,
queries []string,
args ...[]interface{},
) error {
if conn == nil {
// only happens in test
return nil
}
return conn.baseConn.ExecuteSQLsAutoSplit(tctx, nil, conn.cfg.Name, queries, args...)
}

func (conn *DBConn) retryableFn(tctx *tcontext.Context, queries, args any) func(int, error) bool {
return func(retryTime int, err error) bool {
if retry.IsConnectionError(err) {
Expand Down
74 changes: 40 additions & 34 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,9 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
// buildLowerCaseTableNamesMap build a lower case schema map and lower case table map for all tables
// Input: map of schema --> list of tables
// Output: schema names map: lower_case_schema_name --> schema_name
// tables names map: lower_case_schema_name --> lower_case_table_name --> table_name
//
// tables names map: lower_case_schema_name --> lower_case_table_name --> table_name
//
// Note: the result will skip the schemas and tables that their lower_case_name are the same.
func buildLowerCaseTableNamesMap(tables map[string][]string) (map[string]string, map[string]map[string]string) {
schemaMap := make(map[string]string)
Expand Down Expand Up @@ -1144,10 +1146,11 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) {

// flushCheckPoints synchronously flushes previous saved checkpoint in memory to persistent storage, like TiDB
// we flush checkpoints in four cases:
// 1. DDL executed
// 2. pausing / stopping the sync (driven by `s.flushJobs`)
// 3. IsFreshTask return true
// 4. Heartbeat event received
// 1. DDL executed
// 2. pausing / stopping the sync (driven by `s.flushJobs`)
// 3. IsFreshTask return true
// 4. Heartbeat event received
//
// but when error occurred, we can not flush checkpoint, otherwise data may lost
// and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before
// this should be handled when `s.Run` returned
Expand Down Expand Up @@ -1666,9 +1669,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
// 2. then since we are confident that Load unit is done we can delete the load task etcd KV.
// TODO: we can't handle panic between 1. and 2., or fail to delete the load task etcd KV.
// 3. then we initiate schema tracker
// 4. - when it's a fresh task, load the table structure from dump files into schema tracker.
// if it's also a optimistic sharding task, also load the table structure into checkpoints because shard tables
// may not have same table structure so we can't fetch the downstream table structure for them lazily.
// 4. - when it's a fresh task, load the table structure from dump files into schema tracker,
// and flush them into checkpoint again.
// - when it's a resumed task, load the table structure from checkpoints into schema tracker.
// TODO: we can't handle failure between 1. and 4. After 1. it's not a fresh task.
// 5. finally clean the dump files
Expand All @@ -1695,9 +1697,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
s.tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err))
cleanDumpFile = false
}
if s.cfg.ShardMode == config.ShardOptimistic {
s.flushOptimisticTableInfos(s.runCtx)
} else {
err = s.flushCheckPoints()
if err != nil {
s.tctx.L().Warn("error happened when flush table structure from dump files", zap.Error(err))
cleanDumpFile = false
}
}
} else {
err = s.checkpoint.LoadIntoSchemaTracker(ctx, s.schemaTracker)
Expand Down Expand Up @@ -3459,6 +3464,7 @@ func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error {
var dbs, tables []string
var tableFiles [][2]string // [db, filename]
for f := range files {
// TODO: handle db/table name escaped bu dumpling.
if db, ok := utils.GetDBFromDumpFilename(f); ok {
dbs = append(dbs, db)
continue
Expand Down Expand Up @@ -3488,6 +3494,13 @@ func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error {
firstErr = err
}
}
p, err := utils.GetParserFromSQLModeStr(s.cfg.LoaderConfig.SQLMode)
if err != nil {
logger.Error("failed to create parser from SQL Mode, will skip loadTableStructureFromDump",
zap.String("SQLMode", s.cfg.LoaderConfig.SQLMode),
zap.Error(err))
return err
}

for _, dbAndFile := range tableFiles {
db, file := dbAndFile[0], dbAndFile[1]
Expand All @@ -3507,6 +3520,17 @@ func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error {
if len(stmt) == 0 || bytes.HasPrefix(stmt, []byte("/*")) {
continue
}
stmtNode, err3 := p.ParseOneStmt(string(stmt), "", "")
if err3 != nil {
logger.Warn("fail to parse statement for creating table in schema tracker",
zap.String("db", db),
zap.String("path", s.cfg.LoaderConfig.Dir),
zap.String("file", file),
zap.ByteString("statement", stmt),
zap.Error(err3))
setFirstErr(err3)
continue
}
err = s.schemaTracker.Exec(ctx, db, string(stmt))
if err != nil {
logger.Warn("fail to create table for dump files",
Expand All @@ -3515,10 +3539,12 @@ func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error {
zap.ByteString("statement", stmt),
zap.Error(err))
setFirstErr(err)
continue
}
// TODO: we should save table checkpoint here, but considering when
// the first time of flushing checkpoint, user may encounter https://github.com/pingcap/tiflow/issues/5010
// we should fix that problem first.
s.saveTablePoint(
&filter.Table{Schema: db, Name: stmtNode.(*ast.CreateTableStmt).Table.Name.O},
s.getFlushedGlobalPoint(),
)
}
}
return firstErr
Expand Down Expand Up @@ -4068,26 +4094,6 @@ func calculateChanSize(queueSize, workerCount int, compact bool) int {
return chanSize
}

func (s *Syncer) flushOptimisticTableInfos(tctx *tcontext.Context) {
tbls := s.optimist.Tables()
sourceTables := make([]*filter.Table, 0, len(tbls))
tableInfos := make([]*model.TableInfo, 0, len(tbls))
for _, tbl := range tbls {
sourceTable := tbl[0]
targetTable := tbl[1]
tableInfo, err := s.getTableInfo(tctx, &sourceTable, &targetTable)
if err != nil {
tctx.L().Error("failed to get table infos", log.ShortError(err))
continue
}
sourceTables = append(sourceTables, &sourceTable)
tableInfos = append(tableInfos, tableInfo)
}
if err := s.checkpoint.FlushPointsWithTableInfos(tctx, sourceTables, tableInfos); err != nil {
tctx.L().Error("failed to flush table points with table infos", log.ShortError(err))
}
}

func (s *Syncer) setGlobalPointByTime(tctx *tcontext.Context, timeStr string) error {
// we support two layout
t, err := time.ParseInLocation(config.StartTimeFormat, timeStr, s.upstreamTZ)
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/adjust_gtid/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ function clean_gtid() {
# delete SOURCE1 checkpoint's gtid info
run_sql "update dm_meta.${TASK_NAME}_syncer_checkpoint set binlog_gtid=\"\" where id=\"$SOURCE_ID1\" and is_global=1" $TIDB_PORT $TIDB_PASSWORD
# set SOURCE2 incremental metadata without checkpoint
run_sql "delete from dm_meta.${TASK_NAME}_syncer_checkpoint where id=\"$SOURCE_ID2\" and is_global=1" $TIDB_PORT $TIDB_PASSWORD
run_sql "delete from dm_meta.${TASK_NAME}_syncer_checkpoint where id=\"$SOURCE_ID2\"" $TIDB_PORT $TIDB_PASSWORD

cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
sed -i "s/task-mode-placeholder/incremental/g" $WORK_DIR/dm-task.yaml
Expand Down
2 changes: 2 additions & 0 deletions dm/tests/many_tables/conf/tidb-config-small-txn.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[performance]
txn-total-size-limit = 800000
24 changes: 20 additions & 4 deletions dm/tests/many_tables/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,41 @@ function prepare_data() {
run_sql 'DROP DATABASE if exists many_tables_db;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'CREATE DATABASE many_tables_db;' $MYSQL_PORT1 $MYSQL_PASSWORD1
for i in $(seq $TABLE_NUM); do
run_sql "CREATE TABLE many_tables_db.t$i(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "CREATE TABLE many_tables_db.t$i(i TINYINT, j INT UNIQUE KEY, c1 VARCHAR(20), c2 VARCHAR(20), c3 VARCHAR(20), c4 VARCHAR(20), c5 VARCHAR(20), c6 VARCHAR(20), c7 VARCHAR(20), c8 VARCHAR(20), c9 VARCHAR(20), c10 VARCHAR(20), c11 VARCHAR(20), c12 VARCHAR(20), c13 VARCHAR(20));" $MYSQL_PORT1 $MYSQL_PASSWORD1
for j in $(seq 2); do
run_sql "INSERT INTO many_tables_db.t$i VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "INSERT INTO many_tables_db.t$i(i,j) VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done
# to make the tables have odd number of lines before 'ALTER TABLE' command, for check_sync_diff to work correctly
run_sql "INSERT INTO many_tables_db.t$i(i,j) VALUES (9, 90009);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done
}

function incremental_data() {
for j in $(seq 3 5); do
for i in $(seq $TABLE_NUM); do
run_sql "INSERT INTO many_tables_db.t$i VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "INSERT INTO many_tables_db.t$i(i,j) VALUES ($j,${j}000$j),($j,${j}001$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done
done
}

function incremental_data_2() {
j=6
for i in $(seq $TABLE_NUM); do
run_sql "INSERT INTO many_tables_db.t$i VALUES ($j,${j}000$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "INSERT INTO many_tables_db.t$i (i, j) VALUES ($j,${j}000$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done
}

function run() {
pkill -hup tidb-server 2>/dev/null || true
wait_process_exit tidb-server

# clean unistore data
rm -rf /tmp/tidb

# start a TiDB with small txn-total-size-limit
run_tidb_server 4000 $TIDB_PASSWORD $cur/conf/tidb-config-small-txn.toml
sleep 2

echo "start prepare_data"
prepare_data
echo "finish prepare_data"
Expand All @@ -59,6 +71,10 @@ function run() {
wait_until_sync $WORK_DIR "127.0.0.1:$MASTER_PORT"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_sql_tidb_with_retry_times "select count(*) from dm_meta.test_syncer_checkpoint" "count(*): 501" 60

check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'Error 8004: Transaction is too large'

# check https://github.com/pingcap/tiflow/issues/5063
check_time=20
sleep 5
Expand Down
4 changes: 3 additions & 1 deletion dm/tests/start_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ function init_tracker_test() {
done
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 20

# now syncer will save all table structure from dump files at Init, so all tables
# should be loaded into schema tracker.
check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'init table info.*t50' 1
check_log_not_contains $WORK_DIR/worker1/log/dm-worker.log 'init table info.*t51'
check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'init table info.*t51' 1

cleanup_process
cleanup_data start_task
Expand Down

0 comments on commit 247ac7e

Please sign in to comment.