Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed,kvClient, sink (ticdc): support bidirectional replication #7630

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6c9a849
add bidirectional function
asddongmen Nov 17, 2022
0cb6ebf
fix error
asddongmen Nov 18, 2022
50d2d9c
get source id from pd
asddongmen Nov 28, 2022
91fcec4
fix error
asddongmen Nov 28, 2022
8c17d8d
Merge branch 'master' into dongmen/bidirectional_replicating
asddongmen Nov 28, 2022
b4ae99f
remove useless log
asddongmen Nov 28, 2022
0304a03
add precheck and integration test
asddongmen Nov 28, 2022
5012be3
add integration test
asddongmen Nov 28, 2022
45de47d
fix unit test
asddongmen Nov 29, 2022
e65029a
fix unit test
asddongmen Nov 29, 2022
995a8d1
address comment
asddongmen Nov 29, 2022
d72049e
update kvproto
asddongmen Nov 29, 2022
5d19ec1
Merge branch 'master' into dongmen/bidirectional_replicating
asddongmen Nov 29, 2022
269f3fb
fix check
asddongmen Nov 29, 2022
7f3b885
Merge branch 'dongmen/bidirectional_replicating' of github.com:asddon…
asddongmen Nov 29, 2022
3994ea0
fix lint check and address comments
asddongmen Nov 30, 2022
055861e
Merge branch 'master' into dongmen/bidirectional_replicating
ti-chi-bot Nov 30, 2022
acebf50
Merge branch 'master' into dongmen/bidirectional_replicating
ti-chi-bot Nov 30, 2022
4c54851
fix lint check
asddongmen Nov 30, 2022
7b9dcf6
Merge branch 'dongmen/bidirectional_replicating' of github.com:asddon…
asddongmen Nov 30, 2022
5edc9ad
fix it
asddongmen Nov 30, 2022
ab7b148
fix it
asddongmen Nov 30, 2022
21fff7d
fmt
asddongmen Nov 30, 2022
67dde92
fix it
asddongmen Nov 30, 2022
fdef274
fix it
asddongmen Nov 30, 2022
6582b3f
fix it error
asddongmen Nov 30, 2022
251c56b
Merge branch 'master' into dongmen/bidirectional_replicating
ti-chi-bot Nov 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type ReplicaConfig struct {
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
EnableSyncPoint bool `json:"enable_sync_point"`
BDRMode bool `json:"bdr-mode"`
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
SyncPointInterval time.Duration `json:"sync_point_interval"`
SyncPointRetention time.Duration `json:"sync_point_retention"`
Filter *FilterConfig `json:"filter"`
Expand All @@ -112,6 +113,7 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
res.EnableSyncPoint = c.EnableSyncPoint
res.SyncPointInterval = c.SyncPointInterval
res.SyncPointRetention = c.SyncPointRetention
res.BDRMode = c.BDRMode

if c.Filter != nil {
var mySQLReplicationRules *filter.MySQLReplicationRules
Expand Down Expand Up @@ -215,6 +217,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
EnableSyncPoint: cloned.EnableSyncPoint,
SyncPointInterval: cloned.SyncPointInterval,
SyncPointRetention: cloned.SyncPointRetention,
BDRMode: cloned.BDRMode,
}

if cloned.Filter != nil {
Expand Down
4 changes: 4 additions & 0 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ type CDCClient struct {
}
ingressCommitTs model.Ts
ingressResolvedTs model.Ts
filterLoop bool
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
}

// NewCDCClient creates a CDCClient instance
Expand All @@ -247,6 +248,7 @@ func NewCDCClient(
changefeed model.ChangeFeedID,
tableID model.TableID,
tableName string,
filterLoop bool,
) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

Expand All @@ -268,6 +270,7 @@ func NewCDCClient(
}{
counts: list.New(),
},
filterLoop: filterLoop,
}
return
}
Expand Down Expand Up @@ -674,6 +677,7 @@ func (s *eventFeedSession) requestRegionToStore(
StartKey: sri.span.Start,
EndKey: sri.span.End,
ExtraOp: extraOp,
FilterLoop: s.client.filterLoop,
}

failpoint.Inject("kvClientPendingRegionDelay", nil)
Expand Down
9 changes: 9 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,15 @@ func (c *changefeed) asyncExecDDLEvent(ctx cdcContext.Context,
zap.String("changefeed", c.id.ID), zap.Any("event", ddlEvent))
return true, nil
}

// check whether in bdr mode, if so, we need to skip all DDLs
if c.state.Info.Config.BDRMode {
log.Info("ignore the DDL event in BDR mode",
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
zap.String("changefeed", c.id.ID),
zap.Any("ddl", ddlEvent.Query))
return true, nil
}

