Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller(ticdc): always split update kv entries in sink safe mode (#11224) #11657

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -598,6 +599,35 @@
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// getPullerSplitUpdateMode returns how to split update kv entries at puller.
//
// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone
// which means don't split any update kv entries at puller;
// If the sinkURI is mysql compatible, it has the following two cases:
// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways,
// which means split all update kv entries at puller;
// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart,
// which means split update kv entries whose commitTS is older than the replicate ts of sink.
func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}

Check warning on line 615 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L614-L615

Added lines #L614 - L615 were not covered by tests
scheme := sink.GetScheme(sinkURI)
if !sink.IsMySQLCompatibleScheme(scheme) {
return sourcemanager.PullerSplitUpdateModeNone, nil
}
// must be mysql sink
isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, err
}

Check warning on line 624 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L623-L624

Added lines #L623 - L624 were not covered by tests
if isSinkInSafeMode {
return sourcemanager.PullerSplitUpdateModeAlways, nil
}
return sourcemanager.PullerSplitUpdateModeAtStart, nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
if p.initialized.Load() {
Expand Down Expand Up @@ -657,19 +687,23 @@
return errors.Trace(err)
}

isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)
pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.latestInfo.SinkURI, cfConfig)

Check warning on line 690 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L690

Added line #L690 was not covered by tests
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor),
isMysqlBackend)
sortEngine, pullerSplitUpdateMode,
util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor))

Check warning on line 698 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L696-L698

Added lines #L696 - L698 were not covered by tests
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)

isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)
if err != nil {
return errors.Trace(err)
}

Check warning on line 706 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L703-L706

Added lines #L703 - L706 were not covered by tests
p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)
Expand Down
72 changes: 72 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sinkmanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
Expand All @@ -41,6 +42,7 @@ import (
redoPkg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -803,3 +805,73 @@ func TestProcessorNotInitialized(t *testing.T) {
p, _, _ := initProcessor4Test(t, &liveness, false, globalVars, changefeedVars)
require.Nil(t, p.WriteDebugInfo(os.Stdout))
}

func TestGetPullerSplitUpdateMode(t *testing.T) {
testCases := []struct {
sinkURI string
config *config.ReplicaConfig
mode sourcemanager.PullerSplitUpdateMode
}{
{
sinkURI: "kafka://127.0.0.1:9092/ticdc-test2",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeNone,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=true",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=false",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=true",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=false",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
}
for _, tc := range testCases {
mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config)
require.Nil(t, err)
require.Equal(t, tc.mode, mode)
}
}
36 changes: 30 additions & 6 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ import (

const defaultMaxBatchSize = 256

// PullerSplitUpdateMode is the mode to split update events in puller.
type PullerSplitUpdateMode int32

// PullerSplitUpdateMode constants.
const (
PullerSplitUpdateModeNone PullerSplitUpdateMode = 0
PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1
PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2
)

// SourceManager is the manager of the source engine and puller.
type SourceManager struct {
ready chan struct{}
Expand All @@ -51,7 +61,7 @@ type SourceManager struct {
// Used to indicate whether the changefeed is in BDR mode.
bdrMode bool

safeModeAtStart bool
splitUpdateMode PullerSplitUpdateMode

enableTableMonitor bool
puller *puller.MultiplexingPuller
Expand All @@ -63,11 +73,11 @@ func New(
up *upstream.Upstream,
mg entry.MounterGroup,
engine sorter.SortEngine,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
enableTableMonitor bool,
safeModeAtStart bool,
) *SourceManager {
return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor, safeModeAtStart)
return newSourceManager(changefeedID, up, mg, engine, splitUpdateMode, bdrMode, enableTableMonitor)
}

// NewForTest creates a new source manager for testing.
Expand Down Expand Up @@ -97,19 +107,19 @@ func newSourceManager(
up *upstream.Upstream,
mg entry.MounterGroup,
engine sorter.SortEngine,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
enableTableMonitor bool,
safeModeAtStart bool,
) *SourceManager {
mgr := &SourceManager{
ready: make(chan struct{}),
changefeedID: changefeedID,
up: up,
mg: mg,
engine: engine,
splitUpdateMode: splitUpdateMode,
bdrMode: bdrMode,
enableTableMonitor: enableTableMonitor,
safeModeAtStart: safeModeAtStart,
}

serverConfig := config.GetGlobalServerConfig()
Expand Down Expand Up @@ -164,7 +174,21 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo
m.engine.AddTable(span, startTs)

shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs)
if raw == nil || !raw.IsUpdate() {
return false
}
switch m.splitUpdateMode {
case PullerSplitUpdateModeNone:
return false
case PullerSplitUpdateModeAlways:
return true
case PullerSplitUpdateModeAtStart:
return isOldUpdateKVEntry(raw, getReplicaTs)
default:
log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode)))
}
log.Panic("Shouldn't reach here")
return false
}

