From 7f42fce46a9e8c2efa06b763153ca536adb4180a Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Sun, 13 Aug 2023 18:16:59 -0700 Subject: [PATCH] sink(ticdc): split RowChangeEvent if unique key is updated (#9437) close pingcap/tiflow#9430 --- cdc/model/sink.go | 152 ++++++++++++++++ cdc/model/sink_test.go | 167 ++++++++++++++++++ cdc/sink/dmlsink/event.go | 2 + cdc/sink/tablesink/table_sink_impl.go | 3 + .../http_api/util/test_case.py | 2 +- .../http_api_tls/util/test_case.py | 2 +- 6 files changed, 326 insertions(+), 2 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index bfc48bda2ee..fbbc30eb579 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -15,6 +15,7 @@ package model import ( "fmt" + "sort" "strconv" "strings" "sync" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/quotes" "github.com/pingcap/tiflow/pkg/util" @@ -47,6 +49,13 @@ const ( MessageTypeResolved ) +const ( + // the RowChangedEvent order in the same transaction + typeDelete = iota + 1 + typeUpdate + typeInsert +) + // ColumnFlagType is for encapsulating the flag operations for different flags. type ColumnFlagType util.Flag @@ -261,6 +270,11 @@ func (r *RedoLog) GetCommitTs() Ts { return 0 } +// TrySplitAndSortUpdateEvent redo log do nothing +func (r *RedoLog) TrySplitAndSortUpdateEvent() error { + return nil +} + // RedoRowChangedEvent represents the DML event used in RedoLog type RedoRowChangedEvent struct { Row *RowChangedEvent `msg:"row"` @@ -332,11 +346,43 @@ type RowChangedEvent struct { ReplicatingTs Ts `json:"-" msg:"-"` } +// txnRows represents a set of events that belong to the same transaction. +type txnRows []*RowChangedEvent + +// Len is the number of elements in the collection. +func (e txnRows) Len() int { + return len(e) +} + +// Less sort the events base on the order of event type delete