From ba5add149210c30ce3f460b7ca46e6200093c33d Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Mon, 6 May 2024 18:52:07 +0800 Subject: [PATCH] This is an automated cherry-pick of #10919 Signed-off-by: ti-chi-bot --- cdc/model/kv.go | 5 + cdc/model/sink.go | 57 ++- cdc/model/sink_test.go | 132 ++++++ cdc/processor/processor.go | 24 ++ cdc/processor/sinkmanager/manager.go | 5 + cdc/processor/sourcemanager/manager.go | 102 +++++ cdc/redo/reader/reader.go | 8 + cdc/sink/dmlsink/factory/factory.go | 8 +- cdc/sink/dmlsink/txn/mysql/mysql.go | 116 ++++-- cdc/sink/dmlsink/txn/mysql/mysql_test.go | 4 + cmd/kafka-consumer/main.go | 4 +- cmd/pulsar-consumer/main.go | 17 +- cmd/storage-consumer/main.go | 4 +- errors.toml | 5 + pkg/applier/redo.go | 299 +++++++++++++- pkg/applier/redo_test.go | 389 +++++++++++++++++- pkg/errors/cdc_errors.go | 4 + pkg/errors/helper.go | 19 + pkg/sink/codec/open/open_protocol_decoder.go | 2 + .../conf/diff_config.toml | 29 ++ .../conf/workload | 13 + .../changefeed_dup_error_restart/run.sh | 54 +++ .../force_replicate_table/run.sh | 6 +- .../conf/diff_config.toml | 2 +- tests/integration_tests/run_group.sh | 5 + 25 files changed, 1242 insertions(+), 71 deletions(-) create mode 100644 tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml create mode 100644 tests/integration_tests/changefeed_dup_error_restart/conf/workload create mode 100755 tests/integration_tests/changefeed_dup_error_restart/run.sh diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 10a9857e703..201c4b4f494 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -93,6 +93,11 @@ type RawKVEntry struct { RegionID uint64 `msg:"region_id"` } +// IsUpdate checks if the event is an update event. +func (v *RawKVEntry) IsUpdate() bool { + return v.OpType == OpTypePut && v.OldValue != nil && v.Value != nil +} + func (v *RawKVEntry) String() string { // TODO: redact values. return fmt.Sprintf( diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 1e524d59f00..daadc2bd897 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -347,6 +347,30 @@ type RowChangedEvent struct { ReplicatingTs Ts `json:"-" msg:"-"` } +// ToRowChangedEvent converts RowChangedEventInRedoLog to RowChangedEvent +func (r *RowChangedEventInRedoLog) ToRowChangedEvent() *RowChangedEvent { + cols := r.Columns + if cols == nil { + cols = r.PreColumns + } + tableInfo := BuildTableInfo( + r.Table.Schema, + r.Table.Table, + cols, + r.IndexColumns) + tableInfo.TableName.TableID = r.Table.TableID + tableInfo.TableName.IsPartition = r.Table.IsPartition + row := &RowChangedEvent{ + StartTs: r.StartTs, + CommitTs: r.CommitTs, + PhysicalTableID: r.Table.TableID, + TableInfo: tableInfo, + Columns: Columns2ColumnDatas(r.Columns, tableInfo), + PreColumns: Columns2ColumnDatas(r.PreColumns, tableInfo), + } + return row +} + // txnRows represents a set of events that belong to the same transaction. type txnRows []*RowChangedEvent @@ -796,18 +820,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { // Whether split a single update event into delete and insert events? // -// For the MySQL Sink, there is no need to split a single unique key changed update event, this -// is also to keep the backward compatibility, the same behavior as before. +// For the MySQL Sink, we don't split any update event. +// This may cause error like "duplicate entry" when sink to the downstream. +// This kind of error will cause the changefeed to restart, +// and then the related update rows will be splitted to insert and delete at puller side. // // For the Kafka and Storage sink, always split a single unique key changed update event, since: // 1. Avro and CSV does not output the previous column values for the update event, so it would // cause consumer missing data if the unique key changed event is not split. // 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { - if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) { - return false - } - return true + return !sink.IsMySQLCompatibleScheme(sinkScheme) } // trySplitAndSortUpdateEvent try to split update events if unique key is updated @@ -837,8 +860,8 @@ func trySplitAndSortUpdateEvent( // This indicates that it is an update event. if the pk or uk is updated, // we need to split it into two events (delete and insert). - if e.IsUpdate() && shouldSplitUpdateEvent(e) { - deleteEvent, insertEvent, err := splitUpdateEvent(e) + if e.IsUpdate() && ShouldSplitUpdateEvent(e) { + deleteEvent, insertEvent, err := SplitUpdateEvent(e) if err != nil { return nil, errors.Trace(err) } @@ -855,10 +878,22 @@ func trySplitAndSortUpdateEvent( return rowChangedEvents, nil } +<<<<<<< HEAD // shouldSplitUpdateEvent determines if the split event is needed to align the old format based on +======= +func isNonEmptyUniqueOrHandleCol(col *ColumnData, tableInfo *TableInfo) bool { + if col != nil { + colFlag := tableInfo.ForceGetColumnFlagType(col.ColumnID) + return colFlag.IsUniqueKey() || colFlag.IsHandleKey() + } + return false +} + +// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) // whether the handle key column or unique key has been modified. // If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. -func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { +func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { // nil event will never be split. if updateEvent == nil { return false @@ -880,8 +915,8 @@ func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { return false } -// splitUpdateEvent splits an update event into a delete and an insert event. -func splitUpdateEvent( +// SplitUpdateEvent splits an update event into a delete and an insert event. +func SplitUpdateEvent( updateEvent *RowChangedEvent, ) (*RowChangedEvent, *RowChangedEvent, error) { if updateEvent == nil { diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index 4089db24525..a5cc02b6dd2 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -448,7 +448,132 @@ func TestTrySplitAndSortUpdateEventEmpty(t *testing.T) { func TestTrySplitAndSortUpdateEvent(t *testing.T) { t.Parallel() +<<<<<<< HEAD // Update handle key. +======= + // Update primary key. + tableInfoWithPrimaryKey := BuildTableInfo("test", "t", []*Column{ + { + Name: "col1", + Flag: BinaryFlag, + }, + { + Name: "col2", + Flag: HandleKeyFlag | PrimaryKeyFlag, + }, + }, [][]int{{1}}) + events := []*RowChangedEvent{ + { + CommitTs: 1, + TableInfo: tableInfoWithPrimaryKey, + Columns: Columns2ColumnDatas([]*Column{ + { + Name: "col1", + Flag: BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: HandleKeyFlag | PrimaryKeyFlag, + Value: "col2-value-updated", + }, + }, tableInfoWithPrimaryKey), + PreColumns: Columns2ColumnDatas([]*Column{ + { + Name: "col1", + Value: "col1-value", + }, + { + Name: "col2", + Value: "col2-value", + }, + }, tableInfoWithPrimaryKey), + }, + } + result, err := trySplitAndSortUpdateEvent(events) + require.NoError(t, err) + require.Equal(t, 2, len(result)) + require.True(t, result[0].IsDelete()) + require.True(t, result[1].IsInsert()) + + // Update unique key. + tableInfoWithUniqueKey := BuildTableInfo("test", "t", []*Column{ + { + Name: "col1", + Flag: BinaryFlag, + }, + { + Name: "col2", + Flag: UniqueKeyFlag | NullableFlag, + }, + }, [][]int{{1}}) + events = []*RowChangedEvent{ + { + CommitTs: 1, + TableInfo: tableInfoWithUniqueKey, + Columns: Columns2ColumnDatas([]*Column{ + { + Name: "col1", + Value: "col1-value-updated", + }, + { + Name: "col2", + Value: "col2-value-updated", + }, + }, tableInfoWithUniqueKey), + PreColumns: Columns2ColumnDatas([]*Column{ + { + Name: "col1", + Value: "col1-value", + }, + { + Name: "col2", + Value: "col2-value", + }, + }, tableInfoWithUniqueKey), + }, + } + result, err = trySplitAndSortUpdateEvent(events) + require.NoError(t, err) + require.Equal(t, 2, len(result)) + require.True(t, result[0].IsDelete()) + require.True(t, result[0].IsDelete()) + require.True(t, result[1].IsInsert()) + + // Update non-handle key. + events = []*RowChangedEvent{ + { + CommitTs: 1, + TableInfo: tableInfoWithPrimaryKey, + Columns: Columns2ColumnDatas([]*Column{ + { + Name: "col1", + Value: "col1-value-updated", + }, + { + Name: "col2", + Value: "col2-value", + }, + }, tableInfoWithPrimaryKey), + PreColumns: Columns2ColumnDatas([]*Column{ + { + Name: "col1", + Value: "col1-value", + }, + { + Name: "col2", + Value: "col2-value", + }, + }, tableInfoWithPrimaryKey), + }, + } + result, err = trySplitAndSortUpdateEvent(events) + require.NoError(t, err) + require.Equal(t, 1, len(result)) +} + +func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) { +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) columns := []*Column{ { Name: "col1", @@ -608,4 +733,11 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) require.NoError(t, err) require.Len(t, txn.Rows, 1) + + txn2 := &SingleTableTxn{ + Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent}, + } + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + require.NoError(t, err) + require.Len(t, txn2.Rows, 2) } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index b078887fc22..00b30715952 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "net/url" "strconv" "sync" "time" @@ -40,6 +41,7 @@ import ( "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -470,6 +472,18 @@ func isProcessorIgnorableError(err error) bool { return false } +// needPullerSafeModeAtStart returns true if the scheme is mysql compatible. +// pullerSafeMode means to split all update kv entries whose commitTS +// is older then the start time of this changefeed. +func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + return sink.IsMySQLCompatibleScheme(scheme), nil +} + // Tick implements the `orchestrator.State` interface // the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot. // the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot. @@ -619,9 +633,19 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { return errors.Trace(err) } + pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI) + if err != nil { + return errors.Trace(err) + } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, +<<<<<<< HEAD sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode)) +======= + sortEngine, util.GetOrZero(cfConfig.BDRMode), + util.GetOrZero(cfConfig.EnableTableMonitor), + pullerSafeModeAtStart) +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 7e17ba9e600..4e3a14042bd 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -297,6 +297,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Duration("cost", time.Since(start))) + + // For duplicate entry error, we fast fail to restart changefeed. + if cerror.IsDupEntryError(err) { + return errors.Trace(err) + } } // If the error is retryable, we should retry to re-establish the internal resources. diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 3035076993f..e502ffb5ee0 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -17,6 +17,7 @@ import ( "context" "time" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/kv" @@ -28,10 +29,17 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/puller" "github.com/pingcap/tiflow/pkg/config" +<<<<<<< HEAD "github.com/pingcap/tiflow/pkg/spanz" +======= + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/retry" +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) "github.com/pingcap/tiflow/pkg/txnutil" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -71,6 +79,7 @@ type SourceManager struct { engine engine.SortEngine // Used to indicate whether the changefeed is in BDR mode. bdrMode bool + startTs model.Ts // if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers` // will be used. Otherwise `multiplexingPuller` will be used instead. @@ -86,9 +95,16 @@ func New( mg entry.MounterGroup, engine engine.SortEngine, bdrMode bool, +<<<<<<< HEAD ) *SourceManager { multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper) +======= + enableTableMonitor bool, + safeModeAtStart bool, +) *SourceManager { + return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor, safeModeAtStart) +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) } // NewForTest creates a new source manager for testing. @@ -102,14 +118,36 @@ func NewForTest( return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, pullerwrapper.NewPullerWrapperForTest) } +func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool { + return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs +} + +func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) { + if raw == nil { + return nil, nil, errors.New("nil event cannot be split") + } + deleteKVEntry := *raw + deleteKVEntry.Value = nil + + insertKVEntry := *raw + insertKVEntry.OldValue = nil + + return &deleteKVEntry, &insertKVEntry, nil +} + func newSourceManager( changefeedID model.ChangeFeedID, up *upstream.Upstream, mg entry.MounterGroup, engine engine.SortEngine, bdrMode bool, +<<<<<<< HEAD multiplexing bool, pullerWrapperCreator pullerWrapperCreator, +======= + enableTableMonitor bool, + safeModeAtStart bool, +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) ) *SourceManager { mgr := &SourceManager{ ready: make(chan struct{}), @@ -120,9 +158,44 @@ func newSourceManager( bdrMode: bdrMode, multiplexing: multiplexing, } +<<<<<<< HEAD if !multiplexing { mgr.tablePullers.errChan = make(chan error, 16) mgr.tablePullers.pullerWrapperCreator = pullerWrapperCreator +======= + + serverConfig := config.GetGlobalServerConfig() + grpcPool := sharedconn.NewConnAndClientPool(mgr.up.SecurityConfig, kv.GetGlobalGrpcMetrics()) + client := kv.NewSharedClient( + mgr.changefeedID, serverConfig, mgr.bdrMode, + mgr.up.PDClient, grpcPool, mgr.up.RegionCache, mgr.up.PDClock, + txnutil.NewLockerResolver(mgr.up.KVStorage.(tikv.Storage), mgr.changefeedID), + ) + + // consume add raw kv entry to the engine. + // It will be called by the puller when new raw kv entry is received. + consume := func(ctx context.Context, raw *model.RawKVEntry, spans []tablepb.Span) error { + if len(spans) > 1 { + log.Panic("DML puller subscribes multiple spans", + zap.String("namespace", mgr.changefeedID.Namespace), + zap.String("changefeed", mgr.changefeedID.ID)) + } + if raw != nil { + if safeModeAtStart && isOldUpdateKVEntry(raw, mgr.startTs) { + deleteKVEntry, insertKVEntry, err := splitUpdateKVEntry(raw) + if err != nil { + return err + } + deleteEvent := model.NewPolymorphicEvent(deleteKVEntry) + insertEvent := model.NewPolymorphicEvent(insertKVEntry) + mgr.engine.Add(spans[0], deleteEvent, insertEvent) + } else { + pEvent := model.NewPolymorphicEvent(raw) + mgr.engine.Add(spans[0], pEvent) + } + } + return nil +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) } return mgr } @@ -223,6 +296,15 @@ func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { case <-m.tablePullers.ctx.Done(): return m.tablePullers.ctx.Err() } +<<<<<<< HEAD +======= + startTs, err := getCurrentTs(ctx, m.up.PDClient) + if err != nil { + return err + } + m.startTs = startTs + return m.puller.Run(ctx) +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) } // WaitForReady implements util.Runnable. @@ -266,3 +348,23 @@ func (m *SourceManager) Close() { func (m *SourceManager) Add(span tablepb.Span, events ...*model.PolymorphicEvent) { m.engine.Add(span, events...) } + +func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { + backoffBaseDelayInMs := int64(100) + totalRetryDuration := 10 * time.Second + var replicateTs model.Ts + err := retry.Do(ctx, func() error { + phy, logic, err := pdClient.GetTS(ctx) + if err != nil { + return errors.Trace(err) + } + replicateTs = oracle.ComposeTS(phy, logic) + return nil + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithTotalRetryDuratoin(totalRetryDuration), + retry.WithIsRetryableErr(cerrors.IsRetryableError)) + if err != nil { + return model.Ts(0), errors.Trace(err) + } + return replicateTs, nil +} diff --git a/cdc/redo/reader/reader.go b/cdc/redo/reader/reader.go index 25c5ea8cbf1..ec20b1c8def 100644 --- a/cdc/redo/reader/reader.go +++ b/cdc/redo/reader/reader.go @@ -259,8 +259,16 @@ func (l *LogReader) ReadNextRow(ctx context.Context) (*model.RowChangedEvent, er select { case <-ctx.Done(): return nil, errors.Trace(ctx.Err()) +<<<<<<< HEAD case row := <-l.rowCh: return row, nil +======= + case rowInRedoLog := <-l.rowCh: + if rowInRedoLog != nil { + return rowInRedoLog.ToRowChangedEvent(), nil + } + return nil, nil +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) } } diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 6fd0dd4c279..502199e191b 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -64,7 +64,7 @@ type SinkFactory struct { category Category } -// New creates a new SinkFactory by schema. +// New creates a new SinkFactory by scheme. func New( ctx context.Context, changefeedID model.ChangeFeedID, @@ -79,8 +79,8 @@ func New( } s := &SinkFactory{} - schema := sink.GetScheme(sinkURI) - switch schema { + scheme := sink.GetScheme(sinkURI) + switch scheme { case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: txnSink, err := txn.NewMySQLSink(ctx, changefeedID, sinkURI, cfg, errCh, txn.DefaultConflictDetectorSlots) @@ -123,7 +123,7 @@ func New( s.category = CategoryMQ default: return nil, - cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema) + cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme) } return s, nil diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 10e88ab792c..9e8cbd9a2bb 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -357,8 +357,12 @@ func convertBinaryToString(cols []*model.Column) { func (s *mysqlBackend) groupRowsByType( event *dmlsink.TxnCallbackableEvent, +<<<<<<< HEAD tableInfo *timodel.TableInfo, spiltUpdate bool, +======= + tableInfo *model.TableInfo, +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) ) (insertRows, updateRows, deleteRows [][]*sqlmodel.RowChange) { preAllocateSize := len(event.Event.Rows) if preAllocateSize > s.cfg.MaxTxnRow { @@ -394,29 +398,12 @@ func (s *mysqlBackend) groupRowsByType( } if row.IsUpdate() { - if spiltUpdate { - deleteRow = append( - deleteRow, - convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete)) - if len(deleteRow) >= s.cfg.MaxTxnRow { - deleteRows = append(deleteRows, deleteRow) - deleteRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) - } - insertRow = append( - insertRow, - convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert)) - if len(insertRow) >= s.cfg.MaxTxnRow { - insertRows = append(insertRows, insertRow) - insertRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) - } - } else { - updateRow = append( - updateRow, - convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate)) - if len(updateRow) >= s.cfg.MaxMultiUpdateRowCount { - updateRows = append(updateRows, updateRow) - updateRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) - } + updateRow = append( + updateRow, + convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate)) + if len(updateRow) >= s.cfg.MaxMultiUpdateRowCount { + updateRows = append(updateRows, updateRow) + updateRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) } } } @@ -439,7 +426,7 @@ func (s *mysqlBackend) batchSingleTxnDmls( tableInfo *timodel.TableInfo, translateToInsert bool, ) (sqls []string, values [][]interface{}) { - insertRows, updateRows, deleteRows := s.groupRowsByType(event, tableInfo, !translateToInsert) + insertRows, updateRows, deleteRows := s.groupRowsByType(event, tableInfo) // handle delete if len(deleteRows) > 0 { @@ -591,10 +578,20 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { for _, row := range event.Event.Rows { var query string var args []interface{} +<<<<<<< HEAD // If the old value is enabled, is not in safe mode and is an update event, then translate to UPDATE. // NOTICE: Only update events with the old value feature enabled will have both columns and preColumns. if translateToInsert && len(row.PreColumns) != 0 && len(row.Columns) != 0 { query, args = prepareUpdate(quoteTable, row.PreColumns, row.Columns, s.cfg.ForceReplicate) +======= + // Update Event + if len(row.PreColumns) != 0 && len(row.Columns) != 0 { + query, args = prepareUpdate( + quoteTable, + row.GetPreColumns(), + row.GetColumns(), + s.cfg.ForceReplicate) +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) if query != "" { sqls = append(sqls, query) values = append(values, args) @@ -603,12 +600,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { continue } - // Case for update event or delete event. - // For update event: - // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. - // So we will prepare a DELETE SQL here. - // For delete event: - // It will be translated directly into a DELETE SQL. + // Delete Event if len(row.PreColumns) != 0 { query, args = prepareDelete(quoteTable, row.PreColumns, s.cfg.ForceReplicate) if query != "" { @@ -617,14 +609,10 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { } } - // Case for update event or insert event. - // For update event: - // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. - // So we will prepare a REPLACE SQL here. - // For insert event: + // Insert Event // It will be translated directly into a - // INSERT(old value is enabled and not in safe mode) - // or REPLACE(old value is disabled or in safe mode) SQL. + // INSERT(not in safe mode) + // or REPLACE(in safe mode) SQL. if len(row.Columns) != 0 { query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */, translateToInsert) if query != "" { @@ -669,7 +657,7 @@ func (s *mysqlBackend) multiStmtExecute( _, execError := tx.ExecContext(ctx, multiStmtSQL, multiStmtArgs...) if execError != nil { err := logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, execError), + wrapMysqlTxnError(execError), start, s.changefeed, multiStmtSQL, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { @@ -716,7 +704,7 @@ func (s *mysqlBackend) sequenceExecute( } if execError != nil { err := logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, execError), + wrapMysqlTxnError(execError), start, s.changefeed, query, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { @@ -748,20 +736,49 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare writeTimeout += networkDriftDuration failpoint.Inject("MySQLSinkTxnRandomError", func() { - fmt.Printf("start to random error") + log.Warn("inject MySQLSinkTxnRandomError") err := logDMLTxnErr(errors.Trace(driver.ErrBadConn), start, s.changefeed, "failpoint", 0, nil) failpoint.Return(err) }) failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(pctx, time.Hour) }) + failpoint.Inject("MySQLDuplicateEntryError", func() { + log.Warn("inject MySQLDuplicateEntryError") + err := logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLDuplicateEntry, &dmysql.MySQLError{ + Number: uint16(mysql.ErrDupEntry), + Message: "Duplicate entry", + }), start, s.changefeed, "failpoint", 0, nil) + failpoint.Return(err) + }) err := s.statistics.RecordBatchExecution(func() (int, int64, error) { tx, err := s.db.BeginTx(pctx, nil) if err != nil { return 0, 0, logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, err), + wrapMysqlTxnError(err), start, s.changefeed, "BEGIN", dmls.rowCount, dmls.startTs) } +<<<<<<< HEAD +======= + // Set session variables first and then execute the transaction. + // we try to set write source for each txn, + // so we can use it to trace the data source + if err = pmysql.SetWriteSource(pctx, s.cfg, tx); err != nil { + err := logDMLTxnErr( + wrapMysqlTxnError(err), + start, s.changefeed, + fmt.Sprintf("SET SESSION %s = %d", "tidb_cdc_write_source", + s.cfg.SourceID), + dmls.rowCount, dmls.startTs) + if rbErr := tx.Rollback(); rbErr != nil { + if errors.Cause(rbErr) != context.Canceled { + log.Warn("failed to rollback txn", zap.String("changefeed", s.changefeed), zap.Error(rbErr)) + } + } + return 0, 0, err + } + +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) // If interplated SQL size exceeds maxAllowedPacket, mysql driver will // fall back to the sequantial way. // error can be ErrPrepareMulti, ErrBadConn etc. @@ -799,7 +816,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare if err = tx.Commit(); err != nil { return 0, 0, logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, err), + wrapMysqlTxnError(err), start, s.changefeed, "COMMIT", dmls.rowCount, dmls.startTs) } return dmls.rowCount, dmls.approximateSize, nil @@ -818,6 +835,18 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare retry.WithIsRetryableErr(isRetryableDMLError)) } +func wrapMysqlTxnError(err error) error { + errCode, ok := getSQLErrCode(err) + if !ok { + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + switch errCode { + case mysql.ErrDupEntry: + return cerror.WrapError(cerror.ErrMySQLDuplicateEntry, err) + } + return cerror.WrapError(cerror.ErrMySQLTxnError, err) +} + func logDMLTxnErr( err error, start time.Time, changefeed string, query string, count int, startTs []model.Ts, @@ -851,7 +880,8 @@ func isRetryableDMLError(err error) bool { } switch errCode { - case mysql.ErrNoSuchTable, mysql.ErrBadDB: + // when meet dup entry error, we don't retry and report the error directly to owner to restart the changefeed. + case mysql.ErrNoSuchTable, mysql.ErrBadDB, mysql.ErrDupEntry: return false } return true diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index c53737abbfa..a0f48fb44c3 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -1871,7 +1871,11 @@ func TestGroupRowsByType(t *testing.T) { } tableInfo := model.BuildTiDBTableInfo(colums, tc.input[0].IndexColumns) ms.cfg.MaxTxnRow = tc.maxTxnRow +<<<<<<< HEAD inserts, updates, deletes := ms.groupRowsByType(event, tableInfo, false) +======= + inserts, updates, deletes := ms.groupRowsByType(event, event.Event.TableInfo) +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) for _, rows := range inserts { require.LessOrEqual(t, len(rows), tc.maxTxnRow) } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 470e9a37b33..48766e0a697 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -475,7 +475,7 @@ func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) { } changefeedID := model.DefaultChangeFeedID("kafka-consumer") - f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil) + f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig, errChan, nil) if err != nil { cancel() return nil, cerror.Trace(err) @@ -492,7 +492,7 @@ func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) { cancel() }() - ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig()) + ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig) if err != nil { cancel() return nil, cerror.Trace(err) diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go index fbc5c7e3bf9..9e055966a8b 100644 --- a/cmd/pulsar-consumer/main.go +++ b/cmd/pulsar-consumer/main.go @@ -37,6 +37,7 @@ import ( eventsinkfactory "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" "github.com/pingcap/tiflow/cdc/sink/tablesink" sutil "github.com/pingcap/tiflow/cdc/sink/util" + cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/logutil" "github.com/pingcap/tiflow/pkg/quotes" @@ -60,6 +61,9 @@ type ConsumerOption struct { protocol config.Protocol enableTiDBExtension bool + // the replicaConfig of the changefeed which produce data to the kafka topic + replicaConfig *config.ReplicaConfig + logPath string logLevel string timezone string @@ -86,6 +90,15 @@ func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) { o.address = strings.Split(upstreamURI.Host, ",") + replicaConfig := config.GetDefaultReplicaConfig() + if configFile != "" { + err := cmdUtil.StrictDecodeFile(configFile, "pulsar consumer", replicaConfig) + if err != nil { + log.Panic("decode config file failed", zap.Error(err)) + } + } + o.replicaConfig = replicaConfig + s := upstreamURI.Query().Get("protocol") if s != "" { protocol, err := config.ParseSinkProtocolFromString(s) @@ -327,7 +340,7 @@ func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) { } changefeedID := model.DefaultChangeFeedID("pulsar-consumer") - f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil) + f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig, errChan, nil) if err != nil { cancel() return nil, errors.Trace(err) @@ -344,7 +357,7 @@ func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) { cancel() }() - ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig()) + ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig) if err != nil { cancel() return nil, errors.Trace(err) diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 83b5e8a1bad..ff0e1210a06 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -191,7 +191,7 @@ func newConsumer(ctx context.Context) (*consumer, error) { stdCtx, model.DefaultChangeFeedID(defaultChangefeedName), downstreamURIStr, - config.GetDefaultReplicaConfig(), + replicaConfig, errCh, nil, ) @@ -201,7 +201,7 @@ func newConsumer(ctx context.Context) (*consumer, error) { } ddlSink, err := ddlfactory.New(ctx, model.DefaultChangeFeedID(defaultChangefeedName), - downstreamURIStr, config.GetDefaultReplicaConfig()) + downstreamURIStr, replicaConfig) if err != nil { log.Error("failed to create ddl sink", zap.Error(err)) return nil, err diff --git a/errors.toml b/errors.toml index 9f454de2dfc..7891af045ad 100755 --- a/errors.toml +++ b/errors.toml @@ -546,6 +546,11 @@ error = ''' MySQL connection error ''' +["CDC:ErrMySQLDuplicateEntry"] +error = ''' +MySQL duplicate entry error +''' + ["CDC:ErrMySQLInvalidConfig"] error = ''' MySQL config invalid diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 4b85db78390..cde03049bc6 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -15,12 +15,15 @@ package applier import ( "context" + "fmt" "net/url" + "os" "time" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/model/codec" "github.com/pingcap/tiflow/cdc/processor/memquota" "github.com/pingcap/tiflow/cdc/redo/reader" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -62,8 +65,9 @@ type RedoApplierConfig struct { // RedoApplier implements a redo log applier type RedoApplier struct { - cfg *RedoApplierConfig - rd reader.RedoLogReader + cfg *RedoApplierConfig + rd reader.RedoLogReader + updateSplitter *updateEventSplitter ddlSink ddlsink.Sink appliedDDLCount uint64 @@ -180,7 +184,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { return row.CommitTs > ddl.CommitTs } - row, err := ra.rd.ReadNextRow(ctx) + row, err := ra.updateSplitter.readNextRow(ctx) if err != nil { return err } @@ -203,7 +207,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { if err := ra.applyRow(row, checkpointTs); err != nil { return err } - if row, err = ra.rd.ReadNextRow(ctx); err != nil { + if row, err = ra.updateSplitter.readNextRow(ctx); err != nil { return err } } @@ -419,6 +423,292 @@ func createRedoReaderImpl(ctx context.Context, cfg *RedoApplierConfig) (reader.R return reader.NewRedoLogReader(ctx, storageType, readerCfg) } +// tempTxnInsertEventStorage is used to store insert events in the same transaction +// once you begin to read events from storage, you should read all events before you write new events +type tempTxnInsertEventStorage struct { + events []*model.RowChangedEvent + // when events num exceed flushThreshold, write all events to file + flushThreshold int + dir string + txnCommitTs model.Ts + + useFileStorage bool + // eventSizes is used to store the size of each event in file storage + eventSizes []int + writingFile *os.File + readingFile *os.File + // reading is used to indicate whether we are reading events from storage + // this is to ensure that we read all events before write new events + reading bool +} + +const ( + tempStorageFileName = "_insert_storage.tmp" + defaultFlushThreshold = 50 +) + +func newTempTxnInsertEventStorage(flushThreshold int, dir string) *tempTxnInsertEventStorage { + return &tempTxnInsertEventStorage{ + events: make([]*model.RowChangedEvent, 0), + flushThreshold: flushThreshold, + dir: dir, + txnCommitTs: 0, + + useFileStorage: false, + eventSizes: make([]int, 0), + + reading: false, + } +} + +func (t *tempTxnInsertEventStorage) initializeAddEvent(ts model.Ts) { + t.reading = false + t.useFileStorage = false + t.txnCommitTs = ts + t.writingFile = nil + t.readingFile = nil +} + +func (t *tempTxnInsertEventStorage) addEvent(event *model.RowChangedEvent) error { + // do some pre check + if !event.IsInsert() { + log.Panic("event is not insert event", zap.Any("event", event)) + } + if t.reading && t.hasEvent() { + log.Panic("should read all events before write new event") + } + if !t.hasEvent() { + t.initializeAddEvent(event.CommitTs) + } else { + if t.txnCommitTs != event.CommitTs { + log.Panic("commit ts of events should be the same", + zap.Uint64("commitTs", event.CommitTs), + zap.Uint64("txnCommitTs", t.txnCommitTs)) + } + } + + if t.useFileStorage { + return t.writeEventsToFile(event) + } + + t.events = append(t.events, event) + if len(t.events) >= t.flushThreshold { + err := t.writeEventsToFile(t.events...) + if err != nil { + return err + } + t.events = t.events[:0] + } + return nil +} + +func (t *tempTxnInsertEventStorage) writeEventsToFile(events ...*model.RowChangedEvent) error { + if !t.useFileStorage { + t.useFileStorage = true + var err error + t.writingFile, err = os.Create(fmt.Sprintf("%s/%s", t.dir, tempStorageFileName)) + if err != nil { + return err + } + } + for _, event := range events { + redoLog := event.ToRedoLog() + data, err := codec.MarshalRedoLog(redoLog, nil) + if err != nil { + return errors.WrapError(errors.ErrMarshalFailed, err) + } + t.eventSizes = append(t.eventSizes, len(data)) + _, err = t.writingFile.Write(data) + if err != nil { + return err + } + } + return nil +} + +func (t *tempTxnInsertEventStorage) hasEvent() bool { + return len(t.events) > 0 || len(t.eventSizes) > 0 +} + +func (t *tempTxnInsertEventStorage) readFromFile() (*model.RowChangedEvent, error) { + if len(t.eventSizes) == 0 { + return nil, nil + } + if t.readingFile == nil { + var err error + t.readingFile, err = os.Open(fmt.Sprintf("%s/%s", t.dir, tempStorageFileName)) + if err != nil { + return nil, err + } + } + size := t.eventSizes[0] + data := make([]byte, size) + n, err := t.readingFile.Read(data) + if err != nil { + return nil, err + } + if n != size { + return nil, errors.New("read size not equal to expected size") + } + t.eventSizes = t.eventSizes[1:] + redoLog, _, err := codec.UnmarshalRedoLog(data) + if err != nil { + return nil, errors.WrapError(errors.ErrUnmarshalFailed, err) + } + return redoLog.RedoRow.Row.ToRowChangedEvent(), nil +} + +func (t *tempTxnInsertEventStorage) readNextEvent() (*model.RowChangedEvent, error) { + if !t.hasEvent() { + return nil, nil + } + t.reading = true + if t.useFileStorage { + return t.readFromFile() + } + + event := t.events[0] + t.events = t.events[1:] + return event, nil +} + +// updateEventSplitter splits an update event to a delete event and a deferred insert event +// when the update event is an update to the handle key or the non empty unique key. +// deferred insert event means all delete events and update events in the same transaction are emitted before this insert event +type updateEventSplitter struct { + rd reader.RedoLogReader + rdFinished bool + tempStorage *tempTxnInsertEventStorage + prevTxnCommitTs model.Ts + // pendingEvent is the event that trigger the process to emit events from tempStorage, it can be + // 1) an insert event in the same transaction(because there will be no more update and delete events in the same transaction) + // 2) a new event in the next transaction + pendingEvent *model.RowChangedEvent + // meetInsertInCurTxn is used to indicate whether we meet an insert event in the current transaction + // this is to add some check to ensure that insert events are emitted after other kinds of events in the same transaction + meetInsertInCurTxn bool +} + +func newUpdateEventSplitter(rd reader.RedoLogReader, dir string) *updateEventSplitter { + return &updateEventSplitter{ + rd: rd, + rdFinished: false, + tempStorage: newTempTxnInsertEventStorage(defaultFlushThreshold, dir), + prevTxnCommitTs: 0, + } +} + +// processEvent return (event to emit, pending event) +func processEvent( + event *model.RowChangedEvent, + prevTxnCommitTs model.Ts, + tempStorage *tempTxnInsertEventStorage, +) (*model.RowChangedEvent, *model.RowChangedEvent, error) { + if event == nil { + log.Panic("event should not be nil") + } + + // meet a new transaction + if prevTxnCommitTs != 0 && prevTxnCommitTs != event.CommitTs { + if tempStorage.hasEvent() { + // emit the insert events in the previous transaction + return nil, event, nil + } + } + if event.IsDelete() { + return event, nil, nil + } else if event.IsInsert() { + if tempStorage.hasEvent() { + // pend current event and emit the insert events in temp storage first to release memory + return nil, event, nil + } + return event, nil, nil + } else if !model.ShouldSplitUpdateEvent(event) { + return event, nil, nil + } else { + deleteEvent, insertEvent, err := model.SplitUpdateEvent(event) + if err != nil { + return nil, nil, err + } + err = tempStorage.addEvent(insertEvent) + if err != nil { + return nil, nil, err + } + return deleteEvent, nil, nil + } +} + +func (u *updateEventSplitter) checkEventOrder(event *model.RowChangedEvent) { + if event == nil { + return + } + if event.CommitTs > u.prevTxnCommitTs { + u.meetInsertInCurTxn = false + return + } + if event.IsInsert() { + u.meetInsertInCurTxn = true + } else { + // delete or update events + if u.meetInsertInCurTxn { + log.Panic("insert events should be emitted after other kinds of events in the same transaction") + } + } +} + +func (u *updateEventSplitter) readNextRow(ctx context.Context) (*model.RowChangedEvent, error) { + for { + // case 1: pendingEvent is not nil, emit all events from tempStorage and then process pendingEvent + if u.pendingEvent != nil { + if u.tempStorage.hasEvent() { + return u.tempStorage.readNextEvent() + } + var event *model.RowChangedEvent + var err error + event, u.pendingEvent, err = processEvent(u.pendingEvent, u.prevTxnCommitTs, u.tempStorage) + if err != nil { + return nil, err + } + if event == nil || u.pendingEvent != nil { + log.Panic("processEvent return wrong result for pending event", + zap.Any("event", event), + zap.Any("pendingEvent", u.pendingEvent)) + } + return event, nil + } + // case 2: no more events from RedoLogReader, emit all events from tempStorage and return nil + if u.rdFinished { + if u.tempStorage.hasEvent() { + return u.tempStorage.readNextEvent() + } + return nil, nil + } + // case 3: read and process events from RedoLogReader + event, err := u.rd.ReadNextRow(ctx) + if err != nil { + return nil, err + } + if event == nil { + u.rdFinished = true + } else { + u.checkEventOrder(event) + prevTxnCommitTS := u.prevTxnCommitTs + u.prevTxnCommitTs = event.CommitTs + var err error + event, u.pendingEvent, err = processEvent(event, prevTxnCommitTS, u.tempStorage) + if err != nil { + return nil, err + } + if event != nil { + return event, nil + } + if u.pendingEvent == nil { + log.Panic("event to emit and pending event cannot all be nil") + } + } + } +} + // ReadMeta creates a new redo applier and read meta from reader func (ra *RedoApplier) ReadMeta(ctx context.Context) (checkpointTs uint64, resolvedTs uint64, err error) { rd, err := createRedoReader(ctx, ra.cfg) @@ -438,6 +728,7 @@ func (ra *RedoApplier) Apply(egCtx context.Context) (err error) { eg.Go(func() error { return ra.rd.Run(egCtx) }) + ra.updateSplitter = newUpdateEventSplitter(ra.rd, ra.cfg.Dir) ra.memQuota = memquota.NewMemQuota(model.DefaultChangeFeedID(applierChangefeed), config.DefaultChangefeedMemoryQuota, "sink") diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index ce9f87cedb5..e9f4008f3c4 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" "fmt" + "os" "testing" "github.com/DATA-DOG/go-sqlmock" @@ -142,11 +143,19 @@ func TestApply(t *testing.T) { }, }, }, + // update event which doesn't modify handle key { +<<<<<<< HEAD StartTs: 1200, CommitTs: resolvedTs, Table: &model.TableName{Schema: "test", Table: "t1"}, PreColumns: []*model.Column{ +======= + StartTs: 1120, + CommitTs: 1220, + TableInfo: tableInfo, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) { Name: "a", Value: 1, @@ -158,6 +167,71 @@ func TestApply(t *testing.T) { }, }, Columns: []*model.Column{ + { + Name: "a", + Value: 1, + }, { + Name: "b", + Value: "3", + }, + }, tableInfo), + }, + { + StartTs: 1150, + CommitTs: 1250, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 10, + }, { + Name: "b", + Value: "20", + }, + }, tableInfo), + }, + { + StartTs: 1150, + CommitTs: 1250, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 100, + }, { + Name: "b", + Value: "200", + }, + }, tableInfo), + }, + { + StartTs: 1200, + CommitTs: resolvedTs, + TableInfo: tableInfo, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 10, + }, { + Name: "b", + Value: "20", + }, + }, tableInfo), + }, + { + StartTs: 1200, + CommitTs: resolvedTs, + TableInfo: tableInfo, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 1, + }, { + Name: "b", + Value: "3", + }, + }, tableInfo), + Columns: model.Columns2ColumnDatas([]*model.Column{ { Name: "a", Value: 2, @@ -169,6 +243,29 @@ func TestApply(t *testing.T) { }, }, }, + { + StartTs: 1200, + CommitTs: resolvedTs, + TableInfo: tableInfo, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 100, + }, { + Name: "b", + Value: "200", + }, + }, tableInfo), + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: 200, + }, { + Name: "b", + Value: "300", + }, + }, tableInfo), + }, } for _, dml := range dmls { redoLogCh <- dml @@ -201,13 +298,200 @@ func TestApply(t *testing.T) { close(redoLogCh) close(ddlEventCh) + dir, err := os.Getwd() + require.Nil(t, err) cfg := &RedoApplierConfig{ SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1" + "&tidb_placement_mode=ignore&safe-mode=true&cache-prep-stmts=false" + "&multi-stmt-enable=false", + Dir: dir, } ap := NewRedoApplier(cfg) - err := ap.Apply(ctx) + err = ap.Apply(ctx) + require.Nil(t, err) +} + +func TestApplyBigTxn(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + checkpointTs := uint64(1000) + resolvedTs := uint64(2000) + redoLogCh := make(chan *model.RowChangedEvent, 1024) + ddlEventCh := make(chan *model.DDLEvent, 1024) + createMockReader := func(ctx context.Context, cfg *RedoApplierConfig) (reader.RedoLogReader, error) { + return NewMockReader(checkpointTs, resolvedTs, redoLogCh, ddlEventCh), nil + } + + dbIndex := 0 + // DML sink and DDL sink share the same db + db := getMockDBForBigTxn(t) + mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex%2 == 0 { + testDB, err := pmysql.MockTestDB() + require.Nil(t, err) + return testDB, nil + } + return db, nil + } + + getDMLDBConnBak := txn.GetDBConnImpl + txn.GetDBConnImpl = mockGetDBConn + getDDLDBConnBak := mysqlDDL.GetDBConnImpl + mysqlDDL.GetDBConnImpl = mockGetDBConn + createRedoReaderBak := createRedoReader + createRedoReader = createMockReader + defer func() { + createRedoReader = createRedoReaderBak + txn.GetDBConnImpl = getDMLDBConnBak + mysqlDDL.GetDBConnImpl = getDDLDBConnBak + }() + + tableInfo := model.BuildTableInfo("test", "t1", []*model.Column{ + { + Name: "a", + Type: mysqlParser.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + }, { + Name: "b", + Type: mysqlParser.TypeString, + Flag: 0, + }, + }, [][]int{{0}}) + dmls := make([]*model.RowChangedEvent, 0) + // insert some rows + for i := 1; i <= 100; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1100, + CommitTs: 1200, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: i, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i+1), + }, + }, tableInfo), + } + dmls = append(dmls, dml) + } + // update + for i := 1; i <= 100; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1200, + CommitTs: 1300, + TableInfo: tableInfo, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: i, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i+1), + }, + }, tableInfo), + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: i * 10, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*10+1), + }, + }, tableInfo), + } + dmls = append(dmls, dml) + } + // delete and update + for i := 1; i <= 50; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1300, + CommitTs: resolvedTs, + TableInfo: tableInfo, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: i * 10, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*10+1), + }, + }, tableInfo), + } + dmls = append(dmls, dml) + } + for i := 51; i <= 100; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1300, + CommitTs: resolvedTs, + TableInfo: tableInfo, + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: i * 10, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*10+1), + }, + }, tableInfo), + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "a", + Value: i * 100, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*100+1), + }, + }, tableInfo), + } + dmls = append(dmls, dml) + } + for _, dml := range dmls { + redoLogCh <- dml + } + ddls := []*model.DDLEvent{ + { + CommitTs: checkpointTs, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", Table: "checkpoint", + }, + }, + Query: "create table checkpoint(id int)", + Type: timodel.ActionCreateTable, + }, + { + CommitTs: resolvedTs, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", Table: "resolved", + }, + }, + Query: "create table resolved(id int not null unique key)", + Type: timodel.ActionCreateTable, + }, + } + for _, ddl := range ddls { + ddlEventCh <- ddl + } + close(redoLogCh) + close(ddlEventCh) + + dir, err := os.Getwd() + require.Nil(t, err) + cfg := &RedoApplierConfig{ + SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1" + + "&tidb_placement_mode=ignore&safe-mode=true&cache-prep-stmts=false" + + "&multi-stmt-enable=false", + Dir: dir, + } + ap := NewRedoApplier(cfg) + err = ap.Apply(ctx) require.Nil(t, err) } @@ -257,14 +541,117 @@ func getMockDB(t *testing.T) *sql.DB { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() + mock.ExpectBegin() + mock.ExpectExec("UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? LIMIT 1"). + WithArgs(1, "3", 1). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(10, "20"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(100, "200"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + // First, apply row which commitTs equal to resolvedTs mock.ExpectBegin() +<<<<<<< HEAD mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ? AND `b` = ?)"). WithArgs(1, "2"). +======= + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ?)"). + WithArgs(10). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ?)"). + WithArgs(1). +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ?)"). + WithArgs(100). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). WithArgs(2, "3"). WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(200, "300"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + // Then, apply ddl which commitTs equal to resolvedTs + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table resolved(id int not null unique key)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectClose() + return db +} + +func getMockDBForBigTxn(t *testing.T) *sql.DB { + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.Nil(t, err) + + // Before we write data to downstream, we need to check whether the downstream is TiDB. + // So we mock a select tidb_version() query. + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table checkpoint(id int)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectBegin() + for i := 1; i <= 100; i++ { + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(i, fmt.Sprintf("%d", i+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + mock.ExpectCommit() + + mock.ExpectBegin() + for i := 1; i <= 100; i++ { + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ?)"). + WithArgs(i). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + for i := 1; i <= 100; i++ { + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(i*10, fmt.Sprintf("%d", i*10+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + mock.ExpectCommit() + + // First, apply row which commitTs equal to resolvedTs + mock.ExpectBegin() + for i := 1; i <= 100; i++ { + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ?)"). + WithArgs(i * 10). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + for i := 51; i <= 100; i++ { + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(i*100, fmt.Sprintf("%d", i*100+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } mock.ExpectCommit() // Then, apply ddl which commitTs equal to resolvedTs diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 69bb2bcc90d..37bb069ad28 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -328,6 +328,10 @@ var ( "MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError"), ) + ErrMySQLDuplicateEntry = errors.Normalize( + "MySQL duplicate entry error", + errors.RFCCodeText("CDC:ErrMySQLDuplicateEntry"), + ) ErrMySQLQueryError = errors.Normalize( "MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError"), diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 7b36ecdee8c..21b3859f60e 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -122,6 +122,25 @@ func RFCCode(err error) (errors.RFCErrorCode, bool) { return RFCCode(cause) } +// IsDupEntryError checks if an error is a duplicate entry error. +func IsDupEntryError(err error) bool { + if err == nil { + return false + } + if ErrMySQLDuplicateEntry.Equal(err) { + return true + } + if code, ok := RFCCode(err); ok { + if code == ErrMySQLDuplicateEntry.RFCCode() { + return true + } + } + if strings.Contains(err.Error(), string(ErrMySQLDuplicateEntry.RFCCode())) { + return true + } + return false +} + // IsRetryableError check the error is safe or worth to retry func IsRetryableError(err error) bool { if err == nil { diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index 4e31a4ae6de..599e2a28f7e 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -234,6 +234,8 @@ func (b *BatchDecoder) buildColumns( switch mysqlType { case mysql.TypeJSON: value = string(value.([]uint8)) + case mysql.TypeBit: + value = common.MustBinaryLiteralToInt(value.([]uint8)) } column := &model.Column{ diff --git a/tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml b/tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml new file mode 100644 index 00000000000..d8b18e0ec27 --- /dev/null +++ b/tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/changefeed_dup_error_restart/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["changefeed_dup_error_restart.usertable"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/changefeed_dup_error_restart/conf/workload b/tests/integration_tests/changefeed_dup_error_restart/conf/workload new file mode 100644 index 00000000000..7649fd9afb7 --- /dev/null +++ b/tests/integration_tests/changefeed_dup_error_restart/conf/workload @@ -0,0 +1,13 @@ +threadcount=2 +recordcount=10 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/changefeed_dup_error_restart/run.sh b/tests/integration_tests/changefeed_dup_error_restart/run.sh new file mode 100755 index 00000000000..e0756731a91 --- /dev/null +++ b/tests/integration_tests/changefeed_dup_error_restart/run.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function run() { + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_sql "CREATE DATABASE changefeed_dup_error_restart;" + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_dup_error_restart + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLDuplicateEntryError=5%return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="ticdc-changefeed-dup-error-restart-test-$RANDOM" + case $SINK_TYPE in + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + + run_sql "CREATE TABLE changefeed_dup_error_restart.finish_mark_1 (a int primary key);" + sleep 30 + check_table_exists "changefeed_dup_error_restart.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_dup_error_restart + run_sql "CREATE TABLE changefeed_dup_error_restart.finish_mark_2 (a int primary key);" + sleep 30 + check_table_exists "changefeed_dup_error_restart.finish_mark_2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/force_replicate_table/run.sh b/tests/integration_tests/force_replicate_table/run.sh index 5fc5a55c3bb..3ea4e6a7a6b 100755 --- a/tests/integration_tests/force_replicate_table/run.sh +++ b/tests/integration_tests/force_replicate_table/run.sh @@ -68,9 +68,13 @@ function run() { cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $CUR/conf/changefeed.toml ;; + kafka) run_kafka_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml ;; storage) run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" ;; +<<<<<<< HEAD pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;; +======= + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --config $CUR/conf/changefeed.toml ;; +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) esac run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml b/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml index df222cf6fa2..8ef310dcd8c 100644 --- a/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml +++ b/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml @@ -7,7 +7,7 @@ export-fix-sql = true check-struct-only = false [task] -output-dir = "/tmp/tidb_cdc_test/simple_handle_key_only_avro/output" +output-dir = "/tmp/tidb_cdc_test/kafka_simple_handle_key_only_avro/output" source-instances = ["mysql1"] diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 4d72681a771..52b81f2ed9d 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -10,8 +10,13 @@ group=$2 # Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant # changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence # multi_cdc_cluster capture_suicide_while_balance_table +<<<<<<< HEAD mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide server_config_compatibility" mysql_only_http="http_api http_api_tls api_v2" +======= +mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide server_config_compatibility changefeed_dup_error_restart" +mysql_only_http="http_api http_api_tls api_v2 http_api_tls_with_user_auth cli_tls_with_auth" +>>>>>>> c710066a51 (*(ticdc): split old update kv entry after restarting changefeed (#10919)) mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table" kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback mq_sink_dispatcher kafka_column_selector kafka_column_selector_avro"