Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller(ticdc): always split update kv entries in sink safe mode (#11224) #11656

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -587,6 +588,35 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// getPullerSplitUpdateMode returns how to split update kv entries at puller.
//
// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone
// which means don't split any update kv entries at puller;
// If the sinkURI is mysql compatible, it has the following two cases:
// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways,
// which means split all update kv entries at puller;
// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart,
// which means split update kv entries whose commitTS is older than the replicate ts of sink.
func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
if !sink.IsMySQLCompatibleScheme(scheme) {
return sourcemanager.PullerSplitUpdateModeNone, nil
}
// must be mysql sink
isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, err
}
if isSinkInSafeMode {
return sourcemanager.PullerSplitUpdateModeAlways, nil
}
return sourcemanager.PullerSplitUpdateModeAtStart, nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
if p.initialized.Load() {
Expand Down Expand Up @@ -650,18 +680,22 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
return errors.Trace(err)
}

isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)
pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.latestInfo.SinkURI, cfConfig)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(cfConfig.BDRMode),
isMysqlBackend)
sortEngine, pullerSplitUpdateMode,
util.GetOrZero(cfConfig.BDRMode))
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)

isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)
Expand Down
72 changes: 72 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sinkmanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
Expand All @@ -41,6 +42,7 @@ import (
redoPkg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -791,3 +793,73 @@ func TestProcessorNotInitialized(t *testing.T) {
p, _, _ := initProcessor4Test(cdcContext.NewContext4Test(context.Background(), true), t, &liveness, false)
require.Nil(t, p.WriteDebugInfo(os.Stdout))
}

func TestGetPullerSplitUpdateMode(t *testing.T) {
testCases := []struct {
sinkURI string
config *config.ReplicaConfig
mode sourcemanager.PullerSplitUpdateMode
}{
{
sinkURI: "kafka://127.0.0.1:9092/ticdc-test2",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeNone,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=true",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=false",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=true",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=false",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
}
for _, tc := range testCases {
mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config)
require.Nil(t, err)
require.Equal(t, tc.mode, mode)
}
}
39 changes: 32 additions & 7 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ type multiplexingPuller struct {
puller *pullerwrapper.MultiplexingWrapper
}

// PullerSplitUpdateMode is the mode to split update events in puller.
type PullerSplitUpdateMode int32

// PullerSplitUpdateMode constants.
const (
PullerSplitUpdateModeNone PullerSplitUpdateMode = 0
PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1
PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2
)

// SourceManager is the manager of the source engine and puller.
type SourceManager struct {
ready chan struct{}
Expand All @@ -76,9 +86,10 @@ type SourceManager struct {
// if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers`
// will be used. Otherwise `multiplexingPuller` will be used instead.
multiplexing bool
safeModeAtStart bool
tablePullers tablePullers
multiplexingPuller multiplexingPuller

splitUpdateMode PullerSplitUpdateMode
}

// New creates a new source manager.
Expand All @@ -87,11 +98,11 @@ func New(
up *upstream.Upstream,
mg entry.MounterGroup,
engine engine.SortEngine,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
safeModeAtStart bool,
) *SourceManager {
multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing
return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, safeModeAtStart, pullerwrapper.NewPullerWrapper)
return newSourceManager(changefeedID, up, mg, engine, splitUpdateMode, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper)
}

// NewForTest creates a new source manager for testing.
Expand All @@ -102,7 +113,7 @@ func NewForTest(
engine engine.SortEngine,
bdrMode bool,
) *SourceManager {
return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, false, pullerwrapper.NewPullerWrapperForTest)
return newSourceManager(changefeedID, up, mg, engine, PullerSplitUpdateModeNone, bdrMode, false, pullerwrapper.NewPullerWrapperForTest)
}

