Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10919
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lidezhu authored and ti-chi-bot committed May 7, 2024
1 parent c56e6ba commit 8845ae6
Show file tree
Hide file tree
Showing 25 changed files with 2,120 additions and 62 deletions.
5 changes: 5 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type RawKVEntry struct {
RegionID uint64 `msg:"region_id"`
}

// IsUpdate checks if the event is an update event.
func (v *RawKVEntry) IsUpdate() bool {
return v.OpType == OpTypePut && v.OldValue != nil && v.Value != nil
}

func (v *RawKVEntry) String() string {
// TODO: redact values.
return fmt.Sprintf(
Expand Down
57 changes: 46 additions & 11 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,30 @@ type RowChangedEvent struct {
ReplicatingTs Ts `json:"-" msg:"-"`
}

// ToRowChangedEvent converts RowChangedEventInRedoLog to RowChangedEvent
func (r *RowChangedEventInRedoLog) ToRowChangedEvent() *RowChangedEvent {
cols := r.Columns
if cols == nil {
cols = r.PreColumns
}
tableInfo := BuildTableInfo(
r.Table.Schema,
r.Table.Table,
cols,
r.IndexColumns)
tableInfo.TableName.TableID = r.Table.TableID
tableInfo.TableName.IsPartition = r.Table.IsPartition
row := &RowChangedEvent{
StartTs: r.StartTs,
CommitTs: r.CommitTs,
PhysicalTableID: r.Table.TableID,
TableInfo: tableInfo,
Columns: Columns2ColumnDatas(r.Columns, tableInfo),
PreColumns: Columns2ColumnDatas(r.PreColumns, tableInfo),
}
return row
}

// txnRows represents a set of events that belong to the same transaction.
type txnRows []*RowChangedEvent

Expand Down Expand Up @@ -809,18 +833,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error {

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, there is no need to split a single unique key changed update event, this
// is also to keep the backward compatibility, the same behavior as before.
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) {
return false
}
return true
return !sink.IsMySQLCompatibleScheme(sinkScheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
Expand Down Expand Up @@ -850,8 +873,8 @@ func trySplitAndSortUpdateEvent(

// This indicates that it is an update event. if the pk or uk is updated,
// we need to split it into two events (delete and insert).
if e.IsUpdate() && shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if e.IsUpdate() && ShouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := SplitUpdateEvent(e)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -868,10 +891,22 @@ func trySplitAndSortUpdateEvent(
return rowChangedEvents, nil
}

<<<<<<< HEAD
// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
=======
func isNonEmptyUniqueOrHandleCol(col *ColumnData, tableInfo *TableInfo) bool {
if col != nil {
colFlag := tableInfo.ForceGetColumnFlagType(col.ColumnID)
return colFlag.IsUniqueKey() || colFlag.IsHandleKey()
}
return false
}

// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919))
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
Expand All @@ -893,8 +928,8 @@ func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(
// SplitUpdateEvent splits an update event into a delete and an insert event.
func SplitUpdateEvent(
updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error) {
if updateEvent == nil {
Expand Down
149 changes: 149 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,132 @@ func TestTrySplitAndSortUpdateEventEmpty(t *testing.T) {
func TestTrySplitAndSortUpdateEvent(t *testing.T) {
t.Parallel()

<<<<<<< HEAD
// Update handle key.
=======
// Update primary key.
tableInfoWithPrimaryKey := BuildTableInfo("test", "t", []*Column{
{
Name: "col1",
Flag: BinaryFlag,
},
{
Name: "col2",
Flag: HandleKeyFlag | PrimaryKeyFlag,
},
}, [][]int{{1}})
events := []*RowChangedEvent{
{
CommitTs: 1,
TableInfo: tableInfoWithPrimaryKey,
Columns: Columns2ColumnDatas([]*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: HandleKeyFlag | PrimaryKeyFlag,
Value: "col2-value-updated",
},
}, tableInfoWithPrimaryKey),
PreColumns: Columns2ColumnDatas([]*Column{
{
Name: "col1",
Value: "col1-value",
},
{
Name: "col2",
Value: "col2-value",
},
}, tableInfoWithPrimaryKey),
},
}
result, err := trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.True(t, result[0].IsDelete())
require.True(t, result[1].IsInsert())

// Update unique key.
tableInfoWithUniqueKey := BuildTableInfo("test", "t", []*Column{
{
Name: "col1",
Flag: BinaryFlag,
},
{
Name: "col2",
Flag: UniqueKeyFlag | NullableFlag,
},
}, [][]int{{1}})
events = []*RowChangedEvent{
{
CommitTs: 1,
TableInfo: tableInfoWithUniqueKey,
Columns: Columns2ColumnDatas([]*Column{
{
Name: "col1",
Value: "col1-value-updated",
},
{
Name: "col2",
Value: "col2-value-updated",
},
}, tableInfoWithUniqueKey),
PreColumns: Columns2ColumnDatas([]*Column{
{
Name: "col1",
Value: "col1-value",
},
{
Name: "col2",
Value: "col2-value",
},
}, tableInfoWithUniqueKey),
},
}
result, err = trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 2, len(result))
require.True(t, result[0].IsDelete())
require.True(t, result[0].IsDelete())
require.True(t, result[1].IsInsert())

// Update non-handle key.
events = []*RowChangedEvent{
{
CommitTs: 1,
TableInfo: tableInfoWithPrimaryKey,
Columns: Columns2ColumnDatas([]*Column{
{
Name: "col1",
Value: "col1-value-updated",
},
{
Name: "col2",
Value: "col2-value",
},
}, tableInfoWithPrimaryKey),
PreColumns: Columns2ColumnDatas([]*Column{
{
Name: "col1",
Value: "col1-value",
},
{
Name: "col2",
Value: "col2-value",
},
}, tableInfoWithPrimaryKey),
},
}
result, err = trySplitAndSortUpdateEvent(events)
require.NoError(t, err)
require.Equal(t, 1, len(result))
}

func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) {
>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919))
columns := []*Column{
{
Name: "col1",
Expand All @@ -531,7 +656,31 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {
},
}

<<<<<<< HEAD
events := []*RowChangedEvent{
=======
err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}

func TestToRedoLog(t *testing.T) {
cols := []*Column{
>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919))
{
CommitTs: 1,
Columns: columns,
Expand Down
40 changes: 40 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"net/url"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -46,6 +47,7 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -568,6 +570,18 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts,
Expand Down Expand Up @@ -895,7 +909,33 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
}

<<<<<<< HEAD
p.agent, err = p.newAgent(ctx, p.liveness, p.changefeedEpoch)
=======
pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor),
pullerSafeModeAtStart)
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)

p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.latestInfo, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r)
p.sinkManager.name = "SinkManager"
p.sinkManager.changefeedID = p.changefeedID
p.sinkManager.spawn(prcCtx)

// Bind them so that sourceManager can notify sinkManager.r.
p.sourceManager.r.OnResolve(p.sinkManager.r.UpdateReceivedSorterResolvedTs)
p.agent, err = p.newAgent(prcCtx, p.liveness, p.changefeedEpoch, p.cfg, p.ownerCaptureInfoClient)
>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919))
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))

// For duplicate entry error, we fast fail to restart changefeed.
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down
Loading

0 comments on commit 8845ae6

Please sign in to comment.