Skip to content

Commit

Permalink
add teset and fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Apr 20, 2024
1 parent 53928fa commit 119a56c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
11 changes: 5 additions & 6 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,18 +1125,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, there is no need to split a single unique key changed update event, this
// is also to keep the backward compatibility, the same behavior as before.
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) {
return false
}
return true
return !sink.IsMySQLCompatibleScheme(sinkScheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
Expand Down
9 changes: 8 additions & 1 deletion cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {
require.Equal(t, 1, len(result))
}

func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) {
columns := []*Column{
{
Name: "col1",
Expand Down Expand Up @@ -614,6 +614,13 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}

func TestToRedoLog(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions cdc/sink/tablesink/table_sink_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err

resolvedCallbackableEvents := make([]*dmlsink.CallbackableEvent[E], 0, len(resolvedEvents))
for _, ev := range resolvedEvents {
if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.Scheme()); err != nil {
return SinkInternalError{err}
}
// We have to record the event ID for the callback.
postEventFlushFunc := e.progressTracker.addEvent()
evCommitTs := ev.GetCommitTs()
Expand Down

0 comments on commit 119a56c

Please sign in to comment.