func isOldUpdateKVEntry(raw *model.RawKVEntry, getReplicaTs func() model.Ts) bool {
Expand All @@ -114,9 +125,9 @@ func newSourceManager(
up *upstream.Upstream,
mg entry.MounterGroup,
engine engine.SortEngine,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
multiplexing bool,
safeModeAtStart bool,
pullerWrapperCreator pullerWrapperCreator,
) *SourceManager {
mgr := &SourceManager{
Expand All @@ -125,9 +136,9 @@ func newSourceManager(
up: up,
mg: mg,
engine: engine,
splitUpdateMode: splitUpdateMode,
bdrMode: bdrMode,
multiplexing: multiplexing,
safeModeAtStart: safeModeAtStart,
}
if !multiplexing {
mgr.tablePullers.errChan = make(chan error, 16)
Expand All @@ -142,7 +153,21 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo
m.engine.AddTable(span, startTs)

shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs)
if raw == nil || !raw.IsUpdate() {
return false
}
switch m.splitUpdateMode {
case PullerSplitUpdateModeNone:
return false
case PullerSplitUpdateModeAlways:
return true
case PullerSplitUpdateModeAtStart:
return isOldUpdateKVEntry(raw, getReplicaTs)
default:
log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode)))
}
log.Panic("Shouldn't reach here")
return false
}

if m.multiplexing {
Expand Down
63 changes: 45 additions & 18 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,31 +222,58 @@ func mergeConfig(
urlParameters *urlConfig,
) (*urlConfig, error) {
dest := &urlConfig{}
dest.SafeMode = replicaConfig.Sink.SafeMode
if replicaConfig.Sink != nil && replicaConfig.Sink.MySQLConfig != nil {
mConfig := replicaConfig.Sink.MySQLConfig
dest.WorkerCount = mConfig.WorkerCount
dest.MaxTxnRow = mConfig.MaxTxnRow
dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount
dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize
dest.TiDBTxnMode = mConfig.TiDBTxnMode
dest.SSLCa = mConfig.SSLCa
dest.SSLCert = mConfig.SSLCert
dest.SSLKey = mConfig.SSLKey
dest.TimeZone = mConfig.TimeZone
dest.WriteTimeout = mConfig.WriteTimeout
dest.ReadTimeout = mConfig.ReadTimeout
dest.Timeout = mConfig.Timeout
dest.EnableBatchDML = mConfig.EnableBatchDML
dest.EnableMultiStatement = mConfig.EnableMultiStatement
dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement
if replicaConfig != nil && replicaConfig.Sink != nil {
dest.SafeMode = replicaConfig.Sink.SafeMode
if replicaConfig.Sink.MySQLConfig != nil {
mConfig := replicaConfig.Sink.MySQLConfig
dest.WorkerCount = mConfig.WorkerCount
dest.MaxTxnRow = mConfig.MaxTxnRow
dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount
dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize
dest.TiDBTxnMode = mConfig.TiDBTxnMode
dest.SSLCa = mConfig.SSLCa
dest.SSLCert = mConfig.SSLCert
dest.SSLKey = mConfig.SSLKey
dest.TimeZone = mConfig.TimeZone
dest.WriteTimeout = mConfig.WriteTimeout
dest.ReadTimeout = mConfig.ReadTimeout
dest.Timeout = mConfig.Timeout
dest.EnableBatchDML = mConfig.EnableBatchDML
dest.EnableMultiStatement = mConfig.EnableMultiStatement
dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement
}
}
if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
return dest, nil
}

// IsSinkSafeMode returns whether the sink is in safe mode.
func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) {
if sinkURI == nil {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI")
}

scheme := strings.ToLower(sinkURI.Scheme)
if !sink.IsMySQLCompatibleScheme(scheme) {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme)
}
req := &http.Request{URL: sinkURI}
urlParameter := &urlConfig{}
if err := binding.Query.Bind(req, urlParameter); err != nil {
return false, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
var err error
if urlParameter, err = mergeConfig(replicaConfig, urlParameter); err != nil {
return false, err
}
if urlParameter.SafeMode == nil {
return defaultSafeMode, nil
}
return *urlParameter.SafeMode, nil
}

func getWorkerCount(values *urlConfig, workerCount *int) error {
if values.WorkerCount == nil {
return nil
Expand Down
Loading