diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 4462d72c7a..1650d0a98e 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -163,6 +163,7 @@ ErrConfigShardModeNotSupport,[code=20033:class=config:scope=internal:level=mediu ErrConfigMoreThanOne,[code=20034:class=config:scope=internal:level=high], "Message: found %d %s for %s which should <= 1" ErrConfigEtcdParse,[code=20035:class=config:scope=internal:level=high], "Message: incapable config of %s from etcd" ErrConfigMissingForBound,[code=20036:class=config:scope=internal:level=high], "Message: source bound %s doesn't have related source config in etcd" +ErrConfigBinlogEventFilter,[code=20037:class=config:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Please check the `filters` config in source and task configuration files." ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high] ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename" ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high] @@ -276,7 +277,7 @@ ErrSyncerUnitNilOperatorReq,[code=36026:class=sync-unit:scope=internal:level=med ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high], "Message: Column count doesn't match value count: %d (columns) vs %d (values)" ErrSyncerUnitDMLOldNewValueMismatch,[code=36028:class=sync-unit:scope=internal:level=high], "Message: Old value count doesn't match new value count: %d (old) vs %d (new)" ErrSyncerUnitDMLPruneColumnMismatch,[code=36029:class=sync-unit:scope=internal:level=high], "Message: prune DML columns and data mismatch in length: %d (columns) %d (data)" -ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter" +ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Pleass check the `filters` config in source and task configuration files." ErrSyncerUnitGenTableRouter,[code=36031:class=sync-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file." ErrSyncerUnitGenColumnMapping,[code=36032:class=sync-unit:scope=internal:level=high], "Message: generate column mapping, Workaround: Please check the `column-mappings` config in task configuration file." ErrSyncerUnitDoColumnMapping,[code=36033:class=sync-unit:scope=internal:level=high], "Message: mapping row data %v for table `%s`.`%s`" diff --git a/dm/config/source_config.go b/dm/config/source_config.go index 9d37de2fa8..7dda5c9b0c 100644 --- a/dm/config/source_config.go +++ b/dm/config/source_config.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" ) const ( @@ -71,6 +72,9 @@ type SourceConfig struct { // deprecated tracer, to keep compatibility with older version Tracer map[string]interface{} `yaml:"tracer" toml:"tracer" json:"-"` + + CaseSensitive bool `yaml:"case-sensitive" toml:"case-sensitive" json:"case-sensitive"` + Filters []*bf.BinlogEventRule `yaml:"filters" toml:"filters" json:"filters"` } // NewSourceConfig creates a new base config for upstream MySQL/MariaDB source. @@ -189,6 +193,11 @@ func (c *SourceConfig) Verify() error { c.DecryptPassword() + _, err = bf.NewBinlogEvent(c.CaseSensitive, c.Filters) + if err != nil { + return terror.ErrConfigBinlogEventFilter.Delegate(err) + } + return nil } diff --git a/dm/config/source_config_test.go b/dm/config/source_config_test.go index 54c2ddec4f..59e536cb50 100644 --- a/dm/config/source_config_test.go +++ b/dm/config/source_config_test.go @@ -24,6 +24,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/siddontang/go-mysql/mysql" ) @@ -75,6 +76,7 @@ func (t *testConfig) TestConfig(c *C) { // fix empty map after marshal/unmarshal becomes nil clone1.From.Session = map[string]string{} clone1.Tracer = map[string]interface{}{} + clone1.Filters = []*bf.BinlogEventRule{} clone2 := cfg.DecryptPassword() c.Assert(clone2, DeepEquals, clone1) diff --git a/dm/config/task.go b/dm/config/task.go index 9e4f3b97dc..9c6754badb 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -592,6 +592,11 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf cfg.FilterRules[j] = c.Filters[name] } + _, err := bf.NewBinlogEvent(cfg.CaseSensitive, cfg.FilterRules) + if err != nil { + return nil, terror.ErrConfigBinlogEventFilter.Delegate(err) + } + cfg.ColumnMappingRules = make([]*column.Rule, len(inst.ColumnMappingRules)) for j, name := range inst.ColumnMappingRules { cfg.ColumnMappingRules[j] = c.ColumnMappings[name] @@ -605,7 +610,7 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf cfg.CleanDumpFile = c.CleanDumpFile - err := cfg.Adjust(true) + err = cfg.Adjust(true) if err != nil { return nil, terror.Annotatef(err, "source %s", inst.SourceID) } diff --git a/dm/master/bootstrap_test.go b/dm/master/bootstrap_test.go index 76a8ce819f..1a01fdc26c 100644 --- a/dm/master/bootstrap_test.go +++ b/dm/master/bootstrap_test.go @@ -24,6 +24,7 @@ import ( "github.com/golang/mock/gomock" . "github.com/pingcap/check" + filter "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" @@ -72,6 +73,7 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) { // fix empty map after marshal/unmarshal becomes nil cfg1.From.Session = map[string]string{} cfg1.Tracer = map[string]interface{}{} + cfg1.Filters = []*filter.BinlogEventRule{} c.Assert(cfg1.LoadFromFile("./source.yaml"), IsNil) cfg1.From.Host = host cfg1.From.Port = port diff --git a/dm/worker/server.go b/dm/worker/server.go index f539ce1042..a317767d47 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -539,7 +539,9 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { subTaskCfg.LogFile = s.cfg.LogFile subTaskCfg.LogFormat = s.cfg.LogFormat subTaskCfgClone := subTaskCfg - copyConfigFromSource(&subTaskCfgClone, cfg) + if err = copyConfigFromSource(&subTaskCfgClone, cfg); err != nil { + return err + } subTaskCfgs = append(subTaskCfgs, &subTaskCfgClone) } @@ -605,7 +607,9 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { continue } log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) - w.StartSubTask(subTaskCfg, expectStage.Expect) + if err := w.StartSubTask(subTaskCfg, expectStage.Expect); err != nil { + return err + } } w.wg.Add(1) diff --git a/dm/worker/worker.go b/dm/worker/worker.go index 4f0733c69b..ec40e46cb9 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -21,6 +21,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/errors" + bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/siddontang/go/sync2" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -190,36 +191,41 @@ func (w *Worker) Close() { } // StartSubTask creates a sub task an run it -func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) { +func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage) error { w.Lock() defer w.Unlock() // copy some config item from dm-worker's source config - copyConfigFromSource(cfg, w.cfg) + err := copyConfigFromSource(cfg, w.cfg) + if err != nil { + return err + } + // directly put cfg into subTaskHolder // the unique of subtask should be assured by etcd st := NewSubTask(cfg, w.etcdClient) w.subTaskHolder.recordSubTask(st) if w.closed.Get() == closedTrue { st.fail(terror.ErrWorkerAlreadyClosed.Generate()) - return + return nil } cfg2, err := cfg.DecryptPassword() if err != nil { st.fail(errors.Annotate(err, "start sub task")) - return + return nil } st.cfg = cfg2 if w.relayPurger != nil && w.relayPurger.Purging() { // TODO: retry until purged finished st.fail(terror.ErrWorkerRelayIsPurging.Generate(cfg.Name)) - return + return nil } w.l.Info("subtask created", zap.Stringer("config", cfg2)) st.Run(expectStage) + return nil } // UpdateSubTask update config for a sub task @@ -419,9 +425,8 @@ func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskCo if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { // create the subtask for expected running and paused stage. log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) - w.StartSubTask(&subTaskCfg, stage.Expect) - // error is nil, opErrTypeBeforeOp will be ignored - return opErrTypeBeforeOp, nil + err := w.StartSubTask(&subTaskCfg, stage.Expect) + return opErrTypeBeforeOp, err } if stage.Expect == pb.Stage_Running { op = pb.TaskOp_Resume @@ -623,7 +628,7 @@ func (w *Worker) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } // copyConfigFromSource copies config items from source config to sub task -func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceConfig) { +func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceConfig) error { cfg.From = sourceCfg.From cfg.Flavor = sourceCfg.Flavor @@ -634,6 +639,28 @@ func copyConfigFromSource(cfg *config.SubTaskConfig, sourceCfg *config.SourceCon // we can remove this from SubTaskConfig later, because syncer will always read from relay cfg.AutoFixGTID = sourceCfg.AutoFixGTID + + if cfg.CaseSensitive != sourceCfg.CaseSensitive { + log.L().Warn("different case-sensitive config between task config and source config, use `true` for it.") + } + cfg.CaseSensitive = cfg.CaseSensitive || sourceCfg.CaseSensitive + filter, err := bf.NewBinlogEvent(cfg.CaseSensitive, cfg.FilterRules) + if err != nil { + return err + } + + for _, filterRule := range sourceCfg.Filters { + if err = filter.AddRule(filterRule); err != nil { + // task level config has higher priority + if errors.IsAlreadyExists(errors.Cause(err)) { + log.L().Warn("filter config already exist in source config, overwrite it", log.ShortError(err)) + continue + } + return err + } + cfg.FilterRules = append(cfg.FilterRules, filterRule) + } + return nil } // getAllSubTaskStatus returns all subtask status of this worker, note the field diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index 9017819135..22c28d385b 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -68,9 +68,9 @@ func (t *testServer) testWorker(c *C) { c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0) c.Assert(w.closed.Get(), Equals, closedTrue) - w.StartSubTask(&config.SubTaskConfig{ + c.Assert(w.StartSubTask(&config.SubTaskConfig{ Name: "testStartTask", - }, pb.Stage_Running) + }, pb.Stage_Running), IsNil) task := w.subTaskHolder.findSubTask("testStartTask") c.Assert(task, NotNil) c.Assert(task.Result().String(), Matches, ".*worker already closed.*") @@ -160,7 +160,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { var subtaskCfg config.SubTaskConfig c.Assert(subtaskCfg.DecodeFile("./subtask.toml", true), IsNil) c.Assert(err, IsNil) - s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running) + c.Assert(s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running), IsNil) // check task in paused state c.Assert(utils.WaitSomething(100, 100*time.Millisecond, func() bool { @@ -269,7 +269,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { []ha.Stage{ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name)}) c.Assert(err, IsNil) // step 2.1: start a subtask manually - w.StartSubTask(&subtaskCfg, pb.Stage_Running) + c.Assert(w.StartSubTask(&subtaskCfg, pb.Stage_Running), IsNil) // step 3: trigger etcd compaction and check whether we can receive it through watcher _, err = etcdCli.Compact(ctx, rev) c.Assert(err, IsNil) diff --git a/errors.toml b/errors.toml index c320c84869..1e635f7f5c 100644 --- a/errors.toml +++ b/errors.toml @@ -988,6 +988,12 @@ description = "" workaround = "" tags = ["internal", "high"] +[error.DM-config-20037] +message = "generate binlog event filter" +description = "" +workaround = "Please check the `filters` config in source and task configuration files." +tags = ["internal", "high"] + [error.DM-binlog-op-22001] message = "" description = "" @@ -1669,7 +1675,7 @@ tags = ["internal", "high"] [error.DM-sync-unit-36030] message = "generate binlog event filter" description = "" -workaround = "" +workaround = "Pleass check the `filters` config in source and task configuration files." tags = ["internal", "high"] [error.DM-sync-unit-36031] diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 27a71f7753..7612e28b41 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -222,6 +222,7 @@ const ( codeConfigMoreThanOne codeConfigEtcdParse codeConfigMissingForBound + codeConfigBinlogEventFilter ) // Binlog operation error code list @@ -816,6 +817,7 @@ var ( ErrConfigMoreThanOne = New(codeConfigMoreThanOne, ClassConfig, ScopeInternal, LevelHigh, "found %d %s for %s which should <= 1", "") ErrConfigEtcdParse = New(codeConfigEtcdParse, ClassConfig, ScopeInternal, LevelHigh, "incapable config of %s from etcd", "") ErrConfigMissingForBound = New(codeConfigMissingForBound, ClassConfig, ScopeInternal, LevelHigh, "source bound %s doesn't have related source config in etcd", "") + ErrConfigBinlogEventFilter = New(codeConfigBinlogEventFilter, ClassConfig, ScopeInternal, LevelHigh, "generate binlog event filter", "Please check the `filters` config in source and task configuration files.") // Binlog operation error ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "") @@ -946,7 +948,7 @@ var ( ErrSyncerUnitDMLColumnNotMatch = New(codeSyncerUnitDMLColumnNotMatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Column count doesn't match value count: %d (columns) vs %d (values)", "") ErrSyncerUnitDMLOldNewValueMismatch = New(codeSyncerUnitDMLOldNewValueMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Old value count doesn't match new value count: %d (old) vs %d (new)", "") ErrSyncerUnitDMLPruneColumnMismatch = New(codeSyncerUnitDMLPruneColumnMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "prune DML columns and data mismatch in length: %d (columns) %d (data)", "") - ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "") + ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "Pleass check the `filters` config in source and task configuration files.") ErrSyncerUnitGenTableRouter = New(codeSyncerUnitGenTableRouter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.") ErrSyncerUnitGenColumnMapping = New(codeSyncerUnitGenColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "generate column mapping", "Please check the `column-mappings` config in task configuration file.") ErrSyncerUnitDoColumnMapping = New(codeSyncerUnitDoColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "mapping row data %v for table `%s`.`%s`", "") diff --git a/tests/dmctl_basic/conf/dm-task.yaml b/tests/dmctl_basic/conf/dm-task.yaml index 6e49e44841..7b000e9374 100644 --- a/tests/dmctl_basic/conf/dm-task.yaml +++ b/tests/dmctl_basic/conf/dm-task.yaml @@ -20,6 +20,9 @@ mysql-instances: mydumper-config-name: "global" loader-config-name: "global" syncer-config-name: "global" + # sync t_1's alter event by task config(overwrite) + # ignore t_2's alter event by source config + filter-rules: ["user-filter-1"] - source-id: "mysql-replica-02" block-allow-list: "instance" @@ -28,6 +31,8 @@ mysql-instances: mydumper-config-name: "global" loader-config-name: "global" syncer-config-name: "global" + # ignore t_2's alter event by task config + filter-rules: ["user-filter-2"] block-allow-list: instance: @@ -80,3 +85,15 @@ syncers: global: worker-count: 16 batch: 100 + +filters: + user-filter-1: + schema-pattern: "dmctl" + table-pattern: "t_1" + events: ["all"] + action: Do + user-filter-2: + schema-pattern: "dmctl" + table-pattern: "t_2" + sql-pattern: ["alter table .* add column `aaa` int"] + action: Ignore diff --git a/tests/dmctl_basic/conf/get_source1.yaml b/tests/dmctl_basic/conf/get_source1.yaml index e059f6098c..6e921fcf73 100644 --- a/tests/dmctl_basic/conf/get_source1.yaml +++ b/tests/dmctl_basic/conf/get_source1.yaml @@ -30,3 +30,17 @@ checker: backoff-factor: 2 server-id: 123456 tracer: {} +case-sensitive: false +filters: +- schema-pattern: dmctl + table-pattern: t_1 + events: [] + sql-pattern: + - alter table .* add column `aaa` int + action: Ignore +- schema-pattern: dmctl + table-pattern: t_2 + events: [] + sql-pattern: + - alter table .* add column `aaa` int + action: Ignore diff --git a/tests/dmctl_basic/conf/get_source2.yaml b/tests/dmctl_basic/conf/get_source2.yaml index bac6ddfd99..2948f2f878 100644 --- a/tests/dmctl_basic/conf/get_source2.yaml +++ b/tests/dmctl_basic/conf/get_source2.yaml @@ -30,3 +30,5 @@ checker: backoff-factor: 2 server-id: 654321 tracer: {} +case-sensitive: false +filters: [] diff --git a/tests/dmctl_basic/conf/get_task.yaml b/tests/dmctl_basic/conf/get_task.yaml index 7095f083d4..3c1bd1e232 100644 --- a/tests/dmctl_basic/conf/get_task.yaml +++ b/tests/dmctl_basic/conf/get_task.yaml @@ -21,7 +21,8 @@ target-database: mysql-instances: - source-id: mysql-replica-01 meta: null - filter-rules: [] + filter-rules: + - filter-01 column-mapping-rules: - cm-01 route-rules: @@ -40,7 +41,8 @@ mysql-instances: syncer-thread: 0 - source-id: mysql-replica-02 meta: null - filter-rules: [] + filter-rules: + - filter-02 column-mapping-rules: - cm-02 route-rules: @@ -69,7 +71,21 @@ routes: table-pattern: "" target-schema: dmctl target-table: "" -filters: {} +filters: + filter-01: + schema-pattern: dmctl + table-pattern: t_1 + events: + - all + sql-pattern: [] + action: Do + filter-02: + schema-pattern: dmctl + table-pattern: t_2 + events: [] + sql-pattern: + - alter table .* add column `aaa` int + action: Ignore column-mappings: cm-01: schema-pattern: dmctl diff --git a/tests/dmctl_basic/conf/source1.yaml b/tests/dmctl_basic/conf/source1.yaml index 4bbcf0f019..c56142e9ad 100644 --- a/tests/dmctl_basic/conf/source1.yaml +++ b/tests/dmctl_basic/conf/source1.yaml @@ -9,3 +9,13 @@ from: user: root password: '123456' port: 3306 + +filters: + - schema-pattern: "dmctl" + table-pattern: "t_1" + sql-pattern: ["alter table .* add column `aaa` int"] + action: Ignore + - schema-pattern: "dmctl" + table-pattern: "t_2" + sql-pattern: ["alter table .* add column `aaa` int"] + action: Ignore diff --git a/tests/dmctl_basic/run.sh b/tests/dmctl_basic/run.sh index 91daa71890..5711b726c8 100755 --- a/tests/dmctl_basic/run.sh +++ b/tests/dmctl_basic/run.sh @@ -262,6 +262,19 @@ function run() { # purge_relay_success $max_binlog_name $SOURCE_ID1 new_relay_log_count=$(($(ls $WORK_DIR/worker1/relay_log/$server_uuid | wc -l) - 1)) [ "$new_relay_log_count" -eq 1 ] + + run_sql_source1 "alter table dmctl.t_1 add column aaa int" + run_sql_source1 "alter table dmctl.t_2 add column aaa int" + run_sql_source2 "alter table dmctl.t_1 add column aaa int" + run_sql_source2 "alter table dmctl.t_2 add column aaa int" + + # all t_1 synced, all t_2 unsynced + run_sql_source1 "alter table dmctl.t_2 add column bbb int" + run_sql_source2 "alter table dmctl.t_2 add column bbb int" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "detect inconsistent DDL sequence" 2 } cleanup_data dmctl