Skip to content

Commit

Permalink
sink(ticdc): fix duplicated replace when batch-replace is disabled (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Feb 7, 2022
1 parent 596f558 commit f710abb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 31 deletions.
2 changes: 0 additions & 2 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,6 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64,
}
} else {
query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */, translateToInsert)
sqls = append(sqls, query)
values = append(values, args)
if query != "" {
sqls = append(sqls, query)
values = append(values, args)
Expand Down
85 changes: 56 additions & 29 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,41 +58,68 @@ func TestPrepareDML(t *testing.T) {
testCases := []struct {
input []*model.RowChangedEvent
expected *preparedDMLs
}{{
input: []*model.RowChangedEvent{},
expected: &preparedDMLs{sqls: []string{}, values: [][]interface{}{}},
}, {
input: []*model.RowChangedEvent{
{
StartTs: 418658114257813514,
CommitTs: 418658114257813515,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"},
PreColumns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
}},
IndexColumns: [][]int{{1, 2}},
}{
{
input: []*model.RowChangedEvent{},
expected: &preparedDMLs{sqls: []string{}, values: [][]interface{}{}},
}, {
input: []*model.RowChangedEvent{
{
StartTs: 418658114257813514,
CommitTs: 418658114257813515,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"},
PreColumns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
},
}, {
input: []*model.RowChangedEvent{
{
StartTs: 418658114257813516,
CommitTs: 418658114257813517,
Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 2,
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{2, 2}},
rowCount: 1,
},
},
expected: &preparedDMLs{
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
},
}}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ms := newMySQLSink4Test(ctx, t)
for i, tc := range testCases {
for _, tc := range testCases {
dmls := ms.prepareDMLs(tc.input, 0, 0)
require.Equal(t, tc.expected, dmls, tc.expected, fmt.Sprintf("%d", i))
require.Equal(t, tc.expected, dmls)
}
}

Expand Down

0 comments on commit f710abb

Please sign in to comment.