Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

feat: source level filter support #1370

Merged
merged 8 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ ErrConfigShardModeNotSupport,[code=20033:class=config:scope=internal:level=mediu
ErrConfigMoreThanOne,[code=20034:class=config:scope=internal:level=high], "Message: found %d %s for %s which should <= 1"
ErrConfigEtcdParse,[code=20035:class=config:scope=internal:level=high], "Message: incapable config of %s from etcd"
ErrConfigMissingForBound,[code=20036:class=config:scope=internal:level=high], "Message: source bound %s doesn't have related source config in etcd"
ErrConfigBinlogEventFilter,[code=20037:class=config:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Please check the `filters` config in source and task configuration files."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down Expand Up @@ -276,7 +277,7 @@ ErrSyncerUnitNilOperatorReq,[code=36026:class=sync-unit:scope=internal:level=med
ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high], "Message: Column count doesn't match value count: %d (columns) vs %d (values)"
ErrSyncerUnitDMLOldNewValueMismatch,[code=36028:class=sync-unit:scope=internal:level=high], "Message: Old value count doesn't match new value count: %d (old) vs %d (new)"
ErrSyncerUnitDMLPruneColumnMismatch,[code=36029:class=sync-unit:scope=internal:level=high], "Message: prune DML columns and data mismatch in length: %d (columns) %d (data)"
ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter"
ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Pleass check the `filters` config in source and task configuration files."
ErrSyncerUnitGenTableRouter,[code=36031:class=sync-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file."
ErrSyncerUnitGenColumnMapping,[code=36032:class=sync-unit:scope=internal:level=high], "Message: generate column mapping, Workaround: Please check the `column-mappings` config in task configuration file."
ErrSyncerUnitDoColumnMapping,[code=36033:class=sync-unit:scope=internal:level=high], "Message: mapping row data %v for table `%s`.`%s`"
Expand Down
9 changes: 9 additions & 0 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
)

const (
Expand Down Expand Up @@ -71,6 +72,9 @@ type SourceConfig struct {

// deprecated tracer, to keep compatibility with older version
Tracer map[string]interface{} `yaml:"tracer" toml:"tracer" json:"-"`

CaseSensitive bool `yaml:"case-sensitive" toml:"case-sensitive" json:"case-sensitive"`
Filters []*bf.BinlogEventRule `yaml:"filters" toml:"filters" json:"filters"`
}

// NewSourceConfig creates a new base config for upstream MySQL/MariaDB source.
Expand Down Expand Up @@ -189,6 +193,11 @@ func (c *SourceConfig) Verify() error {

c.DecryptPassword()

_, err = bf.NewBinlogEvent(c.CaseSensitive, c.Filters)
if err != nil {
return terror.ErrConfigBinlogEventFilter.Delegate(err)
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/siddontang/go-mysql/mysql"
)

Expand Down Expand Up @@ -75,6 +76,7 @@ func (t *testConfig) TestConfig(c *C) {
// fix empty map after marshal/unmarshal becomes nil
clone1.From.Session = map[string]string{}
clone1.Tracer = map[string]interface{}{}
clone1.Filters = []*bf.BinlogEventRule{}
clone2 := cfg.DecryptPassword()
c.Assert(clone2, DeepEquals, clone1)

Expand Down
7 changes: 6 additions & 1 deletion dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,11 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg.FilterRules[j] = c.Filters[name]
}

_, err := bf.NewBinlogEvent(cfg.CaseSensitive, cfg.FilterRules)
if err != nil {
return nil, terror.ErrConfigBinlogEventFilter.Delegate(err)
}

cfg.ColumnMappingRules = make([]*column.Rule, len(inst.ColumnMappingRules))
for j, name := range inst.ColumnMappingRules {
cfg.ColumnMappingRules[j] = c.ColumnMappings[name]
Expand All @@ -605,7 +610,7 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf

cfg.CleanDumpFile = c.CleanDumpFile

err := cfg.Adjust(true)
err = cfg.Adjust(true)
if err != nil {
return nil, terror.Annotatef(err, "source %s", inst.SourceID)
}
Expand Down
2 changes: 2 additions & 0 deletions dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/golang/mock/gomock"
. "github.com/pingcap/check"
filter "github.com/pingcap/tidb-tools/pkg/binlog-filter"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
Expand Down Expand Up @@ -72,6 +73,7 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
// fix empty map after marshal/unmarshal becomes nil
cfg1.From.Session = map[string]string{}
cfg1.Tracer = map[string]interface{}{}
cfg1.Filters = []*filter.BinlogEventRule{}
c.Assert(cfg1.LoadFromFile("./source.yaml"), IsNil)
cfg1.From.Host = host
cfg1.From.Port = port
Expand Down
8 changes: 6 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,9 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
subTaskCfg.LogFile = s.cfg.LogFile
subTaskCfg.LogFormat = s.cfg.LogFormat
subTaskCfgClone := subTaskCfg
copyConfigFromSource(&subTaskCfgClone, cfg)
if err = copyConfigFromSource(&subTaskCfgClone, cfg); err != nil {
return err
}
subTaskCfgs = append(subTaskCfgs, &subTaskCfgClone)
}

Expand Down Expand Up @@ -605,7 +607,9 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
continue
}
log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
w.StartSubTask(subTaskCfg, expectStage.Expect)
if err := w.StartSubTask(subTaskCfg, expectStage.Expect); err != nil {
return err
}
}

w.wg.Add(1)
Expand Down
45 changes: 36 additions & 9 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/siddontang/go/sync2"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -190,36 +191,41 @@ func (w *Worker) Close() {
}

