Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#11224
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lidezhu authored and ti-chi-bot committed Oct 12, 2024
1 parent bcb2bd3 commit c7b4c65
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 1 deletion.
60 changes: 60 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -754,6 +755,35 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// getPullerSplitUpdateMode returns how to split update kv entries at puller.
//
// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone
// which means don't split any update kv entries at puller;
// If the sinkURI is mysql compatible, it has the following two cases:
// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways,
// which means split all update kv entries at puller;
// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart,
// which means split update kv entries whose commitTS is older than the replicate ts of sink.
func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
if !sink.IsMySQLCompatibleScheme(scheme) {
return sourcemanager.PullerSplitUpdateModeNone, nil
}
// must be mysql sink
isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, err
}
if isSinkInSafeMode {
return sourcemanager.PullerSplitUpdateModeAlways, nil
}
return sourcemanager.PullerSplitUpdateModeAtStart, nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
if p.initialized {
Expand Down Expand Up @@ -910,7 +940,37 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
}

<<<<<<< HEAD
p.agent, err = p.newAgent(ctx, p.liveness, p.changefeedEpoch)
=======
pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.latestInfo.SinkURI, cfConfig)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, pullerSplitUpdateMode,
util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor))
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(ctx)

isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)
p.sinkManager.name = "SinkManager"
p.sinkManager.changefeedID = p.changefeedID
p.sinkManager.spawn(ctx)

// Bind them so that sourceManager can notify sinkManager.r.
p.sourceManager.r.OnResolve(p.sinkManager.r.UpdateReceivedSorterResolvedTs)
p.agent, err = p.newAgent(ctx, p.liveness, p.changefeedEpoch, p.cfg, p.ownerCaptureInfoClient)
>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224))
if err != nil {
return err
}
Expand Down
76 changes: 76 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
<<<<<<< HEAD
=======
"github.com/pingcap/tiflow/cdc/processor/sinkmanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224))
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
Expand All @@ -35,6 +40,7 @@ import (
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -627,3 +633,73 @@ func TestProcessorLiveness(t *testing.T) {
*p.agent.(*mockAgent).liveness = model.LivenessCaptureAlive
require.Equal(t, model.LivenessCaptureAlive, p.liveness.Load())
}

func TestGetPullerSplitUpdateMode(t *testing.T) {
testCases := []struct {
sinkURI string
config *config.ReplicaConfig
mode sourcemanager.PullerSplitUpdateMode
}{
{
sinkURI: "kafka://127.0.0.1:9092/ticdc-test2",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeNone,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=true",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=false",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=true",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:[email protected]:3306/?safe-mode=false",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
}
for _, tc := range testCases {
mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config)
require.Nil(t, err)
require.Equal(t, tc.mode, mode)
}
}
121 changes: 120 additions & 1 deletion cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ import (

const defaultMaxBatchSize = 256

// PullerSplitUpdateMode is the mode to split update events in puller.
type PullerSplitUpdateMode int32

// PullerSplitUpdateMode constants.
const (
PullerSplitUpdateModeNone PullerSplitUpdateMode = 0
PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1
PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2
)

// SourceManager is the manager of the source engine and puller.
type SourceManager struct {
// changefeedID is the changefeed ID.
Expand All @@ -50,19 +60,45 @@ type SourceManager struct {
// Used to indicate whether the changefeed is in BDR mode.
bdrMode bool

<<<<<<< HEAD
safeModeAtStart bool
=======
splitUpdateMode PullerSplitUpdateMode

enableTableMonitor bool
puller *puller.MultiplexingPuller
>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224))
}

// New creates a new source manager.
func New(
changefeedID model.ChangeFeedID,
up *upstream.Upstream,
mg entry.MounterGroup,
<<<<<<< HEAD
engine engine.SortEngine,
errChan chan error,
bdrMode bool,
safeModeAtStart bool,
) *SourceManager {
=======
engine sorter.SortEngine,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
enableTableMonitor bool,
) *SourceManager {
return newSourceManager(changefeedID, up, mg, engine, splitUpdateMode, bdrMode, enableTableMonitor)
}

// NewForTest creates a new source manager for testing.
func NewForTest(
changefeedID model.ChangeFeedID,
up *upstream.Upstream,
mg entry.MounterGroup,
engine sorter.SortEngine,
bdrMode bool,
) *SourceManager {
>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224))
return &SourceManager{
changefeedID: changefeedID,
up: up,
Expand All @@ -78,12 +114,95 @@ func isOldUpdateKVEntry(raw *model.RawKVEntry, getReplicaTs func() model.Ts) boo
return raw != nil && raw.IsUpdate() && raw.CRTs < getReplicaTs()
}

