Skip to content

Commit

Permalink
puller(ticdc): always split update kv entries in sink safe mode (#11224
Browse files Browse the repository at this point in the history
…) (#11654)

close #11231
  • Loading branch information
ti-chi-bot authored Oct 12, 2024
1 parent bcb2bd3 commit 19fb23f
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 9 deletions.
39 changes: 37 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -754,6 +755,35 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// getPullerSplitUpdateMode returns how to split update kv entries at puller.
//
// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone
// which means don't split any update kv entries at puller;
// If the sinkURI is mysql compatible, it has the following two cases:
// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways,
// which means split all update kv entries at puller;
// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart,
// which means split update kv entries whose commitTS is older than the replicate ts of sink.
func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
if !sink.IsMySQLCompatibleScheme(scheme) {
return sourcemanager.PullerSplitUpdateModeNone, nil
}
// must be mysql sink
isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, err
}
if isSinkInSafeMode {
return sourcemanager.PullerSplitUpdateModeAlways, nil
}
return sourcemanager.PullerSplitUpdateModeAtStart, nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
if p.initialized {
Expand Down Expand Up @@ -844,12 +874,17 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI)

pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.changefeed.Info.SinkURI, p.changefeed.Info.Config)
if err != nil {
return errors.Trace(err)
}
p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, p.mg,
sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, isMysqlBackend)
sortEngine, p.errCh, pullerSplitUpdateMode, p.changefeed.Info.Config.BDRMode)
isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sinkManager, err = sinkmanager.New(stdCtx, p.changefeedID,
p.changefeed.Info, p.upstream, p.schemaStorage,
p.redoDMLMgr, p.sourceManager,
Expand Down
36 changes: 36 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/cdc/scheduler/schedulepb"
mocksink "github.com/pingcap/tiflow/cdc/sink/mock"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
Expand Down Expand Up @@ -627,3 +629,37 @@ func TestProcessorLiveness(t *testing.T) {
*p.agent.(*mockAgent).liveness = model.LivenessCaptureAlive
require.Equal(t, model.LivenessCaptureAlive, p.liveness.Load())
}

func TestGetPullerSplitUpdateMode(t *testing.T) {
testCases := []struct {
sinkURI string
config *config.ReplicaConfig
mode sourcemanager.PullerSplitUpdateMode
}{
{
sinkURI: "kafka://127.0.0.1:9092/ticdc-test2",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeNone,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=true",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=false",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
}
for _, tc := range testCases {
mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config)
require.Nil(t, err)
require.Equal(t, tc.mode, mode)
}
}
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func createManagerWithMemEngine(
) (*SinkManager, engine.SortEngine) {
sortEngine := memory.New(context.Background())
up := upstream.NewUpstream4Test(&mockPD{})
sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan, false, false)
sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan, sourcemanager.PullerSplitUpdateModeNone, false)
manager, err := New(
ctx, changefeedID, changefeedInfo, up,
&entry.MockSchemaStorage{Resolved: math.MaxUint64},
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func createWorker(
) (*sinkWorker, engine.SortEngine) {
sortEngine := memory.New(context.Background())
sm := sourcemanager.New(changefeedID, upstream.NewUpstream4Test(&mockPD{}),
&entry.MockMountGroup{}, sortEngine, make(chan error, 1), false, false)
&entry.MockMountGroup{}, sortEngine, make(chan error, 1), sourcemanager.PullerSplitUpdateModeNone, false)

// To avoid refund or release panics.
quota := memquota.NewMemQuota(changefeedID, memQuota+1024*1024*1024, "")
Expand Down
34 changes: 29 additions & 5 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ import (

const defaultMaxBatchSize = 256

// PullerSplitUpdateMode is the mode to split update events in puller.
type PullerSplitUpdateMode int32

// PullerSplitUpdateMode constants.
const (
PullerSplitUpdateModeNone PullerSplitUpdateMode = 0
PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1
PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2
)

// SourceManager is the manager of the source engine and puller.
type SourceManager struct {
// changefeedID is the changefeed ID.
Expand All @@ -47,10 +57,10 @@ type SourceManager struct {
pullers sync.Map
// Used to report the error to the processor.
errChan chan error
// Used to specify the behavior of splitting update events in puller.
splitUpdateMode PullerSplitUpdateMode
// Used to indicate whether the changefeed is in BDR mode.
bdrMode bool

safeModeAtStart bool
}

// New creates a new source manager.
Expand All @@ -60,17 +70,17 @@ func New(
mg entry.MounterGroup,
engine engine.SortEngine,
errChan chan error,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
safeModeAtStart bool,
) *SourceManager {
return &SourceManager{
changefeedID: changefeedID,
up: up,
mg: mg,
engine: engine,
errChan: errChan,
splitUpdateMode: splitUpdateMode,
bdrMode: bdrMode,
safeModeAtStart: safeModeAtStart,
}
}

Expand All @@ -83,7 +93,21 @@ func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID,
// Add table to the engine first, so that the engine can receive the events from the puller.
m.engine.AddTable(tableID)
shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs)
if raw == nil || !raw.IsUpdate() {
return false
}
switch m.splitUpdateMode {
case PullerSplitUpdateModeNone:
return false
case PullerSplitUpdateModeAlways:
return true
case PullerSplitUpdateModeAtStart:
return isOldUpdateKVEntry(raw, getReplicaTs)
default:
log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode)))
}
log.Panic("Shouldn't reach here")
return false
}
p := pullerwrapper.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs, m.bdrMode, shouldSplitKVEntry)
p.Start(ctx, m.up, m.engine, m.errChan)
Expand Down
18 changes: 18 additions & 0 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,24 @@ func (c *Config) Apply(
return nil
}

// IsSinkSafeMode returns whether the sink is in safe mode.
func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) {
if sinkURI == nil {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI")
}

scheme := strings.ToLower(sinkURI.Scheme)
if !sink.IsMySQLCompatibleScheme(scheme) {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme)
}
query := sinkURI.Query()
var safeMode bool
if err := getSafeMode(query, &safeMode); err != nil {
return false, err
}
return safeMode, nil
}

func getWorkerCount(values url.Values, workerCount *int) error {
s := values.Get("worker-count")
if len(s) == 0 {
Expand Down

0 comments on commit 19fb23f

Please sign in to comment.