// Only nil in unit tests.
Expand Down
63 changes: 45 additions & 18 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,31 +223,58 @@
urlParameters *urlConfig,
) (*urlConfig, error) {
dest := &urlConfig{}
dest.SafeMode = replicaConfig.Sink.SafeMode
if replicaConfig.Sink != nil && replicaConfig.Sink.MySQLConfig != nil {
mConfig := replicaConfig.Sink.MySQLConfig
dest.WorkerCount = mConfig.WorkerCount
dest.MaxTxnRow = mConfig.MaxTxnRow
dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount
dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize
dest.TiDBTxnMode = mConfig.TiDBTxnMode
dest.SSLCa = mConfig.SSLCa
dest.SSLCert = mConfig.SSLCert
dest.SSLKey = mConfig.SSLKey
dest.TimeZone = mConfig.TimeZone
dest.WriteTimeout = mConfig.WriteTimeout
dest.ReadTimeout = mConfig.ReadTimeout
dest.Timeout = mConfig.Timeout
dest.EnableBatchDML = mConfig.EnableBatchDML
dest.EnableMultiStatement = mConfig.EnableMultiStatement
dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement
if replicaConfig != nil && replicaConfig.Sink != nil {
dest.SafeMode = replicaConfig.Sink.SafeMode
if replicaConfig.Sink.MySQLConfig != nil {
mConfig := replicaConfig.Sink.MySQLConfig
dest.WorkerCount = mConfig.WorkerCount
dest.MaxTxnRow = mConfig.MaxTxnRow
dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount
dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize
dest.TiDBTxnMode = mConfig.TiDBTxnMode
dest.SSLCa = mConfig.SSLCa
dest.SSLCert = mConfig.SSLCert
dest.SSLKey = mConfig.SSLKey
dest.TimeZone = mConfig.TimeZone
dest.WriteTimeout = mConfig.WriteTimeout
dest.ReadTimeout = mConfig.ReadTimeout
dest.Timeout = mConfig.Timeout
dest.EnableBatchDML = mConfig.EnableBatchDML
dest.EnableMultiStatement = mConfig.EnableMultiStatement
dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement
}
}
if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
return dest, nil
}

// IsSinkSafeMode returns whether the sink is in safe mode.
func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) {
if sinkURI == nil {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI")
}

Check warning on line 257 in pkg/sink/mysql/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/mysql/config.go#L254-L257

Added lines #L254 - L257 were not covered by tests

scheme := strings.ToLower(sinkURI.Scheme)
if !sink.IsMySQLCompatibleScheme(scheme) {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme)
}
req := &http.Request{URL: sinkURI}
urlParameter := &urlConfig{}
if err := binding.Query.Bind(req, urlParameter); err != nil {
return false, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
var err error
if urlParameter, err = mergeConfig(replicaConfig, urlParameter); err != nil {
return false, err
}
if urlParameter.SafeMode == nil {
return defaultSafeMode, nil
}
return *urlParameter.SafeMode, nil

Check warning on line 275 in pkg/sink/mysql/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/mysql/config.go#L259-L275

Added lines #L259 - L275 were not covered by tests
}

func getWorkerCount(values *urlConfig, workerCount *int) error {
if values.WorkerCount == nil {
return nil
Expand Down
Loading