diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 619830db52c..f8724d5e182 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "net/url" "strconv" "sync" "time" @@ -41,6 +42,7 @@ import ( "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -486,6 +488,18 @@ func isProcessorIgnorableError(err error) bool { return false } +// needPullerSafeModeAtStart returns true if the scheme is mysql compatible. +// pullerSafeMode means to split all update kv entries whose commitTS +// is older then the start time of this changefeed. +func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + return sink.IsMySQLCompatibleScheme(scheme), nil +} + // Tick implements the `orchestrator.State` interface // the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot. // the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot. @@ -641,10 +655,15 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { return errors.Trace(err) } + pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI) + if err != nil { + return errors.Trace(err) + } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode), - util.GetOrZero(p.latestInfo.Config.EnableTableMonitor)) + util.GetOrZero(p.latestInfo.Config.EnableTableMonitor), + pullerSafeModeAtStart) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 9a38b4e9d98..35226a65cbf 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -69,8 +69,9 @@ func New( engine sorter.SortEngine, bdrMode bool, enableTableMonitor bool, + safeModeAtStart bool, ) *SourceManager { - return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor) + return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor, safeModeAtStart) } // NewForTest creates a new source manager for testing. @@ -115,6 +116,7 @@ func newSourceManager( engine sorter.SortEngine, bdrMode bool, enableTableMonitor bool, + safeModeAtStart bool, ) *SourceManager { mgr := &SourceManager{ ready: make(chan struct{}), @@ -143,7 +145,7 @@ func newSourceManager( zap.String("changefeed", mgr.changefeedID.ID)) } if raw != nil { - if isOldUpdateKVEntry(raw, mgr.startTs) { + if safeModeAtStart && isOldUpdateKVEntry(raw, mgr.startTs) { deleteKVEntry, insertKVEntry, err := splitUpdateKVEntry(raw) if err != nil { return err diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 6fd0dd4c279..502199e191b 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -64,7 +64,7 @@ type SinkFactory struct { category Category } -// New creates a new SinkFactory by schema. +// New creates a new SinkFactory by scheme. func New( ctx context.Context, changefeedID model.ChangeFeedID, @@ -79,8 +79,8 @@ func New( } s := &SinkFactory{} - schema := sink.GetScheme(sinkURI) - switch schema { + scheme := sink.GetScheme(sinkURI) + switch scheme { case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: txnSink, err := txn.NewMySQLSink(ctx, changefeedID, sinkURI, cfg, errCh, txn.DefaultConflictDetectorSlots) @@ -123,7 +123,7 @@ func New( s.category = CategoryMQ default: return nil, - cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema) + cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme) } return s, nil