From eb871f862e059832533f1abc6b9b1b3f0957a780 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 22 Nov 2024 10:29:50 +0800 Subject: [PATCH] lightning: add PK to internal tables (#57480) close pingcap/tidb#57479 --- .../tests/lightning_config_max_error/run.sh | 30 +++--- .../lightning_duplicate_detection/run.sh | 2 +- .../lightning_duplicate_detection_new/run.sh | 14 +-- .../run.sh | 2 +- .../run.sh | 2 +- .../run.sh | 2 +- .../run.sh | 2 +- .../run.sh | 4 +- lightning/tests/lightning_sqlmode/run.sh | 10 +- lightning/tidb-lightning.toml | 2 +- pkg/lightning/backend/tidb/tidb_test.go | 22 ++--- pkg/lightning/errormanager/errormanager.go | 24 +++-- .../errormanager/errormanager_test.go | 72 +++++++------- .../errormanager/resolveconflict_test.go | 96 +++++++++---------- 14 files changed, 144 insertions(+), 140 deletions(-) diff --git a/lightning/tests/lightning_config_max_error/run.sh b/lightning/tests/lightning_config_max_error/run.sh index d0464245a573e..74419dddae718 100755 --- a/lightning/tests/lightning_config_max_error/run.sh +++ b/lightning/tests/lightning_config_max_error/run.sh @@ -28,7 +28,7 @@ duplicated_row_count=$(( ${total_row_count} - ${uniq_row_count} )) remaining_row_count=$(( ${uniq_row_count} + ${duplicated_row_count}/2 )) run_sql 'DROP TABLE IF EXISTS mytest.testtbl' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4' run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view' stderr_file="/tmp/${TEST_NAME}.stderr" @@ -47,7 +47,7 @@ EOF cat "${stderr_file}" grep -q "${err_msg}" "${stderr_file}" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v4' # Although conflict error number exceeds the max-error limit, # all the conflict errors are recorded, # because recording of conflict errors are executed batch by batch (batch size 1024), @@ -57,12 +57,12 @@ check_contains "COUNT(*): ${duplicated_row_count}" # import a second time run_sql 'DROP TABLE IF EXISTS mytest.testtbl' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4' run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view' run_lightning --backend local --config "${mydir}/normal_config.toml" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v4' check_contains "COUNT(*): ${duplicated_row_count}" # Check remaining records in the target table @@ -72,12 +72,12 @@ check_contains "COUNT(*): ${remaining_row_count}" # import a third time run_sql 'DROP TABLE IF EXISTS mytest.testtbl' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4' run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view' run_lightning --backend local --config "${mydir}/normal_config_old_style.toml" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v4' check_contains "COUNT(*): ${duplicated_row_count}" # Check remaining records in the target table @@ -85,27 +85,27 @@ run_sql 'SELECT COUNT(*) FROM mytest.testtbl' check_contains "COUNT(*): ${remaining_row_count}" # import a fourth time -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_records' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_records_v2' run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view' ! run_lightning --backend local --config "${mydir}/ignore_config.toml" [ $? -eq 0 ] tail -n 10 $TEST_DIR/lightning.log | grep "ERROR" | tail -n 1 | grep -Fq "[Lightning:Config:ErrInvalidConfig]conflict.strategy cannot be set to \\\"ignore\\\" when use tikv-importer.backend = \\\"local\\\"" -# Check tidb backend record duplicate entry in conflict_records table -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_records' +# Check tidb backend record duplicate entry in conflict_records_v2 table +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_records_v2' run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view' run_lightning --backend tidb --config "${mydir}/tidb.toml" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records_v2' check_contains "COUNT(*): 15" -run_sql 'SELECT * FROM lightning_task_info.conflict_records WHERE offset = 149' +run_sql 'SELECT * FROM lightning_task_info.conflict_records_v2 WHERE offset = 149' check_contains "error: Error 1062 (23000): Duplicate entry '5' for key 'testtbl.PRIMARY'" check_contains "row_data: ('5','bbb05')" -# Check max-error-record can limit the size of conflict_records table +# Check max-error-record can limit the size of conflict_records_v2 table run_sql 'DROP DATABASE IF EXISTS lightning_task_info' run_sql 'DROP DATABASE IF EXISTS mytest' run_lightning --backend tidb --config "${mydir}/tidb-limit-record.toml" 2>&1 | grep "\`lightning_task_info\`.\`conflict_view\`" | grep -q "5" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records_v2' check_contains "COUNT(*): 5" # Check conflict.threshold @@ -121,8 +121,8 @@ run_sql 'DROP DATABASE IF EXISTS mytest' rm "${TEST_DIR}/lightning.log" run_lightning --backend tidb --config "${mydir}/tidb-error.toml" 2>&1 | grep -q "Error 1062 (23000): Duplicate entry '1' for key 'testtbl.PRIMARY'" check_contains "Error 1062 (23000): Duplicate entry '1' for key 'testtbl.PRIMARY'" "${TEST_DIR}/lightning.log" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records_v2' check_contains "COUNT(*): 1" -run_sql 'SELECT * FROM lightning_task_info.conflict_records' +run_sql 'SELECT * FROM lightning_task_info.conflict_records_v2' check_contains "error: Error 1062 (23000): Duplicate entry '1' for key 'testtbl.PRIMARY'" check_contains "row_data: ('1','bbb01')" diff --git a/lightning/tests/lightning_duplicate_detection/run.sh b/lightning/tests/lightning_duplicate_detection/run.sh index ec9effd73a82d..46f8ab497d16a 100644 --- a/lightning/tests/lightning_duplicate_detection/run.sh +++ b/lightning/tests/lightning_duplicate_detection/run.sh @@ -53,7 +53,7 @@ verify_detected_rows() { done done mapfile -t expect_rows < <(for row in "${expect_rows[@]}"; do echo "$row"; done | sort | uniq) - mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v3 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" | + mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v4 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" | grep "row_data:" | sed 's/^.*(//' | sed 's/).*$//' | sed 's/"//g' | sed 's/, */,/g' | sort | uniq) equal=0 if [ "${#actual_rows[@]}" = "${#expect_rows[@]}" ]; then diff --git a/lightning/tests/lightning_duplicate_detection_new/run.sh b/lightning/tests/lightning_duplicate_detection_new/run.sh index 902d0a6b08271..8d4d6e8ca34ec 100755 --- a/lightning/tests/lightning_duplicate_detection_new/run.sh +++ b/lightning/tests/lightning_duplicate_detection_new/run.sh @@ -40,14 +40,14 @@ if [ "$expected_rows" != "$actual_rows" ] || [ "$expected_pks" != "$actual_pks" echo "local backend replace strategy result is not equal to tidb backend" exit 1 fi -run_sql "SELECT count(*) FROM lightning_task_info.conflict_records" +run_sql "SELECT count(*) FROM lightning_task_info.conflict_records_v2" check_contains "count(*): 227" -run_sql "SELECT count(*) FROM lightning_task_info.conflict_records WHERE error = ''" +run_sql "SELECT count(*) FROM lightning_task_info.conflict_records_v2 WHERE error = ''" check_contains "count(*): 0" -run_sql "SELECT * FROM lightning_task_info.conflict_records WHERE row_id = 12" +run_sql "SELECT * FROM lightning_task_info.conflict_records_v2 WHERE row_id = 12" check_contains "(171,'yRxZE',9201592769833450947,'xs3d',5,4,283270321)" check_contains "[kv:1062]Duplicate entry '171' for key 'dup_detect.PRIMARY'" -run_sql "SELECT * FROM lightning_task_info.conflict_records WHERE row_id = 1" +run_sql "SELECT * FROM lightning_task_info.conflict_records_v2 WHERE row_id = 1" check_contains "(87,'nEoKu',7836621565948506759,'y6',48,0,177543185)" check_contains "[kv:1062]Duplicate entry '0-177543185' for key 'dup_detect.uniq_col6_col7'" @@ -61,11 +61,11 @@ expected_pks=$(run_sql "SELECT group_concat(col1 ORDER BY col1) AS pks FROM test cleanup run_lightning --backend local --config "$CUR/local-error.toml" --log-file "$LOG_FILE" 2>&1 | grep -q "duplicate key in table \`test\`.\`dup_detect\` caused by index .*, but because checkpoint is off we can't have more details" grep -q "duplicate key in table \`test\`.\`dup_detect\` caused by index .*, but because checkpoint is off we can't have more details" "$LOG_FILE" -run_sql "SELECT * FROM lightning_task_info.conflict_records" +run_sql "SELECT * FROM lightning_task_info.conflict_records_v2" check_contains "error: duplicate key in table \`test\`.\`dup_detect\`" run_lightning --backend local --config "$CUR/local-error.toml" --log-file "$LOG_FILE" --enable-checkpoint=1 2>&1 | grep -q "duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)" grep -q "duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)" "$LOG_FILE" -run_sql "SELECT * FROM lightning_task_info.conflict_records" +run_sql "SELECT * FROM lightning_task_info.conflict_records_v2" check_contains "error: duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are" check_contains "restore table \`test\`.\`dup_detect\` failed: duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)" "$LOG_FILE" run_lightning_ctl --enable-checkpoint=1 --backend local --config "$CUR/local-error.toml" --checkpoint-error-destroy="\`test\`.\`dup_detect\`" @@ -89,6 +89,6 @@ run_lightning_ctl --enable-checkpoint=1 --backend local --config "$CUR/local-rep run_lightning --enable-checkpoint=1 --backend local --config "$CUR/local-replace.toml" --log-file "$LOG_FILE" run_sql "SELECT count(*) FROM test.dup_detect" check_contains "count(*): 174" -run_sql "SELECT count(*) FROM lightning_task_info.conflict_records" +run_sql "SELECT count(*) FROM lightning_task_info.conflict_records_v2" check_contains "count(*): 227" check_not_contains "duplicate detection start" "$LOG_FILE" diff --git a/lightning/tests/lightning_duplicate_resolution_error/run.sh b/lightning/tests/lightning_duplicate_resolution_error/run.sh index bd2978802923f..f468fff3ce9c0 100644 --- a/lightning/tests/lightning_duplicate_resolution_error/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_error/run.sh @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0 mydir=$(dirname "${BASH_SOURCE[0]}") run_sql 'DROP TABLE IF EXISTS dup_resolve.a' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4' run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view' ! run_lightning --backend local --config "${mydir}/config.toml" diff --git a/lightning/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh b/lightning/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh index e6988d22b9699..690ddbd18a871 100644 --- a/lightning/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0 mydir=$(dirname "${BASH_SOURCE[0]}") run_sql 'DROP TABLE IF EXISTS dup_resolve.a' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4' run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view' ! run_lightning --backend local --config "${mydir}/config.toml" diff --git a/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh b/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh index e346b3961977b..5ef374f9b1dab 100644 --- a/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0 mydir=$(dirname "${BASH_SOURCE[0]}") run_sql 'DROP TABLE IF EXISTS dup_resolve.a' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4' run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view' ! run_lightning --backend local --config "${mydir}/config.toml" diff --git a/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh b/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh index 02b441d5ca058..b433e3c225510 100644 --- a/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0 mydir=$(dirname "${BASH_SOURCE[0]}") run_sql 'DROP TABLE IF EXISTS dup_resolve.a' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4' run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view' ! run_lightning --backend local --config "${mydir}/config.toml" diff --git a/lightning/tests/lightning_duplicate_resolution_merge/run.sh b/lightning/tests/lightning_duplicate_resolution_merge/run.sh index 9060067362876..2891c6ab3bc03 100644 --- a/lightning/tests/lightning_duplicate_resolution_merge/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_merge/run.sh @@ -42,10 +42,10 @@ run_sql 'admin check table dup_resolve.a' run_sql 'select count(*) from dup_resolve.a' check_contains 'count(*): 10' -run_sql 'select count(*) from lightning_task_info.conflict_records' +run_sql 'select count(*) from lightning_task_info.conflict_records_v2' check_contains 'count(*): 16' -run_sql 'select count(*) from lightning_task_info.conflict_error_v3' +run_sql 'select count(*) from lightning_task_info.conflict_error_v4' check_contains 'count(*): 4' run_sql 'select count(*) from lightning_task_info.conflict_view' diff --git a/lightning/tests/lightning_sqlmode/run.sh b/lightning/tests/lightning_sqlmode/run.sh index e12e409b1b909..5c77f690d9e26 100755 --- a/lightning/tests/lightning_sqlmode/run.sh +++ b/lightning/tests/lightning_sqlmode/run.sh @@ -61,28 +61,28 @@ run_sql 'SELECT min(id), max(id) FROM sqlmodedb.t' check_contains 'min(id): 4' check_contains 'max(id): 4' -run_sql 'SELECT count(*) FROM sqlmodedb_lightning_task_info.type_error_v1' +run_sql 'SELECT count(*) FROM sqlmodedb_lightning_task_info.type_error_v2' check_contains 'count(*): 4' -run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(1,%";' +run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(1,%";' check_contains 'path: sqlmodedb.t.1.sql' check_contains 'offset: 53' check_contains 'cannot convert datum from unsigned bigint to type timestamp.' check_contains "row_data: (1,9,128,'too long','x,y,z')" -run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(2,%";' +run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(2,%";' check_contains 'path: sqlmodedb.t.1.sql' check_contains 'offset: 100' check_contains "Incorrect timestamp value: '2000-00-00 00:00:00'" check_contains "row_data: (2,'2000-00-00 00:00:00',-99999,'🤩',3)" -run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(3,%";' +run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(3,%";' check_contains 'path: sqlmodedb.t.1.sql' check_contains 'offset: 149' check_contains "Incorrect timestamp value: '9999-12-31 23:59:59'" check_contains "row_data: (3,'9999-12-31 23:59:59','NaN',x'99','x+y')" -run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(5,%";' +run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(5,%";' check_contains 'path: sqlmodedb.t.1.sql' check_contains 'offset: 237' check_contains "Column 'a' cannot be null" diff --git a/lightning/tidb-lightning.toml b/lightning/tidb-lightning.toml index df7c8842e7f59..bb5530ad99a30 100644 --- a/lightning/tidb-lightning.toml +++ b/lightning/tidb-lightning.toml @@ -103,7 +103,7 @@ strategy = "" # precheck-conflict-before-import = false # Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when the strategy is "replace" or "ignore". The default value is 10000. If you set a value larger than 10000, the import process might experience performance degradation. # threshold = 10000 -# Controls the maximum number of records in the `conflict_records` table. The default value is 10000. +# Controls the maximum number of records in the `conflict_records_v2` table. The default value is 10000. # Starting from v8.1.0, there is no need to configure `max-record-rows` manually, because TiDB Lightning automatically assigns the value of `max-record-rows` with the value of `threshold`, regardless of the user input. `max-record-rows` will be deprecated in a future release. # In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded. # In the logical import mode, if the strategy is "ignore", the conflict records that are ignored are recorded; if the strategy is "replace", the conflict records are not recorded. diff --git a/pkg/lightning/backend/tidb/tidb_test.go b/pkg/lightning/backend/tidb/tidb_test.go index c8e3490ba7a29..2170a65df00c8 100644 --- a/pkg/lightning/backend/tidb/tidb_test.go +++ b/pkg/lightning/backend/tidb/tidb_test.go @@ -606,35 +606,35 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) { ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "10.csv", int64(0), nonRetryableError.Error(), "(4)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(5)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "11.csv", int64(0), nonRetryableError.Error(), "(5)"). WillReturnResult(driver.ResultNoRows) @@ -671,21 +671,21 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) { ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)"). WillReturnResult(driver.ResultNoRows) s.mockDB. ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E"). WillReturnError(nonRetryableError) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)"). WillReturnResult(driver.ResultNoRows) // the forth row will exceed the error threshold, won't record this error @@ -732,7 +732,7 @@ func TestWriteRowsRecordOneError(t *testing.T) { ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E"). WillReturnError(dupErr) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.conflict_records.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.conflict_records_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), dupErr.Error(), 0, "(2)"). WillReturnResult(driver.ResultNoRows) @@ -1045,7 +1045,7 @@ func TestLogicalImportBatchPrepStmt(t *testing.T) { // TestWriteRowsRecordOneErrorPrepStmt tests that when LogicalImportPrepStmt is true and the batch insert fails, // it will fallback to a single row insert, -// the error will be recorded in tidb_lightning_errors.conflict_records. +// the error will be recorded in tidb_lightning_errors.conflict_records_v2. func TestWriteRowsRecordOneErrorPrepStmt(t *testing.T) { dupErr := &gmysql.MySQLError{Number: errno.ErrDupEntry, Message: "Duplicate entry '2' for key 'PRIMARY'"} s := createMysqlSuite(t) @@ -1068,7 +1068,7 @@ func TestWriteRowsRecordOneErrorPrepStmt(t *testing.T) { WithArgs(2). WillReturnError(dupErr) s.mockDB. - ExpectExec("INSERT INTO `tidb_lightning_errors`\\.conflict_records.*"). + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.conflict_records_v2.*"). WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), dupErr.Error(), 0, "(2)"). WillReturnResult(driver.ResultNoRows) diff --git a/pkg/lightning/errormanager/errormanager.go b/pkg/lightning/errormanager/errormanager.go index f472f381d3b0d..cec0bc4cc352a 100644 --- a/pkg/lightning/errormanager/errormanager.go +++ b/pkg/lightning/errormanager/errormanager.go @@ -52,17 +52,18 @@ const ( CREATE SCHEMA IF NOT EXISTS %s; ` - syntaxErrorTableName = "syntax_error_v1" - typeErrorTableName = "type_error_v1" + syntaxErrorTableName = "syntax_error_v2" + typeErrorTableName = "type_error_v2" // ConflictErrorTableName is the table name for duplicate detection. - ConflictErrorTableName = "conflict_error_v3" + ConflictErrorTableName = "conflict_error_v4" // DupRecordTableName is the table name to record duplicate data that displayed to user. - DupRecordTableName = "conflict_records" + DupRecordTableName = "conflict_records_v2" // ConflictViewName is the view name for presenting the union information of ConflictErrorTable and DupRecordTable. ConflictViewName = "conflict_view" createSyntaxErrorTable = ` CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` ( + id bigint PRIMARY KEY AUTO_INCREMENT, task_id bigint NOT NULL, create_time datetime(6) NOT NULL DEFAULT now(6), table_name varchar(261) NOT NULL, @@ -75,6 +76,7 @@ const ( createTypeErrorTable = ` CREATE TABLE IF NOT EXISTS %s.` + typeErrorTableName + ` ( + id bigint PRIMARY KEY AUTO_INCREMENT, task_id bigint NOT NULL, create_time datetime(6) NOT NULL DEFAULT now(6), table_name varchar(261) NOT NULL, @@ -87,6 +89,7 @@ const ( createConflictErrorTable = ` CREATE TABLE IF NOT EXISTS %s.` + ConflictErrorTableName + ` ( + id bigint PRIMARY KEY AUTO_INCREMENT, task_id bigint NOT NULL, create_time datetime(6) NOT NULL DEFAULT now(6), table_name varchar(261) NOT NULL, @@ -107,6 +110,7 @@ const ( createDupRecordTableName = ` CREATE TABLE IF NOT EXISTS %s.` + DupRecordTableName + ` ( + id bigint PRIMARY KEY AUTO_INCREMENT, task_id bigint NOT NULL, create_time datetime(6) NOT NULL DEFAULT now(6), table_name varchar(261) NOT NULL, @@ -166,17 +170,17 @@ const ( sqlValuesConflictErrorIndex = "(?,?,?,?,?,?,?,?,?,?)" selectIndexConflictKeysReplace = ` - SELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle + SELECT id, raw_key, index_name, raw_value, raw_handle FROM %s.` + ConflictErrorTableName + ` - WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? - ORDER BY _tidb_rowid LIMIT ?; + WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? + ORDER BY id LIMIT ?; ` selectDataConflictKeysReplace = ` - SELECT _tidb_rowid, raw_key, raw_value + SELECT id, raw_key, raw_value FROM %s.` + ConflictErrorTableName + ` - WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? - ORDER BY _tidb_rowid LIMIT ?; + WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? + ORDER BY id LIMIT ?; ` deleteNullDataRow = ` diff --git a/pkg/lightning/errormanager/errormanager_test.go b/pkg/lightning/errormanager/errormanager_test.go index f823d870e9f32..d6bab3cc58a26 100644 --- a/pkg/lightning/errormanager/errormanager_test.go +++ b/pkg/lightning/errormanager/errormanager_test.go @@ -66,7 +66,7 @@ func TestInit(t *testing.T) { em.conflictV1Enabled = true mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`;"). WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v3.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectExec("CREATE OR REPLACE VIEW `lightning_errors`\\.conflict_view.*"). WillReturnResult(sqlmock.NewResult(3, 1)) @@ -78,11 +78,11 @@ func TestInit(t *testing.T) { em.remainingError.Type.Store(1) mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`.*"). WillReturnResult(sqlmock.NewResult(5, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v1.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v2.*"). WillReturnResult(sqlmock.NewResult(6, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v3.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(7, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_records.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_records_v2.*"). WillReturnResult(sqlmock.NewResult(7, 1)) mock.ExpectExec("CREATE OR REPLACE VIEW `lightning_errors`\\.conflict_view.*"). WillReturnResult(sqlmock.NewResult(7, 1)) @@ -120,7 +120,7 @@ type mockRows struct { } func (r *mockRows) Columns() []string { - return []string{"_tidb_rowid", "raw_handle", "raw_row"} + return []string{"id", "raw_handle", "raw_row"} } func (r *mockRows) Close() error { return nil } @@ -129,7 +129,7 @@ func (r *mockRows) Next(dest []driver.Value) error { if r.start >= r.end { return io.EOF } - dest[0] = r.start // _tidb_rowid + dest[0] = r.start // id dest[1] = []byte{} // raw_handle dest[2] = []byte{} // raw_row r.start++ @@ -137,7 +137,7 @@ func (r *mockRows) Next(dest []driver.Value) error { } func (c mockConn) QueryContext(_ context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { - expectedQuery := "SELECT _tidb_rowid, raw_handle, raw_row.*" + expectedQuery := "SELECT id, raw_handle, raw_row.*" if err := sqlmock.QueryMatcherRegexp.Match(expectedQuery, query); err != nil { return &mockRows{}, nil } @@ -288,22 +288,22 @@ func TestReplaceConflictOneKey(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(2, 1)) mockDB.ExpectExec("CREATE OR REPLACE VIEW `lightning_task_info`\\.conflict_view.*"). WillReturnResult(sqlmock.NewResult(3, 1)) - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"}). AddRow(1, data1RowKey, data1RowValue). AddRow(2, data1RowKey, data2RowValue)) for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"})) } mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() @@ -486,40 +486,40 @@ func TestReplaceConflictOneUniqueKey(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(2, 1)) mockDB.ExpectExec("CREATE OR REPLACE VIEW `lightning_task_info`\\.conflict_view.*"). WillReturnResult(sqlmock.NewResult(3, 1)) - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"}). AddRow(1, data1IndexKey, "uni_b", data1IndexValue, data1RowKey). AddRow(2, data1IndexKey, "uni_b", data2IndexValue, data2RowKey). AddRow(3, data3IndexKey, "uni_b", data3IndexValue, data3RowKey). AddRow(4, data3IndexKey, "uni_b", data4IndexValue, data4RowKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v4.*"). WithArgs(0, "test", nil, nil, data2RowKey, data2RowValue, 2, 0, "test", nil, nil, data4RowKey, data4RowValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"})) } - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"}). AddRow(1, data1RowKey, data1RowValue). AddRow(2, data1RowKey, data3RowValue)) for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"})) } mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 2)) mockDB.ExpectCommit() mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() @@ -641,7 +641,7 @@ func TestErrorMgrErrorOutput(t *testing.T) { "+---+-------------+-------------+--------------------------------+\n" + "| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |\n" + "+---+-------------+-------------+--------------------------------+\n" + - "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 1 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" + + "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 1 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m|\n" + "+---+-------------+-------------+--------------------------------+\n" require.Equal(t, expected, output) @@ -654,8 +654,8 @@ func TestErrorMgrErrorOutput(t *testing.T) { "+---+-------------+-------------+--------------------------------+\n" + "| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |\n" + "+---+-------------+-------------+--------------------------------+\n" + - "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 90 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m|\n" + - "|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 10 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" + + "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 90 \x1b[0m|\x1b[31m `error_info`.`type_error_v2` \x1b[0m|\n" + + "|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 10 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m|\n" + "+---+-------------+-------------+--------------------------------+\n" require.Equal(t, expected, output) @@ -671,8 +671,8 @@ func TestErrorMgrErrorOutput(t *testing.T) { "+---+---------------------+-------------+--------------------------------+\n" + "| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |\n" + "+---+---------------------+-------------+--------------------------------+\n" + - "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m|\n" + - "|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" + + "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v2` \x1b[0m|\n" + + "|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m|\n" + "|\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m|\n" + "|\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_view` \x1b[0m|\n" + "+---+---------------------+-------------+--------------------------------+\n" @@ -686,8 +686,8 @@ func TestErrorMgrErrorOutput(t *testing.T) { "+---+---------------------+-------------+--------------------------------+\n" + "| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |\n" + "+---+---------------------+-------------+--------------------------------+\n" + - "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m|\n" + - "|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" + + "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v2` \x1b[0m|\n" + + "|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m|\n" + "|\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m|\n" + "|\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_view` \x1b[0m|\n" + "+---+---------------------+-------------+--------------------------------+\n" @@ -701,8 +701,8 @@ func TestErrorMgrErrorOutput(t *testing.T) { "+---+---------------------+-------------+--------------------------------+\n" + "| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |\n" + "+---+---------------------+-------------+--------------------------------+\n" + - "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m|\n" + - "|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" + + "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v2` \x1b[0m|\n" + + "|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m|\n" + "|\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m|\n" + "|\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_view` \x1b[0m|\n" + "+---+---------------------+-------------+--------------------------------+\n" diff --git a/pkg/lightning/errormanager/resolveconflict_test.go b/pkg/lightning/errormanager/resolveconflict_test.go index c1f4edffcd840..00dca3d9cfe1e 100644 --- a/pkg/lightning/errormanager/resolveconflict_test.go +++ b/pkg/lightning/errormanager/resolveconflict_test.go @@ -167,40 +167,40 @@ func TestReplaceConflictMultipleKeysNonclusteredPk(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(2, 1)) mockDB.ExpectExec("CREATE OR REPLACE VIEW `lightning_task_info`\\.conflict_view.*"). WillReturnResult(sqlmock.NewResult(3, 1)) - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"}). AddRow(1, data2RowKey, "PRIMARY", data2RowValue, data1RowKey). AddRow(2, data2RowKey, "PRIMARY", data3NonclusteredValue, data2NonclusteredKey). AddRow(3, data6RowKey, "PRIMARY", data6RowValue, data5RowKey). AddRow(4, data6RowKey, "PRIMARY", data7NonclusteredValue, data6NonclusteredKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v4.*"). WithArgs(0, "a", nil, nil, data2NonclusteredKey, data2NonclusteredValue, 2, 0, "a", nil, nil, data6NonclusteredKey, data6NonclusteredValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"})) } - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"}). AddRow(1, data2NonclusteredKey, data2NonclusteredValue). AddRow(2, data6NonclusteredKey, data6NonclusteredValue)) for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"})) } mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 2)) mockDB.ExpectCommit() mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() @@ -352,36 +352,36 @@ func TestReplaceConflictOneKeyNonclusteredPk(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(2, 1)) mockDB.ExpectExec("CREATE OR REPLACE VIEW `lightning_task_info`\\.conflict_view.*"). WillReturnResult(sqlmock.NewResult(3, 1)) - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"}). AddRow(1, data3IndexKey, "PRIMARY", data3IndexValue, data3RowKey). AddRow(2, data3IndexKey, "PRIMARY", data4IndexValue, data4RowKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v4.*"). WithArgs(0, "a", nil, nil, data4RowKey, data4RowValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"})) } - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"}). AddRow(1, data4RowKey, data4RowValue)) for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"})) } mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 1)) mockDB.ExpectCommit() mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() @@ -534,12 +534,12 @@ func TestReplaceConflictOneUniqueKeyNonclusteredPk(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(2, 1)) mockDB.ExpectExec("CREATE OR REPLACE VIEW `lightning_task_info`\\.conflict_view.*"). WillReturnResult(sqlmock.NewResult(3, 1)) - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"}). AddRow(1, data4NonclusteredKey, "uni_b", data4NonclusteredValue, data4RowKey). AddRow(2, data4NonclusteredKey, "uni_b", data5NonclusteredValue, data5RowKey). AddRow(3, data1NonclusteredKey, "uni_b", data1NonclusteredValue, data1RowKey). @@ -547,31 +547,31 @@ func TestReplaceConflictOneUniqueKeyNonclusteredPk(t *testing.T) { AddRow(5, data3IndexKey, "PRIMARY", data3IndexValue, data3RowKey). AddRow(6, data3IndexKey, "PRIMARY", data4NonclusteredValue, data4RowKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v4.*"). WithArgs(0, "a", nil, nil, data5RowKey, data5RowValue, 2, 0, "a", nil, nil, data2RowKey, data2RowValue, 2, 0, "a", nil, nil, data4RowKey, data4RowValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"})) } - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"}). AddRow(1, data5RowKey, data5RowValue). AddRow(2, data2RowKey, data2RowValue). AddRow(3, data4RowKey, data4RowValue)) for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"})) } mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 3)) mockDB.ExpectCommit() mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() @@ -741,12 +741,12 @@ func TestReplaceConflictOneUniqueKeyNonclusteredVarcharPk(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(2, 1)) mockDB.ExpectExec("CREATE OR REPLACE VIEW `lightning_task_info`\\.conflict_view.*"). WillReturnResult(sqlmock.NewResult(3, 1)) - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"}). AddRow(1, data4NonclusteredKey, "uni_b", data4NonclusteredValue, data4RowKey). AddRow(2, data4NonclusteredKey, "uni_b", data5NonclusteredValue, data5RowKey). AddRow(3, data1NonclusteredKey, "uni_b", data1NonclusteredValue, data1RowKey). @@ -754,31 +754,31 @@ func TestReplaceConflictOneUniqueKeyNonclusteredVarcharPk(t *testing.T) { AddRow(5, data3IndexKey, "PRIMARY", data3IndexValue, data3RowKey). AddRow(6, data3IndexKey, "PRIMARY", data4IndexValue, data4RowKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v4.*"). WithArgs(0, "a", nil, nil, data5RowKey, data5RowValue, 2, 0, "a", nil, nil, data2RowKey, data2RowValue, 2, 0, "a", nil, nil, data4RowKey, data4RowValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type = 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "index_name", "raw_value", "raw_handle"})) } - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"}). AddRow(1, data5RowKey, data5RowValue). AddRow(2, data2RowKey, data2RowValue). AddRow(3, data4RowKey, data4RowValue)) for i := 0; i < 2; i++ { - mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). - WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + mockDB.ExpectQuery("\\QSELECT id, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v4 WHERE table_name = ? AND kv_type <> 0 AND id >= ? and id < ? ORDER BY id LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"id", "raw_key", "raw_value"})) } mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 3)) mockDB.ExpectCommit() mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v4.*"). WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit()