Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed,kvClient, sink (ticdc): support bidirectional replication #7630

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6c9a849
add bidirectional function
asddongmen Nov 17, 2022
0cb6ebf
fix error
asddongmen Nov 18, 2022
50d2d9c
get source id from pd
asddongmen Nov 28, 2022
91fcec4
fix error
asddongmen Nov 28, 2022
8c17d8d
Merge branch 'master' into dongmen/bidirectional_replicating
asddongmen Nov 28, 2022
b4ae99f
remove useless log
asddongmen Nov 28, 2022
0304a03
add precheck and integration test
asddongmen Nov 28, 2022
5012be3
add integration test
asddongmen Nov 28, 2022
45de47d
fix unit test
asddongmen Nov 29, 2022
e65029a
fix unit test
asddongmen Nov 29, 2022
995a8d1
address comment
asddongmen Nov 29, 2022
d72049e
update kvproto
asddongmen Nov 29, 2022
5d19ec1
Merge branch 'master' into dongmen/bidirectional_replicating
asddongmen Nov 29, 2022
269f3fb
fix check
asddongmen Nov 29, 2022
7f3b885
Merge branch 'dongmen/bidirectional_replicating' of github.com:asddon…
asddongmen Nov 29, 2022
3994ea0
fix lint check and address comments
asddongmen Nov 30, 2022
055861e
Merge branch 'master' into dongmen/bidirectional_replicating
ti-chi-bot Nov 30, 2022
acebf50
Merge branch 'master' into dongmen/bidirectional_replicating
ti-chi-bot Nov 30, 2022
4c54851
fix lint check
asddongmen Nov 30, 2022
7b9dcf6
Merge branch 'dongmen/bidirectional_replicating' of github.com:asddon…
asddongmen Nov 30, 2022
5edc9ad
fix it
asddongmen Nov 30, 2022
ab7b148
fix it
asddongmen Nov 30, 2022
21fff7d
fmt
asddongmen Nov 30, 2022
67dde92
fix it
asddongmen Nov 30, 2022
fdef274
fix it
asddongmen Nov 30, 2022
6582b3f
fix it error
asddongmen Nov 30, 2022
251c56b
Merge branch 'master' into dongmen/bidirectional_replicating
ti-chi-bot Nov 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type ReplicaConfig struct {
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
EnableSyncPoint bool `json:"enable_sync_point"`
BDRMode bool `json:"bdr_mode"`
SyncPointInterval time.Duration `json:"sync_point_interval"`
SyncPointRetention time.Duration `json:"sync_point_retention"`
Filter *FilterConfig `json:"filter"`
Expand All @@ -113,6 +114,7 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
res.EnableSyncPoint = c.EnableSyncPoint
res.SyncPointInterval = c.SyncPointInterval
res.SyncPointRetention = c.SyncPointRetention
res.BDRMode = c.BDRMode

if c.Filter != nil {
var mySQLReplicationRules *filter.MySQLReplicationRules
Expand Down Expand Up @@ -221,6 +223,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
EnableSyncPoint: cloned.EnableSyncPoint,
SyncPointInterval: cloned.SyncPointInterval,
SyncPointRetention: cloned.SyncPointRetention,
BDRMode: cloned.BDRMode,
}

if cloned.Filter != nil {
Expand Down
6 changes: 6 additions & 0 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ type CDCClient struct {
}
ingressCommitTs model.Ts
ingressResolvedTs model.Ts
// filterLoop is used in BDR mode, when it is true, tikv cdc component
// will filter data that are written by another TiCDC.
filterLoop bool
}

// NewCDCClient creates a CDCClient instance
Expand All @@ -247,6 +250,7 @@ func NewCDCClient(
changefeed model.ChangeFeedID,
tableID model.TableID,
tableName string,
filterLoop bool,
) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

Expand All @@ -268,6 +272,7 @@ func NewCDCClient(
}{
counts: list.New(),
},
filterLoop: filterLoop,
}
return
}
Expand Down Expand Up @@ -674,6 +679,7 @@ func (s *eventFeedSession) requestRegionToStore(
StartKey: sri.span.Start,
EndKey: sri.span.End,
ExtraOp: extraOp,
FilterLoop: s.client.filterLoop,
}

failpoint.Inject("kvClientPendingRegionDelay", nil)
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -290,7 +290,7 @@ func prepareBench(b *testing.B, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down
52 changes: 26 additions & 26 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestNewClient(t *testing.T) {
defer regionCache.Close()
cli := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "")
config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false)
require.NotNil(t, cli)
}

