diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 2b54e553618..ebd65e4015f 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -43,7 +43,6 @@ import ( "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/sink" - "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -586,35 +585,6 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { return sink.IsMySQLCompatibleScheme(scheme), nil } -// getPullerSplitUpdateMode returns how to split update kv entries at puller. -// -// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone -// which means don't split any update kv entries at puller; -// If the sinkURI is mysql compatible, it has the following two cases: -// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways, -// which means split all update kv entries at puller; -// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart, -// which means split update kv entries whose commitTS is older than the replicate ts of sink. -func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) { - sinkURI, err := url.Parse(sinkURIStr) - if err != nil { - return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err) - } - scheme := sink.GetScheme(sinkURI) - if !sink.IsMySQLCompatibleScheme(scheme) { - return sourcemanager.PullerSplitUpdateModeNone, nil - } - // must be mysql sink - isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config) - if err != nil { - return sourcemanager.PullerSplitUpdateModeNone, err - } - if isSinkInSafeMode { - return sourcemanager.PullerSplitUpdateModeAlways, nil - } - return sourcemanager.PullerSplitUpdateModeAtStart, nil -} - // lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick. func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { if p.initialized.Load() { @@ -678,22 +648,18 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { return errors.Trace(err) } - pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.latestInfo.SinkURI, cfConfig) + isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI) if err != nil { return errors.Trace(err) } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, - sortEngine, pullerSplitUpdateMode, - util.GetOrZero(cfConfig.BDRMode)) + sortEngine, util.GetOrZero(cfConfig.BDRMode), + isMysqlBackend) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) - isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI) - if err != nil { - return errors.Trace(err) - } p.sinkManager.r = sinkmanager.New( p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream, p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index cd116efbcc5..b1575309bd0 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sinkmanager" - "github.com/pingcap/tiflow/cdc/processor/sourcemanager" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler" @@ -42,7 +41,6 @@ import ( redoPkg "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" - "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -793,73 +791,3 @@ func TestProcessorNotInitialized(t *testing.T) { p, _, _ := initProcessor4Test(cdcContext.NewContext4Test(context.Background(), true), t, &liveness, false) require.Nil(t, p.WriteDebugInfo(os.Stdout)) } - -func TestGetPullerSplitUpdateMode(t *testing.T) { - testCases := []struct { - sinkURI string - config *config.ReplicaConfig - mode sourcemanager.PullerSplitUpdateMode - }{ - { - sinkURI: "kafka://127.0.0.1:9092/ticdc-test2", - config: nil, - mode: sourcemanager.PullerSplitUpdateModeNone, - }, - { - sinkURI: "mysql://root:test@127.0.0.1:3306/", - config: nil, - mode: sourcemanager.PullerSplitUpdateModeAtStart, - }, - { - sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true", - config: nil, - mode: sourcemanager.PullerSplitUpdateModeAlways, - }, - { - sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false", - config: nil, - mode: sourcemanager.PullerSplitUpdateModeAtStart, - }, - { - sinkURI: "mysql://root:test@127.0.0.1:3306/", - config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{ - SafeMode: util.AddressOf(true), - }, - }, - mode: sourcemanager.PullerSplitUpdateModeAlways, - }, - { - sinkURI: "mysql://root:test@127.0.0.1:3306/", - config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{ - SafeMode: util.AddressOf(false), - }, - }, - mode: sourcemanager.PullerSplitUpdateModeAtStart, - }, - { - sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true", - config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{ - SafeMode: util.AddressOf(false), - }, - }, - mode: sourcemanager.PullerSplitUpdateModeAlways, - }, - { - sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false", - config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{ - SafeMode: util.AddressOf(true), - }, - }, - mode: sourcemanager.PullerSplitUpdateModeAlways, - }, - } - for _, tc := range testCases { - mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config) - require.Nil(t, err) - require.Equal(t, tc.mode, mode) - } -} diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 76eaf66d0fb..cd5d937c15d 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -57,16 +57,6 @@ type multiplexingPuller struct { puller *pullerwrapper.MultiplexingWrapper } -// PullerSplitUpdateMode is the mode to split update events in puller. -type PullerSplitUpdateMode int32 - -// PullerSplitUpdateMode constants. -const ( - PullerSplitUpdateModeNone PullerSplitUpdateMode = 0 - PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1 - PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2 -) - // SourceManager is the manager of the source engine and puller. type SourceManager struct { ready chan struct{} @@ -86,10 +76,9 @@ type SourceManager struct { // if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers` // will be used. Otherwise `multiplexingPuller` will be used instead. multiplexing bool + safeModeAtStart bool tablePullers tablePullers multiplexingPuller multiplexingPuller - - splitUpdateMode PullerSplitUpdateMode } // New creates a new source manager. @@ -98,11 +87,11 @@ func New( up *upstream.Upstream, mg entry.MounterGroup, engine engine.SortEngine, - splitUpdateMode PullerSplitUpdateMode, bdrMode bool, + safeModeAtStart bool, ) *SourceManager { multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing - return newSourceManager(changefeedID, up, mg, engine, splitUpdateMode, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper) + return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, safeModeAtStart, pullerwrapper.NewPullerWrapper) } // NewForTest creates a new source manager for testing. @@ -113,7 +102,7 @@ func NewForTest( engine engine.SortEngine, bdrMode bool, ) *SourceManager { - return newSourceManager(changefeedID, up, mg, engine, PullerSplitUpdateModeNone, bdrMode, false, pullerwrapper.NewPullerWrapperForTest) + return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, false, pullerwrapper.NewPullerWrapperForTest) } func isOldUpdateKVEntry(raw *model.RawKVEntry, getReplicaTs func() model.Ts) bool { @@ -125,9 +114,9 @@ func newSourceManager( up *upstream.Upstream, mg entry.MounterGroup, engine engine.SortEngine, - splitUpdateMode PullerSplitUpdateMode, bdrMode bool, multiplexing bool, + safeModeAtStart bool, pullerWrapperCreator pullerWrapperCreator, ) *SourceManager { mgr := &SourceManager{ @@ -136,9 +125,9 @@ func newSourceManager( up: up, mg: mg, engine: engine, - splitUpdateMode: splitUpdateMode, bdrMode: bdrMode, multiplexing: multiplexing, + safeModeAtStart: safeModeAtStart, } if !multiplexing { mgr.tablePullers.errChan = make(chan error, 16) @@ -153,21 +142,7 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo m.engine.AddTable(span, startTs) shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { - if raw == nil || !raw.IsUpdate() { - return false - } - switch m.splitUpdateMode { - case PullerSplitUpdateModeNone: - return false - case PullerSplitUpdateModeAlways: - return true - case PullerSplitUpdateModeAtStart: - return isOldUpdateKVEntry(raw, getReplicaTs) - default: - log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode))) - } - log.Panic("Shouldn't reach here") - return false + return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs) } if m.multiplexing { diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 4b40d7afb8e..344731ecee0 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -222,26 +222,24 @@ func mergeConfig( urlParameters *urlConfig, ) (*urlConfig, error) { dest := &urlConfig{} - if replicaConfig != nil && replicaConfig.Sink != nil { - dest.SafeMode = replicaConfig.Sink.SafeMode - if replicaConfig.Sink.MySQLConfig != nil { - mConfig := replicaConfig.Sink.MySQLConfig - dest.WorkerCount = mConfig.WorkerCount - dest.MaxTxnRow = mConfig.MaxTxnRow - dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount - dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize - dest.TiDBTxnMode = mConfig.TiDBTxnMode - dest.SSLCa = mConfig.SSLCa - dest.SSLCert = mConfig.SSLCert - dest.SSLKey = mConfig.SSLKey - dest.TimeZone = mConfig.TimeZone - dest.WriteTimeout = mConfig.WriteTimeout - dest.ReadTimeout = mConfig.ReadTimeout - dest.Timeout = mConfig.Timeout - dest.EnableBatchDML = mConfig.EnableBatchDML - dest.EnableMultiStatement = mConfig.EnableMultiStatement - dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement - } + dest.SafeMode = replicaConfig.Sink.SafeMode + if replicaConfig.Sink != nil && replicaConfig.Sink.MySQLConfig != nil { + mConfig := replicaConfig.Sink.MySQLConfig + dest.WorkerCount = mConfig.WorkerCount + dest.MaxTxnRow = mConfig.MaxTxnRow + dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount + dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize + dest.TiDBTxnMode = mConfig.TiDBTxnMode + dest.SSLCa = mConfig.SSLCa + dest.SSLCert = mConfig.SSLCert + dest.SSLKey = mConfig.SSLKey + dest.TimeZone = mConfig.TimeZone + dest.WriteTimeout = mConfig.WriteTimeout + dest.ReadTimeout = mConfig.ReadTimeout + dest.Timeout = mConfig.Timeout + dest.EnableBatchDML = mConfig.EnableBatchDML + dest.EnableMultiStatement = mConfig.EnableMultiStatement + dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement } if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil { return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) @@ -249,31 +247,6 @@ func mergeConfig( return dest, nil } -// IsSinkSafeMode returns whether the sink is in safe mode. -func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) { - if sinkURI == nil { - return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI") - } - - scheme := strings.ToLower(sinkURI.Scheme) - if !sink.IsMySQLCompatibleScheme(scheme) { - return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme) - } - req := &http.Request{URL: sinkURI} - urlParameter := &urlConfig{} - if err := binding.Query.Bind(req, urlParameter); err != nil { - return false, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) - } - var err error - if urlParameter, err = mergeConfig(replicaConfig, urlParameter); err != nil { - return false, err - } - if urlParameter.SafeMode == nil { - return defaultSafeMode, nil - } - return *urlParameter.SafeMode, nil -} - func getWorkerCount(values *urlConfig, workerCount *int) error { if values.WorkerCount == nil { return nil