From f710abba6417971370e0d0711d9cd8ca3fa784c4 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 7 Feb 2022 12:23:35 +0800 Subject: [PATCH] sink(ticdc): fix duplicated replace when batch-replace is disabled (#4502) close pingcap/tiflow#4501 --- cdc/sink/mysql.go | 2 - cdc/sink/mysql_test.go | 85 ++++++++++++++++++++++++++++-------------- 2 files changed, 56 insertions(+), 31 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 1b2dc9f5602..7b81d0a24b0 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -678,8 +678,6 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64, } } else { query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */, translateToInsert) - sqls = append(sqls, query) - values = append(values, args) if query != "" { sqls = append(sqls, query) values = append(values, args) diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index cf42a6cb8c8..cc0b119350c 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -58,41 +58,68 @@ func TestPrepareDML(t *testing.T) { testCases := []struct { input []*model.RowChangedEvent expected *preparedDMLs - }{{ - input: []*model.RowChangedEvent{}, - expected: &preparedDMLs{sqls: []string{}, values: [][]interface{}{}}, - }, { - input: []*model.RowChangedEvent{ - { - StartTs: 418658114257813514, - CommitTs: 418658114257813515, - Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, - PreColumns: []*model.Column{nil, { - Name: "a1", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, - Value: 1, - }, { - Name: "a3", - Type: mysql.TypeLong, - Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, - Value: 1, - }}, - IndexColumns: [][]int{{1, 2}}, + }{ + { + input: []*model.RowChangedEvent{}, + expected: &preparedDMLs{sqls: []string{}, values: [][]interface{}{}}, + }, { + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"}, + values: [][]interface{}{{1, 1}}, + rowCount: 1, + }, + }, { + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"}, + values: [][]interface{}{{2, 2}}, + rowCount: 1, }, }, - expected: &preparedDMLs{ - sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"}, - values: [][]interface{}{{1, 1}}, - rowCount: 1, - }, - }} + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() ms := newMySQLSink4Test(ctx, t) - for i, tc := range testCases { + for _, tc := range testCases { dmls := ms.prepareDMLs(tc.input, 0, 0) - require.Equal(t, tc.expected, dmls, tc.expected, fmt.Sprintf("%d", i)) + require.Equal(t, tc.expected, dmls) } }