Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer(dm): fix corrupt latin1 data when replicating #7027

Merged
merged 9 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions dm/syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/util/filter"
"github.com/shopspring/decimal"
"go.uber.org/zap"
"golang.org/x/text/encoding/charmap"

cdcmodel "github.com/pingcap/tiflow/cdc/model"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
Expand All @@ -43,14 +44,18 @@ type genDMLParam struct {
extendData [][]interface{} // all data include extend data
}

var latin1Decoder = charmap.ISO8859_1.NewDecoder()

// extractValueFromData adjust the values obtained from go-mysql so that
// - the values can be correctly converted to TiDB datum
// - the values are in the correct type that go-sql-driver/mysql uses.
func extractValueFromData(data []interface{}, columns []*model.ColumnInfo, sourceTI *model.TableInfo) []interface{} {
value := make([]interface{}, 0, len(data))
var err error

for i, d := range data {
d = castUnsigned(d, &columns[i].FieldType)
isLatin1 := columns[i].GetCharset() == charset.CharsetLatin1 || columns[i].GetCharset() == "" && sourceTI.Charset == charset.CharsetLatin1

switch v := d.(type) {
case int8:
Expand All @@ -69,12 +74,26 @@ func extractValueFromData(data []interface{}, columns []*model.ColumnInfo, sourc
d = uint64(v)
case decimal.Decimal:
d = v.String()
case []byte:
if isLatin1 {
d, err = latin1Decoder.Bytes(v)
if err != nil {
log.L().DPanic("can't convert latin1 to utf8", zap.ByteString("value", v), zap.Error(err))
}
}
case string:
// convert string to []byte so that go-sql-driver/mysql can use _binary'value' for DML
if columns[i].GetCharset() == charset.CharsetGBK {
d = []byte(v)
} else if columns[i].GetCharset() == "" && sourceTI.Charset == charset.CharsetGBK {
isGBK := columns[i].GetCharset() == charset.CharsetGBK || columns[i].GetCharset() == "" && sourceTI.Charset == charset.CharsetGBK
switch {
case isGBK:
// convert string to []byte so that go-sql-driver/mysql can use _binary'value' for DML
d = []byte(v)
case isLatin1:
// TiDB has bug in latin1 so we must convert it to utf8 at DM's scope
// https://github.com/pingcap/tidb/issues/18955
d, err = latin1Decoder.String(v)
if err != nil {
log.L().DPanic("can't convert latin1 to utf8", zap.String("value", v), zap.Error(err))
}
}
}
value = append(value, d)
Expand Down
6 changes: 5 additions & 1 deletion dm/tests/expression_filter/conf/dm-task2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mysql-instances:
- "update_old_lt_100"
- "update_new_lt_100"
- "update_old_and_new"
- "only_muller"

expression-filter:
even_c:
Expand Down Expand Up @@ -45,7 +46,10 @@ expression-filter:
table: "t5"
update-old-value-expr: "c = 1"
update-new-value-expr: "c = 2"

only_muller:
schema: "expr_filter"
table: "t6"
insert-value-expr: "name != 'Müller'"

black-white-list: # compatible with deprecated config
instance:
Expand Down
5 changes: 4 additions & 1 deletion dm/tests/expression_filter/data/db1.increment2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,8 @@ update t5 set should_skip = 1, c = 2 where c = 1;
insert into t5 values (4, 1, 1); -- check this `should_skip = 1` row must be updated to `should_skip = 0` in TiDB
update t5 set should_skip = 0, c = 3 where c = 1;

insert into t6 (id, name, msg) values (1, 'Müller', 'Müller'), (2, 'X Æ A-12', 'X Æ A-12');
alter table t6 add column name2 varchar(20) character set latin1 default 'Müller';

-- trigger a flush
alter table t5 add column dummy int;
alter table t5 add column dummy int;
5 changes: 4 additions & 1 deletion dm/tests/expression_filter/data/db1.prepare2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ create table t2 (id int primary key,
should_skip int,
c int,
gen int as (id + 1)
);
);

create table t6 (id int, name varchar(20), msg text, primary key(`id`)) character set latin1;
insert into t6 (id, name, msg) values (0, 'Müller', 'Müller');
9 changes: 6 additions & 3 deletions dm/tests/expression_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ function complex_behaviour() {

dmctl_start_task_standalone $cur/conf/dm-task2.yaml

# https://github.com/pingcap/dumpling/issues/296
sleep 1
run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down Expand Up @@ -62,8 +60,13 @@ function complex_behaviour() {
run_sql_tidb "select count(8) from expr_filter.t5 where should_skip = 1"
check_contains "count(8): 0"

run_sql_tidb "select count(9) from expr_filter.t6 where name = 'Müller' and msg = 'Müller' and name2 = 'Müller'"
check_contains "count(9): 2"
run_sql_tidb "select count(10) from expr_filter.t6 where name != 'Müller'"
check_contains "count(10): 0"

insert_num=$(grep -o '"number of filtered insert"=[0-9]\+' $WORK_DIR/worker1/log/dm-worker.log | grep -o '[0-9]\+' | awk '{n += $1}; END{print n}')
[ $insert_num -eq 5 ]
[ $insert_num -eq 6 ]
update_num=$(grep -o '"number of filtered update"=[0-9]\+' $WORK_DIR/worker1/log/dm-worker.log | grep -o '[0-9]\+' | awk '{n += $1}; END{print n}')
[ $update_num -eq 3 ]

Expand Down