// StartSubTask creates a sub task an run it
func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) {
func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) error {
w.Lock()
defer w.Unlock()

// copy some config item from dm-worker's source config
copyConfigFromSource(cfg, w.cfg)
err := copyConfigFromSource(cfg, w.cfg)
if err != nil {
return err
}

// directly put cfg into subTaskHolder
// the unique of subtask should be assured by etcd
st := NewSubTask(cfg, w.etcdClient)
w.subTaskHolder.recordSubTask(st)
if w.closed.Get() == closedTrue {
st.fail(terror.ErrWorkerAlreadyClosed.Generate())
return
return nil
}

cfg2, err := cfg.DecryptPassword()
if err != nil {
st.fail(errors.Annotate(err, "start sub task"))
return
return nil
}
st.cfg = cfg2

if w.relayPurger != nil && w.relayPurger.Purging() {
// TODO: retry until purged finished
st.fail(terror.ErrWorkerRelayIsPurging.Generate(cfg.Name))
return
return nil
}

w.l.Info("subtask created", zap.Stringer("config", cfg2))
st.Run(expectStage)
return nil
}

// UpdateSubTask update config for a sub task
Expand Down Expand Up @@ -419,9 +425,8 @@ func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskCo
if st := w.subTaskHolder.findSubTask(stage.Task); st == nil {
// create the subtask for expected running and paused stage.
log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
w.StartSubTask(&subTaskCfg, stage.Expect)
// error is nil, opErrTypeBeforeOp will be ignored
return opErrTypeBeforeOp, nil
err := w.StartSubTask(&subTaskCfg, stage.Expect)
return opErrTypeBeforeOp, err
}
if stage.Expect == pb.Stage_Running {
op = pb.TaskOp_Resume
Expand Down Expand Up @@ -623,7 +628,7 @@ func (w *Worker) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR
}

// copyConfigFromSource copies config items from source config to sub task
func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceConfig) {
func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceConfig) error {
cfg.From = sourceCfg.From

cfg.Flavor = sourceCfg.Flavor
Expand All @@ -634,6 +639,28 @@ func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceCon

// we can remove this from SubTaskConfig later, because syncer will always read from relay
cfg.AutoFixGTID = sourceCfg.AutoFixGTID

if cfg.CaseSensitive != sourceCfg.CaseSensitive {
log.L().Warn("different case-sensitive config between task config and source config, use `true` for it.")
}
cfg.CaseSensitive = cfg.CaseSensitive || sourceCfg.CaseSensitive
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
filter, err := bf.NewBinlogEvent(cfg.CaseSensitive, cfg.FilterRules)
if err != nil {
return err
}

for _, filterRule := range sourceCfg.Filters {
if err = filter.AddRule(filterRule); err != nil {
// task level config has higher priority
if errors.IsAlreadyExists(errors.Cause(err)) {
log.L().Warn("filter config already exist in source config, overwrite it", log.ShortError(err))
continue
}
return err
}
cfg.FilterRules = append(cfg.FilterRules, filterRule)
}
return nil
}

// getAllSubTaskStatus returns all subtask status of this worker, note the field
Expand Down
8 changes: 4 additions & 4 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (t *testServer) testWorker(c *C) {
c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0)
c.Assert(w.closed.Get(), Equals, closedTrue)

w.StartSubTask(&config.SubTaskConfig{
c.Assert(w.StartSubTask(&config.SubTaskConfig{
Name: "testStartTask",
}, pb.Stage_Running)
}, pb.Stage_Running), IsNil)
task := w.subTaskHolder.findSubTask("testStartTask")
c.Assert(task, NotNil)
c.Assert(task.Result().String(), Matches, ".*worker already closed.*")
Expand Down Expand Up @@ -160,7 +160,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) {
var subtaskCfg config.SubTaskConfig
c.Assert(subtaskCfg.DecodeFile("./subtask.toml", true), IsNil)
c.Assert(err, IsNil)
s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running)
c.Assert(s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running), IsNil)

