Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

feat: source level filter support #1370

Merged
merged 8 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ ErrConfigShardModeNotSupport,[code=20033:class=config:scope=internal:level=mediu
ErrConfigMoreThanOne,[code=20034:class=config:scope=internal:level=high], "Message: found %d %s for %s which should <= 1"
ErrConfigEtcdParse,[code=20035:class=config:scope=internal:level=high], "Message: incapable config of %s from etcd"
ErrConfigMissingForBound,[code=20036:class=config:scope=internal:level=high], "Message: source bound %s doesn't have related source config in etcd"
ErrConfigBinlogEventFilter,[code=20037:class=config:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Please check the `filters` config in source and task configuration files."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down Expand Up @@ -276,7 +277,7 @@ ErrSyncerUnitNilOperatorReq,[code=36026:class=sync-unit:scope=internal:level=med
ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high], "Message: Column count doesn't match value count: %d (columns) vs %d (values)"
ErrSyncerUnitDMLOldNewValueMismatch,[code=36028:class=sync-unit:scope=internal:level=high], "Message: Old value count doesn't match new value count: %d (old) vs %d (new)"
ErrSyncerUnitDMLPruneColumnMismatch,[code=36029:class=sync-unit:scope=internal:level=high], "Message: prune DML columns and data mismatch in length: %d (columns) %d (data)"
ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter"
ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Pleass check the `filters` config in source and task configuration files."
ErrSyncerUnitGenTableRouter,[code=36031:class=sync-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file."
ErrSyncerUnitGenColumnMapping,[code=36032:class=sync-unit:scope=internal:level=high], "Message: generate column mapping, Workaround: Please check the `column-mappings` config in task configuration file."
ErrSyncerUnitDoColumnMapping,[code=36033:class=sync-unit:scope=internal:level=high], "Message: mapping row data %v for table `%s`.`%s`"
Expand Down
9 changes: 9 additions & 0 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
)

const (
Expand Down Expand Up @@ -71,6 +72,9 @@ type SourceConfig struct {

// deprecated tracer, to keep compatibility with older version
Tracer map[string]interface{} `yaml:"tracer" toml:"tracer" json:"-"`

CaseSensitive bool `yaml:"case-sensitive" toml:"case-sensitive" json:"case-sensitive"`
Filters []*bf.BinlogEventRule `yaml:"filters" toml:"filters" json:"filters"`
}

// NewSourceConfig creates a new base config for upstream MySQL/MariaDB source.
Expand Down Expand Up @@ -189,6 +193,11 @@ func (c *SourceConfig) Verify() error {

c.DecryptPassword()

_, err = bf.NewBinlogEvent(c.CaseSensitive, c.Filters)
if err != nil {
return terror.ErrConfigBinlogEventFilter.Delegate(err)
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/siddontang/go-mysql/mysql"
)

Expand Down Expand Up @@ -75,6 +76,7 @@ func (t *testConfig) TestConfig(c *C) {
// fix empty map after marshal/unmarshal becomes nil
clone1.From.Session = map[string]string{}
clone1.Tracer = map[string]interface{}{}
clone1.Filters = []*bf.BinlogEventRule{}
clone2 := cfg.DecryptPassword()
c.Assert(clone2, DeepEquals, clone1)

Expand Down
7 changes: 6 additions & 1 deletion dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,11 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg.FilterRules[j] = c.Filters[name]
}

_, err := bf.NewBinlogEvent(cfg.CaseSensitive, cfg.FilterRules)
if err != nil {
return nil, terror.ErrConfigBinlogEventFilter.Delegate(err)
}

cfg.ColumnMappingRules = make([]*column.Rule, len(inst.ColumnMappingRules))
for j, name := range inst.ColumnMappingRules {
cfg.ColumnMappingRules[j] = c.ColumnMappings[name]
Expand All @@ -605,7 +610,7 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf

cfg.CleanDumpFile = c.CleanDumpFile

err := cfg.Adjust(true)
err = cfg.Adjust(true)
if err != nil {
return nil, terror.Annotatef(err, "source %s", inst.SourceID)
}
Expand Down
2 changes: 2 additions & 0 deletions dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/golang/mock/gomock"
. "github.com/pingcap/check"
filter "github.com/pingcap/tidb-tools/pkg/binlog-filter"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
Expand Down Expand Up @@ -72,6 +73,7 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
// fix empty map after marshal/unmarshal becomes nil
cfg1.From.Session = map[string]string{}
cfg1.Tracer = map[string]interface{}{}
cfg1.Filters = []*filter.BinlogEventRule{}
c.Assert(cfg1.LoadFromFile("./source.yaml"), IsNil)
cfg1.From.Host = host
cfg1.From.Port = port
Expand Down
8 changes: 6 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,9 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
subTaskCfg.LogFile = s.cfg.LogFile
subTaskCfg.LogFormat = s.cfg.LogFormat
subTaskCfgClone := subTaskCfg
copyConfigFromSource(&subTaskCfgClone, cfg)
if err = copyConfigFromSource(&subTaskCfgClone, cfg); err != nil {
return err
}
subTaskCfgs = append(subTaskCfgs, &subTaskCfgClone)
}

Expand Down Expand Up @@ -605,7 +607,9 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
continue
}
log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
w.StartSubTask(subTaskCfg, expectStage.Expect)
if err := w.StartSubTask(subTaskCfg, expectStage.Expect); err != nil {
return err
}
}

w.wg.Add(1)
Expand Down
45 changes: 36 additions & 9 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/siddontang/go/sync2"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -190,36 +191,41 @@ func (w *Worker) Close() {
}

