From b976744d50f44ab1eb4451eb57803a0899a59862 Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Sat, 12 Oct 2024 11:05:23 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #11224 Signed-off-by: ti-chi-bot --- cdc/processor/processor.go | 42 ++++++++++++++- cdc/processor/processor_test.go | 72 ++++++++++++++++++++++++++ cdc/processor/sourcemanager/manager.go | 61 +++++++++++++++++++++- pkg/sink/mysql/config.go | 63 +++++++++++++++------- 4 files changed, 218 insertions(+), 20 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index f63b77fb583..f0f34da3944 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -587,6 +588,35 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { return sink.IsMySQLCompatibleScheme(scheme), nil } +// getPullerSplitUpdateMode returns how to split update kv entries at puller. +// +// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone +// which means don't split any update kv entries at puller; +// If the sinkURI is mysql compatible, it has the following two cases: +// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways, +// which means split all update kv entries at puller; +// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart, +// which means split update kv entries whose commitTS is older than the replicate ts of sink. +func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + if !sink.IsMySQLCompatibleScheme(scheme) { + return sourcemanager.PullerSplitUpdateModeNone, nil + } + // must be mysql sink + isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config) + if err != nil { + return sourcemanager.PullerSplitUpdateModeNone, err + } + if isSinkInSafeMode { + return sourcemanager.PullerSplitUpdateModeAlways, nil + } + return sourcemanager.PullerSplitUpdateModeAtStart, nil +} + // lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick. func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { if p.initialized.Load() { @@ -650,18 +680,28 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { return errors.Trace(err) } - isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI) + pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.latestInfo.SinkURI, cfConfig) if err != nil { return errors.Trace(err) } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, +<<<<<<< HEAD sortEngine, util.GetOrZero(cfConfig.BDRMode), isMysqlBackend) +======= + sortEngine, pullerSplitUpdateMode, + util.GetOrZero(cfConfig.BDRMode), + util.GetOrZero(cfConfig.EnableTableMonitor)) +>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) + isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI) + if err != nil { + return errors.Trace(err) + } p.sinkManager.r = sinkmanager.New( p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream, p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index b1575309bd0..cd116efbcc5 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sinkmanager" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler" @@ -41,6 +42,7 @@ import ( redoPkg "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -791,3 +793,73 @@ func TestProcessorNotInitialized(t *testing.T) { p, _, _ := initProcessor4Test(cdcContext.NewContext4Test(context.Background(), true), t, &liveness, false) require.Nil(t, p.WriteDebugInfo(os.Stdout)) } + +func TestGetPullerSplitUpdateMode(t *testing.T) { + testCases := []struct { + sinkURI string + config *config.ReplicaConfig + mode sourcemanager.PullerSplitUpdateMode + }{ + { + sinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeNone, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(true), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(false), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(false), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(true), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + } + for _, tc := range testCases { + mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config) + require.Nil(t, err) + require.Equal(t, tc.mode, mode) + } +} diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index cd5d937c15d..8f41dc9e25a 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -37,6 +37,7 @@ import ( const defaultMaxBatchSize = 256 +<<<<<<< HEAD type pullerWrapperCreator func( changefeed model.ChangeFeedID, span tablepb.Span, @@ -56,6 +57,17 @@ type tablePullers struct { type multiplexingPuller struct { puller *pullerwrapper.MultiplexingWrapper } +======= +// PullerSplitUpdateMode is the mode to split update events in puller. +type PullerSplitUpdateMode int32 + +// PullerSplitUpdateMode constants. +const ( + PullerSplitUpdateModeNone PullerSplitUpdateMode = 0 + PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1 + PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2 +) +>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) // SourceManager is the manager of the source engine and puller. type SourceManager struct { @@ -73,12 +85,19 @@ type SourceManager struct { // Used to indicate whether the changefeed is in BDR mode. bdrMode bool +<<<<<<< HEAD // if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers` // will be used. Otherwise `multiplexingPuller` will be used instead. multiplexing bool safeModeAtStart bool tablePullers tablePullers multiplexingPuller multiplexingPuller +======= + splitUpdateMode PullerSplitUpdateMode + + enableTableMonitor bool + puller *puller.MultiplexingPuller +>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) } // New creates a new source manager. @@ -86,12 +105,21 @@ func New( changefeedID model.ChangeFeedID, up *upstream.Upstream, mg entry.MounterGroup, +<<<<<<< HEAD engine engine.SortEngine, bdrMode bool, safeModeAtStart bool, ) *SourceManager { multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, safeModeAtStart, pullerwrapper.NewPullerWrapper) +======= + engine sorter.SortEngine, + splitUpdateMode PullerSplitUpdateMode, + bdrMode bool, + enableTableMonitor bool, +) *SourceManager { + return newSourceManager(changefeedID, up, mg, engine, splitUpdateMode, bdrMode, enableTableMonitor) +>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) } // NewForTest creates a new source manager for testing. @@ -113,6 +141,7 @@ func newSourceManager( changefeedID model.ChangeFeedID, up *upstream.Upstream, mg entry.MounterGroup, +<<<<<<< HEAD engine engine.SortEngine, bdrMode bool, multiplexing bool, @@ -128,6 +157,22 @@ func newSourceManager( bdrMode: bdrMode, multiplexing: multiplexing, safeModeAtStart: safeModeAtStart, +======= + engine sorter.SortEngine, + splitUpdateMode PullerSplitUpdateMode, + bdrMode bool, + enableTableMonitor bool, +) *SourceManager { + mgr := &SourceManager{ + ready: make(chan struct{}), + changefeedID: changefeedID, + up: up, + mg: mg, + engine: engine, + splitUpdateMode: splitUpdateMode, + bdrMode: bdrMode, + enableTableMonitor: enableTableMonitor, +>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) } if !multiplexing { mgr.tablePullers.errChan = make(chan error, 16) @@ -142,7 +187,21 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo m.engine.AddTable(span, startTs) shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { - return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs) + if raw == nil || !raw.IsUpdate() { + return false + } + switch m.splitUpdateMode { + case PullerSplitUpdateModeNone: + return false + case PullerSplitUpdateModeAlways: + return true + case PullerSplitUpdateModeAtStart: + return isOldUpdateKVEntry(raw, getReplicaTs) + default: + log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode))) + } + log.Panic("Shouldn't reach here") + return false } if m.multiplexing { diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 344731ecee0..4b40d7afb8e 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -222,24 +222,26 @@ func mergeConfig( urlParameters *urlConfig, ) (*urlConfig, error) { dest := &urlConfig{} - dest.SafeMode = replicaConfig.Sink.SafeMode - if replicaConfig.Sink != nil && replicaConfig.Sink.MySQLConfig != nil { - mConfig := replicaConfig.Sink.MySQLConfig - dest.WorkerCount = mConfig.WorkerCount - dest.MaxTxnRow = mConfig.MaxTxnRow - dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount - dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize - dest.TiDBTxnMode = mConfig.TiDBTxnMode - dest.SSLCa = mConfig.SSLCa - dest.SSLCert = mConfig.SSLCert - dest.SSLKey = mConfig.SSLKey - dest.TimeZone = mConfig.TimeZone - dest.WriteTimeout = mConfig.WriteTimeout - dest.ReadTimeout = mConfig.ReadTimeout - dest.Timeout = mConfig.Timeout - dest.EnableBatchDML = mConfig.EnableBatchDML - dest.EnableMultiStatement = mConfig.EnableMultiStatement - dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement + if replicaConfig != nil && replicaConfig.Sink != nil { + dest.SafeMode = replicaConfig.Sink.SafeMode + if replicaConfig.Sink.MySQLConfig != nil { + mConfig := replicaConfig.Sink.MySQLConfig + dest.WorkerCount = mConfig.WorkerCount + dest.MaxTxnRow = mConfig.MaxTxnRow + dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount + dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize + dest.TiDBTxnMode = mConfig.TiDBTxnMode + dest.SSLCa = mConfig.SSLCa + dest.SSLCert = mConfig.SSLCert + dest.SSLKey = mConfig.SSLKey + dest.TimeZone = mConfig.TimeZone + dest.WriteTimeout = mConfig.WriteTimeout + dest.ReadTimeout = mConfig.ReadTimeout + dest.Timeout = mConfig.Timeout + dest.EnableBatchDML = mConfig.EnableBatchDML + dest.EnableMultiStatement = mConfig.EnableMultiStatement + dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement + } } if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil { return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) @@ -247,6 +249,31 @@ func mergeConfig( return dest, nil } +// IsSinkSafeMode returns whether the sink is in safe mode. +func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) { + if sinkURI == nil { + return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI") + } + + scheme := strings.ToLower(sinkURI.Scheme) + if !sink.IsMySQLCompatibleScheme(scheme) { + return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme) + } + req := &http.Request{URL: sinkURI} + urlParameter := &urlConfig{} + if err := binding.Query.Bind(req, urlParameter); err != nil { + return false, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + var err error + if urlParameter, err = mergeConfig(replicaConfig, urlParameter); err != nil { + return false, err + } + if urlParameter.SafeMode == nil { + return defaultSafeMode, nil + } + return *urlParameter.SafeMode, nil +} + func getWorkerCount(values *urlConfig, workerCount *int) error { if values.WorkerCount == nil { return nil From c4083d3c5ec947026a93269b410ee7eaa89bd5e5 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 12 Oct 2024 11:34:39 +0800 Subject: [PATCH 2/2] fix conflict --- cdc/processor/processor.go | 8 +---- cdc/processor/sourcemanager/manager.go | 48 ++++---------------------- 2 files changed, 8 insertions(+), 48 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index f0f34da3944..0c04016fe0d 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -686,14 +686,8 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, -<<<<<<< HEAD - sortEngine, util.GetOrZero(cfConfig.BDRMode), - isMysqlBackend) -======= sortEngine, pullerSplitUpdateMode, - util.GetOrZero(cfConfig.BDRMode), - util.GetOrZero(cfConfig.EnableTableMonitor)) ->>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) + util.GetOrZero(cfConfig.BDRMode)) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 8f41dc9e25a..76eaf66d0fb 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -37,7 +37,6 @@ import ( const defaultMaxBatchSize = 256 -<<<<<<< HEAD type pullerWrapperCreator func( changefeed model.ChangeFeedID, span tablepb.Span, @@ -57,7 +56,7 @@ type tablePullers struct { type multiplexingPuller struct { puller *pullerwrapper.MultiplexingWrapper } -======= + // PullerSplitUpdateMode is the mode to split update events in puller. type PullerSplitUpdateMode int32 @@ -67,7 +66,6 @@ const ( PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1 PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2 ) ->>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) // SourceManager is the manager of the source engine and puller. type SourceManager struct { @@ -85,19 +83,13 @@ type SourceManager struct { // Used to indicate whether the changefeed is in BDR mode. bdrMode bool -<<<<<<< HEAD // if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers` // will be used. Otherwise `multiplexingPuller` will be used instead. multiplexing bool - safeModeAtStart bool tablePullers tablePullers multiplexingPuller multiplexingPuller -======= - splitUpdateMode PullerSplitUpdateMode - enableTableMonitor bool - puller *puller.MultiplexingPuller ->>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) + splitUpdateMode PullerSplitUpdateMode } // New creates a new source manager. @@ -105,21 +97,12 @@ func New( changefeedID model.ChangeFeedID, up *upstream.Upstream, mg entry.MounterGroup, -<<<<<<< HEAD engine engine.SortEngine, - bdrMode bool, - safeModeAtStart bool, -) *SourceManager { - multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing - return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, safeModeAtStart, pullerwrapper.NewPullerWrapper) -======= - engine sorter.SortEngine, splitUpdateMode PullerSplitUpdateMode, bdrMode bool, - enableTableMonitor bool, ) *SourceManager { - return newSourceManager(changefeedID, up, mg, engine, splitUpdateMode, bdrMode, enableTableMonitor) ->>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) + multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing + return newSourceManager(changefeedID, up, mg, engine, splitUpdateMode, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper) } // NewForTest creates a new source manager for testing. @@ -130,7 +113,7 @@ func NewForTest( engine engine.SortEngine, bdrMode bool, ) *SourceManager { - return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, false, pullerwrapper.NewPullerWrapperForTest) + return newSourceManager(changefeedID, up, mg, engine, PullerSplitUpdateModeNone, bdrMode, false, pullerwrapper.NewPullerWrapperForTest) } func isOldUpdateKVEntry(raw *model.RawKVEntry, getReplicaTs func() model.Ts) bool { @@ -141,11 +124,10 @@ func newSourceManager( changefeedID model.ChangeFeedID, up *upstream.Upstream, mg entry.MounterGroup, -<<<<<<< HEAD engine engine.SortEngine, + splitUpdateMode PullerSplitUpdateMode, bdrMode bool, multiplexing bool, - safeModeAtStart bool, pullerWrapperCreator pullerWrapperCreator, ) *SourceManager { mgr := &SourceManager{ @@ -154,25 +136,9 @@ func newSourceManager( up: up, mg: mg, engine: engine, + splitUpdateMode: splitUpdateMode, bdrMode: bdrMode, multiplexing: multiplexing, - safeModeAtStart: safeModeAtStart, -======= - engine sorter.SortEngine, - splitUpdateMode PullerSplitUpdateMode, - bdrMode bool, - enableTableMonitor bool, -) *SourceManager { - mgr := &SourceManager{ - ready: make(chan struct{}), - changefeedID: changefeedID, - up: up, - mg: mg, - engine: engine, - splitUpdateMode: splitUpdateMode, - bdrMode: bdrMode, - enableTableMonitor: enableTableMonitor, ->>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224)) } if !multiplexing { mgr.tablePullers.errChan = make(chan error, 16)