diff --git a/cdc/model/sink.go b/cdc/model/sink.go index bfc48bda2ee..fbbc30eb579 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -15,6 +15,7 @@ package model import ( "fmt" + "sort" "strconv" "strings" "sync" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/quotes" "github.com/pingcap/tiflow/pkg/util" @@ -47,6 +49,13 @@ const ( MessageTypeResolved ) +const ( + // the RowChangedEvent order in the same transaction + typeDelete = iota + 1 + typeUpdate + typeInsert +) + // ColumnFlagType is for encapsulating the flag operations for different flags. type ColumnFlagType util.Flag @@ -261,6 +270,11 @@ func (r *RedoLog) GetCommitTs() Ts { return 0 } +// TrySplitAndSortUpdateEvent redo log do nothing +func (r *RedoLog) TrySplitAndSortUpdateEvent() error { + return nil +} + // RedoRowChangedEvent represents the DML event used in RedoLog type RedoRowChangedEvent struct { Row *RowChangedEvent `msg:"row"` @@ -332,11 +346,43 @@ type RowChangedEvent struct { ReplicatingTs Ts `json:"-" msg:"-"` } +// txnRows represents a set of events that belong to the same transaction. +type txnRows []*RowChangedEvent + +// Len is the number of elements in the collection. +func (e txnRows) Len() int { + return len(e) +} + +// Less sort the events base on the order of event type delete