diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 4e01b170167..f0d58352cf5 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -603,7 +603,10 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error { return errors.Trace(err) } case timodel.ActionRenameTables: - return s.renameTables(job) + err := s.renameTables(job) + if err != nil { + return errors.Trace(err) + } case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable: err := s.createTable(getWrapTableInfo(job)) if err != nil { @@ -827,7 +830,8 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { lastSnap := s.snaps[len(s.snaps)-1] if job.BinlogInfo.FinishedTS <= lastSnap.currentTs { log.Info("ignore foregone DDL", zap.Int64("jobID", job.ID), - zap.String("DDL", job.Query), zap.String("changefeed", s.id)) + zap.String("DDL", job.Query), zap.String("changefeed", s.id), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS)) return nil } snap = lastSnap.Clone() @@ -837,11 +841,13 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { if err := snap.handleDDL(job); err != nil { log.Error("handle DDL failed", zap.String("DDL", job.Query), zap.Stringer("job", job), zap.Error(err), - zap.String("changefeed", s.id)) + zap.String("changefeed", s.id), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS)) return errors.Trace(err) } log.Info("handle DDL", zap.String("DDL", job.Query), - zap.Stringer("job", job), zap.String("changefeed", s.id)) + zap.Stringer("job", job), zap.String("changefeed", s.id), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS)) + s.snaps = append(s.snaps, snap) s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) return nil diff --git a/cdc/entry/schema_storage_test.go b/cdc/entry/schema_storage_test.go index 77952ad8b0b..4d8765213c7 100644 --- a/cdc/entry/schema_storage_test.go +++ b/cdc/entry/schema_storage_test.go @@ -410,9 +410,11 @@ func TestHandleRenameTables(t *testing.T) { rawArgs, err := json.Marshal(args) require.Nil(t, err) var job *timodel.Job = &timodel.Job{ - Type: timodel.ActionRenameTables, - RawArgs: rawArgs, - BinlogInfo: &timodel.HistoryInfo{}, + Type: timodel.ActionRenameTables, + RawArgs: rawArgs, + BinlogInfo: &timodel.HistoryInfo{ + FinishedTS: 11112222, + }, } job.BinlogInfo.MultipleTableInfos = append(job.BinlogInfo.MultipleTableInfos, &timodel.TableInfo{ @@ -442,6 +444,7 @@ func TestHandleRenameTables(t *testing.T) { t2 := model.TableName{Schema: "db_1", Table: "y"} require.Equal(t, snap.tableNameToID[t1], int64(13)) require.Equal(t, snap.tableNameToID[t2], int64(14)) + require.Equal(t, uint64(11112222), snap.currentTs) } func testDoDDLAndCheck(t *testing.T, snap *schemaSnapshot, job *timodel.Job, isErr bool) { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 203ce5fb9b7..e70f1d96cb3 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -937,6 +937,9 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode p.sendError(err) return nil }) + + // FIXME: using GetLastSnapshot here would be confused and get the wrong table name + // after `rename table` DDL, since `rename table` keeps the tableID unchanged var tableName *model.TableName retry.Do(ctx, func() error { //nolint:errcheck if name, ok := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID); ok { diff --git a/tests/integration_tests/rename_tables/conf/diff_config.toml b/tests/integration_tests/rename_tables/conf/diff_config.toml new file mode 100644 index 00000000000..e7ed7a855c4 --- /dev/null +++ b/tests/integration_tests/rename_tables/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/rename_tables/sync_diff/output" + + source-instances = ["tidb0"] + + target-instance = "mysql1" + + target-check-tables = ["rename_tables_test.?*"] + +[data-sources] +[data-sources.tidb0] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.mysql1] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/rename_tables/data/test.sql b/tests/integration_tests/rename_tables/data/test.sql new file mode 100644 index 00000000000..5f596b11708 --- /dev/null +++ b/tests/integration_tests/rename_tables/data/test.sql @@ -0,0 +1,53 @@ +drop database if exists `rename_tables_test`; +create database `rename_tables_test`; +use `rename_tables_test`; + +create table t1 ( + value64 bigint unsigned not null, + primary key(value64) +); +insert into t1 values(17156792991891826145); +insert into t1 values(91891826145); +delete from t1 where value64=17156792991891826145; +update t1 set value64=17156792991891826; +update t1 set value64=56792991891826; + +rename table t1 to t1_1; + +create table t2 ( + value64 bigint unsigned not null, + primary key(value64) +); +insert into t2 values(17156792991891826145); +insert into t2 values(91891826145); +delete from t2 where value64=91891826145; +update t2 set value64=17156792991891826; +update t2 set value64=56792991891826; + +rename table t2 to t2_2; + +create table t1 ( + value64 bigint unsigned not null, + value32 integer not null, + primary key(value64, value32) +); + +create table t2 ( + value64 bigint unsigned not null, + value32 integer not null, + primary key(value64, value32) +); + +insert into t1 values(17156792991891826145, 1); +insert into t1 values( 9223372036854775807, 2); +insert into t2 values(17156792991891826145, 3); +insert into t2 values( 9223372036854775807, 4); + +rename table t1 to t1_7, t2 to t2_7; + +insert into t1_7 values(91891826145, 5); +insert into t1_7 values(685477580, 6); +insert into t2_7 values(1715679991826145, 7); +insert into t2_7 values(2036854775807, 8); + +create table finish_mark(id int primary key); diff --git a/tests/integration_tests/rename_tables/run.sh b/tests/integration_tests/rename_tables/run.sh new file mode 100755 index 00000000000..4110a349cae --- /dev/null +++ b/tests/integration_tests/rename_tables/run.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="ticdc-rename-tables-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + fi + run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + + check_table_exists rename_tables_test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + echo "check table exists success" + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 60 + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"