Skip to content

Commit

Permalink
Merge branch 'master' into fix-match-inject-ddl-repeatedly
Browse files Browse the repository at this point in the history
  • Loading branch information
WizardXiao authored Mar 25, 2022
2 parents a4544a8 + 1a952da commit 20f5d3a
Show file tree
Hide file tree
Showing 42 changed files with 1,701 additions and 693 deletions.
13 changes: 7 additions & 6 deletions cdc/processor/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,21 +496,22 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {
suite.tableExecutor.On("GetCheckpoint").
Return(model.Ts(1000), model.Ts(1000))

require.Never(t, func() bool {
start := time.Now()
for time.Since(start) < 100*time.Millisecond {
err := agent.Tick(suite.cdcCtx)
require.NoError(t, err)

select {
case <-suite.ctx.Done():
return true
require.FailNow(t, "context is canceled")
case <-suite.dispatchResponseCh:
return true
require.FailNow(t, "Dispatch Response is received")
case <-suite.syncCh:
return true
require.FailNow(t, "Sync is received")
default:
return false
}
}, 100*time.Millisecond, 1*time.Millisecond)
time.Sleep(10 * time.Millisecond)
}
suite.UnblockSync()

require.Eventually(t, func() bool {
Expand Down
2 changes: 1 addition & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ ErrSchedulerRelayWorkersWrongBound,[code=46021:class=scheduler:scope=internal:le
ErrSchedulerRelayWorkersWrongRelay,[code=46022:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for another sources %s respectively, Workaround: Please correct sources in `stop-relay`."
ErrSchedulerSourceOpRelayExist,[code=46023:class=scheduler:scope=internal:level=high], "Message: source with name %s need to operate has existing relay workers %s, Workaround: Please `stop-relay` first."
ErrSchedulerLatchInUse,[code=46024:class=scheduler:scope=internal:level=low], "Message: when %s, resource %s is in use by other client, Workaround: Please try again later"
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update when not enable relay and no running tasks for now"
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source %s can only be updated when relay is disabled and no tasks are running for now"
ErrSchedulerWrongWorkerInput,[code=46026:class=scheduler:scope=internal:level=medium], "Message: require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]"
ErrSchedulerBoundDiffWithStartedRelay,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s], Workaround: If you intend to bind the source with worker, you can stop-relay for current source."
ErrSchedulerStartRelayOnSpecified,[code=46028:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now, Workaround: Please stop all relay workers first, or specify worker name for `start-relay`."
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if err := c.LoaderConfig.adjust(); err != nil {
return err
}
if err := c.ValidatorCfg.adjust(); err != nil {
if err := c.ValidatorCfg.Adjust(); err != nil {
return err
}

Expand Down
21 changes: 16 additions & 5 deletions dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"sort"
"strings"
"time"

"github.com/coreos/go-semver/semver"
"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -61,7 +62,9 @@ const (
ValidationFast = "fast"
ValidationFull = "full"

DefaultValidatorWorkerCount = 4
DefaultValidatorWorkerCount = 4
DefaultValidatorRowErrorDelay = 30 * time.Minute
DefaultValidatorMetaFlushInterval = 1 * time.Minute
)

// default config item values.
Expand Down Expand Up @@ -340,11 +343,13 @@ func (m *SyncerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
}

type ValidatorConfig struct {
Mode string `yaml:"mode" toml:"mode" json:"mode"`
WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"`
Mode string `yaml:"mode" toml:"mode" json:"mode"`
WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"`
RowErrorDelay Duration `yaml:"row-error-delay" toml:"row-error-delay" json:"row-error-delay"`
MetaFlushInterval Duration `yaml:"meta-flush-interval" toml:"meta-flush-interval" json:"meta-flush-interval"`
}

func (v *ValidatorConfig) adjust() error {
func (v *ValidatorConfig) Adjust() error {
if v.Mode == "" {
v.Mode = ValidationNone
}
Expand All @@ -354,6 +359,12 @@ func (v *ValidatorConfig) adjust() error {
if v.WorkerCount <= 0 {
v.WorkerCount = DefaultValidatorWorkerCount
}
if v.RowErrorDelay.Duration == 0 {
v.RowErrorDelay.Duration = DefaultValidatorRowErrorDelay
}
if v.MetaFlushInterval.Duration == 0 {
v.MetaFlushInterval.Duration = DefaultValidatorMetaFlushInterval
}
return nil
}

Expand Down Expand Up @@ -610,7 +621,7 @@ func (c *TaskConfig) adjust() error {
}

for _, validatorCfg := range c.Validators {
if err := validatorCfg.adjust(); err != nil {
if err := validatorCfg.Adjust(); err != nil {
return err
}
}
Expand Down
70 changes: 47 additions & 23 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
subTaskCfg.Name = task.Name
subTaskCfg.Mode = string(task.TaskMode)
// set task meta
subTaskCfg.MetaFile = *task.MetaSchema
subTaskCfg.MetaSchema = *task.MetaSchema
// add binlog meta
if sourceCfg.BinlogGtid != nil || sourceCfg.BinlogName != nil || sourceCfg.BinlogPos != nil {
meta := &Meta{}
Expand Down Expand Up @@ -187,7 +187,6 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
subTaskCfg.From = sourceCfgMap[sourceCfg.SourceName].From
// set target db config
subTaskCfg.To = *toDBCfg.Clone()
// TODO set meet error policy
// TODO ExprFilter
// set full unit config
subTaskCfg.MydumperConfig = DefaultMydumperConfig()
Expand Down Expand Up @@ -219,13 +218,11 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
}
subTaskCfg.ValidatorCfg = defaultValidatorConfig()
// set route,blockAllowList,filter config
doCnt := len(tableMigrateRuleMap[sourceCfg.SourceName])
doDBs := make([]string, doCnt)
doTables := make([]*filter.Table, doCnt)

doDBs := []string{}
doTables := []*filter.Table{}
routeRules := []*router.TableRule{}
filterRules := []*bf.BinlogEventRule{}
for j, rule := range tableMigrateRuleMap[sourceCfg.SourceName] {
for _, rule := range tableMigrateRuleMap[sourceCfg.SourceName] {
// route
if rule.Target != nil && (rule.Target.Schema != nil || rule.Target.Table != nil) {
tableRule := &router.TableRule{SchemaPattern: rule.Source.Schema, TablePattern: rule.Source.Table}
Expand All @@ -245,17 +242,31 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
return nil, terror.ErrOpenAPICommonError.Generatef("filter rule name %s not found.", name)
}
filterRule.SchemaPattern = rule.Source.Schema
filterRule.TablePattern = rule.Source.Table
if rule.Source.Table != "" {
filterRule.TablePattern = rule.Source.Table
}
filterRules = append(filterRules, &filterRule)
}
}
// BlockAllowList
doDBs[j] = rule.Source.Schema
doTables[j] = &filter.Table{Schema: rule.Source.Schema, Name: rule.Source.Table}
if rule.Source.Table != "" {
doTables = append(doTables, &filter.Table{Schema: rule.Source.Schema, Name: rule.Source.Table})
} else {
doDBs = append(doDBs, rule.Source.Schema)
}
}
subTaskCfg.RouteRules = routeRules
subTaskCfg.FilterRules = filterRules
subTaskCfg.BAList = &filter.Rules{DoDBs: removeDuplication(doDBs), DoTables: doTables}
if len(doDBs) > 0 || len(doTables) > 0 {
bAList := &filter.Rules{}
if len(doDBs) > 0 {
bAList.DoDBs = removeDuplication(doDBs)
}
if len(doTables) > 0 {
bAList.DoTables = doTables
}
subTaskCfg.BAList = bAList
}
// adjust sub task config
if err := subTaskCfg.Adjust(true); err != nil {
return nil, terror.Annotatef(err, "source name %s", sourceCfg.SourceName)
Expand Down Expand Up @@ -514,6 +525,8 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
}
// set table migrate rules
tableMigrateRuleList := []openapi.TaskTableMigrateRule{}
// used to remove repeated rules
ruleMap := map[string]struct{}{}
appendOneRule := func(sourceName, schemaPattern, tablePattern, targetSchema, targetTable string) {
tableMigrateRule := openapi.TaskTableMigrateRule{
Source: struct {
Expand All @@ -526,13 +539,15 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
Table: tablePattern,
},
}
if targetSchema != "" || targetTable != "" {
if targetSchema != "" {
tableMigrateRule.Target = &struct {
Schema *string `json:"schema,omitempty"`
Table *string `json:"table,omitempty"`
}{
Schema: &targetSchema,
Table: &targetTable,
}
if targetTable != "" {
tableMigrateRule.Target.Table = &targetTable
}
}
if filterRuleList, ok := filterMap[sourceName]; ok {
Expand All @@ -542,26 +557,35 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
}
tableMigrateRule.BinlogFilterRule = &ruleNameList
}
ruleKey := strings.Join([]string{sourceName, schemaPattern, tablePattern}, "-")
if _, ok := ruleMap[ruleKey]; ok {
return
}
ruleMap[ruleKey] = struct{}{}
tableMigrateRuleList = append(tableMigrateRuleList, tableMigrateRule)
}

// gen migrate rules by route
for sourceName, ruleList := range routeMap {
for _, rule := range ruleList {
appendOneRule(sourceName, rule.SchemaPattern, rule.TablePattern, rule.TargetSchema, rule.TargetTable)
}
}
// for user only set BlockAllowList without route rules, this means keep same with upstream db and table
if len(tableMigrateRuleList) == 0 {
for _, cfg := range subTaskConfigList {
if cfg.BAList != nil {
for idx := range cfg.BAList.DoTables {
schemaPattern := cfg.BAList.DoTables[idx].Schema
tablePattern := cfg.BAList.DoTables[idx].Name
appendOneRule(cfg.SourceID, schemaPattern, tablePattern, "", "")
}

// gen migrate rules by BAList
for _, cfg := range subTaskConfigList {
if cfg.BAList != nil {
for idx := range cfg.BAList.DoDBs {
schemaPattern := cfg.BAList.DoDBs[idx]
appendOneRule(cfg.SourceID, schemaPattern, "", "", "")
}
for idx := range cfg.BAList.DoTables {
schemaPattern := cfg.BAList.DoTables[idx].Schema
tablePattern := cfg.BAList.DoTables[idx].Name
appendOneRule(cfg.SourceID, schemaPattern, tablePattern, "", "")
}
}
}

// set basic global config
task := openapi.Task{
Name: oneSubtaskConfig.Name,
Expand Down
Loading

0 comments on commit 20f5d3a

Please sign in to comment.