From 19fb23f644c6b233bbf2fcc0b824a7eb0c34356f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sat, 12 Oct 2024 14:01:47 +0800 Subject: [PATCH] puller(ticdc): always split update kv entries in sink safe mode (#11224) (#11654) close pingcap/tiflow#11231 --- cdc/processor/processor.go | 39 ++++++++++++++++++- cdc/processor/processor_test.go | 36 +++++++++++++++++ cdc/processor/sinkmanager/manager_test.go | 2 +- .../sinkmanager/table_sink_worker_test.go | 2 +- cdc/processor/sourcemanager/manager.go | 34 +++++++++++++--- pkg/sink/mysql/config.go | 18 +++++++++ 6 files changed, 122 insertions(+), 9 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 865f2d72485..59ed6de8d32 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -48,6 +48,7 @@ import ( "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -754,6 +755,35 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { return sink.IsMySQLCompatibleScheme(scheme), nil } +// getPullerSplitUpdateMode returns how to split update kv entries at puller. +// +// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone +// which means don't split any update kv entries at puller; +// If the sinkURI is mysql compatible, it has the following two cases: +// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways, +// which means split all update kv entries at puller; +// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart, +// which means split update kv entries whose commitTS is older than the replicate ts of sink. +func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + if !sink.IsMySQLCompatibleScheme(scheme) { + return sourcemanager.PullerSplitUpdateModeNone, nil + } + // must be mysql sink + isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config) + if err != nil { + return sourcemanager.PullerSplitUpdateModeNone, err + } + if isSinkInSafeMode { + return sourcemanager.PullerSplitUpdateModeAlways, nil + } + return sourcemanager.PullerSplitUpdateModeAtStart, nil +} + // lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick. func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { if p.initialized { @@ -844,12 +874,17 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { zap.Duration("duration", time.Since(start))) return errors.Trace(err) } - isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI) + + pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.changefeed.Info.SinkURI, p.changefeed.Info.Config) if err != nil { return errors.Trace(err) } p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, p.mg, - sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, isMysqlBackend) + sortEngine, p.errCh, pullerSplitUpdateMode, p.changefeed.Info.Config.BDRMode) + isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI) + if err != nil { + return errors.Trace(err) + } p.sinkManager, err = sinkmanager.New(stdCtx, p.changefeedID, p.changefeed.Info, p.upstream, p.schemaStorage, p.redoDMLMgr, p.sourceManager, diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 3c6a02bddfe..b0860f30aea 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -25,11 +25,13 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" mocksink "github.com/pingcap/tiflow/cdc/sink/mock" + "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" @@ -627,3 +629,37 @@ func TestProcessorLiveness(t *testing.T) { *p.agent.(*mockAgent).liveness = model.LivenessCaptureAlive require.Equal(t, model.LivenessCaptureAlive, p.liveness.Load()) } + +func TestGetPullerSplitUpdateMode(t *testing.T) { + testCases := []struct { + sinkURI string + config *config.ReplicaConfig + mode sourcemanager.PullerSplitUpdateMode + }{ + { + sinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeNone, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + } + for _, tc := range testCases { + mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config) + require.Nil(t, err) + require.Equal(t, tc.mode, mode) + } +} diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 01194e6f8eb..84d67156dfc 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -57,7 +57,7 @@ func createManagerWithMemEngine( ) (*SinkManager, engine.SortEngine) { sortEngine := memory.New(context.Background()) up := upstream.NewUpstream4Test(&mockPD{}) - sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan, false, false) + sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan, sourcemanager.PullerSplitUpdateModeNone, false) manager, err := New( ctx, changefeedID, changefeedInfo, up, &entry.MockSchemaStorage{Resolved: math.MaxUint64}, diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index bf72629c302..264c5281b31 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -39,7 +39,7 @@ func createWorker( ) (*sinkWorker, engine.SortEngine) { sortEngine := memory.New(context.Background()) sm := sourcemanager.New(changefeedID, upstream.NewUpstream4Test(&mockPD{}), - &entry.MockMountGroup{}, sortEngine, make(chan error, 1), false, false) + &entry.MockMountGroup{}, sortEngine, make(chan error, 1), sourcemanager.PullerSplitUpdateModeNone, false) // To avoid refund or release panics. quota := memquota.NewMemQuota(changefeedID, memQuota+1024*1024*1024, "") diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 2ec2b205ba3..6306a612c62 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -32,6 +32,16 @@ import ( const defaultMaxBatchSize = 256 +// PullerSplitUpdateMode is the mode to split update events in puller. +type PullerSplitUpdateMode int32 + +// PullerSplitUpdateMode constants. +const ( + PullerSplitUpdateModeNone PullerSplitUpdateMode = 0 + PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1 + PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2 +) + // SourceManager is the manager of the source engine and puller. type SourceManager struct { // changefeedID is the changefeed ID. @@ -47,10 +57,10 @@ type SourceManager struct { pullers sync.Map // Used to report the error to the processor. errChan chan error + // Used to specify the behavior of splitting update events in puller. + splitUpdateMode PullerSplitUpdateMode // Used to indicate whether the changefeed is in BDR mode. bdrMode bool - - safeModeAtStart bool } // New creates a new source manager. @@ -60,8 +70,8 @@ func New( mg entry.MounterGroup, engine engine.SortEngine, errChan chan error, + splitUpdateMode PullerSplitUpdateMode, bdrMode bool, - safeModeAtStart bool, ) *SourceManager { return &SourceManager{ changefeedID: changefeedID, @@ -69,8 +79,8 @@ func New( mg: mg, engine: engine, errChan: errChan, + splitUpdateMode: splitUpdateMode, bdrMode: bdrMode, - safeModeAtStart: safeModeAtStart, } } @@ -83,7 +93,21 @@ func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID, // Add table to the engine first, so that the engine can receive the events from the puller. m.engine.AddTable(tableID) shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { - return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs) + if raw == nil || !raw.IsUpdate() { + return false + } + switch m.splitUpdateMode { + case PullerSplitUpdateModeNone: + return false + case PullerSplitUpdateModeAlways: + return true + case PullerSplitUpdateModeAtStart: + return isOldUpdateKVEntry(raw, getReplicaTs) + default: + log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode))) + } + log.Panic("Shouldn't reach here") + return false } p := pullerwrapper.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs, m.bdrMode, shouldSplitKVEntry) p.Start(ctx, m.up, m.engine, m.errChan) diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index c3d0e03dbef..bb1077e9433 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -190,6 +190,24 @@ func (c *Config) Apply( return nil } +// IsSinkSafeMode returns whether the sink is in safe mode. +func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) { + if sinkURI == nil { + return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI") + } + + scheme := strings.ToLower(sinkURI.Scheme) + if !sink.IsMySQLCompatibleScheme(scheme) { + return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme) + } + query := sinkURI.Query() + var safeMode bool + if err := getSafeMode(query, &safeMode); err != nil { + return false, err + } + return safeMode, nil +} + func getWorkerCount(values url.Values, workerCount *int) error { s := values.Get("worker-count") if len(s) == 0 {