Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm: new router compatible with regular expression #4358

Merged
merged 16 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high
ErrWorkerTLSConfigNotValid,[code=40076:class=dm-worker:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file."
ErrWorkerFailConnectMaster,[code=40077:class=dm-worker:scope=internal:level=high], "Message: cannot join with master endpoints: %v, error: %v, Workaround: Please check network connection of worker and check worker name is unique."
ErrWorkerRelayConfigChanging,[code=40079:class=dm-worker:scope=internal:level=low], "Message: relay config of worker %s is changed too frequently, last relay source %s:, new relay source %s, Workaround: Please try again later"
ErrWorkerRouteTableDupMatch,[code=40080:class=dm-worker:scope=internal:level=high], "Message: table %s.%s matches more than one rule, Workaround: please check the route rules in the task config"
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium], "Message: parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium], "Message: config toml transform, Workaround: Please check the configuration file has correct TOML format."
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium], "Message: '%s' is an invalid flag"
Expand Down
4 changes: 2 additions & 2 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/dumpling"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
Expand All @@ -42,7 +43,6 @@ import (
column "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/dumpling/export"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -135,7 +135,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
if err != nil {
return terror.ErrTaskCheckGenBAList.Delegate(err)
}
r, err := router.NewTableRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules)
r, err := router.NewRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules)
if err != nil {
return terror.ErrTaskCheckGenTableRouter.Delegate(err)
}
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a question: why not pull a request to tidb-tools?
maybe i miss some offline talk infomation😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, nice suggestion. We're planning moving it to tidb-tools to keep the table router aligned across all the project.

"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/dumpling"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
)
Expand Down Expand Up @@ -426,7 +426,7 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if _, err := filter.New(c.CaseSensitive, c.BAList); err != nil {
return terror.ErrConfigGenBAList.Delegate(err)
}
if _, err := router.NewTableRouter(c.CaseSensitive, c.RouteRules); err != nil {
if _, err := router.NewRouter(c.CaseSensitive, c.RouteRules); err != nil {
return terror.ErrConfigGenTableRouter.Delegate(err)
}
// NewMapping will fill arguments with the default values.
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2878,6 +2878,12 @@ description = ""
workaround = "Please try again later"
tags = ["internal", "low"]

[error.DM-dm-worker-40080]
message = "table %s.%s matches more than one rule"
description = ""
workaround = "please check the route rules in the task config"
tags = ["internal", "high"]

[error.DM-dm-tracer-42001]
message = "parse dm-tracer config flag set"
description = ""
Expand Down
4 changes: 2 additions & 2 deletions dm/loader/convert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
parserpkg "github.com/pingcap/tiflow/dm/pkg/parser"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"

"github.com/pingcap/errors"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/parser/ast"
)

Expand Down Expand Up @@ -234,7 +234,7 @@ func tableName(schema, table string) string {
return fmt.Sprintf("`%s`.`%s`", schema, table)
}

func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file, sqlMode, sourceID string) (*tableInfo, error) {
func parseTable(ctx *tcontext.Context, r *router.RouteTable, schema, table, file, sqlMode, sourceID string) (*tableInfo, error) {
statement, err := exportStatement(file)
if err != nil {
return nil, err
Expand Down
10 changes: 5 additions & 5 deletions dm/loader/convert_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ package loader

import (
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
router "github.com/pingcap/tidb-tools/pkg/table-router"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/router"

. "github.com/pingcap/check"
)
Expand Down Expand Up @@ -165,7 +165,7 @@ func (t *testConvertDataSuite) TestParseTable(c *C) {
insertHeadStmt: "INSERT INTO `t` VALUES",
}

r, err := router.NewTableRouter(false, rules)
r, err := router.NewRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES", "source-mysql-01")
Expand Down Expand Up @@ -193,7 +193,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) {
insertHeadStmt: "INSERT INTO `t` (`id`,`t_json`) VALUES",
}

r, err := router.NewTableRouter(false, rules)
r, err := router.NewRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "", "source-mysql-01")
Expand Down Expand Up @@ -411,7 +411,7 @@ func (t *testConvertDataSuite) TestParseTableWithExtendColumn(c *C) {
extendVal: []string{"t2", "test1", "source1"},
}

r, err := router.NewTableRouter(false, rules)
r, err := router.NewRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES", "source1")
Expand Down Expand Up @@ -456,7 +456,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumnExtendColumn(c *
extendVal: []string{"t3", "test1", "source1"},
}

r, err := router.NewTableRouter(false, rules)
r, err := router.NewRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "", "source1")
Expand Down
12 changes: 6 additions & 6 deletions dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ import (
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -430,7 +430,7 @@ type Loader struct {

fileJobQueue chan *fileJob

tableRouter *router.Table
tableRouter *router.RouteTable
baList *filter.Filter
columnMapping *cm.Mapping

Expand Down Expand Up @@ -876,7 +876,7 @@ func (l *Loader) Update(ctx context.Context, cfg *config.SubTaskConfig) error {
var (
err error
oldBaList *filter.Filter
oldTableRouter *router.Table
oldTableRouter *router.RouteTable
oldColumnMapping *cm.Mapping
)

Expand Down Expand Up @@ -904,7 +904,7 @@ func (l *Loader) Update(ctx context.Context, cfg *config.SubTaskConfig) error {

// update route, for loader, this almost useless, because schemas often have been restored
oldTableRouter = l.tableRouter
l.tableRouter, err = router.NewTableRouter(cfg.CaseSensitive, cfg.RouteRules)
l.tableRouter, err = router.NewRouter(cfg.CaseSensitive, cfg.RouteRules)
if err != nil {
return terror.ErrLoadUnitGenTableRouter.Delegate(err)
}
Expand All @@ -924,7 +924,7 @@ func (l *Loader) Update(ctx context.Context, cfg *config.SubTaskConfig) error {
}

func (l *Loader) genRouter(rules []*router.TableRule) error {
l.tableRouter, _ = router.NewTableRouter(l.cfg.CaseSensitive, []*router.TableRule{})
l.tableRouter, _ = router.NewRouter(l.cfg.CaseSensitive, []*router.TableRule{})
for _, rule := range rules {
err := l.tableRouter.AddRule(rule)
if err != nil {
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func renameShardingSchema(query, srcSchema, dstSchema string, ansiquote bool) st
return SQLReplace(query, srcSchema, dstSchema, ansiquote)
}

func fetchMatchedLiteral(ctx *tcontext.Context, router *router.Table, schema, table string) (targetSchema string, targetTable string) {
func fetchMatchedLiteral(ctx *tcontext.Context, router *router.RouteTable, schema, table string) (targetSchema string, targetTable string) {
if schema == "" {
// nothing change
return schema, table
Expand Down
Loading