Skip to content

Commit

Permalink
lightning: add PK to internal tables (#57480)
Browse files Browse the repository at this point in the history
close #57479
  • Loading branch information
lance6716 authored Nov 22, 2024
1 parent d42a36d commit eb871f8
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 140 deletions.
30 changes: 15 additions & 15 deletions lightning/tests/lightning_config_max_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ duplicated_row_count=$(( ${total_row_count} - ${uniq_row_count} ))
remaining_row_count=$(( ${uniq_row_count} + ${duplicated_row_count}/2 ))

run_sql 'DROP TABLE IF EXISTS mytest.testtbl'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4'
run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view'

stderr_file="/tmp/${TEST_NAME}.stderr"
Expand All @@ -47,7 +47,7 @@ EOF
cat "${stderr_file}"
grep -q "${err_msg}" "${stderr_file}"

run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v4'
# Although conflict error number exceeds the max-error limit,
# all the conflict errors are recorded,
# because recording of conflict errors are executed batch by batch (batch size 1024),
Expand All @@ -57,12 +57,12 @@ check_contains "COUNT(*): ${duplicated_row_count}"
# import a second time

run_sql 'DROP TABLE IF EXISTS mytest.testtbl'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4'
run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view'

run_lightning --backend local --config "${mydir}/normal_config.toml"

run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v4'
check_contains "COUNT(*): ${duplicated_row_count}"

# Check remaining records in the target table
Expand All @@ -72,40 +72,40 @@ check_contains "COUNT(*): ${remaining_row_count}"
# import a third time

run_sql 'DROP TABLE IF EXISTS mytest.testtbl'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4'
run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view'

run_lightning --backend local --config "${mydir}/normal_config_old_style.toml"

run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v4'
check_contains "COUNT(*): ${duplicated_row_count}"

# Check remaining records in the target table
run_sql 'SELECT COUNT(*) FROM mytest.testtbl'
check_contains "COUNT(*): ${remaining_row_count}"

# import a fourth time
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_records'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_records_v2'
run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view'
! run_lightning --backend local --config "${mydir}/ignore_config.toml"
[ $? -eq 0 ]
tail -n 10 $TEST_DIR/lightning.log | grep "ERROR" | tail -n 1 | grep -Fq "[Lightning:Config:ErrInvalidConfig]conflict.strategy cannot be set to \\\"ignore\\\" when use tikv-importer.backend = \\\"local\\\""

# Check tidb backend record duplicate entry in conflict_records table
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_records'
# Check tidb backend record duplicate entry in conflict_records_v2 table
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_records_v2'
run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view'
run_lightning --backend tidb --config "${mydir}/tidb.toml"
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records_v2'
check_contains "COUNT(*): 15"
run_sql 'SELECT * FROM lightning_task_info.conflict_records WHERE offset = 149'
run_sql 'SELECT * FROM lightning_task_info.conflict_records_v2 WHERE offset = 149'
check_contains "error: Error 1062 (23000): Duplicate entry '5' for key 'testtbl.PRIMARY'"
check_contains "row_data: ('5','bbb05')"

# Check max-error-record can limit the size of conflict_records table
# Check max-error-record can limit the size of conflict_records_v2 table
run_sql 'DROP DATABASE IF EXISTS lightning_task_info'
run_sql 'DROP DATABASE IF EXISTS mytest'
run_lightning --backend tidb --config "${mydir}/tidb-limit-record.toml" 2>&1 | grep "\`lightning_task_info\`.\`conflict_view\`" | grep -q "5"
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records_v2'
check_contains "COUNT(*): 5"

# Check conflict.threshold
Expand All @@ -121,8 +121,8 @@ run_sql 'DROP DATABASE IF EXISTS mytest'
rm "${TEST_DIR}/lightning.log"
run_lightning --backend tidb --config "${mydir}/tidb-error.toml" 2>&1 | grep -q "Error 1062 (23000): Duplicate entry '1' for key 'testtbl.PRIMARY'"
check_contains "Error 1062 (23000): Duplicate entry '1' for key 'testtbl.PRIMARY'" "${TEST_DIR}/lightning.log"
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records_v2'
check_contains "COUNT(*): 1"
run_sql 'SELECT * FROM lightning_task_info.conflict_records'
run_sql 'SELECT * FROM lightning_task_info.conflict_records_v2'
check_contains "error: Error 1062 (23000): Duplicate entry '1' for key 'testtbl.PRIMARY'"
check_contains "row_data: ('1','bbb01')"
2 changes: 1 addition & 1 deletion lightning/tests/lightning_duplicate_detection/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ verify_detected_rows() {
done
done
mapfile -t expect_rows < <(for row in "${expect_rows[@]}"; do echo "$row"; done | sort | uniq)
mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v3 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" |
mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v4 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" |
grep "row_data:" | sed 's/^.*(//' | sed 's/).*$//' | sed 's/"//g' | sed 's/, */,/g' | sort | uniq)
equal=0
if [ "${#actual_rows[@]}" = "${#expect_rows[@]}" ]; then
Expand Down
14 changes: 7 additions & 7 deletions lightning/tests/lightning_duplicate_detection_new/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ if [ "$expected_rows" != "$actual_rows" ] || [ "$expected_pks" != "$actual_pks"
echo "local backend replace strategy result is not equal to tidb backend"
exit 1
fi
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records_v2"
check_contains "count(*): 227"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records WHERE error = ''"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records_v2 WHERE error = ''"
check_contains "count(*): 0"
run_sql "SELECT * FROM lightning_task_info.conflict_records WHERE row_id = 12"
run_sql "SELECT * FROM lightning_task_info.conflict_records_v2 WHERE row_id = 12"
check_contains "(171,'yRxZE',9201592769833450947,'xs3d',5,4,283270321)"
check_contains "[kv:1062]Duplicate entry '171' for key 'dup_detect.PRIMARY'"
run_sql "SELECT * FROM lightning_task_info.conflict_records WHERE row_id = 1"
run_sql "SELECT * FROM lightning_task_info.conflict_records_v2 WHERE row_id = 1"
check_contains "(87,'nEoKu',7836621565948506759,'y6',48,0,177543185)"
check_contains "[kv:1062]Duplicate entry '0-177543185' for key 'dup_detect.uniq_col6_col7'"

Expand All @@ -61,11 +61,11 @@ expected_pks=$(run_sql "SELECT group_concat(col1 ORDER BY col1) AS pks FROM test
cleanup
run_lightning --backend local --config "$CUR/local-error.toml" --log-file "$LOG_FILE" 2>&1 | grep -q "duplicate key in table \`test\`.\`dup_detect\` caused by index .*, but because checkpoint is off we can't have more details"
grep -q "duplicate key in table \`test\`.\`dup_detect\` caused by index .*, but because checkpoint is off we can't have more details" "$LOG_FILE"
run_sql "SELECT * FROM lightning_task_info.conflict_records"
run_sql "SELECT * FROM lightning_task_info.conflict_records_v2"
check_contains "error: duplicate key in table \`test\`.\`dup_detect\`"
run_lightning --backend local --config "$CUR/local-error.toml" --log-file "$LOG_FILE" --enable-checkpoint=1 2>&1 | grep -q "duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)"
grep -q "duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)" "$LOG_FILE"
run_sql "SELECT * FROM lightning_task_info.conflict_records"
run_sql "SELECT * FROM lightning_task_info.conflict_records_v2"
check_contains "error: duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are"
check_contains "restore table \`test\`.\`dup_detect\` failed: duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)" "$LOG_FILE"
run_lightning_ctl --enable-checkpoint=1 --backend local --config "$CUR/local-error.toml" --checkpoint-error-destroy="\`test\`.\`dup_detect\`"
Expand All @@ -89,6 +89,6 @@ run_lightning_ctl --enable-checkpoint=1 --backend local --config "$CUR/local-rep
run_lightning --enable-checkpoint=1 --backend local --config "$CUR/local-replace.toml" --log-file "$LOG_FILE"
run_sql "SELECT count(*) FROM test.dup_detect"
check_contains "count(*): 174"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records_v2"
check_contains "count(*): 227"
check_not_contains "duplicate detection start" "$LOG_FILE"
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0
mydir=$(dirname "${BASH_SOURCE[0]}")

run_sql 'DROP TABLE IF EXISTS dup_resolve.a'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4'
run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view'

! run_lightning --backend local --config "${mydir}/config.toml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0
mydir=$(dirname "${BASH_SOURCE[0]}")

run_sql 'DROP TABLE IF EXISTS dup_resolve.a'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4'
run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view'

! run_lightning --backend local --config "${mydir}/config.toml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0
mydir=$(dirname "${BASH_SOURCE[0]}")

run_sql 'DROP TABLE IF EXISTS dup_resolve.a'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4'
run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view'

! run_lightning --backend local --config "${mydir}/config.toml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0
mydir=$(dirname "${BASH_SOURCE[0]}")

run_sql 'DROP TABLE IF EXISTS dup_resolve.a'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v4'
run_sql 'DROP VIEW IF EXISTS lightning_task_info.conflict_view'

! run_lightning --backend local --config "${mydir}/config.toml"
Expand Down
4 changes: 2 additions & 2 deletions lightning/tests/lightning_duplicate_resolution_merge/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ run_sql 'admin check table dup_resolve.a'
run_sql 'select count(*) from dup_resolve.a'
check_contains 'count(*): 10'

run_sql 'select count(*) from lightning_task_info.conflict_records'
run_sql 'select count(*) from lightning_task_info.conflict_records_v2'
check_contains 'count(*): 16'

run_sql 'select count(*) from lightning_task_info.conflict_error_v3'
run_sql 'select count(*) from lightning_task_info.conflict_error_v4'
check_contains 'count(*): 4'

run_sql 'select count(*) from lightning_task_info.conflict_view'
Expand Down
10 changes: 5 additions & 5 deletions lightning/tests/lightning_sqlmode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,28 @@ run_sql 'SELECT min(id), max(id) FROM sqlmodedb.t'
check_contains 'min(id): 4'
check_contains 'max(id): 4'

run_sql 'SELECT count(*) FROM sqlmodedb_lightning_task_info.type_error_v1'
run_sql 'SELECT count(*) FROM sqlmodedb_lightning_task_info.type_error_v2'
check_contains 'count(*): 4'

run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(1,%";'
run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(1,%";'
check_contains 'path: sqlmodedb.t.1.sql'
check_contains 'offset: 53'
check_contains 'cannot convert datum from unsigned bigint to type timestamp.'
check_contains "row_data: (1,9,128,'too long','x,y,z')"

run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(2,%";'
run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(2,%";'
check_contains 'path: sqlmodedb.t.1.sql'
check_contains 'offset: 100'
check_contains "Incorrect timestamp value: '2000-00-00 00:00:00'"
check_contains "row_data: (2,'2000-00-00 00:00:00',-99999,'🤩',3)"

run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(3,%";'
run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(3,%";'
check_contains 'path: sqlmodedb.t.1.sql'
check_contains 'offset: 149'
check_contains "Incorrect timestamp value: '9999-12-31 23:59:59'"
check_contains "row_data: (3,'9999-12-31 23:59:59','NaN',x'99','x+y')"

run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(5,%";'
run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(5,%";'
check_contains 'path: sqlmodedb.t.1.sql'
check_contains 'offset: 237'
check_contains "Column 'a' cannot be null"
Expand Down
2 changes: 1 addition & 1 deletion lightning/tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ strategy = ""
# precheck-conflict-before-import = false
# Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when the strategy is "replace" or "ignore". The default value is 10000. If you set a value larger than 10000, the import process might experience performance degradation.
# threshold = 10000
# Controls the maximum number of records in the `conflict_records` table. The default value is 10000.
# Controls the maximum number of records in the `conflict_records_v2` table. The default value is 10000.
# Starting from v8.1.0, there is no need to configure `max-record-rows` manually, because TiDB Lightning automatically assigns the value of `max-record-rows` with the value of `threshold`, regardless of the user input. `max-record-rows` will be deprecated in a future release.
# In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded.
# In the logical import mode, if the strategy is "ignore", the conflict records that are ignored are recorded; if the strategy is "replace", the conflict records are not recorded.
Expand Down
22 changes: 11 additions & 11 deletions pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,35 +606,35 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) {
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "10.csv", int64(0), nonRetryableError.Error(), "(4)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(5)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "11.csv", int64(0), nonRetryableError.Error(), "(5)").
WillReturnResult(driver.ResultNoRows)

Expand Down Expand Up @@ -671,21 +671,21 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) {
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
Expand Down Expand Up @@ -732,7 +732,7 @@ func TestWriteRowsRecordOneError(t *testing.T) {
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(dupErr)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.conflict_records.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.conflict_records_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), dupErr.Error(), 0, "(2)").
WillReturnResult(driver.ResultNoRows)

Expand Down Expand Up @@ -1045,7 +1045,7 @@ func TestLogicalImportBatchPrepStmt(t *testing.T) {

// TestWriteRowsRecordOneErrorPrepStmt tests that when LogicalImportPrepStmt is true and the batch insert fails,
// it will fallback to a single row insert,
// the error will be recorded in tidb_lightning_errors.conflict_records.
// the error will be recorded in tidb_lightning_errors.conflict_records_v2.
func TestWriteRowsRecordOneErrorPrepStmt(t *testing.T) {
dupErr := &gmysql.MySQLError{Number: errno.ErrDupEntry, Message: "Duplicate entry '2' for key 'PRIMARY'"}
s := createMysqlSuite(t)
Expand All @@ -1068,7 +1068,7 @@ func TestWriteRowsRecordOneErrorPrepStmt(t *testing.T) {
WithArgs(2).
WillReturnError(dupErr)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.conflict_records.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.conflict_records_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), dupErr.Error(), 0, "(2)").
WillReturnResult(driver.ResultNoRows)

Expand Down
Loading

0 comments on commit eb871f8

Please sign in to comment.