From 48bcbfb23aab4153a4fce6a5725c8bb850fcf8fc Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 1 Mar 2024 18:05:02 +0800 Subject: [PATCH] dm: fix update dml loss if binary column is primary key (#10685) (#10692) close pingcap/tiflow#10672 --- dm/syncer/dml.go | 11 ++++++++ dm/tests/many_tables/run.sh | 3 +-- .../conf/single-source-no-sharding.yaml | 1 + dm/tests/shardddl1_1/run.sh | 25 +++++++++++++++++++ 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/dm/syncer/dml.go b/dm/syncer/dml.go index 0270c61bf5b..531d436027e 100644 --- a/dm/syncer/dml.go +++ b/dm/syncer/dml.go @@ -97,8 +97,19 @@ func adjustValueFromBinlogData( } } case string: + isBinary := columns[i].GetType() == mysql.TypeString && mysql.HasBinaryFlag(columns[i].GetFlag()) isGBK := columns[i].GetCharset() == charset.CharsetGBK || columns[i].GetCharset() == "" && sourceTI.Charset == charset.CharsetGBK switch { + case isBinary: + // convert string to []byte so that go-sql-driver/mysql can use _binary'value' for DML + d = []byte(v) + // if column is binary and value length is less than column length, we need to pad the value with 0x00 + // ref: https://dev.mysql.com/doc/refman/8.0/en/binary-varbinary.html + valLen := columns[i].FieldType.GetFlen() + if valLen != types.UnspecifiedLength && valLen > len(v) { + padding := make([]byte, valLen-len(v)) + d = append(d.([]byte), padding...) + } case isGBK: // convert string to []byte so that go-sql-driver/mysql can use _binary'value' for DML d = []byte(v) diff --git a/dm/tests/many_tables/run.sh b/dm/tests/many_tables/run.sh index 61f31c92762..c5fcd01319d 100644 --- a/dm/tests/many_tables/run.sh +++ b/dm/tests/many_tables/run.sh @@ -100,8 +100,7 @@ function run() { check_sync_diff $WORK_DIR $cur/conf/diff_config.toml check_metric $WORKER1_PORT 'lightning_tables{result="success",source_id="mysql-replica-01",state="completed",task="test"}' 1 $(($TABLE_NUM - 1)) $(($TABLE_NUM + 1)) - run_sql_tidb "select count(*) from dm_meta.test_syncer_checkpoint" - check_contains "count(*): $(($TABLE_NUM + 1))" + run_sql_tidb_with_retry "select count(*) from dm_meta.test_syncer_checkpoint" "count(*): $(($TABLE_NUM + 1))" check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'Error 8004 (HY000): Transaction is too large' diff --git a/dm/tests/shardddl1_1/conf/single-source-no-sharding.yaml b/dm/tests/shardddl1_1/conf/single-source-no-sharding.yaml index 90987e1399b..0561ed696b1 100644 --- a/dm/tests/shardddl1_1/conf/single-source-no-sharding.yaml +++ b/dm/tests/shardddl1_1/conf/single-source-no-sharding.yaml @@ -48,3 +48,4 @@ syncers: global: worker-count: 16 batch: 100 + safe-mode-duration: 0 diff --git a/dm/tests/shardddl1_1/run.sh b/dm/tests/shardddl1_1/run.sh index 5d866ea71fc..0067ed37ed8 100644 --- a/dm/tests/shardddl1_1/run.sh +++ b/dm/tests/shardddl1_1/run.sh @@ -498,10 +498,35 @@ function DM_SAME_DDL_TWICE() { run_case SAME_DDL_TWICE "double-source-pessimistic" "init_table 111 211 212" "clean_table" "pessimistic" } +function DM_BINARY_COLUMN_CASE() { + run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b) values(1,0xBF500C00A2034521B819D6EB7065D200)" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_sql_source1 "update ${shardddl1}.${tb1} set a=2 where b=0xBF500C00A2034521B819D6EB7065D200" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_sql_source1 "delete from ${shardddl1}.${tb1} where b=0xBF500C00A2034521B819D6EB7065D200" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # padding by mysql + run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b) values(1,0xBF500C00A2034521B819D6EB7065D2)" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_sql_source1 "update ${shardddl1}.${tb1} set a=2 where b=0xBF500C00A2034521B819D6EB7065D200" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_sql_source1 "delete from ${shardddl1}.${tb1} where b=0xBF500C00A2034521B819D6EB7065D200" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb};" "count(1): 0" +} + +function DM_BINARY_COLUMN() { + run_case BINARY_COLUMN "single-source-no-sharding" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b binary(16) primary key);\"" \ + "clean_table" "" +} + function run() { init_cluster init_database DM_SAME_DDL_TWICE + DM_BINARY_COLUMN start=6 end=35 except=(024 025 029)