diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 10a9857e703..201c4b4f494 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -93,6 +93,11 @@ type RawKVEntry struct { RegionID uint64 `msg:"region_id"` } +// IsUpdate checks if the event is an update event. +func (v *RawKVEntry) IsUpdate() bool { + return v.OpType == OpTypePut && v.OldValue != nil && v.Value != nil +} + func (v *RawKVEntry) String() string { // TODO: redact values. return fmt.Sprintf( diff --git a/cdc/model/sink.go b/cdc/model/sink.go index d1e685f2768..64c8244a7c6 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -772,18 +772,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { // Whether split a single update event into delete and insert events? // -// For the MySQL Sink, there is no need to split a single unique key changed update event, this -// is also to keep the backward compatibility, the same behavior as before. +// For the MySQL Sink, we don't split any update event. +// This may cause error like "duplicate entry" when sink to the downstream. +// This kind of error will cause the changefeed to restart, +// and then the related update rows will be splitted to insert and delete at puller side. // // For the Kafka and Storage sink, always split a single unique key changed update event, since: // 1. Avro and CSV does not output the previous column values for the update event, so it would // cause consumer missing data if the unique key changed event is not split. // 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. func (t *SingleTableTxn) dontSplitUpdateEvent(scheme string) bool { - if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(scheme) { - return true - } - return false + return sink.IsMySQLCompatibleScheme(scheme) } // trySplitAndSortUpdateEvent try to split update events if unique key is updated @@ -813,8 +812,8 @@ func trySplitAndSortUpdateEvent( // This indicates that it is an update event. if the pk or uk is updated, // we need to split it into two events (delete and insert). - if e.IsUpdate() && shouldSplitUpdateEvent(e) { - deleteEvent, insertEvent, err := splitUpdateEvent(e) + if e.IsUpdate() && ShouldSplitUpdateEvent(e) { + deleteEvent, insertEvent, err := SplitUpdateEvent(e) if err != nil { return nil, errors.Trace(err) } @@ -831,10 +830,10 @@ func trySplitAndSortUpdateEvent( return rowChangedEvents, nil } -// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on +// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on // whether the handle key column or unique key has been modified. // If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. -func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { +func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { // nil event will never be split. if updateEvent == nil { return false @@ -856,8 +855,8 @@ func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { return false } -// splitUpdateEvent splits an update event into a delete and an insert event. -func splitUpdateEvent( +// SplitUpdateEvent splits an update event into a delete and an insert event. +func SplitUpdateEvent( updateEvent *RowChangedEvent, ) (*RowChangedEvent, *RowChangedEvent, error) { if updateEvent == nil { diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index e0a0aed9d14..31f05b8d679 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -20,6 +20,7 @@ import ( timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" + "github.com/pingcap/tiflow/pkg/sink" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -563,3 +564,55 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(result)) } + +var ukUpdatedEvent = &RowChangedEvent{ + PreColumns: []*Column{ + { + Name: "col1", + Flag: BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: HandleKeyFlag | UniqueKeyFlag, + Value: "col2-value", + }, + }, + + Columns: []*Column{ + { + Name: "col1", + Flag: BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: HandleKeyFlag | UniqueKeyFlag, + Value: "col2-value-updated", + }, + }, +} + +func TestTrySplitAndSortUpdateEventOne(t *testing.T) { + txn := &SingleTableTxn{ + Rows: []*RowChangedEvent{ukUpdatedEvent}, + } + + err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme) + require.NoError(t, err) + require.Len(t, txn.Rows, 2) + + txn = &SingleTableTxn{ + Rows: []*RowChangedEvent{ukUpdatedEvent}, + } + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) + + txn2 := &SingleTableTxn{ + Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent}, + } + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + require.NoError(t, err) + require.Len(t, txn2.Rows, 2) +} diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index cb8a79a4a00..251839dc8f2 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "net/url" "strconv" "sync" "time" @@ -42,6 +43,7 @@ import ( "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -445,6 +447,18 @@ func isProcessorIgnorableError(err error) bool { return false } +// needPullerSafeModeAtStart returns true if the scheme is mysql compatible. +// pullerSafeMode means to split all update kv entries whose commitTS +// is older then the start time of this changefeed. +func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + return sink.IsMySQLCompatibleScheme(scheme), nil +} + // Tick implements the `orchestrator.State` interface // the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd // The main logic of processor is in this function, including the calculation of many kinds of ts, @@ -684,9 +698,14 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { return errors.Trace(err) } + pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.changefeed.Info.SinkURI) + if err != nil { + return errors.Trace(err) + } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, - sortEngine, p.changefeed.Info.Config.BDRMode) + sortEngine, p.changefeed.Info.Config.BDRMode, + pullerSafeModeAtStart) p.sourceManager.name = "SourceManager" p.sourceManager.spawn(stdCtx) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 830797f5254..edaec1c7db9 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -297,6 +297,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Duration("cost", time.Since(start))) + + // For duplicate entry error, we fast fail to restart changefeed. + if cerror.IsDupEntryError(err) { + return errors.Trace(err) + } } // If the error is retryable, we should retry to re-establish the internal resources. diff --git a/cdc/processor/sinkmanager/redo_log_worker_test.go b/cdc/processor/sinkmanager/redo_log_worker_test.go index f0dda7f60b5..b7cce3775cb 100644 --- a/cdc/processor/sinkmanager/redo_log_worker_test.go +++ b/cdc/processor/sinkmanager/redo_log_worker_test.go @@ -62,7 +62,7 @@ func (suite *redoLogWorkerSuite) createWorker( ) (*redoWorker, engine.SortEngine, *mockRedoDMLManager) { sortEngine := memory.New(context.Background()) sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}), - &entry.MockMountGroup{}, sortEngine, false) + &entry.MockMountGroup{}, sortEngine, false, false) go func() { _ = sm.Run(ctx) }() // To avoid refund or release panics. diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 0bb42a1f9ee..586ce0c138c 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -129,7 +129,7 @@ func (suite *tableSinkWorkerSuite) createWorker( ) (*sinkWorker, engine.SortEngine) { sortEngine := memory.New(context.Background()) sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}), - &entry.MockMountGroup{}, sortEngine, false) + &entry.MockMountGroup{}, sortEngine, false, false) go func() { sm.Run(ctx) }() // To avoid refund or release panics. diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 1ef797dda19..c6bd640ec3d 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -17,6 +17,7 @@ import ( "context" "time" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" @@ -25,8 +26,12 @@ import ( pullerwrapper "github.com/pingcap/tiflow/cdc/processor/sourcemanager/puller" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/puller" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -53,6 +58,9 @@ type SourceManager struct { // Used to indicate whether the changefeed is in BDR mode. bdrMode bool + safeModeAtStart bool + startTs model.Ts + // pullerWrapperCreator is used to create a puller wrapper. // Only used for testing. pullerWrapperCreator func(changefeed model.ChangeFeedID, @@ -60,6 +68,8 @@ type SourceManager struct { tableName string, startTs model.Ts, bdrMode bool, + shouldSplitKVEntry pullerwrapper.ShouldSplitKVEntry, + splitUpdateKVEntry pullerwrapper.SplitUpdateKVEntry, ) pullerwrapper.Wrapper } @@ -70,6 +80,7 @@ func New( mg entry.MounterGroup, engine engine.SortEngine, bdrMode bool, + safeModeAtStart bool, ) *SourceManager { return &SourceManager{ ready: make(chan struct{}), @@ -79,6 +90,7 @@ func New( engine: engine, errChan: make(chan error, 16), bdrMode: bdrMode, + safeModeAtStart: safeModeAtStart, pullerWrapperCreator: pullerwrapper.NewPullerWrapper, } } @@ -103,11 +115,31 @@ func NewForTest( } } +func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool { + return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs +} + +func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) { + if raw == nil { + return nil, nil, errors.New("nil event cannot be split") + } + deleteKVEntry := *raw + deleteKVEntry.Value = nil + + insertKVEntry := *raw + insertKVEntry.OldValue = nil + + return &deleteKVEntry, &insertKVEntry, nil +} + // AddTable adds a table to the source manager. Start puller and register table to the engine. func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts) { // Add table to the engine first, so that the engine can receive the events from the puller. m.engine.AddTable(span, startTs) - p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode) + shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { + return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs) + } + p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode, shouldSplitKVEntry, splitUpdateKVEntry) p.Start(m.ctx, m.up, m.engine, m.errChan) m.pullers.Store(span, p) } @@ -159,6 +191,11 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats // Run implements util.Runnable. func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { + startTs, err := getCurrentTs(ctx, m.up.PDClient) + if err != nil { + return err + } + m.startTs = startTs m.ctx = ctx close(m.ready) select { @@ -208,3 +245,23 @@ func (m *SourceManager) Close() { func (m *SourceManager) Add(span tablepb.Span, events ...*model.PolymorphicEvent) { m.engine.Add(span, events...) } + +func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { + backoffBaseDelayInMs := int64(100) + totalRetryDuration := 10 * time.Second + var replicateTs model.Ts + err := retry.Do(ctx, func() error { + phy, logic, err := pdClient.GetTS(ctx) + if err != nil { + return errors.Trace(err) + } + replicateTs = oracle.ComposeTS(phy, logic) + return nil + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithTotalRetryDuratoin(totalRetryDuration), + retry.WithIsRetryableErr(cerrors.IsRetryableError)) + if err != nil { + return model.Ts(0), errors.Trace(err) + } + return replicateTs, nil +} diff --git a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go index 8607184a9a3..847eb171594 100644 --- a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go @@ -34,6 +34,8 @@ func NewPullerWrapperForTest( tableName string, startTs model.Ts, bdrMode bool, + shouldSplitKVEntry ShouldSplitKVEntry, + splitUpdateKVEntry SplitUpdateKVEntry, ) Wrapper { return &dummyPullerWrapper{} } diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 8861dacccac..0cc2863bd50 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -27,6 +27,12 @@ import ( "golang.org/x/sync/errgroup" ) +// ShouldSplitKVEntry checks whether the raw kv entry should be splitted. +type ShouldSplitKVEntry func(raw *model.RawKVEntry) bool + +// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry. +type SplitUpdateKVEntry func(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) + // Wrapper is a wrapper of puller used by source manager. type Wrapper interface { // Start the puller and send internal errors into `errChan`. @@ -49,6 +55,9 @@ type WrapperImpl struct { startTs model.Ts bdrMode bool + shouldSplitKVEntry ShouldSplitKVEntry + splitUpdateKVEntry SplitUpdateKVEntry + // cancel is used to cancel the puller when remove or close the table. cancel context.CancelFunc // eg is used to wait the puller to exit. @@ -62,13 +71,17 @@ func NewPullerWrapper( tableName string, startTs model.Ts, bdrMode bool, + shouldSplitKVEntry ShouldSplitKVEntry, + splitUpdateKVEntry SplitUpdateKVEntry, ) Wrapper { return &WrapperImpl{ - changefeed: changefeed, - span: span, - tableName: tableName, - startTs: startTs, - bdrMode: bdrMode, + changefeed: changefeed, + span: span, + tableName: tableName, + startTs: startTs, + bdrMode: bdrMode, + shouldSplitKVEntry: shouldSplitKVEntry, + splitUpdateKVEntry: splitUpdateKVEntry, } } @@ -127,8 +140,18 @@ func (n *WrapperImpl) Start( if rawKV == nil { continue } - pEvent := model.NewPolymorphicEvent(rawKV) - eventSortEngine.Add(n.span, pEvent) + if n.shouldSplitKVEntry(rawKV) { + deleteKVEntry, insertKVEntry, err := n.splitUpdateKVEntry(rawKV) + if err != nil { + return err + } + deleteEvent := model.NewPolymorphicEvent(deleteKVEntry) + insertEvent := model.NewPolymorphicEvent(insertKVEntry) + eventSortEngine.Add(n.span, deleteEvent, insertEvent) + } else { + pEvent := model.NewPolymorphicEvent(rawKV) + eventSortEngine.Add(n.span, pEvent) + } } } }) diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index a78658a1a41..9138ccb9cd4 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -62,7 +62,7 @@ type SinkFactory struct { category Category } -// New creates a new SinkFactory by schema. +// New creates a new SinkFactory by scheme. func New( ctx context.Context, sinkURIStr string, @@ -76,8 +76,8 @@ func New( } s := &SinkFactory{} - schema := strings.ToLower(sinkURI.Scheme) - switch schema { + scheme := strings.ToLower(sinkURI.Scheme) + switch scheme { case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: txnSink, err := txn.NewMySQLSink(ctx, sinkURI, cfg, errCh, txn.DefaultConflictDetectorSlots) if err != nil { @@ -110,7 +110,7 @@ func New( s.category = CategoryBlackhole default: return nil, - cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema) + cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme) } return s, nil diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 61a8178a050..ae1454ed09a 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -360,7 +360,6 @@ func convertBinaryToString(cols []*model.Column) { func (s *mysqlBackend) groupRowsByType( event *dmlsink.TxnCallbackableEvent, tableInfo *timodel.TableInfo, - spiltUpdate bool, ) (insertRows, updateRows, deleteRows [][]*sqlmodel.RowChange) { preAllocateSize := len(event.Event.Rows) if preAllocateSize > s.cfg.MaxTxnRow { @@ -396,29 +395,12 @@ func (s *mysqlBackend) groupRowsByType( } if row.IsUpdate() { - if spiltUpdate { - deleteRow = append( - deleteRow, - convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete)) - if len(deleteRow) >= s.cfg.MaxTxnRow { - deleteRows = append(deleteRows, deleteRow) - deleteRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) - } - insertRow = append( - insertRow, - convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert)) - if len(insertRow) >= s.cfg.MaxTxnRow { - insertRows = append(insertRows, insertRow) - insertRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) - } - } else { - updateRow = append( - updateRow, - convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate)) - if len(updateRow) >= s.cfg.MaxMultiUpdateRowCount { - updateRows = append(updateRows, updateRow) - updateRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) - } + updateRow = append( + updateRow, + convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate)) + if len(updateRow) >= s.cfg.MaxMultiUpdateRowCount { + updateRows = append(updateRows, updateRow) + updateRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) } } } @@ -441,7 +423,7 @@ func (s *mysqlBackend) batchSingleTxnDmls( tableInfo *timodel.TableInfo, translateToInsert bool, ) (sqls []string, values [][]interface{}) { - insertRows, updateRows, deleteRows := s.groupRowsByType(event, tableInfo, !translateToInsert) + insertRows, updateRows, deleteRows := s.groupRowsByType(event, tableInfo) // handle delete if len(deleteRows) > 0 { @@ -597,9 +579,8 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { for _, row := range event.Event.Rows { var query string var args []interface{} - // If the old value is enabled, is not in safe mode and is an update event, then translate to UPDATE. - // NOTICE: Only update events with the old value feature enabled will have both columns and preColumns. - if translateToInsert && len(row.PreColumns) != 0 && len(row.Columns) != 0 { + // Update Event + if len(row.PreColumns) != 0 && len(row.Columns) != 0 { query, args = prepareUpdate(quoteTable, row.PreColumns, row.Columns, s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) @@ -609,12 +590,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { continue } - // Case for update event or delete event. - // For update event: - // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. - // So we will prepare a DELETE SQL here. - // For delete event: - // It will be translated directly into a DELETE SQL. + // Delete Event if len(row.PreColumns) != 0 { query, args = prepareDelete(quoteTable, row.PreColumns, s.cfg.ForceReplicate) if query != "" { @@ -623,14 +599,10 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { } } - // Case for update event or insert event. - // For update event: - // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. - // So we will prepare a REPLACE SQL here. - // For insert event: + // Insert Event // It will be translated directly into a - // INSERT(old value is enabled and not in safe mode) - // or REPLACE(old value is disabled or in safe mode) SQL. + // INSERT(not in safe mode) + // or REPLACE(in safe mode) SQL. if len(row.Columns) != 0 { query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */, translateToInsert) if query != "" { @@ -675,7 +647,7 @@ func (s *mysqlBackend) multiStmtExecute( _, execError := tx.ExecContext(ctx, multiStmtSQL, multiStmtArgs...) if execError != nil { err := logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, execError), + wrapMysqlTxnError(execError), start, s.changefeed, multiStmtSQL, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { @@ -722,7 +694,7 @@ func (s *mysqlBackend) sequenceExecute( } if execError != nil { err := logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, execError), + wrapMysqlTxnError(execError), start, s.changefeed, query, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { @@ -754,17 +726,25 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare writeTimeout += networkDriftDuration failpoint.Inject("MySQLSinkTxnRandomError", func() { - fmt.Printf("start to random error") + log.Warn("inject MySQLSinkTxnRandomError") err := logDMLTxnErr(errors.Trace(driver.ErrBadConn), start, s.changefeed, "failpoint", 0, nil) failpoint.Return(err) }) failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(pctx, time.Hour) }) + failpoint.Inject("MySQLDuplicateEntryError", func() { + log.Warn("inject MySQLDuplicateEntryError") + err := logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLDuplicateEntry, &dmysql.MySQLError{ + Number: uint16(mysql.ErrDupEntry), + Message: "Duplicate entry", + }), start, s.changefeed, "failpoint", 0, nil) + failpoint.Return(err) + }) err := s.statistics.RecordBatchExecution(func() (int, int64, error) { tx, err := s.db.BeginTx(pctx, nil) if err != nil { return 0, 0, logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, err), + wrapMysqlTxnError(err), start, s.changefeed, "BEGIN", dmls.rowCount, dmls.startTs) } @@ -805,7 +785,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare if err = tx.Commit(); err != nil { return 0, 0, logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, err), + wrapMysqlTxnError(err), start, s.changefeed, "COMMIT", dmls.rowCount, dmls.startTs) } return dmls.rowCount, dmls.approximateSize, nil @@ -824,6 +804,18 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare retry.WithIsRetryableErr(isRetryableDMLError)) } +func wrapMysqlTxnError(err error) error { + errCode, ok := getSQLErrCode(err) + if !ok { + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + switch errCode { + case mysql.ErrDupEntry: + return cerror.WrapError(cerror.ErrMySQLDuplicateEntry, err) + } + return cerror.WrapError(cerror.ErrMySQLTxnError, err) +} + func logDMLTxnErr( err error, start time.Time, changefeed string, query string, count int, startTs []model.Ts, @@ -857,7 +849,8 @@ func isRetryableDMLError(err error) bool { } switch errCode { - case mysql.ErrNoSuchTable, mysql.ErrBadDB: + // when meet dup entry error, we don't retry and report the error directly to owner to restart the changefeed. + case mysql.ErrNoSuchTable, mysql.ErrBadDB, mysql.ErrDupEntry: return false } return true diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index 78dc4b58770..a2a5072f0aa 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -1864,7 +1864,7 @@ func TestGroupRowsByType(t *testing.T) { } tableInfo := model.BuildTiDBTableInfo(colums, tc.input[0].IndexColumns) ms.cfg.MaxTxnRow = tc.maxTxnRow - inserts, updates, deletes := ms.groupRowsByType(event, tableInfo, false) + inserts, updates, deletes := ms.groupRowsByType(event, tableInfo) for _, rows := range inserts { require.LessOrEqual(t, len(rows), tc.maxTxnRow) } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 86f6d2a2d13..bf64633f05c 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -437,7 +437,6 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { return nil, errors.Trace(err) } c.eventRouter = eventRouter - } c.sinks = make([]*partitionSinks, kafkaPartitionNum) @@ -449,10 +448,14 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { partitionNo: i, } } + sinkReplicaConfig := replicaConfig + if sinkReplicaConfig == nil { + sinkReplicaConfig = config.GetDefaultReplicaConfig() + } f, err := eventsinkfactory.New( ctx, downstreamURIStr, - config.GetDefaultReplicaConfig(), + sinkReplicaConfig, errChan, nil, ) @@ -475,7 +478,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { ddlSink, err := ddlsinkfactory.New( ctx, downstreamURIStr, - config.GetDefaultReplicaConfig(), + sinkReplicaConfig, ) if err != nil { cancel() diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index aa95ec46238..93e6b4a839b 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -188,7 +188,7 @@ func newConsumer(ctx context.Context) (*consumer, error) { sinkFactory, err := dmlfactory.New( stdCtx, downstreamURIStr, - config.GetDefaultReplicaConfig(), + replicaConfig, errCh, nil, ) @@ -197,7 +197,7 @@ func newConsumer(ctx context.Context) (*consumer, error) { return nil, err } - ddlSink, err := ddlfactory.New(ctx, downstreamURIStr, config.GetDefaultReplicaConfig()) + ddlSink, err := ddlfactory.New(ctx, downstreamURIStr, replicaConfig) if err != nil { log.Error("failed to create ddl sink", zap.Error(err)) return nil, err diff --git a/errors.toml b/errors.toml index e52ac7a2d95..5ca42ababef 100755 --- a/errors.toml +++ b/errors.toml @@ -531,6 +531,11 @@ error = ''' MySQL connection error ''' +["CDC:ErrMySQLDuplicateEntry"] +error = ''' +MySQL duplicate entry error +''' + ["CDC:ErrMySQLInvalidConfig"] error = ''' MySQL config invalid diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 27741af4a47..92d0d532e73 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -15,13 +15,16 @@ package applier import ( "context" + "fmt" "net/url" + "os" "time" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/model/codec" "github.com/pingcap/tiflow/cdc/processor/memquota" "github.com/pingcap/tiflow/cdc/redo/reader" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -64,8 +67,9 @@ type RedoApplierConfig struct { // RedoApplier implements a redo log applier type RedoApplier struct { - cfg *RedoApplierConfig - rd reader.RedoLogReader + cfg *RedoApplierConfig + rd reader.RedoLogReader + updateSplitter *updateEventSplitter ddlSink ddlsink.Sink appliedDDLCount uint64 @@ -178,7 +182,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { return row.CommitTs > ddl.CommitTs } - row, err := ra.rd.ReadNextRow(ctx) + row, err := ra.updateSplitter.readNextRow(ctx) if err != nil { return err } @@ -201,7 +205,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { if err := ra.applyRow(row, checkpointTs); err != nil { return err } - if row, err = ra.rd.ReadNextRow(ctx); err != nil { + if row, err = ra.updateSplitter.readNextRow(ctx); err != nil { return err } } @@ -417,6 +421,292 @@ func createRedoReaderImpl(ctx context.Context, cfg *RedoApplierConfig) (reader.R return reader.NewRedoLogReader(ctx, storageType, readerCfg) } +// tempTxnInsertEventStorage is used to store insert events in the same transaction +// once you begin to read events from storage, you should read all events before you write new events +type tempTxnInsertEventStorage struct { + events []*model.RowChangedEvent + // when events num exceed flushThreshold, write all events to file + flushThreshold int + dir string + txnCommitTs model.Ts + + useFileStorage bool + // eventSizes is used to store the size of each event in file storage + eventSizes []int + writingFile *os.File + readingFile *os.File + // reading is used to indicate whether we are reading events from storage + // this is to ensure that we read all events before write new events + reading bool +} + +const ( + tempStorageFileName = "_insert_storage.tmp" + defaultFlushThreshold = 50 +) + +func newTempTxnInsertEventStorage(flushThreshold int, dir string) *tempTxnInsertEventStorage { + return &tempTxnInsertEventStorage{ + events: make([]*model.RowChangedEvent, 0), + flushThreshold: flushThreshold, + dir: dir, + txnCommitTs: 0, + + useFileStorage: false, + eventSizes: make([]int, 0), + + reading: false, + } +} + +func (t *tempTxnInsertEventStorage) initializeAddEvent(ts model.Ts) { + t.reading = false + t.useFileStorage = false + t.txnCommitTs = ts + t.writingFile = nil + t.readingFile = nil +} + +func (t *tempTxnInsertEventStorage) addEvent(event *model.RowChangedEvent) error { + // do some pre check + if !event.IsInsert() { + log.Panic("event is not insert event", zap.Any("event", event)) + } + if t.reading && t.hasEvent() { + log.Panic("should read all events before write new event") + } + if !t.hasEvent() { + t.initializeAddEvent(event.CommitTs) + } else { + if t.txnCommitTs != event.CommitTs { + log.Panic("commit ts of events should be the same", + zap.Uint64("commitTs", event.CommitTs), + zap.Uint64("txnCommitTs", t.txnCommitTs)) + } + } + + if t.useFileStorage { + return t.writeEventsToFile(event) + } + + t.events = append(t.events, event) + if len(t.events) >= t.flushThreshold { + err := t.writeEventsToFile(t.events...) + if err != nil { + return err + } + t.events = t.events[:0] + } + return nil +} + +func (t *tempTxnInsertEventStorage) writeEventsToFile(events ...*model.RowChangedEvent) error { + if !t.useFileStorage { + t.useFileStorage = true + var err error + t.writingFile, err = os.Create(fmt.Sprintf("%s/%s", t.dir, tempStorageFileName)) + if err != nil { + return err + } + } + for _, event := range events { + redoLog := event.ToRedoLog() + data, err := codec.MarshalRedoLog(redoLog, nil) + if err != nil { + return errors.WrapError(errors.ErrMarshalFailed, err) + } + t.eventSizes = append(t.eventSizes, len(data)) + _, err = t.writingFile.Write(data) + if err != nil { + return err + } + } + return nil +} + +func (t *tempTxnInsertEventStorage) hasEvent() bool { + return len(t.events) > 0 || len(t.eventSizes) > 0 +} + +func (t *tempTxnInsertEventStorage) readFromFile() (*model.RowChangedEvent, error) { + if len(t.eventSizes) == 0 { + return nil, nil + } + if t.readingFile == nil { + var err error + t.readingFile, err = os.Open(fmt.Sprintf("%s/%s", t.dir, tempStorageFileName)) + if err != nil { + return nil, err + } + } + size := t.eventSizes[0] + data := make([]byte, size) + n, err := t.readingFile.Read(data) + if err != nil { + return nil, err + } + if n != size { + return nil, errors.New("read size not equal to expected size") + } + t.eventSizes = t.eventSizes[1:] + redoLog, _, err := codec.UnmarshalRedoLog(data) + if err != nil { + return nil, errors.WrapError(errors.ErrUnmarshalFailed, err) + } + return redoLog.RedoRow.Row, nil +} + +func (t *tempTxnInsertEventStorage) readNextEvent() (*model.RowChangedEvent, error) { + if !t.hasEvent() { + return nil, nil + } + t.reading = true + if t.useFileStorage { + return t.readFromFile() + } + + event := t.events[0] + t.events = t.events[1:] + return event, nil +} + +// updateEventSplitter splits an update event to a delete event and a deferred insert event +// when the update event is an update to the handle key or the non empty unique key. +// deferred insert event means all delete events and update events in the same transaction are emitted before this insert event +type updateEventSplitter struct { + rd reader.RedoLogReader + rdFinished bool + tempStorage *tempTxnInsertEventStorage + prevTxnCommitTs model.Ts + // pendingEvent is the event that trigger the process to emit events from tempStorage, it can be + // 1) an insert event in the same transaction(because there will be no more update and delete events in the same transaction) + // 2) a new event in the next transaction + pendingEvent *model.RowChangedEvent + // meetInsertInCurTxn is used to indicate whether we meet an insert event in the current transaction + // this is to add some check to ensure that insert events are emitted after other kinds of events in the same transaction + meetInsertInCurTxn bool +} + +func newUpdateEventSplitter(rd reader.RedoLogReader, dir string) *updateEventSplitter { + return &updateEventSplitter{ + rd: rd, + rdFinished: false, + tempStorage: newTempTxnInsertEventStorage(defaultFlushThreshold, dir), + prevTxnCommitTs: 0, + } +} + +// processEvent return (event to emit, pending event) +func processEvent( + event *model.RowChangedEvent, + prevTxnCommitTs model.Ts, + tempStorage *tempTxnInsertEventStorage, +) (*model.RowChangedEvent, *model.RowChangedEvent, error) { + if event == nil { + log.Panic("event should not be nil") + } + + // meet a new transaction + if prevTxnCommitTs != 0 && prevTxnCommitTs != event.CommitTs { + if tempStorage.hasEvent() { + // emit the insert events in the previous transaction + return nil, event, nil + } + } + if event.IsDelete() { + return event, nil, nil + } else if event.IsInsert() { + if tempStorage.hasEvent() { + // pend current event and emit the insert events in temp storage first to release memory + return nil, event, nil + } + return event, nil, nil + } else if !model.ShouldSplitUpdateEvent(event) { + return event, nil, nil + } else { + deleteEvent, insertEvent, err := model.SplitUpdateEvent(event) + if err != nil { + return nil, nil, err + } + err = tempStorage.addEvent(insertEvent) + if err != nil { + return nil, nil, err + } + return deleteEvent, nil, nil + } +} + +func (u *updateEventSplitter) checkEventOrder(event *model.RowChangedEvent) { + if event == nil { + return + } + if event.CommitTs > u.prevTxnCommitTs { + u.meetInsertInCurTxn = false + return + } + if event.IsInsert() { + u.meetInsertInCurTxn = true + } else { + // delete or update events + if u.meetInsertInCurTxn { + log.Panic("insert events should be emitted after other kinds of events in the same transaction") + } + } +} + +func (u *updateEventSplitter) readNextRow(ctx context.Context) (*model.RowChangedEvent, error) { + for { + // case 1: pendingEvent is not nil, emit all events from tempStorage and then process pendingEvent + if u.pendingEvent != nil { + if u.tempStorage.hasEvent() { + return u.tempStorage.readNextEvent() + } + var event *model.RowChangedEvent + var err error + event, u.pendingEvent, err = processEvent(u.pendingEvent, u.prevTxnCommitTs, u.tempStorage) + if err != nil { + return nil, err + } + if event == nil || u.pendingEvent != nil { + log.Panic("processEvent return wrong result for pending event", + zap.Any("event", event), + zap.Any("pendingEvent", u.pendingEvent)) + } + return event, nil + } + // case 2: no more events from RedoLogReader, emit all events from tempStorage and return nil + if u.rdFinished { + if u.tempStorage.hasEvent() { + return u.tempStorage.readNextEvent() + } + return nil, nil + } + // case 3: read and process events from RedoLogReader + event, err := u.rd.ReadNextRow(ctx) + if err != nil { + return nil, err + } + if event == nil { + u.rdFinished = true + } else { + u.checkEventOrder(event) + prevTxnCommitTS := u.prevTxnCommitTs + u.prevTxnCommitTs = event.CommitTs + var err error + event, u.pendingEvent, err = processEvent(event, prevTxnCommitTS, u.tempStorage) + if err != nil { + return nil, err + } + if event != nil { + return event, nil + } + if u.pendingEvent == nil { + log.Panic("event to emit and pending event cannot all be nil") + } + } + } +} + // ReadMeta creates a new redo applier and read meta from reader func (ra *RedoApplier) ReadMeta(ctx context.Context) (checkpointTs uint64, resolvedTs uint64, err error) { rd, err := createRedoReader(ctx, ra.cfg) @@ -437,6 +727,7 @@ func (ra *RedoApplier) Apply(egCtx context.Context) (err error) { eg.Go(func() error { return ra.rd.Run(egCtx) }) + ra.updateSplitter = newUpdateEventSplitter(ra.rd, ra.cfg.Dir) ra.memQuota = memquota.NewMemQuota(model.DefaultChangeFeedID(applierChangefeed), config.DefaultChangefeedMemoryQuota, "sink") diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index 7262c967258..c12d55bd674 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" "fmt" + "os" "testing" "github.com/DATA-DOG/go-sqlmock" @@ -142,6 +143,82 @@ func TestApply(t *testing.T) { }, }, }, + // update event which doesn't modify handle key + { + StartTs: 1120, + CommitTs: 1220, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: 1, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "2", + Flag: 0, + }, + }, + Columns: []*model.Column{ + { + Name: "a", + Value: 1, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "3", + Flag: 0, + }, + }, + }, + { + StartTs: 1150, + CommitTs: 1250, + Table: &model.TableName{Schema: "test", Table: "t1"}, + Columns: []*model.Column{ + { + Name: "a", + Value: 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "20", + Flag: 0, + }, + }, + }, + { + StartTs: 1150, + CommitTs: 1250, + Table: &model.TableName{Schema: "test", Table: "t1"}, + Columns: []*model.Column{ + { + Name: "a", + Value: 100, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "200", + Flag: 0, + }, + }, + }, + { + StartTs: 1200, + CommitTs: resolvedTs, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "20", + Flag: 0, + }, + }, + }, { StartTs: 1200, CommitTs: resolvedTs, @@ -153,7 +230,7 @@ func TestApply(t *testing.T) { Flag: model.HandleKeyFlag, }, { Name: "b", - Value: "2", + Value: "3", Flag: 0, }, }, @@ -169,6 +246,218 @@ func TestApply(t *testing.T) { }, }, }, + { + StartTs: 1200, + CommitTs: resolvedTs, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: 100, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "200", + Flag: 0, + }, + }, + Columns: []*model.Column{ + { + Name: "a", + Value: 200, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: "300", + Flag: 0, + }, + }, + }, + } + for _, dml := range dmls { + redoLogCh <- dml + } + ddls := []*model.DDLEvent{ + { + CommitTs: checkpointTs, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", Table: "checkpoint", + }, + }, + Query: "create table checkpoint(id int)", + Type: timodel.ActionCreateTable, + }, + { + CommitTs: resolvedTs, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", Table: "resolved", + }, + }, + Query: "create table resolved(id int not null unique key)", + Type: timodel.ActionCreateTable, + }, + } + for _, ddl := range ddls { + ddlEventCh <- ddl + } + close(redoLogCh) + close(ddlEventCh) + + dir, err := os.Getwd() + require.Nil(t, err) + cfg := &RedoApplierConfig{ + SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1" + + "&tidb_placement_mode=ignore&safe-mode=true&cache-prep-stmts=false" + + "&multi-stmt-enable=false", + Dir: dir, + } + ap := NewRedoApplier(cfg) + err = ap.Apply(ctx) + require.Nil(t, err) +} + +func TestApplyBigTxn(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + checkpointTs := uint64(1000) + resolvedTs := uint64(2000) + redoLogCh := make(chan *model.RowChangedEvent, 1024) + ddlEventCh := make(chan *model.DDLEvent, 1024) + createMockReader := func(ctx context.Context, cfg *RedoApplierConfig) (reader.RedoLogReader, error) { + return NewMockReader(checkpointTs, resolvedTs, redoLogCh, ddlEventCh), nil + } + + dbIndex := 0 + // DML sink and DDL sink share the same db + db := getMockDBForBigTxn(t) + mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex%2 == 0 { + testDB, err := pmysql.MockTestDB(true) + require.Nil(t, err) + return testDB, nil + } + return db, nil + } + + getDMLDBConnBak := txn.GetDBConnImpl + txn.GetDBConnImpl = mockGetDBConn + getDDLDBConnBak := mysqlDDL.GetDBConnImpl + mysqlDDL.GetDBConnImpl = mockGetDBConn + createRedoReaderBak := createRedoReader + createRedoReader = createMockReader + defer func() { + createRedoReader = createRedoReaderBak + txn.GetDBConnImpl = getDMLDBConnBak + mysqlDDL.GetDBConnImpl = getDDLDBConnBak + }() + + dmls := make([]*model.RowChangedEvent, 0) + // insert some rows + for i := 1; i <= 60; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1100, + CommitTs: 1200, + Table: &model.TableName{Schema: "test", Table: "t1"}, + Columns: []*model.Column{ + { + Name: "a", + Value: i, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i+1), + Flag: 0, + }, + }, + } + dmls = append(dmls, dml) + } + // update + for i := 1; i <= 60; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1200, + CommitTs: 1300, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: i, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i+1), + Flag: 0, + }, + }, + Columns: []*model.Column{ + { + Name: "a", + Value: i * 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*10+1), + Flag: 0, + }, + }, + } + dmls = append(dmls, dml) + } + // delete and update + for i := 1; i <= 30; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1300, + CommitTs: resolvedTs, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: i * 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*10+1), + Flag: 0, + }, + }, + } + dmls = append(dmls, dml) + } + for i := 31; i <= 60; i++ { + dml := &model.RowChangedEvent{ + StartTs: 1300, + CommitTs: resolvedTs, + Table: &model.TableName{Schema: "test", Table: "t1"}, + PreColumns: []*model.Column{ + { + Name: "a", + Value: i * 10, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*10+1), + Flag: 0, + }, + }, + Columns: []*model.Column{ + { + Name: "a", + Value: i * 100, + Flag: model.HandleKeyFlag, + }, { + Name: "b", + Value: fmt.Sprintf("%d", i*100+1), + Flag: 0, + }, + }, + } + dmls = append(dmls, dml) } for _, dml := range dmls { redoLogCh <- dml @@ -191,7 +480,7 @@ func TestApply(t *testing.T) { Schema: "test", Table: "resolved", }, }, - Query: "create table resolved(id int)", + Query: "create table resolved(id int not null unique key)", Type: timodel.ActionCreateTable, }, } @@ -201,13 +490,16 @@ func TestApply(t *testing.T) { close(redoLogCh) close(ddlEventCh) + dir, err := os.Getwd() + require.Nil(t, err) cfg := &RedoApplierConfig{ SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1" + "&tidb_placement_mode=ignore&safe-mode=true&cache-prep-stmts=false" + "&multi-stmt-enable=false", + Dir: dir, } ap := NewRedoApplier(cfg) - err := ap.Apply(ctx) + err = ap.Apply(ctx) require.Nil(t, err) } @@ -257,20 +549,114 @@ func getMockDB(t *testing.T) *sql.DB { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() + mock.ExpectBegin() + mock.ExpectExec("UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? LIMIT 1"). + WithArgs(1, "3", 1). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(10, "20"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(100, "200"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + // First, apply row which commitTs equal to resolvedTs mock.ExpectBegin() - mock.ExpectExec("DELETE FROM `test`.`t1` WHERE `a` = ? LIMIT 1"). - WithArgs(1). + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`,`b`) IN ((?,?))"). + WithArgs(10, "20"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`,`b`) IN ((?,?))"). + WithArgs(1, "3"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`,`b`) IN ((?,?))"). + WithArgs(100, "200"). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). WithArgs(2, "3"). WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(200, "300"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + // Then, apply ddl which commitTs equal to resolvedTs + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table resolved(id int not null unique key)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectClose() + return db +} + +func getMockDBForBigTxn(t *testing.T) *sql.DB { + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.Nil(t, err) + + // Before we write data to downstream, we need to check whether the downstream is TiDB. + // So we mock a select tidb_version() query. + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table checkpoint(id int)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectBegin() + for i := 1; i <= 60; i++ { + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(i, fmt.Sprintf("%d", i+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + mock.ExpectCommit() + + mock.ExpectBegin() + for i := 1; i <= 60; i++ { + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`,`b`) IN ((?,?))"). + WithArgs(i, fmt.Sprintf("%d", i+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + for i := 1; i <= 60; i++ { + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(i*10, fmt.Sprintf("%d", i*10+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + mock.ExpectCommit() + + // First, apply row which commitTs equal to resolvedTs + mock.ExpectBegin() + for i := 1; i <= 60; i++ { + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`,`b`) IN ((?,?))"). + WithArgs(i*10, fmt.Sprintf("%d", i*10+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } + for i := 31; i <= 60; i++ { + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(i*100, fmt.Sprintf("%d", i*100+1)). + WillReturnResult(sqlmock.NewResult(1, 1)) + } mock.ExpectCommit() // Then, apply ddl which commitTs equal to resolvedTs mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("create table resolved(id int)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table resolved(id int not null unique key)").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() mock.ExpectClose() diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 070cb84aea2..27045928b90 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -260,6 +260,10 @@ var ( "MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError"), ) + ErrMySQLDuplicateEntry = errors.Normalize( + "MySQL duplicate entry error", + errors.RFCCodeText("CDC:ErrMySQLDuplicateEntry"), + ) ErrMySQLQueryError = errors.Normalize( "MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError"), diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 48fe2b54578..4450027dcfa 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -121,6 +121,25 @@ func RFCCode(err error) (errors.RFCErrorCode, bool) { return RFCCode(cause) } +// IsDupEntryError checks if an error is a duplicate entry error. +func IsDupEntryError(err error) bool { + if err == nil { + return false + } + if ErrMySQLDuplicateEntry.Equal(err) { + return true + } + if code, ok := RFCCode(err); ok { + if code == ErrMySQLDuplicateEntry.RFCCode() { + return true + } + } + if strings.Contains(err.Error(), string(ErrMySQLDuplicateEntry.RFCCode())) { + return true + } + return false +} + // IsRetryableError check the error is safe or worth to retry func IsRetryableError(err error) bool { if err == nil { diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index 112d9fcd064..dafe14560cc 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -192,6 +192,8 @@ func (b *BatchDecoder) buildColumns( switch mysqlType { case mysql.TypeJSON: value = string(value.([]uint8)) + case mysql.TypeBit: + value, _ = common.BinaryLiteralToInt(value.([]uint8)) } column := &model.Column{ diff --git a/pkg/sink/sink_type.go b/pkg/sink/sink_type.go index 1d5f8ee991d..424c91e9897 100644 --- a/pkg/sink/sink_type.go +++ b/pkg/sink/sink_type.go @@ -13,6 +13,11 @@ package sink +import ( + "net/url" + "strings" +) + // Type is the type of sink. type Type int @@ -81,3 +86,8 @@ func IsStorageScheme(scheme string) bool { return scheme == FileScheme || scheme == S3Scheme || scheme == GCSScheme || scheme == GSScheme || scheme == AzblobScheme || scheme == AzureScheme || scheme == CloudStorageNoopScheme } + +// GetScheme returns the scheme of the url. +func GetScheme(url *url.URL) string { + return strings.ToLower(url.Scheme) +} diff --git a/tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml b/tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml new file mode 100644 index 00000000000..d8b18e0ec27 --- /dev/null +++ b/tests/integration_tests/changefeed_dup_error_restart/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/changefeed_dup_error_restart/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["changefeed_dup_error_restart.usertable"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/changefeed_dup_error_restart/conf/workload b/tests/integration_tests/changefeed_dup_error_restart/conf/workload new file mode 100644 index 00000000000..7649fd9afb7 --- /dev/null +++ b/tests/integration_tests/changefeed_dup_error_restart/conf/workload @@ -0,0 +1,13 @@ +threadcount=2 +recordcount=10 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/changefeed_dup_error_restart/run.sh b/tests/integration_tests/changefeed_dup_error_restart/run.sh new file mode 100755 index 00000000000..e0756731a91 --- /dev/null +++ b/tests/integration_tests/changefeed_dup_error_restart/run.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function run() { + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_sql "CREATE DATABASE changefeed_dup_error_restart;" + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_dup_error_restart + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLDuplicateEntryError=5%return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="ticdc-changefeed-dup-error-restart-test-$RANDOM" + case $SINK_TYPE in + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + + run_sql "CREATE TABLE changefeed_dup_error_restart.finish_mark_1 (a int primary key);" + sleep 30 + check_table_exists "changefeed_dup_error_restart.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_dup_error_restart + run_sql "CREATE TABLE changefeed_dup_error_restart.finish_mark_2 (a int primary key);" + sleep 30 + check_table_exists "changefeed_dup_error_restart.finish_mark_2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/force_replicate_table/run.sh b/tests/integration_tests/force_replicate_table/run.sh index f386ee5bd86..04e6372d64f 100755 --- a/tests/integration_tests/force_replicate_table/run.sh +++ b/tests/integration_tests/force_replicate_table/run.sh @@ -67,7 +67,7 @@ function run() { cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/changefeed.toml case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $CUR/conf/changefeed.toml ;; + kafka) run_kafka_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml ;; storage) run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" ;; esac diff --git a/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml b/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml new file mode 100644 index 00000000000..8ef310dcd8c --- /dev/null +++ b/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/kafka_simple_handle_key_only_avro/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index a93653d8ef6..36988c6a3f8 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -10,7 +10,7 @@ group=$2 # Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant # changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence # multi_cdc_cluster capture_suicide_while_balance_table -mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide" +mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide changefeed_dup_error_restart" mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"