done, err = c.sink.emitDDLEvent(ctx, ddlEvent)
if err != nil {
return false, err
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (n *pullerNode) tableSpan() []regionspan.Span {

func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
up *upstream.Upstream, wg *errgroup.Group,
sorter *sorterNode,
sorter *sorterNode, filterLoop bool,
) error {
n.wg = wg
ctxC, cancel := context.WithCancel(ctx)
Expand All @@ -84,6 +84,7 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
n.changefeed,
n.tableID,
n.tableName,
filterLoop,
)
n.wg.Go(func() error {
ctx.Throw(errors.Trace(n.plr.Run(ctxC)))
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func (t *tableActor) RemainEvents() int64 {

// for ut
var startPuller = func(t *tableActor, ctx *actorNodeContext) error {
return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode)
return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode, t.replicaConfig.BDRMode)
}

var startSorter = func(t *tableActor, ctx *actorNodeContext) error {
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/sourcemanager/puller/puller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (n *Wrapper) Start(
n.changefeed,
n.tableID,
n.tableName,
false,
)
n.wg.Add(1)
go func() {
Expand Down
8 changes: 7 additions & 1 deletion cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ const (
ddlPullerStuckWarnDuration = 30 * time.Second
// DDLPullerTableName is the fake table name for ddl puller
DDLPullerTableName = "DDL_PULLER"
// ddl puller should never filter any DDL jobs even if
// the changefeed is in BDR mode, because the DDL jobs should
// be filtered before they are sent to the sink
ddLPullerFilterLoop = false
)

// DDLJobPuller is used to pull ddl job from TiKV.
Expand Down Expand Up @@ -481,7 +485,9 @@ func NewDDLJobPuller(
regionspan.GetAllDDLSpan(),
cfg,
changefeed,
-1, DDLPullerTableName),
-1, DDLPullerTableName,
ddLPullerFilterLoop,
),
kvStorage: kvStorage,
outputCh: make(chan *model.DDLJobEntry, defaultPullerOutputChanSize),
metricDiscardedDDLCounter: discardedDDLCounter.
Expand Down
3 changes: 2 additions & 1 deletion cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func New(ctx context.Context,
changefeed model.ChangeFeedID,
tableID model.TableID,
tableName string,
filterLoop bool,
) Puller {
tikvStorage, ok := kvStorage.(tikv.Storage)
if !ok {
Expand All @@ -108,7 +109,7 @@ func New(ctx context.Context,
WithLabelValues(changefeed.Namespace, changefeed.ID, pullerType)
tsTracker := frontier.NewFrontier(0, metricMissedRegionCollectCounter, comparableSpans...)
kvCli := kv.NewCDCKVClient(
ctx, pdCli, grpcPool, regionCache, pdClock, cfg, changefeed, tableID, tableName)
ctx, pdCli, grpcPool, regionCache, pdClock, cfg, changefeed, tableID, tableName, filterLoop)
p := &pullerImpl{
kvCli: kvCli,
kvStorage: tikvStorage,
Expand Down
49 changes: 49 additions & 0 deletions cdc/sinkv2/eventsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"database/sql/driver"
"fmt"
"net/url"
"strconv"
"time"

dmysql "github.com/go-sql-driver/mysql"
Expand Down Expand Up @@ -88,6 +89,15 @@ func NewMySQLBackends(
if err != nil {
return nil, err
}

//Note(dongmen): Only for BDR mode use for now.
sysVariables := make(map[string]string)
// TODO(dongmen): set a correct value for this variable after the `source_id` design is done.
sysVariables["tidb_cdc_write_source"] = strconv.Itoa(1)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
if err := setTiDBSystemVariable(ctx, db, sysVariables); err != nil {
return nil, err
}

db.SetMaxIdleConns(cfg.WorkerCount)
db.SetMaxOpenConns(cfg.WorkerCount)

Expand Down Expand Up @@ -427,3 +437,42 @@ func getSQLErrCode(err error) (errors.ErrCode, bool) {
func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) {
s.dmlMaxRetry = maxRetry
}

func setTiDBSystemVariable(ctx context.Context, db *sql.DB, vars map[string]string) error {
var tidbVer string
// check if downstream is TiDB
row := db.QueryRowContext(ctx, "select tidb_version()")
err := row.Scan(&tidbVer)
if err != nil {
log.Error("err", zap.Error(err))
// downstream is not TiDB, do nothing
if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError);
// means downstream is not TiDB
ok && mysqlErr.Number == mysql.ErrNoDB ||
mysqlErr.Number == mysql.ErrSpDoesNotExist {
return nil
}
return err
}

// downstream is TiDB, set system variables.
// We should always try to set this variable, and ignore the error if
// downstream does not support this variable, it is by design.
for k, v := range vars {
query := fmt.Sprintf("SET SESSION %s = %s", k, v)
log.Info("set session variable", zap.String("query", query))
_, err := db.ExecContext(ctx, query)
if err != nil {
if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok &&
mysqlErr.Number == mysql.ErrUnknownSystemVariable {
log.Info("This version of TiDB does not "+
"support system variable: tidb_write_by_ticdc",
zap.String("version", tidbVer))
continue
}
return err
}
}

return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,5 @@ replace github.com/benbjohnson/clock v1.3.0 => github.com/benbjohnson/clock v1.1

// copy from TiDB
replace go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac

replace github.com/pingcap/kvproto => github.com/xiongjiwei/kvproto v0.0.0-20221116081748-24d900443289
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xiongjiwei/kvproto v0.0.0-20221116081748-24d900443289 h1:qsL3cNN2jOOp8B1hfIWH78NTBDMxhZlip1aJujogj/4=
github.com/xiongjiwei/kvproto v0.0.0-20221116081748-24d900443289/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/xitongsys/parquet-go v1.5.1/go.mod h1:xUxwM8ELydxh4edHGegYq1pA8NnMKDx0K/GyB0o2bww=
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457/go.mod h1:pheqtXeHQFzxJk45lRQ0UIGIivKnLXvialZSFWs81A8=
github.com/xitongsys/parquet-go v1.6.0 h1:j6YrTVZdQx5yywJLIOklZcKVsCoSD1tqOVRXyTBFSjs=
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type replicaConfig struct {
ForceReplicate bool `toml:"force-replicate" json:"force-replicate"`
CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"`
EnableSyncPoint bool `toml:"enable-sync-point" json:"enable-sync-point"`
BDRMode bool `toml:"bdr-mode" json:"bdr-mode"`
SyncPointInterval time.Duration `toml:"sync-point-interval" json:"sync-point-interval"`
SyncPointRetention time.Duration `toml:"sync-point-retention" json:"sync-point-retention"`
Filter *FilterConfig `toml:"filter" json:"filter"`
Expand Down