diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 10a9857e703..201c4b4f494 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -93,6 +93,11 @@ type RawKVEntry struct { RegionID uint64 `msg:"region_id"` } +// IsUpdate checks if the event is an update event. +func (v *RawKVEntry) IsUpdate() bool { + return v.OpType == OpTypePut && v.OldValue != nil && v.Value != nil +} + func (v *RawKVEntry) String() string { // TODO: redact values. return fmt.Sprintf( diff --git a/cdc/model/sink.go b/cdc/model/sink.go index d3e0fc70cb1..a3b0f3fd9f3 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -800,18 +800,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { // Whether split a single update event into delete and insert events? // -// For the MySQL Sink, there is no need to split a single unique key changed update event, this -// is also to keep the backward compatibility, the same behavior as before. +// For the MySQL Sink, we don't split any update event. +// This may cause error like "duplicate entry" when sink to the downstream. +// This kind of error will cause the changefeed to restart, +// and then the related update rows will be splitted to insert and delete at puller side. // // For the Kafka and Storage sink, always split a single unique key changed update event, since: // 1. Avro and CSV does not output the previous column values for the update event, so it would // cause consumer missing data if the unique key changed event is not split. // 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { - if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) { - return false - } - return true + return !sink.IsMySQLCompatibleScheme(sinkScheme) } // trySplitAndSortUpdateEvent try to split update events if unique key is updated @@ -841,8 +840,8 @@ func trySplitAndSortUpdateEvent( // This indicates that it is an update event. if the pk or uk is updated, // we need to split it into two events (delete and insert). - if e.IsUpdate() && shouldSplitUpdateEvent(e) { - deleteEvent, insertEvent, err := splitUpdateEvent(e) + if e.IsUpdate() && ShouldSplitUpdateEvent(e) { + deleteEvent, insertEvent, err := SplitUpdateEvent(e) if err != nil { return nil, errors.Trace(err) } @@ -859,10 +858,10 @@ func trySplitAndSortUpdateEvent( return rowChangedEvents, nil } -// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on +// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on // whether the handle key column or unique key has been modified. // If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. -func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { +func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { // nil event will never be split. if updateEvent == nil { return false @@ -884,8 +883,8 @@ func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { return false } -// splitUpdateEvent splits an update event into a delete and an insert event. -func splitUpdateEvent( +// SplitUpdateEvent splits an update event into a delete and an insert event. +func SplitUpdateEvent( updateEvent *RowChangedEvent, ) (*RowChangedEvent, *RowChangedEvent, error) { if updateEvent == nil { diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index 4089db24525..3f4a03cf2b0 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -445,7 +445,7 @@ func TestTrySplitAndSortUpdateEventEmpty(t *testing.T) { require.Equal(t, 0, len(result)) } -func TestTrySplitAndSortUpdateEvent(t *testing.T) { +func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) { t.Parallel() // Update handle key. @@ -608,4 +608,11 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) require.NoError(t, err) require.Len(t, txn.Rows, 1) + + txn2 := &SingleTableTxn{ + Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent}, + } + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + require.NoError(t, err) + require.Len(t, txn2.Rows, 2) } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index b078887fc22..7b44d3c078e 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "net/url" "strconv" "sync" "time" @@ -40,6 +41,7 @@ import ( "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -470,6 +472,18 @@ func isProcessorIgnorableError(err error) bool { return false } +// needPullerSafeModeAtStart returns true if the scheme is mysql compatible. +// pullerSafeMode means to split all update kv entries whose commitTS +// is older then the start time of this changefeed. +func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + return sink.IsMySQLCompatibleScheme(scheme), nil +} + // Tick implements the `orchestrator.State` interface // the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot. // the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot. @@ -619,9 +633,14 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { return errors.Trace(err) } + pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI) + if err != nil { + return errors.Trace(err) + } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, - sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode)) + sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode), + pullerSafeModeAtStart) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 7e17ba9e600..4e3a14042bd 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -297,6 +297,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Duration("cost", time.Since(start))) + + // For duplicate entry error, we fast fail to restart changefeed. + if cerror.IsDupEntryError(err) { + return errors.Trace(err) + } } // If the error is retryable, we should retry to re-establish the internal resources. diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 3035076993f..f2765e0f596 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -17,6 +17,7 @@ import ( "context" "time" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/kv" @@ -28,10 +29,14 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/puller" "github.com/pingcap/tiflow/pkg/config" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/txnutil" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -43,6 +48,8 @@ type pullerWrapperCreator func( tableName string, startTs model.Ts, bdrMode bool, + shouldSplitKVEntry pullerwrapper.ShouldSplitKVEntry, + splitUpdateKVEntry pullerwrapper.SplitUpdateKVEntry, ) pullerwrapper.Wrapper type tablePullers struct { @@ -71,10 +78,13 @@ type SourceManager struct { engine engine.SortEngine // Used to indicate whether the changefeed is in BDR mode. bdrMode bool + // startTs is the timestamp when SourceManager starts. + startTs model.Ts // if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers` // will be used. Otherwise `multiplexingPuller` will be used instead. multiplexing bool + safeModeAtStart bool tablePullers tablePullers multiplexingPuller multiplexingPuller } @@ -86,9 +96,10 @@ func New( mg entry.MounterGroup, engine engine.SortEngine, bdrMode bool, + safeModeAtStart bool, ) *SourceManager { multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing - return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper) + return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, safeModeAtStart, pullerwrapper.NewPullerWrapper) } // NewForTest creates a new source manager for testing. @@ -99,7 +110,24 @@ func NewForTest( engine engine.SortEngine, bdrMode bool, ) *SourceManager { - return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, pullerwrapper.NewPullerWrapperForTest) + return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, false, pullerwrapper.NewPullerWrapperForTest) +} + +func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool { + return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs +} + +func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) { + if raw == nil { + return nil, nil, errors.New("nil event cannot be split") + } + deleteKVEntry := *raw + deleteKVEntry.Value = nil + + insertKVEntry := *raw + insertKVEntry.OldValue = nil + + return &deleteKVEntry, &insertKVEntry, nil } func newSourceManager( @@ -109,16 +137,18 @@ func newSourceManager( engine engine.SortEngine, bdrMode bool, multiplexing bool, + safeModeAtStart bool, pullerWrapperCreator pullerWrapperCreator, ) *SourceManager { mgr := &SourceManager{ - ready: make(chan struct{}), - changefeedID: changefeedID, - up: up, - mg: mg, - engine: engine, - bdrMode: bdrMode, - multiplexing: multiplexing, + ready: make(chan struct{}), + changefeedID: changefeedID, + up: up, + mg: mg, + engine: engine, + bdrMode: bdrMode, + multiplexing: multiplexing, + safeModeAtStart: safeModeAtStart, } if !multiplexing { mgr.tablePullers.errChan = make(chan error, 16) @@ -137,7 +167,10 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo return } - p := m.tablePullers.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode) + shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { + return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs) + } + p := m.tablePullers.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode, shouldSplitKVEntry, splitUpdateKVEntry) p.Start(m.tablePullers.ctx, m.up, m.engine, m.tablePullers.errChan) m.tablePullers.Store(span, p) } @@ -198,6 +231,11 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats // Run implements util.Runnable. func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { + startTs, err := getCurrentTs(ctx, m.up.PDClient) + if err != nil { + return err + } + m.startTs = startTs if m.multiplexing { serverConfig := config.GetGlobalServerConfig() grpcPool := sharedconn.NewConnAndClientPool(m.up.SecurityConfig, kv.GetGlobalGrpcMetrics()) @@ -206,9 +244,14 @@ func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { m.up.PDClient, grpcPool, m.up.RegionCache, m.up.PDClock, txnutil.NewLockerResolver(m.up.KVStorage.(tikv.Storage), m.changefeedID), ) + shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { + return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs) + } m.multiplexingPuller.puller = pullerwrapper.NewMultiplexingPullerWrapper( m.changefeedID, client, m.engine, int(serverConfig.KVClient.FrontierConcurrent), + shouldSplitKVEntry, + splitUpdateKVEntry, ) close(m.ready) @@ -266,3 +309,23 @@ func (m *SourceManager) Close() { func (m *SourceManager) Add(span tablepb.Span, events ...*model.PolymorphicEvent) { m.engine.Add(span, events...) } + +func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { + backoffBaseDelayInMs := int64(100) + totalRetryDuration := 10 * time.Second + var replicateTs model.Ts + err := retry.Do(ctx, func() error { + phy, logic, err := pdClient.GetTS(ctx) + if err != nil { + return errors.Trace(err) + } + replicateTs = oracle.ComposeTS(phy, logic) + return nil + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithTotalRetryDuratoin(totalRetryDuration), + retry.WithIsRetryableErr(cerrors.IsRetryableError)) + if err != nil { + return model.Ts(0), errors.Trace(err) + } + return replicateTs, nil +} diff --git a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go index 8607184a9a3..847eb171594 100644 --- a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go @@ -34,6 +34,8 @@ func NewPullerWrapperForTest( tableName string, startTs model.Ts, bdrMode bool, + shouldSplitKVEntry ShouldSplitKVEntry, + splitUpdateKVEntry SplitUpdateKVEntry, ) Wrapper { return &dummyPullerWrapper{} } diff --git a/cdc/processor/sourcemanager/puller/multiplexing_puller_wrapper.go b/cdc/processor/sourcemanager/puller/multiplexing_puller_wrapper.go index c02759277ae..115f02b8297 100644 --- a/cdc/processor/sourcemanager/puller/multiplexing_puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/multiplexing_puller_wrapper.go @@ -37,6 +37,8 @@ func NewMultiplexingPullerWrapper( client *kv.SharedClient, eventSortEngine engine.SortEngine, frontiers int, + shouldSplitKVEntry ShouldSplitKVEntry, + splitUpdateKVEntry SplitUpdateKVEntry, ) *MultiplexingWrapper { consume := func(ctx context.Context, raw *model.RawKVEntry, spans []tablepb.Span) error { if len(spans) > 1 { @@ -45,8 +47,18 @@ func NewMultiplexingPullerWrapper( zap.String("changefeed", changefeed.ID)) } if raw != nil { - pEvent := model.NewPolymorphicEvent(raw) - eventSortEngine.Add(spans[0], pEvent) + if shouldSplitKVEntry(raw) { + deleteKVEntry, insertKVEntry, err := splitUpdateKVEntry(raw) + if err != nil { + return err + } + deleteEvent := model.NewPolymorphicEvent(deleteKVEntry) + insertEvent := model.NewPolymorphicEvent(insertKVEntry) + eventSortEngine.Add(spans[0], deleteEvent, insertEvent) + } else { + pEvent := model.NewPolymorphicEvent(raw) + eventSortEngine.Add(spans[0], pEvent) + } } return nil } diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 06749fab252..1fa332e0dc3 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -40,6 +40,12 @@ type Wrapper interface { Close() } +// ShouldSplitKVEntry checks whether the raw kv entry should be splitted. +type ShouldSplitKVEntry func(raw *model.RawKVEntry) bool + +// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry. +type SplitUpdateKVEntry func(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) + // WrapperImpl is a wrapper of puller used by source manager. type WrapperImpl struct { changefeed model.ChangeFeedID @@ -49,6 +55,9 @@ type WrapperImpl struct { startTs model.Ts bdrMode bool + shouldSplitKVEntry ShouldSplitKVEntry + splitUpdateKVEntry SplitUpdateKVEntry + // cancel is used to cancel the puller when remove or close the table. cancel context.CancelFunc // eg is used to wait the puller to exit. @@ -62,13 +71,17 @@ func NewPullerWrapper( tableName string, startTs model.Ts, bdrMode bool, + shouldSplitKVEntry ShouldSplitKVEntry, + splitUpdateKVEntry SplitUpdateKVEntry, ) Wrapper { return &WrapperImpl{ - changefeed: changefeed, - span: span, - tableName: tableName, - startTs: startTs, - bdrMode: bdrMode, + changefeed: changefeed, + span: span, + tableName: tableName, + startTs: startTs, + bdrMode: bdrMode, + shouldSplitKVEntry: shouldSplitKVEntry, + splitUpdateKVEntry: splitUpdateKVEntry, } } @@ -126,8 +139,18 @@ func (n *WrapperImpl) Start( if rawKV == nil { continue } - pEvent := model.NewPolymorphicEvent(rawKV) - eventSortEngine.Add(n.span, pEvent) + if n.shouldSplitKVEntry(rawKV) { + deleteKVEntry, insertKVEntry, err := n.splitUpdateKVEntry(rawKV) + if err != nil { + return err + } + deleteEvent := model.NewPolymorphicEvent(deleteKVEntry) + insertEvent := model.NewPolymorphicEvent(insertKVEntry) + eventSortEngine.Add(n.span, deleteEvent, insertEvent) + } else { + pEvent := model.NewPolymorphicEvent(rawKV) + eventSortEngine.Add(n.span, pEvent) + } } } }) diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 6fd0dd4c279..502199e191b 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -64,7 +64,7 @@ type SinkFactory struct { category Category } -// New creates a new SinkFactory by schema. +// New creates a new SinkFactory by scheme. func New( ctx context.Context, changefeedID model.ChangeFeedID, @@ -79,8 +79,8 @@ func New( } s := &SinkFactory{} - schema := sink.GetScheme(sinkURI) - switch schema { + scheme := sink.GetScheme(sinkURI) + switch scheme { case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: txnSink, err := txn.NewMySQLSink(ctx, changefeedID, sinkURI, cfg, errCh, txn.DefaultConflictDetectorSlots) @@ -123,7 +123,7 @@ func New( s.category = CategoryMQ default: return nil, - cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema) + cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme) } return s, nil diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index fd57b780443..3271db2945c 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -358,7 +358,6 @@ func convertBinaryToString(cols []*model.Column) { func (s *mysqlBackend) groupRowsByType( event *dmlsink.TxnCallbackableEvent, tableInfo *timodel.TableInfo, - spiltUpdate bool, ) (insertRows, updateRows, deleteRows [][]*sqlmodel.RowChange) { preAllocateSize := len(event.Event.Rows) if preAllocateSize > s.cfg.MaxTxnRow { @@ -394,29 +393,12 @@ func (s *mysqlBackend) groupRowsByType( } if row.IsUpdate() { - if spiltUpdate { - deleteRow = append( - deleteRow, - convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete)) - if len(deleteRow) >= s.cfg.MaxTxnRow { - deleteRows = append(deleteRows, deleteRow) - deleteRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) - } - insertRow = append( - insertRow, - convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert)) - if len(insertRow) >= s.cfg.MaxTxnRow { - insertRows = append(insertRows, insertRow) - insertRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) - } - } else { - updateRow = append( - updateRow, - convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate)) - if len(updateRow) >= s.cfg.MaxMultiUpdateRowCount { - updateRows = append(updateRows, updateRow) - updateRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) - } + updateRow = append( + updateRow, + convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate)) + if len(updateRow) >= s.cfg.MaxMultiUpdateRowCount { + updateRows = append(updateRows, updateRow) + updateRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) } } } @@ -439,7 +421,7 @@ func (s *mysqlBackend) batchSingleTxnDmls( tableInfo *timodel.TableInfo, translateToInsert bool, ) (sqls []string, values [][]interface{}) { - insertRows, updateRows, deleteRows := s.groupRowsByType(event, tableInfo, !translateToInsert) + insertRows, updateRows, deleteRows := s.groupRowsByType(event, tableInfo) // handle delete if len(deleteRows) > 0 { @@ -593,10 +575,13 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { for _, row := range event.Event.Rows { var query string var args []interface{} - // If the old value is enabled, is not in safe mode and is an update event, then translate to UPDATE. - // NOTICE: Only update events with the old value feature enabled will have both columns and preColumns. - if translateToInsert && len(row.PreColumns) != 0 && len(row.Columns) != 0 { - query, args = prepareUpdate(quoteTable, row.PreColumns, row.Columns, s.cfg.ForceReplicate) + // Update Event + if len(row.PreColumns) != 0 && len(row.Columns) != 0 { + query, args = prepareUpdate( + quoteTable, + row.PreColumns, + row.Columns, + s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) values = append(values, args) @@ -605,12 +590,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { continue } - // Case for update event or delete event. - // For update event: - // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. - // So we will prepare a DELETE SQL here. - // For delete event: - // It will be translated directly into a DELETE SQL. + // Delete Event if len(row.PreColumns) != 0 { query, args = prepareDelete(quoteTable, row.PreColumns, s.cfg.ForceReplicate) if query != "" { @@ -619,14 +599,10 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { } } - // Case for update event or insert event. - // For update event: - // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. - // So we will prepare a REPLACE SQL here. - // For insert event: + // Insert Event // It will be translated directly into a - // INSERT(old value is enabled and not in safe mode) - // or REPLACE(old value is disabled or in safe mode) SQL. + // INSERT(not in safe mode) + // or REPLACE(in safe mode) SQL. if len(row.Columns) != 0 { query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */, translateToInsert) if query != "" { @@ -671,7 +647,7 @@ func (s *mysqlBackend) multiStmtExecute( _, execError := tx.ExecContext(ctx, multiStmtSQL, multiStmtArgs...) if execError != nil { err := logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, execError), + wrapMysqlTxnError(execError), start, s.changefeed, multiStmtSQL, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { @@ -718,7 +694,7 @@ func (s *mysqlBackend) sequenceExecute( } if execError != nil { err := logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, execError), + wrapMysqlTxnError(execError), start, s.changefeed, query, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { @@ -750,17 +726,25 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare writeTimeout += networkDriftDuration failpoint.Inject("MySQLSinkTxnRandomError", func() { - fmt.Printf("start to random error") + log.Warn("inject MySQLSinkTxnRandomError") err := logDMLTxnErr(errors.Trace(driver.ErrBadConn), start, s.changefeed, "failpoint", 0, nil) failpoint.Return(err) }) failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(pctx, time.Hour) }) + failpoint.Inject("MySQLDuplicateEntryError", func() { + log.Warn("inject MySQLDuplicateEntryError") + err := logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLDuplicateEntry, &dmysql.MySQLError{ + Number: uint16(mysql.ErrDupEntry), + Message: "Duplicate entry", + }), start, s.changefeed, "failpoint", 0, nil) + failpoint.Return(err) + }) err := s.statistics.RecordBatchExecution(func() (int, int64, error) { tx, err := s.db.BeginTx(pctx, nil) if err != nil { return 0, 0, logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, err), + wrapMysqlTxnError(err), start, s.changefeed, "BEGIN", dmls.rowCount, dmls.startTs) } @@ -801,7 +785,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare if err = tx.Commit(); err != nil { return 0, 0, logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, err), + wrapMysqlTxnError(err), start, s.changefeed, "COMMIT", dmls.rowCount, dmls.startTs) } return dmls.rowCount, dmls.approximateSize, nil @@ -820,6 +804,18 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare retry.WithIsRetryableErr(isRetryableDMLError)) } +func wrapMysqlTxnError(err error) error { + errCode, ok := getSQLErrCode(err) + if !ok { + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + switch errCode { + case mysql.ErrDupEntry: + return cerror.WrapError(cerror.ErrMySQLDuplicateEntry, err) + } + return cerror.WrapError(cerror.ErrMySQLTxnError, err) +} + func logDMLTxnErr( err error, start time.Time, changefeed string, query string, count int, startTs []model.Ts, @@ -853,7 +849,8 @@ func isRetryableDMLError(err error) bool { } switch errCode { - case mysql.ErrNoSuchTable, mysql.ErrBadDB: + // when meet dup entry error, we don't retry and report the error directly to owner to restart the changefeed. + case mysql.ErrNoSuchTable, mysql.ErrBadDB, mysql.ErrDupEntry: return false } return true diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index 45f9f5be48e..e60adf9e962 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -1872,7 +1872,7 @@ func TestGroupRowsByType(t *testing.T) { } tableInfo := model.BuildTiDBTableInfo(colums, tc.input[0].IndexColumns) ms.cfg.MaxTxnRow = tc.maxTxnRow - inserts, updates, deletes := ms.groupRowsByType(event, tableInfo, false) + inserts, updates, deletes := ms.groupRowsByType(event, tableInfo) for _, rows := range inserts { require.LessOrEqual(t, len(rows), tc.maxTxnRow) } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 470e9a37b33..48766e0a697 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -475,7 +475,7 @@ func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) { } changefeedID := model.DefaultChangeFeedID("kafka-consumer") - f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil) + f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig, errChan, nil) if err != nil { cancel() return nil, cerror.Trace(err) @@ -492,7 +492,7 @@ func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) { cancel() }() - ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig()) + ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig) if err != nil { cancel() return nil, cerror.Trace(err) diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go index fbc5c7e3bf9..9e055966a8b 100644 --- a/cmd/pulsar-consumer/main.go +++ b/cmd/pulsar-consumer/main.go @@ -37,6 +37,7 @@ import ( eventsinkfactory "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" "github.com/pingcap/tiflow/cdc/sink/tablesink" sutil "github.com/pingcap/tiflow/cdc/sink/util" + cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/logutil" "github.com/pingcap/tiflow/pkg/quotes" @@ -60,6 +61,9 @@ type ConsumerOption struct { protocol config.Protocol enableTiDBExtension bool + // the replicaConfig of the changefeed which produce data to the kafka topic + replicaConfig *config.ReplicaConfig + logPath string logLevel string timezone string @@ -86,6 +90,15 @@ func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) { o.address = strings.Split(upstreamURI.Host, ",") + replicaConfig := config.GetDefaultReplicaConfig() + if configFile != "" { + err := cmdUtil.StrictDecodeFile(configFile, "pulsar consumer", replicaConfig) + if err != nil { + log.Panic("decode config file failed", zap.Error(err)) + } + } + o.replicaConfig = replicaConfig + s := upstreamURI.Query().Get("protocol") if s != "" { protocol, err := config.ParseSinkProtocolFromString(s) @@ -327,7 +340,7 @@ func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) { } changefeedID := model.DefaultChangeFeedID("pulsar-consumer") - f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil) + f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig, errChan, nil) if err != nil { cancel() return nil, errors.Trace(err) @@ -344,7 +357,7 @@ func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) { cancel() }() - ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig()) + ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig) if err != nil { cancel() return nil, errors.Trace(err) diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 83b5e8a1bad..ff0e1210a06 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -191,7 +191,7 @@ func newConsumer(ctx context.Context) (*consumer, error) { stdCtx, model.DefaultChangeFeedID(defaultChangefeedName), downstreamURIStr, - config.GetDefaultReplicaConfig(), + replicaConfig, errCh, nil, ) @@ -201,7 +201,7 @@ func newConsumer(ctx context.Context) (*consumer, error) { } ddlSink, err := ddlfactory.New(ctx, model.DefaultChangeFeedID(defaultChangefeedName), - downstreamURIStr, config.GetDefaultReplicaConfig()) + downstreamURIStr, replicaConfig) if err != nil { log.Error("failed to create ddl sink", zap.Error(err)) return nil, err diff --git a/errors.toml b/errors.toml index 9f454de2dfc..7891af045ad 100755 --- a/errors.toml +++ b/errors.toml @@ -546,6 +546,11 @@ error = ''' MySQL connection error ''' +["CDC:ErrMySQLDuplicateEntry"] +error = ''' +MySQL duplicate entry error +''' + ["CDC:ErrMySQLInvalidConfig"] error = ''' MySQL config invalid diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 4b85db78390..51859757f2f 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -15,12 +15,15 @@ package applier import ( "context" + "fmt" "net/url" + "os" "time" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/model/codec" "github.com/pingcap/tiflow/cdc/processor/memquota" "github.com/pingcap/tiflow/cdc/redo/reader" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -62,8 +65,9 @@ type RedoApplierConfig struct { // RedoApplier implements a redo log applier type RedoApplier struct { - cfg *RedoApplierConfig - rd reader.RedoLogReader + cfg *RedoApplierConfig + rd reader.RedoLogReader + updateSplitter *updateEventSplitter ddlSink ddlsink.Sink appliedDDLCount uint64 @@ -180,7 +184,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { return row.CommitTs > ddl.CommitTs } - row, err := ra.rd.ReadNextRow(ctx) + row, err := ra.updateSplitter.readNextRow(ctx) if err != nil { return err } @@ -203,7 +207,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { if err := ra.applyRow(row, checkpointTs); err != nil { return err } - if row, err = ra.rd.ReadNextRow(ctx); err != nil { + if row, err = ra.updateSplitter.readNextRow(ctx); err != nil { return err } } @@ -256,6 +260,7 @@ func (ra *RedoApplier) resetQuota(rowSize uint64) error { } else if ra.pendingQuota < 64*1024 { ra.pendingQuota = 64 * 1024 } + log.Info("reset quota", zap.Uint64("rowSize", rowSize), zap.Uint64("newQuota", ra.pendingQuota)) return ra.memQuota.BlockAcquire(ra.pendingQuota - oldQuota) } @@ -419,6 +424,292 @@ func createRedoReaderImpl(ctx context.Context, cfg *RedoApplierConfig) (reader.R return reader.NewRedoLogReader(ctx, storageType, readerCfg) } +// tempTxnInsertEventStorage is used to store insert events in the same transaction +// once you begin to read events from storage, you should read all events before you write new events +type tempTxnInsertEventStorage struct { + events []*model.RowChangedEvent + // when events num exceed flushThreshold, write all events to file + flushThreshold int + dir string + txnCommitTs model.Ts + + useFileStorage bool + // eventSizes is used to store the size of each event in file storage + eventSizes []int + writingFile *os.File + readingFile *os.File + // reading is used to indicate whether we are reading events from storage + // this is to ensure that we read all events before write new events + reading bool +} + +const ( + tempStorageFileName = "_insert_storage.tmp" + defaultFlushThreshold = 50 +) + +func newTempTxnInsertEventStorage(flushThreshold int, dir string) *tempTxnInsertEventStorage { + return &tempTxnInsertEventStorage{ + events: make([]*model.RowChangedEvent, 0), + flushThreshold: flushThreshold, + dir: dir, + txnCommitTs: 0, + + useFileStorage: false, + eventSizes: make([]int, 0), + + reading: false, + } +} + +func (t *tempTxnInsertEventStorage) initializeAddEvent(ts model.Ts) { + t.reading = false + t.useFileStorage = false + t.txnCommitTs = ts + t.writingFile = nil + t.readingFile = nil +} + +func (t *tempTxnInsertEventStorage) addEvent(event *model.RowChangedEvent) error { + // do some pre check + if !event.IsInsert() { + log.Panic("event is not insert event", zap.Any("event", event)) + } + if t.reading && t.hasEvent() { + log.Panic("should read all events before write new event") + } + if !t.hasEvent() { + t.initializeAddEvent(event.CommitTs) + } else { + if t.txnCommitTs != event.CommitTs { + log.Panic("commit ts of events should be the same", + zap.Uint64("commitTs", event.CommitTs), + zap.Uint64("txnCommitTs", t.txnCommitTs)) + } + } + + if t.useFileStorage { + return t.writeEventsToFile(event) + } + + t.events = append(t.events, event) + if len(t.events) >= t.flushThreshold { + err := t.writeEventsToFile(t.events...) + if err != nil { + return err + } + t.events = t.events[:0] + } + return nil +} + +func (t *tempTxnInsertEventStorage) writeEventsToFile(events ...*model.RowChangedEvent) error { + if !t.useFileStorage { + t.useFileStorage = true + var err error + t.writingFile, err = os.Create(fmt.Sprintf("%s/%s", t.dir, tempStorageFileName)) + if err != nil { + return err + } + } + for _, event := range events { + redoLog := event.ToRedoLog() + data, err := codec.MarshalRedoLog(redoLog, nil) + if err != nil { + return errors.WrapError(errors.ErrMarshalFailed, err) + } + t.eventSizes = append(t.eventSizes, len(data)) + _, err = t.writingFile.Write(data) + if err != nil { + return err + } + } + return nil +} + +func (t *tempTxnInsertEventStorage) hasEvent() bool { + return len(t.events) > 0 || len(t.eventSizes) > 0 +} + +func (t *tempTxnInsertEventStorage) readFromFile() (*model.RowChangedEvent, error) { + if len(t.eventSizes) == 0 { + return nil, nil + } + if t.readingFile == nil { + var err error + t.readingFile, err = os.Open(fmt.Sprintf("%s/%s", t.dir, tempStorageFileName)) + if err != nil { + return nil, err + } + } + size := t.eventSizes[0] + data := make([]byte, size) + n, err := t.readingFile.Read(data) + if err != nil { + return nil, err + } + if n != size { + return nil, errors.New("read size not equal to expected size") + } + t.eventSizes = t.eventSizes[1:] + redoLog, _, err := codec.UnmarshalRedoLog(data) + if err != nil { + return nil, errors.WrapError(errors.ErrUnmarshalFailed, err) + } + return redoLog.RedoRow.Row, nil +} + +func (t *tempTxnInsertEventStorage) readNextEvent() (*model.RowChangedEvent, error) { + if !t.hasEvent() { + return nil, nil + } + t.reading = true + if t.useFileStorage { + return t.readFromFile() + } + + event := t.events[0] + t.events = t.events[1:] + return event, nil +} + +// updateEventSplitter splits an update event to a delete event and a deferred insert event +// when the update event is an update to the handle key or the non empty unique key. +// deferred insert event means all delete events and update events in the same transaction are emitted before this insert event +type updateEventSplitter struct { + rd reader.RedoLogReader + rdFinished bool + tempStorage *tempTxnInsertEventStorage + prevTxnCommitTs model.Ts + // pendingEvent is the event that trigger the process to emit events from tempStorage, it can be + // 1) an insert event in the same transaction(because there will be no more update and delete events in the same transaction) + // 2) a new event in the next transaction + pendingEvent *model.RowChangedEvent + // meetInsertInCurTxn is used to indicate whether we meet an insert event in the current transaction + // this is to add some check to ensure that insert events are emitted after other kinds of events in the same transaction + meetInsertInCurTxn bool +} + +func newUpdateEventSplitter(rd reader.RedoLogReader, dir string) *updateEventSplitter { + return &updateEventSplitter{ + rd: rd, + rdFinished: false, + tempStorage: newTempTxnInsertEventStorage(defaultFlushThreshold, dir), + prevTxnCommitTs: 0, + } +} + +// processEvent return (event to emit, pending event) +func processEvent( + event *model.RowChangedEvent, + prevTxnCommitTs model.Ts, + tempStorage *tempTxnInsertEventStorage, +) (*model.RowChangedEvent, *model.RowChangedEvent, error) { + if event == nil { + log.Panic("event should not be nil") + } + + // meet a new transaction + if prevTxnCommitTs != 0 && prevTxnCommitTs != event.CommitTs { + if tempStorage.hasEvent() { + // emit the insert events in the previous transaction + return nil, event, nil + } + } + if event.IsDelete() { + return event, nil, nil + } else if event.IsInsert() { + if tempStorage.hasEvent() { + // pend current event and emit the insert events in temp storage first to release memory + return nil, event, nil + } + return event, nil, nil + } else if !model.ShouldSplitUpdateEvent(event) { + return event, nil, nil + } else { + deleteEvent, insertEvent, err := model.SplitUpdateEvent(event) + if err != nil { + return nil, nil, err + } + err = tempStorage.addEvent(insertEvent) + if err != nil { + return nil, nil, err + } + return deleteEvent, nil, nil + } +} + +func (u *updateEventSplitter) checkEventOrder(event *model.RowChangedEvent) { + if event == nil { + return + } + if event.CommitTs > u.prevTxnCommitTs { + u.meetInsertInCurTxn = false + return + } + if event.IsInsert() { + u.meetInsertInCurTxn = true + } else { + // delete or update events + if u.meetInsertInCurTxn { + log.Panic("insert events should be emitted after other kinds of events in the same transaction") + } + } +} + +func (u *updateEventSplitter) readNextRow(ctx context.Context) (*model.RowChangedEvent, error) { + for { + // case 1: pendingEvent is not nil, emit all events from tempStorage and then process pendingEvent + if u.pendingEvent != nil { + if u.tempStorage.hasEvent() { + return u.tempStorage.readNextEvent() + } + var event *model.RowChangedEvent + var err error + event, u.pendingEvent, err = processEvent(u.pendingEvent, u.prevTxnCommitTs, u.tempStorage) + if err != nil { + return nil, err + } + if event == nil || u.pendingEvent != nil { + log.Panic("processEvent return wrong result for pending event", + zap.Any("event", event), + zap.Any("pendingEvent", u.pendingEvent)) + } + return event, nil + } + // case 2: no more events from RedoLogReader, emit all events from tempStorage and return nil + if u.rdFinished { + if u.tempStorage.hasEvent() { + return u.tempStorage.readNextEvent() + } + return nil, nil + } + // case 3: read and process events from RedoLogReader + event, err := u.rd.ReadNextRow(ctx) + if err != nil { + return nil, err + } + if event == nil { + u.rdFinished = true + } else { + u.checkEventOrder(event) + prevTxnCommitTS := u.prevTxnCommitTs + u.prevTxnCommitTs = event.CommitTs + var err error + event, u.pendingEvent, err = processEvent(event, prevTxnCommitTS, u.tempStorage) + if err != nil { + return nil, err + } + if event != nil { + return event, nil + } + if u.pendingEvent == nil { + log.Panic("event to emit and pending event cannot all be nil") + } + } + } +} + // ReadMeta creates a new redo applier and read meta from reader func (ra *RedoApplier) ReadMeta(ctx context.Context) (checkpointTs uint64, resolvedTs uint64, err error) { rd, err := createRedoReader(ctx, ra.cfg) @@ -438,6 +729,7 @@ func (ra *RedoApplier) Apply(egCtx context.Context) (err error) { eg.Go(func() error { return ra.rd.Run(egCtx) }) + ra.updateSplitter = newUpdateEventSplitter(ra.rd, ra.cfg.Dir) ra.memQuota = memquota.NewMemQuota(model.DefaultChangeFeedID(applierChangefeed), config.DefaultChangefeedMemoryQuota, "sink") diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index 8687e3fea02..b252ec07f77 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" "fmt" + "os" "testing" "github.com/DATA-DOG/go-sqlmock" @@ -142,6 +143,82 @@ func TestApply(t *testing.T) { }, }, }, + // update event which doesn't modify handle key + { + StartTs: 1120, + CommitTs: 1220, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: 1, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "2", + Flag: 0, + }, + }, + Columns: []*model.Column{ + { + Name: "a", + Value: 1, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "3", + Flag: 0, + }, + }, + }, + { + StartTs: 1150, + CommitTs: 1250, + Table: &model.TableName{Schema: "test", Table: "t1"}, + Columns: []*model.Column{ + { + Name: "a", + Value: 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "20", + Flag: 0, + }, + }, + }, + { + StartTs: 1150, + CommitTs: 1250, + Table: &model.TableName{Schema: "test", Table: "t1"}, + Columns: []*model.Column{ + { + Name: "a", + Value: 100, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "200", + Flag: 0, + }, + }, + }, + { + StartTs: 1200, + CommitTs: resolvedTs, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "20", + Flag: 0, + }, + }, + }, { StartTs: 1200, CommitTs: resolvedTs, @@ -153,7 +230,7 @@ func TestApply(t *testing.T) { Flag: model.HandleKeyFlag, }, { Name: "b", - Value: "2", + Value: "3", Flag: 0, }, }, @@ -169,6 +246,218 @@ func TestApply(t *testing.T) { }, }, }, + { + StartTs: 1200, + CommitTs: resolvedTs, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: 100, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "200", + Flag: 0, + }, + }, + Columns: []*model.Column{ + { + Name: "a", + Value: 200, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "300", + Flag: 0, + }, + }, + }, + } + for _, dml := range dmls { + redoLogCh <- dml + } + ddls := []*model.DDLEvent{ + { + CommitTs: checkpointTs, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", Table: "checkpoint", + }, + }, + Query: "create table checkpoint(id int)", + Type: timodel.ActionCreateTable, + }, + { + CommitTs: resolvedTs, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", Table: "resolved", + }, + }, + Query: "create table resolved(id int not null unique key)", + Type: timodel.ActionCreateTable, + }, + } + for _, ddl := range ddls { + ddlEventCh <- ddl + } + close(redoLogCh) + close(ddlEventCh) + + dir, err := os.Getwd() + require.Nil(t, err) + cfg := &RedoApplierConfig{ + SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1" + + "&tidb_placement_mode=ignore&safe-mode=true&cache-prep-stmts=false" + + "&multi-stmt-enable=false", + Dir: dir, + } + ap := NewRedoApplier(cfg) + err = ap.Apply(ctx) + require.Nil(t, err) +} + +func TestApplyBigTxn(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + checkpointTs := uint64(1000) + resolvedTs := uint64(2000) + redoLogCh := make(chan *model.RowChangedEvent, 1024) + ddlEventCh := make(chan *model.DDLEvent, 1024) + createMockReader := func(ctx context.Context, cfg *RedoApplierConfig) (reader.RedoLogReader, error) { + return NewMockReader(checkpointTs, resolvedTs, redoLogCh, ddlEventCh), nil + } + + dbIndex := 0 + // DML sink and DDL sink share the same db + db := getMockDBForBigTxn(t) + mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex%2 == 0 { + testDB, err := pmysql.MockTestDB(true) + require.Nil(t, err) + return testDB, nil + } + return db, nil + } + + getDMLDBConnBak := txn.GetDBConnImpl + txn.GetDBConnImpl = mockGetDBConn + getDDLDBConnBak := mysqlDDL.GetDBConnImpl + mysqlDDL.GetDBConnImpl = mockGetDBConn + createRedoReaderBak := createRedoReader + createRedoReader = createMockReader + defer func() { + createRedoReader = createRedoReaderBak + txn.GetDBConnImpl = getDMLDBConnBak + mysqlDDL.GetDBConnImpl = getDDLDBConnBak + }() + + dmls := make([]*model.RowChangedEvent, 0) + // insert some rows + for i := 1; i <= 60; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1100, + CommitTs: 1200, + Table: &model.TableName{Schema: "test", Table: "t1"}, + Columns: []*model.Column{ + { + Name: "a", + Value: i, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i+1), + Flag: 0, + }, + }, + } + dmls = append(dmls, dml) + } + // update + for i := 1; i <= 60; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1200, + CommitTs: 1300, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: i, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i+1), + Flag: 0, + }, + }, + Columns: []*model.Column{ + { + Name: "a", + Value: i * 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*10+1), + Flag: 0, + }, + }, + } + dmls = append(dmls, dml) + } + // delete and update + for i := 1; i <= 30; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1300, + CommitTs: resolvedTs, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: i * 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*10+1), + Flag: 0, + }, + }, + } + dmls = append(dmls, dml) + } + for i := 31; i <= 60; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1300, + CommitTs: resolvedTs, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: i * 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*10+1), + Flag: 0, + }, + }, + Columns: []*model.Column{ + { + Name: "a", + Value: i * 100, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*100+1), + Flag: 0, + }, + }, + } + dmls = append(dmls, dml) } for _, dml := range dmls { redoLogCh <- dml @@ -191,7 +480,7 @@ func TestApply(t *testing.T) { Schema: "test", Table: "resolved", }, }, - Query: "create table resolved(id int)", + Query: "create table resolved(id int not null unique key)", Type: timodel.ActionCreateTable, }, } @@ -201,13 +490,16 @@ func TestApply(t *testing.T) { close(redoLogCh) close(ddlEventCh) + dir, err := os.Getwd() + require.Nil(t, err) cfg := &RedoApplierConfig{ SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1" + "&tidb_placement_mode=ignore&safe-mode=true&cache-prep-stmts=false" + "&multi-stmt-enable=false", + Dir: dir, } ap := NewRedoApplier(cfg) - err := ap.Apply(ctx) + err = ap.Apply(ctx) require.Nil(t, err) } @@ -257,20 +549,114 @@ func getMockDB(t *testing.T) *sql.DB { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() + mock.ExpectBegin() + mock.ExpectExec("UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? LIMIT 1"). + WithArgs(1, "3", 1). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(10, "20"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(100, "200"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + // First, apply row which commitTs equal to resolvedTs mock.ExpectBegin() - mock.ExpectExec("DELETE FROM `test`.`t1` WHERE `a` = ? LIMIT 1"). - WithArgs(1). + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ? AND `b` = ?)"). + WithArgs(10, "20"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ? AND `b` = ?)"). + WithArgs(1, "3"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ? AND `b` = ?)"). + WithArgs(100, "200"). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). WithArgs(2, "3"). WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(200, "300"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + // Then, apply ddl which commitTs equal to resolvedTs + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table resolved(id int not null unique key)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectClose() + return db +} + +func getMockDBForBigTxn(t *testing.T) *sql.DB { + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.Nil(t, err) + + // Before we write data to downstream, we need to check whether the downstream is TiDB. + // So we mock a select tidb_version() query. + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table checkpoint(id int)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectBegin() + for i := 1; i <= 60; i++ { + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(i, fmt.Sprintf("%d", i+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + mock.ExpectCommit() + + mock.ExpectBegin() + for i := 1; i <= 60; i++ { + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ? AND `b` = ?)"). + WithArgs(i, fmt.Sprintf("%d", i+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + for i := 1; i <= 60; i++ { + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(i*10, fmt.Sprintf("%d", i*10+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + mock.ExpectCommit() + + // First, apply row which commitTs equal to resolvedTs + mock.ExpectBegin() + for i := 1; i <= 60; i++ { + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a` = ? AND `b` = ?)"). + WithArgs(i*10, fmt.Sprintf("%d", i*10+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + for i := 31; i <= 60; i++ { + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(i*100, fmt.Sprintf("%d", i*100+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } mock.ExpectCommit() // Then, apply ddl which commitTs equal to resolvedTs mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("create table resolved(id int)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table resolved(id int not null unique key)").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() mock.ExpectClose() diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 69bb2bcc90d..37bb069ad28 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -328,6 +328,10 @@ var ( "MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError"), ) + ErrMySQLDuplicateEntry = errors.Normalize( + "MySQL duplicate entry error", + errors.RFCCodeText("CDC:ErrMySQLDuplicateEntry"), + ) ErrMySQLQueryError = errors.Normalize( "MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError"), diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 7b36ecdee8c..21b3859f60e 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -122,6 +122,25 @@ func RFCCode(err error) (errors.RFCErrorCode, bool) { return RFCCode(cause) } +// IsDupEntryError checks if an error is a duplicate entry error. +func IsDupEntryError(err error) bool { + if err == nil { + return false + } + if ErrMySQLDuplicateEntry.Equal(err) { + return true + } + if code, ok := RFCCode(err); ok { + if code == ErrMySQLDuplicateEntry.RFCCode() { + return true + } + } + if strings.Contains(err.Error(), string(ErrMySQLDuplicateEntry.RFCCode())) { + return true + } + return false +} + // IsRetryableError check the error is safe or worth to retry func IsRetryableError(err error) bool { if err == nil { diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index 4e31a4ae6de..599e2a28f7e 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -234,6 +234,8 @@ func (b *BatchDecoder) buildColumns( switch mysqlType { case mysql.TypeJSON: value = string(value.([]uint8)) + case mysql.TypeBit: + value = common.MustBinaryLiteralToInt(value.([]uint8)) } column := &model.Column{ diff --git a/tests/integration_tests/_utils/run_pulsar_consumer b/tests/integration_tests/_utils/run_pulsar_consumer index 6e305689fec..aae219aa8a9 100755 --- a/tests/integration_tests/_utils/run_pulsar_consumer +++ b/tests/integration_tests/_utils/run_pulsar_consumer @@ -8,7 +8,8 @@ set -e workdir=$1 changefeed_sink_uri=$2 -log_suffix=$3 +consumer_replica_config=$3 +log_suffix=$4 pwd=$pwd echo "[$(date)] <<<<<< START Pulsar consumer in $TEST_NAME case >>>>>>" @@ -18,7 +19,11 @@ cd $workdir # we need to set `batch-dml-enable` to false to avoid data inconsistency. downstream_uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false" +consumer_replica_config_args= +if [ "$consumer_replica_config" != "" ]; then + consumer_replica_config_args=" --config $consumer_replica_config" +fi # output debug log to allow us to check the consumer's behavior when it encounters errors -cdc_pulsar_consumer --log-file $workdir/cdc_pulsar_consumer$log_suffix.log --log-level debug --upstream-uri $changefeed_sink_uri --downstream-uri ${downstream_uri} >>$workdir/cdc_pulsar_consumer_stdout$log_suffix.log 2>&1 & +cdc_pulsar_consumer --log-file $workdir/cdc_pulsar_consumer$log_suffix.log --log-level debug --upstream-uri $changefeed_sink_uri --downstream-uri ${downstream_uri} $consumer_replica_config_args >>$workdir/cdc_pulsar_consumer_stdout$log_suffix.log 2>&1 & cd $pwd diff --git a/tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml b/tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml new file mode 100644 index 00000000000..d8b18e0ec27 --- /dev/null +++ b/tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/changefeed_dup_error_restart/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["changefeed_dup_error_restart.usertable"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/changefeed_dup_error_restart/conf/workload b/tests/integration_tests/changefeed_dup_error_restart/conf/workload new file mode 100644 index 00000000000..7649fd9afb7 --- /dev/null +++ b/tests/integration_tests/changefeed_dup_error_restart/conf/workload @@ -0,0 +1,13 @@ +threadcount=2 +recordcount=10 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/changefeed_dup_error_restart/run.sh b/tests/integration_tests/changefeed_dup_error_restart/run.sh new file mode 100755 index 00000000000..e0756731a91 --- /dev/null +++ b/tests/integration_tests/changefeed_dup_error_restart/run.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function run() { + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_sql "CREATE DATABASE changefeed_dup_error_restart;" + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_dup_error_restart + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLDuplicateEntryError=5%return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="ticdc-changefeed-dup-error-restart-test-$RANDOM" + case $SINK_TYPE in + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + + run_sql "CREATE TABLE changefeed_dup_error_restart.finish_mark_1 (a int primary key);" + sleep 30 + check_table_exists "changefeed_dup_error_restart.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_dup_error_restart + run_sql "CREATE TABLE changefeed_dup_error_restart.finish_mark_2 (a int primary key);" + sleep 30 + check_table_exists "changefeed_dup_error_restart.finish_mark_2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/force_replicate_table/run.sh b/tests/integration_tests/force_replicate_table/run.sh index 5fc5a55c3bb..d124997c58e 100755 --- a/tests/integration_tests/force_replicate_table/run.sh +++ b/tests/integration_tests/force_replicate_table/run.sh @@ -68,9 +68,9 @@ function run() { cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $CUR/conf/changefeed.toml ;; + kafka) run_kafka_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml ;; storage) run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" ;; - pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;; + pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml ;; esac run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff --git a/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml b/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml index df222cf6fa2..8ef310dcd8c 100644 --- a/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml +++ b/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml @@ -7,7 +7,7 @@ export-fix-sql = true check-struct-only = false [task] -output-dir = "/tmp/tidb_cdc_test/simple_handle_key_only_avro/output" +output-dir = "/tmp/tidb_cdc_test/kafka_simple_handle_key_only_avro/output" source-instances = ["mysql1"] diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 4d72681a771..3ea5514a956 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -10,7 +10,7 @@ group=$2 # Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant # changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence # multi_cdc_cluster capture_suicide_while_balance_table -mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide server_config_compatibility" +mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide server_config_compatibility changefeed_dup_error_restart" mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"