<<<<<<< HEAD
=======
func newSourceManager(
changefeedID model.ChangeFeedID,
up *upstream.Upstream,
mg entry.MounterGroup,
engine sorter.SortEngine,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
enableTableMonitor bool,
) *SourceManager {
mgr := &SourceManager{
ready: make(chan struct{}),
changefeedID: changefeedID,
up: up,
mg: mg,
engine: engine,
splitUpdateMode: splitUpdateMode,
bdrMode: bdrMode,
enableTableMonitor: enableTableMonitor,
}

serverConfig := config.GetGlobalServerConfig()
grpcPool := sharedconn.NewConnAndClientPool(mgr.up.SecurityConfig, kv.GetGlobalGrpcMetrics())
client := kv.NewSharedClient(
mgr.changefeedID, serverConfig, mgr.bdrMode,
mgr.up.PDClient, grpcPool, mgr.up.RegionCache, mgr.up.PDClock,
txnutil.NewLockerResolver(mgr.up.KVStorage.(tikv.Storage), mgr.changefeedID),
)

// consume add raw kv entry to the engine.
// It will be called by the puller when new raw kv entry is received.
consume := func(ctx context.Context, raw *model.RawKVEntry, spans []tablepb.Span, shouldSplitKVEntry model.ShouldSplitKVEntry) error {
if len(spans) > 1 {
log.Panic("DML puller subscribes multiple spans",
zap.String("namespace", mgr.changefeedID.Namespace),
zap.String("changefeed", mgr.changefeedID.ID))
}
if raw != nil {
if shouldSplitKVEntry(raw) {
deleteKVEntry, insertKVEntry, err := model.SplitUpdateKVEntry(raw)
if err != nil {
return err
}
deleteEvent := model.NewPolymorphicEvent(deleteKVEntry)
insertEvent := model.NewPolymorphicEvent(insertKVEntry)
mgr.engine.Add(spans[0], deleteEvent, insertEvent)
} else {
pEvent := model.NewPolymorphicEvent(raw)
mgr.engine.Add(spans[0], pEvent)
}
}
return nil
}
slots, hasher := mgr.engine.SlotsAndHasher()

mgr.puller = puller.NewMultiplexingPuller(
mgr.changefeedID,
client,
up.PDClock,
consume,
slots,
hasher,
int(serverConfig.KVClient.FrontierConcurrent))

return mgr
}

>>>>>>> f1d2ee62f8 (puller(ticdc): always split update kv entries in sink safe mode (#11224))
// AddTable adds a table to the source manager. Start puller and register table to the engine.
func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID, tableName string, startTs model.Ts, getReplicaTs func() model.Ts) {
// Add table to the engine first, so that the engine can receive the events from the puller.
m.engine.AddTable(tableID)
shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs)
if raw == nil || !raw.IsUpdate() {
return false
}
switch m.splitUpdateMode {
case PullerSplitUpdateModeNone:
return false
case PullerSplitUpdateModeAlways:
return true
case PullerSplitUpdateModeAtStart:
return isOldUpdateKVEntry(raw, getReplicaTs)
default:
log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode)))
}
log.Panic("Shouldn't reach here")
return false
}
p := pullerwrapper.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs, m.bdrMode, shouldSplitKVEntry)
p.Start(ctx, m.up, m.engine, m.errChan)
Expand Down
Loading

0 comments on commit c7b4c65

Please sign in to comment.