// StartSubTask creates a sub task an run it
func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) {
func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) error {
w.Lock()
defer w.Unlock()

// copy some config item from dm-worker's source config
copyConfigFromSource(cfg, w.cfg)
err := copyConfigFromSource(cfg, w.cfg)
if err != nil {
return err
}

// directly put cfg into subTaskHolder
// the unique of subtask should be assured by etcd
st := NewSubTask(cfg, w.etcdClient)
w.subTaskHolder.recordSubTask(st)
if w.closed.Get() == closedTrue {
st.fail(terror.ErrWorkerAlreadyClosed.Generate())
return
return nil
}

cfg2, err := cfg.DecryptPassword()
if err != nil {
st.fail(errors.Annotate(err, "start sub task"))
return
return nil
}
st.cfg = cfg2

if w.relayPurger != nil && w.relayPurger.Purging() {
// TODO: retry until purged finished
st.fail(terror.ErrWorkerRelayIsPurging.Generate(cfg.Name))
return
return nil
}

w.l.Info("subtask created", zap.Stringer("config", cfg2))
st.Run(expectStage)
return nil
}

// UpdateSubTask update config for a sub task
Expand Down Expand Up @@ -419,9 +425,8 @@ func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskCo
if st := w.subTaskHolder.findSubTask(stage.Task); st == nil {
// create the subtask for expected running and paused stage.
log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
w.StartSubTask(&subTaskCfg, stage.Expect)
// error is nil, opErrTypeBeforeOp will be ignored
return opErrTypeBeforeOp, nil
err := w.StartSubTask(&subTaskCfg, stage.Expect)
return opErrTypeBeforeOp, err
}
if stage.Expect == pb.Stage_Running {
op = pb.TaskOp_Resume
Expand Down Expand Up @@ -623,7 +628,7 @@ func (w *Worker) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR
}

// copyConfigFromSource copies config items from source config to sub task
func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceConfig) {
func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceConfig) error {
cfg.From = sourceCfg.From

cfg.Flavor = sourceCfg.Flavor
Expand All @@ -634,6 +639,28 @@ func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceCon

// we can remove this from SubTaskConfig later, because syncer will always read from relay
cfg.AutoFixGTID = sourceCfg.AutoFixGTID

if cfg.CaseSensitive != sourceCfg.CaseSensitive {
log.L().Warn("different case-sensitive config between task config and source config, use `true` for it.")
}
cfg.CaseSensitive = cfg.CaseSensitive || sourceCfg.CaseSensitive
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
filter, err := bf.NewBinlogEvent(cfg.CaseSensitive, cfg.FilterRules)
if err != nil {
return err
}

for _, filterRule := range sourceCfg.Filters {
if err = filter.AddRule(filterRule); err != nil {
// task level config has higher priority
if errors.IsAlreadyExists(errors.Cause(err)) {
log.L().Warn("filter config already exist in source config, overwrite it", log.ShortError(err))
continue
}
return err
}
cfg.FilterRules = append(cfg.FilterRules, filterRule)
}
return nil
}

// getAllSubTaskStatus returns all subtask status of this worker, note the field
Expand Down
8 changes: 4 additions & 4 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (t *testServer) testWorker(c *C) {
c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0)
c.Assert(w.closed.Get(), Equals, closedTrue)

w.StartSubTask(&config.SubTaskConfig{
c.Assert(w.StartSubTask(&config.SubTaskConfig{
Name: "testStartTask",
}, pb.Stage_Running)
}, pb.Stage_Running), IsNil)
task := w.subTaskHolder.findSubTask("testStartTask")
c.Assert(task, NotNil)
c.Assert(task.Result().String(), Matches, ".*worker already closed.*")
Expand Down Expand Up @@ -160,7 +160,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) {
var subtaskCfg config.SubTaskConfig
c.Assert(subtaskCfg.DecodeFile("./subtask.toml", true), IsNil)
c.Assert(err, IsNil)
s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running)
c.Assert(s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running), IsNil)

