Skip to content

Commit

Permalink
only split update kv entries for mysql sink
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Apr 22, 2024
1 parent 5b60c14 commit cf514b1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
21 changes: 20 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"net/url"
"strconv"
"sync"
"time"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -486,6 +488,18 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot.
// the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot.
Expand Down Expand Up @@ -641,10 +655,15 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
return errors.Trace(err)
}

pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode),
util.GetOrZero(p.latestInfo.Config.EnableTableMonitor))
util.GetOrZero(p.latestInfo.Config.EnableTableMonitor),
pullerSafeModeAtStart)
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)
Expand Down
6 changes: 4 additions & 2 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ func New(
engine sorter.SortEngine,
bdrMode bool,
enableTableMonitor bool,
safeModeAtStart bool,
) *SourceManager {
return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor)
return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor, safeModeAtStart)
}

// NewForTest creates a new source manager for testing.
Expand Down Expand Up @@ -115,6 +116,7 @@ func newSourceManager(
engine sorter.SortEngine,
bdrMode bool,
enableTableMonitor bool,
safeModeAtStart bool,
) *SourceManager {
mgr := &SourceManager{
ready: make(chan struct{}),
Expand Down Expand Up @@ -143,7 +145,7 @@ func newSourceManager(
zap.String("changefeed", mgr.changefeedID.ID))
}
if raw != nil {
if isOldUpdateKVEntry(raw, mgr.startTs) {
if safeModeAtStart && isOldUpdateKVEntry(raw, mgr.startTs) {
deleteKVEntry, insertKVEntry, err := splitUpdateKVEntry(raw)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type SinkFactory struct {
category Category
}

// New creates a new SinkFactory by schema.
// New creates a new SinkFactory by scheme.
func New(
ctx context.Context,
changefeedID model.ChangeFeedID,
Expand All @@ -79,8 +79,8 @@ func New(
}

s := &SinkFactory{}
schema := sink.GetScheme(sinkURI)
switch schema {
scheme := sink.GetScheme(sinkURI)
switch scheme {
case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme:
txnSink, err := txn.NewMySQLSink(ctx, changefeedID, sinkURI, cfg, errCh,
txn.DefaultConflictDetectorSlots)
Expand Down Expand Up @@ -123,7 +123,7 @@ func New(
s.category = CategoryMQ
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema)
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme)
}

return s, nil
Expand Down

0 comments on commit cf514b1

Please sign in to comment.