Skip to content

Commit

Permalink
sink(ticdc): split RowChangeEvent if unique key is updated (#9437)
Browse files Browse the repository at this point in the history
close #9430
  • Loading branch information
sdojjy authored Aug 14, 2023
1 parent dd8da8e commit 7f42fce
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 2 deletions.
152 changes: 152 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package model

import (
"fmt"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -47,6 +49,13 @@ const (
MessageTypeResolved
)

const (
// the RowChangedEvent order in the same transaction
typeDelete = iota + 1
typeUpdate
typeInsert
)

// ColumnFlagType is for encapsulating the flag operations for different flags.
type ColumnFlagType util.Flag

Expand Down Expand Up @@ -261,6 +270,11 @@ func (r *RedoLog) GetCommitTs() Ts {
return 0
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent() error {
return nil
}

// RedoRowChangedEvent represents the DML event used in RedoLog
type RedoRowChangedEvent struct {
Row *RowChangedEvent `msg:"row"`
Expand Down Expand Up @@ -332,11 +346,43 @@ type RowChangedEvent struct {
ReplicatingTs Ts `json:"-" msg:"-"`
}

// txnRows represents a set of events that belong to the same transaction.
type txnRows []*RowChangedEvent

// Len is the number of elements in the collection.
func (e txnRows) Len() int {
return len(e)
}

// Less sort the events base on the order of event type delete<update<insert
func (e txnRows) Less(i, j int) bool {
return getDMLOrder(e[i]) < getDMLOrder(e[j])
}

// getDMLOrder returns the order of the dml types: delete<update<insert
func getDMLOrder(event *RowChangedEvent) int {
if event.IsDelete() {
return typeDelete
} else if event.IsUpdate() {
return typeUpdate
}
return typeInsert
}

func (e txnRows) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}

// GetCommitTs returns the commit timestamp of this event.
func (r *RowChangedEvent) GetCommitTs() uint64 {
return r.CommitTs
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent() error {
return nil
}

// IsDelete returns true if the row is a delete event
func (r *RowChangedEvent) IsDelete() bool {
return len(r.PreColumns) != 0 && len(r.Columns) == 0
Expand Down Expand Up @@ -720,6 +766,112 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
return t.CommitTs
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent() error {
if len(t.Rows) < 2 {
return nil
}
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
}
t.Rows = newRows
return nil
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
events []*RowChangedEvent,
) ([]*RowChangedEvent, error) {
rowChangedEvents := make([]*RowChangedEvent, 0, len(events))
split := false
for _, e := range events {
if e == nil {
log.Warn("skip emit nil event",
zap.Any("event", e))
continue
}

colLen := len(e.Columns)
preColLen := len(e.PreColumns)
// Some transactions could generate empty row change event, such as
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event",
zap.Any("event", e))
continue
}

// This indicates that it is an update event. if the pk or uk is updated,
// we need to split it into two events (delete and insert).
if e.IsUpdate() && shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if err != nil {
return nil, errors.Trace(err)
}
split = true
rowChangedEvents = append(rowChangedEvents, deleteEvent, insertEvent)
} else {
rowChangedEvents = append(rowChangedEvents, e)
}
}
// some updated events is split, need to sort
if split {
sort.Sort(txnRows(rowChangedEvents))
}
return rowChangedEvents, nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
}

for i := range updateEvent.Columns {
col := updateEvent.Columns[i]
preCol := updateEvent.PreColumns[i]
if col != nil && (col.Flag.IsUniqueKey() || col.Flag.IsHandleKey()) &&
preCol != nil && (preCol.Flag.IsUniqueKey() || preCol.Flag.IsHandleKey()) {
colValueString := ColumnValueString(col.Value)
preColValueString := ColumnValueString(preCol.Value)
// If one unique key columns is updated, we need to split the event row.
if colValueString != preColValueString {
return true
}
}
}
return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(
updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error) {
if updateEvent == nil {
return nil, nil, errors.New("nil event cannot be split")
}

// If there is an update to handle key columns,
// we need to split the event into two events to be compatible with the old format.
// NOTICE: Here we don't need a full deep copy because
// our two events need Columns and PreColumns respectively,
// so it won't have an impact and no more full deep copy wastes memory.
deleteEvent := *updateEvent
deleteEvent.Columns = nil

insertEvent := *updateEvent
// NOTICE: clean up pre cols for insert event.
insertEvent.PreColumns = nil

return &deleteEvent, &insertEvent, nil
}

// Append adds a row changed event into SingleTableTxn
func (t *SingleTableTxn) Append(row *RowChangedEvent) {
if row.StartTs != t.StartTs || row.CommitTs != t.CommitTs || row.Table.TableID != t.Table.TableID {
Expand Down
167 changes: 167 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package model

import (
"sort"
"testing"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -396,3 +398,168 @@ func TestExchangeTablePartition(t *testing.T) {
require.Equal(t, "ALTER TABLE `test1`.`t1` EXCHANGE PARTITION `p0` WITH TABLE `test2`.`t2`", event.Query)
require.Equal(t, event.Type, timodel.ActionExchangeTablePartition)
}

func TestSortRowChangedEvent(t *testing.T) {
events := []*RowChangedEvent{
{
PreColumns: []*Column{{}},
Columns: []*Column{{}},
},
{
Columns: []*Column{{}},
},
{
PreColumns: []*Column{{}},
},
}
assert.True(t, events[0].IsUpdate())
assert.True(t, events[1].IsInsert())
assert.True(t, events[2].IsDelete())
sort.Sort(txnRows(events))
assert.True(t, events[0].IsDelete())
assert.True(t, events[1].IsUpdate())
assert.True(t, events[2].IsInsert())
}

func TestTrySplitAndSortUpdateEventNil(t *testing.T) {
t.Parallel()

events := []*RowChangedEvent{nil}
result, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 0, len(result))
}

func TestTrySplitAndSortUpdateEventEmpty(t *testing.T) {
t.Parallel()

events := []*RowChangedEvent{
{
StartTs: 1,
CommitTs: 2,
},
}
result, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 0, len(result))
}

func TestTrySplitAndSortUpdateEvent(t *testing.T) {
t.Parallel()

// Update handle key.
columns := []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value-updated",
},
}
preColumns := []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value",
},
}

events := []*RowChangedEvent{
{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
},
}
result, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.True(t, result[0].IsDelete())
require.True(t, result[1].IsInsert())

// Update unique key.
columns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: UniqueKeyFlag,
Value: "col2-value-updated",
},
}
preColumns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: UniqueKeyFlag,
Value: "col2-value",
},
}

events = []*RowChangedEvent{
{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
},
}
result, err = trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.True(t, result[0].IsDelete())
require.True(t, result[0].IsDelete())
require.True(t, result[1].IsInsert())

// Update non-handle key.
columns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value",
},
}
preColumns = []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag,
Value: "col2-value",
},
}

events = []*RowChangedEvent{
{
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
},
}
result, err = trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 1, len(result))
}
2 changes: 2 additions & 0 deletions cdc/sink/dmlsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated
TrySplitAndSortUpdateEvent() error
}

// CallbackFunc is the callback function for callbackable event.
Expand Down
Loading

0 comments on commit 7f42fce

Please sign in to comment.