// check task in paused state
c.Assert(utils.WaitSomething(100, 100*time.Millisecond, func() bool {
Expand Down Expand Up @@ -269,7 +269,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
[]ha.Stage{ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name)})
c.Assert(err, IsNil)
// step 2.1: start a subtask manually
w.StartSubTask(&subtaskCfg, pb.Stage_Running)
c.Assert(w.StartSubTask(&subtaskCfg, pb.Stage_Running), IsNil)
// step 3: trigger etcd compaction and check whether we can receive it through watcher
_, err = etcdCli.Compact(ctx, rev)
c.Assert(err, IsNil)
Expand Down
8 changes: 7 additions & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-config-20037]
message = "generate binlog event filter"
description = ""
workaround = "Please check the `filters` config in source and task configuration files."
tags = ["internal", "high"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down Expand Up @@ -1669,7 +1675,7 @@ tags = ["internal", "high"]
[error.DM-sync-unit-36030]
message = "generate binlog event filter"
description = ""
workaround = ""
workaround = "Pleass check the `filters` config in source and task configuration files."
tags = ["internal", "high"]

[error.DM-sync-unit-36031]
Expand Down
4 changes: 3 additions & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ const (
codeConfigMoreThanOne
codeConfigEtcdParse
codeConfigMissingForBound
codeConfigBinlogEventFilter
)

// Binlog operation error code list
Expand Down Expand Up @@ -816,6 +817,7 @@ var (
ErrConfigMoreThanOne = New(codeConfigMoreThanOne, ClassConfig, ScopeInternal, LevelHigh, "found %d %s for %s which should <= 1", "")
ErrConfigEtcdParse = New(codeConfigEtcdParse, ClassConfig, ScopeInternal, LevelHigh, "incapable config of %s from etcd", "")
ErrConfigMissingForBound = New(codeConfigMissingForBound, ClassConfig, ScopeInternal, LevelHigh, "source bound %s doesn't have related source config in etcd", "")
ErrConfigBinlogEventFilter = New(codeConfigBinlogEventFilter, ClassConfig, ScopeInternal, LevelHigh, "generate binlog event filter", "Please check the `filters` config in source and task configuration files.")

// Binlog operation error
ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "")
Expand Down Expand Up @@ -946,7 +948,7 @@ var (
ErrSyncerUnitDMLColumnNotMatch = New(codeSyncerUnitDMLColumnNotMatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Column count doesn't match value count: %d (columns) vs %d (values)", "")
ErrSyncerUnitDMLOldNewValueMismatch = New(codeSyncerUnitDMLOldNewValueMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Old value count doesn't match new value count: %d (old) vs %d (new)", "")
ErrSyncerUnitDMLPruneColumnMismatch = New(codeSyncerUnitDMLPruneColumnMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "prune DML columns and data mismatch in length: %d (columns) %d (data)", "")
ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "")
ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "Pleass check the `filters` config in source and task configuration files.")
ErrSyncerUnitGenTableRouter = New(codeSyncerUnitGenTableRouter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.")
ErrSyncerUnitGenColumnMapping = New(codeSyncerUnitGenColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "generate column mapping", "Please check the `column-mappings` config in task configuration file.")
ErrSyncerUnitDoColumnMapping = New(codeSyncerUnitDoColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "mapping row data %v for table `%s`.`%s`", "")
Expand Down
17 changes: 17 additions & 0 deletions tests/dmctl_basic/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ mysql-instances:
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
# sync t_1's alter event by task config(overwrite)
# ignore t_2's alter event by source config
filter-rules: ["user-filter-1"]

- source-id: "mysql-replica-02"
block-allow-list: "instance"
Expand All @@ -28,6 +31,8 @@ mysql-instances:
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
# ignore t_2's alter event by task config
filter-rules: ["user-filter-2"]

block-allow-list:
instance:
Expand Down Expand Up @@ -80,3 +85,15 @@ syncers:
global:
worker-count: 16
batch: 100

filters:
user-filter-1:
schema-pattern: "dmctl"
table-pattern: "t_1"
events: ["all"]
action: Do
user-filter-2:
schema-pattern: "dmctl"
table-pattern: "t_2"
sql-pattern: ["alter table .* add column `aaa` int"]
action: Ignore
14 changes: 14 additions & 0 deletions tests/dmctl_basic/conf/get_source1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,17 @@ checker:
backoff-factor: 2
server-id: 123456
tracer: {}
case-sensitive: false
filters:
- schema-pattern: dmctl
table-pattern: t_1
events: []
sql-pattern:
- alter table .* add column `aaa` int
action: Ignore
- schema-pattern: dmctl
table-pattern: t_2
events: []
sql-pattern:
- alter table .* add column `aaa` int
action: Ignore
2 changes: 2 additions & 0 deletions tests/dmctl_basic/conf/get_source2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ checker:
backoff-factor: 2
server-id: 654321
tracer: {}
case-sensitive: false
filters: []
Loading