// check task in paused state
c.Assert(utils.WaitSomething(100, 100*time.Millisecond, func() bool {
Expand Down Expand Up @@ -269,7 +269,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
[]ha.Stage{ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name)})
c.Assert(err, IsNil)
// step 2.1: start a subtask manually
w.StartSubTask(&subtaskCfg, pb.Stage_Running)
c.Assert(w.StartSubTask(&subtaskCfg, pb.Stage_Running), IsNil)
// step 3: trigger etcd compaction and check whether we can receive it through watcher
_, err = etcdCli.Compact(ctx, rev)
c.Assert(err, IsNil)
Expand Down
8 changes: 7 additions & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-config-20037]
message = "generate binlog event filter"
description = ""
workaround = "Please check the `filters` config in source and task configuration files."
tags = ["internal", "high"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down Expand Up @@ -1669,7 +1675,7 @@ tags = ["internal", "high"]
[error.DM-sync-unit-36030]
message = "generate binlog event filter"
description = ""
workaround = ""
workaround = "Pleass check the `filters` config in source and task configuration files."
tags = ["internal", "high"]

[error.DM-sync-unit-36031]
Expand Down
4 changes: 3 additions & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ const (
codeConfigMoreThanOne
codeConfigEtcdParse
codeConfigMissingForBound
codeConfigBinlogEventFilter
)

// Binlog operation error code list
Expand Down Expand Up @@ -816,6 +817,7 @@ var (
ErrConfigMoreThanOne = New(codeConfigMoreThanOne, ClassConfig, ScopeInternal, LevelHigh, "found %d %s for %s which should <= 1", "")
ErrConfigEtcdParse = New(codeConfigEtcdParse, ClassConfig, ScopeInternal, LevelHigh, "incapable config of %s from etcd", "")
ErrConfigMissingForBound = New(codeConfigMissingForBound, ClassConfig, ScopeInternal, LevelHigh, "source bound %s doesn't have related source config in etcd", "")
ErrConfigBinlogEventFilter = New(codeConfigBinlogEventFilter, ClassConfig, ScopeInternal, LevelHigh, "generate binlog event filter", "Please check the `filters` config in source and task configuration files.")

// Binlog operation error
ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "")
Expand Down Expand Up @@ -946,7 +948,7 @@ var (
ErrSyncerUnitDMLColumnNotMatch = New(codeSyncerUnitDMLColumnNotMatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Column count doesn't match value count: %d (columns) vs %d (values)", "")
ErrSyncerUnitDMLOldNewValueMismatch = New(codeSyncerUnitDMLOldNewValueMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Old value count doesn't match new value count: %d (old) vs %d (new)", "")
ErrSyncerUnitDMLPruneColumnMismatch = New(codeSyncerUnitDMLPruneColumnMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "prune DML columns and data mismatch in length: %d (columns) %d (data)", "")
ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "")
ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "Pleass check the `filters` config in source and task configuration files.")
ErrSyncerUnitGenTableRouter = New(codeSyncerUnitGenTableRouter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.")
ErrSyncerUnitGenColumnMapping = New(codeSyncerUnitGenColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "generate column mapping", "Please check the `column-mappings` config in task configuration file.")
ErrSyncerUnitDoColumnMapping = New(codeSyncerUnitDoColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "mapping row data %v for table `%s`.`%s`", "")
Expand Down
17 changes: 17 additions & 0 deletions tests/dmctl_basic/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ mysql-instances:
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
# sync t_1's alter event by task config(overwrite)
# ignore t_2's alter event by source config
filter-rules: ["user-filter-1"]

- source-id: "mysql-replica-02"
block-allow-list: "instance"
Expand All @@ -28,6 +31,8 @@ mysql-instances:
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
# ignore t_2's alter event by task config
filter-rules: ["user-filter-2"]

block-allow-list:
instance:
Expand Down Expand Up @@ -80,3 +85,15 @@ syncers:
global:
worker-count: 16
batch: 100

filters:
user-filter-1:
schema-pattern: "dmctl"
table-pattern: "t_1"
events: ["all"]
action: Do
user-filter-2:
schema-pattern: "dmctl"
table-pattern: "t_2"
sql-pattern: ["alter table .* add column `aaa` int"]
action: Ignore
14 changes: 14 additions & 0 deletions tests/dmctl_basic/conf/get_source1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,17 @@ checker:
backoff-factor: 2
server-id: 123456
tracer: {}
case-sensitive: false
filters:
- schema-pattern: dmctl
table-pattern: t_1
events: []
sql-pattern:
- alter table .* add column `aaa` int
action: Ignore
- schema-pattern: dmctl
table-pattern: t_2
events: []
sql-pattern:
- alter table .* add column `aaa` int
action: Ignore
2 changes: 2 additions & 0 deletions tests/dmctl_basic/conf/get_source2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ checker:
backoff-factor: 2
server-id: 654321
tracer: {}
case-sensitive: false
filters: []
Loading