Expand Down Expand Up @@ -320,7 +320,7 @@ func TestConnectOfflineTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
// Take care of the eventCh, it's used to output resolvedTs event or kv event
// It will stuck the normal routine
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestRecvLargeMessageSize(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestHandleError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -681,7 +681,7 @@ func TestCompatibilityWithSameConn(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var wg2 sync.WaitGroup
wg2.Add(1)
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestClusterIDMismatch(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

var wg2 sync.WaitGroup
Expand Down Expand Up @@ -817,7 +817,7 @@ func testHandleFeedEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1278,7 +1278,7 @@ func TestStreamSendWithError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1390,7 +1390,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1523,7 +1523,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1732,7 +1732,7 @@ func TestIncompatibleTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
// NOTICE: eventCh may block the main logic of EventFeed
eventCh := make(chan model.RegionFeedEvent, 128)
wg.Add(1)
Expand Down Expand Up @@ -1809,7 +1809,7 @@ func TestNoPendingRegionError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

wg.Add(1)
Expand Down Expand Up @@ -1888,7 +1888,7 @@ func TestDropStaleRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2002,7 +2002,7 @@ func TestResolveLock(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2107,7 +2107,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var clientWg sync.WaitGroup
clientWg.Add(1)
Expand Down Expand Up @@ -2260,7 +2260,7 @@ func testEventAfterFeedStop(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2447,7 +2447,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2665,7 +2665,7 @@ func TestResolveLockNoCandidate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2761,7 +2761,7 @@ func TestFailRegionReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2844,7 +2844,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2912,7 +2912,7 @@ func testClientErrNoPendingRegion(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2990,7 +2990,7 @@ func testKVClientForceReconnect(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3141,7 +3141,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 100)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3258,7 +3258,7 @@ func TestEvTimeUpdate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3384,7 +3384,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3476,7 +3476,7 @@ func TestPrewriteNotMatchError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
baseAllocatedID := currentRequestID()

Expand Down
21 changes: 21 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -501,6 +502,17 @@ LOOP:
cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel

sourceID, err := pdutil.GetSourceID(ctx, c.upstream.PDClient)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not necessary to get sourced if the BDR is disabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we always need it.

if err != nil {
return errors.Trace(err)
}
c.state.Info.Config.Sink.TiDBSourceID = sourceID
log.Info("set source id",
zap.Uint64("sourceID", sourceID),
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
)

c.sink = c.newSink(c.id, c.state.Info, ctx.Throw)
c.sink.run(cancelCtx)

Expand Down Expand Up @@ -900,6 +912,15 @@ func (c *changefeed) asyncExecDDLEvent(ctx cdcContext.Context,
zap.String("changefeed", c.id.ID), zap.Any("event", ddlEvent))
return true, nil
}

// check whether in bdr mode, if so, we need to skip all DDLs
if c.state.Info.Config.BDRMode {
log.Info("ignore the DDL event in BDR mode",
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
zap.String("changefeed", c.id.ID),
zap.Any("ddl", ddlEvent.Query))
return true, nil
}

done, err = c.sink.emitDDLEvent(ctx, ddlEvent)
if err != nil {
return false, err
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (n *pullerNode) tableSpan() []regionspan.Span {

func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
up *upstream.Upstream, wg *errgroup.Group,
sorter *sorterNode,
sorter *sorterNode, filterLoop bool,
) error {
n.wg = wg
ctxC, cancel := context.WithCancel(ctx)
Expand All @@ -84,6 +84,7 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
n.changefeed,
n.tableID,
n.tableName,
filterLoop,
)
n.wg.Go(func() error {
ctx.Throw(errors.Trace(n.plr.Run(ctxC)))
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func (t *tableActor) RemainEvents() int64 {

// for ut
var startPuller = func(t *tableActor, ctx *actorNodeContext) error {
return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode)
return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode, t.replicaConfig.BDRMode)
}

var startSorter = func(t *tableActor, ctx *actorNodeContext) error {
Expand Down
7 changes: 7 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -854,6 +855,12 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
p.sendError(p.mg.Run(ctx))
}()

sourceID, err := pdutil.GetSourceID(ctx, p.upstream.PDClient)
if err != nil {
return errors.Trace(err)
}
p.changefeed.Info.Config.Sink.TiDBSourceID = sourceID

start := time.Now()
conf := config.GetGlobalServerConfig()
p.pullBasedSinking = conf.Debug.EnablePullBasedSink
Expand Down
Loading