Skip to content

Commit

Permalink
This is an automated cherry-pick of #7027
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Sep 13, 2022
1 parent 82a688e commit 2c36d09
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 6 deletions.
65 changes: 65 additions & 0 deletions dm/syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"go.uber.org/zap"
"golang.org/x/text/encoding/charmap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
Expand Down Expand Up @@ -63,6 +64,7 @@ func (op dmlOpType) String() (str string) {

// genDMLParam stores pruned columns, data as well as the original columns, data, index.
type genDMLParam struct {
<<<<<<< HEAD
targetTableID string // as a key in map like `schema`.`table`
sourceTable *filter.Table // origin table
safeMode bool // only used in update
Expand All @@ -76,6 +78,69 @@ func extractValueFromData(data []interface{}, columns []*model.ColumnInfo) []int
value := make([]interface{}, 0, len(data))
for i := range data {
value = append(value, castUnsigned(data[i], &columns[i].FieldType))
=======
sourceTable *filter.Table // origin table
targetTable *filter.Table
safeMode bool // only used in update
originalData [][]interface{} // all data
sourceTableInfo *model.TableInfo // all table info
extendData [][]interface{} // all data include extend data
}

var latin1Decoder = charmap.ISO8859_1.NewDecoder()

// extractValueFromData adjust the values obtained from go-mysql so that
// - the values can be correctly converted to TiDB datum
// - the values are in the correct type that go-sql-driver/mysql uses.
func extractValueFromData(data []interface{}, columns []*model.ColumnInfo, sourceTI *model.TableInfo) []interface{} {
value := make([]interface{}, 0, len(data))
var err error

for i, d := range data {
d = castUnsigned(d, &columns[i].FieldType)
isLatin1 := columns[i].GetCharset() == charset.CharsetLatin1 || columns[i].GetCharset() == "" && sourceTI.Charset == charset.CharsetLatin1

switch v := d.(type) {
case int8:
d = int64(v)
case int16:
d = int64(v)
case int32:
d = int64(v)
case uint8:
d = uint64(v)
case uint16:
d = uint64(v)
case uint32:
d = uint64(v)
case uint:
d = uint64(v)
case decimal.Decimal:
d = v.String()
case []byte:
if isLatin1 {
d, err = latin1Decoder.Bytes(v)
if err != nil {
log.L().DPanic("can't convert latin1 to utf8", zap.ByteString("value", v), zap.Error(err))
}
}
case string:
isGBK := columns[i].GetCharset() == charset.CharsetGBK || columns[i].GetCharset() == "" && sourceTI.Charset == charset.CharsetGBK
switch {
case isGBK:
// convert string to []byte so that go-sql-driver/mysql can use _binary'value' for DML
d = []byte(v)
case isLatin1:
// TiDB has bug in latin1 so we must convert it to utf8 at DM's scope
// https://github.com/pingcap/tidb/issues/18955
d, err = latin1Decoder.String(v)
if err != nil {
log.L().DPanic("can't convert latin1 to utf8", zap.String("value", v), zap.Error(err))
}
}
}
value = append(value, d)
>>>>>>> 81c8e09bd (syncer(dm): fix corrupt latin1 data when replicating (#7027))
}
return value
}
Expand Down
6 changes: 5 additions & 1 deletion dm/tests/expression_filter/conf/dm-task2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mysql-instances:
- "update_old_lt_100"
- "update_new_lt_100"
- "update_old_and_new"
- "only_muller"

expression-filter:
even_c:
Expand Down Expand Up @@ -45,7 +46,10 @@ expression-filter:
table: "t5"
update-old-value-expr: "c = 1"
update-new-value-expr: "c = 2"

only_muller:
schema: "expr_filter"
table: "t6"
insert-value-expr: "name != 'Müller'"

black-white-list: # compatible with deprecated config
instance:
Expand Down
5 changes: 4 additions & 1 deletion dm/tests/expression_filter/data/db1.increment2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,8 @@ update t5 set should_skip = 1, c = 2 where c = 1;
insert into t5 values (4, 1, 1); -- check this `should_skip = 1` row must be updated to `should_skip = 0` in TiDB
update t5 set should_skip = 0, c = 3 where c = 1;

insert into t6 (id, name, msg) values (1, 'Müller', 'Müller'), (2, 'X Æ A-12', 'X Æ A-12');
alter table t6 add column name2 varchar(20) character set latin1 default 'Müller';

-- trigger a flush
alter table t5 add column dummy int;
alter table t5 add column dummy int;
5 changes: 4 additions & 1 deletion dm/tests/expression_filter/data/db1.prepare2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ create table t2 (id int primary key,
should_skip int,
c int,
gen int as (id + 1)
);
);

create table t6 (id int, name varchar(20), msg text, primary key(`id`)) character set latin1;
insert into t6 (id, name, msg) values (0, 'Müller', 'Müller');
9 changes: 6 additions & 3 deletions dm/tests/expression_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ function complex_behaviour() {

dmctl_start_task_standalone $cur/conf/dm-task2.yaml

# https://github.com/pingcap/dumpling/issues/296
sleep 1
run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down Expand Up @@ -62,8 +60,13 @@ function complex_behaviour() {
run_sql_tidb "select count(8) from expr_filter.t5 where should_skip = 1"
check_contains "count(8): 0"

run_sql_tidb "select count(9) from expr_filter.t6 where name = 'Müller' and msg = 'Müller' and name2 = 'Müller'"
check_contains "count(9): 2"
run_sql_tidb "select count(10) from expr_filter.t6 where name != 'Müller'"
check_contains "count(10): 0"

insert_num=$(grep -o '"number of filtered insert"=[0-9]\+' $WORK_DIR/worker1/log/dm-worker.log | grep -o '[0-9]\+' | awk '{n += $1}; END{print n}')
[ $insert_num -eq 5 ]
[ $insert_num -eq 6 ]
update_num=$(grep -o '"number of filtered update"=[0-9]\+' $WORK_DIR/worker1/log/dm-worker.log | grep -o '[0-9]\+' | awk '{n += $1}; END{print n}')
[ $update_num -eq 3 ]

Expand Down

0 comments on commit 2c36d09

Please sign in to comment.