Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into dynamic-keepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Feb 2, 2021
2 parents 7766214 + a0af040 commit a6cb5bb
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 1 deletion.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ ErrConfigMoreThanOne,[code=20034:class=config:scope=internal:level=high], "Messa
ErrConfigEtcdParse,[code=20035:class=config:scope=internal:level=high], "Message: incapable config of %s from etcd"
ErrConfigMissingForBound,[code=20036:class=config:scope=internal:level=high], "Message: source bound %s doesn't have related source config in etcd"
ErrConfigBinlogEventFilter,[code=20037:class=config:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Please check the `filters` config in source and task configuration files."
ErrConfigGlobalConfigsUnused,[code=20038:class=config:scope=internal:level=high], "Message: The configurations as following %v are set in global configuration but instances don't use them, Workaround: Please check the configuration files."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
55 changes: 55 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -418,6 +419,8 @@ func (c *TaskConfig) adjust() error {
}

iids := make(map[string]int) // source-id -> instance-index
globalConfigReferCount := map[string]int{}
prefixs := []string{"RouteRules", "FilterRules", "ColumnMappingRules", "Mydumper", "Loader", "Syncer"}
duplicateErrorStrings := make([]string, 0)
for i, inst := range c.MySQLInstances {
if err := inst.VerifyAndAdjust(); err != nil {
Expand Down Expand Up @@ -447,16 +450,19 @@ func (c *TaskConfig) adjust() error {
if _, ok := c.Routes[name]; !ok {
return terror.ErrConfigRouteRuleNotFound.Generate(i, name)
}
globalConfigReferCount[prefixs[0]+name]++
}
for _, name := range inst.FilterRules {
if _, ok := c.Filters[name]; !ok {
return terror.ErrConfigFilterRuleNotFound.Generate(i, name)
}
globalConfigReferCount[prefixs[1]+name]++
}
for _, name := range inst.ColumnMappingRules {
if _, ok := c.ColumnMappings[name]; !ok {
return terror.ErrConfigColumnMappingNotFound.Generate(i, name)
}
globalConfigReferCount[prefixs[2]+name]++
}

// only when BAList is empty use BWList
Expand All @@ -472,10 +478,14 @@ func (c *TaskConfig) adjust() error {
if !ok {
return terror.ErrConfigMydumperCfgNotFound.Generate(i, inst.MydumperConfigName)
}
globalConfigReferCount[prefixs[3]+inst.MydumperConfigName]++
inst.Mydumper = new(MydumperConfig)
*inst.Mydumper = *rule // ref mydumper config
}
if inst.Mydumper == nil {
if len(c.Mydumpers) != 0 {
log.L().Warn("mysql instance don't refer mydumper's configuration with mydumper-config-name, the default configuration will be used", zap.String("mysql instance", inst.SourceID))
}
defaultCfg := defaultMydumperConfig()
inst.Mydumper = &defaultCfg
} else if inst.Mydumper.ChunkFilesize == "" {
Expand All @@ -496,10 +506,14 @@ func (c *TaskConfig) adjust() error {
if !ok {
return terror.ErrConfigLoaderCfgNotFound.Generate(i, inst.LoaderConfigName)
}
globalConfigReferCount[prefixs[4]+inst.LoaderConfigName]++
inst.Loader = new(LoaderConfig)
*inst.Loader = *rule // ref loader config
}
if inst.Loader == nil {
if len(c.Loaders) != 0 {
log.L().Warn("mysql instance don't refer loader's configuration with loader-config-name, the default configuration will be used", zap.String("mysql instance", inst.SourceID))
}
defaultCfg := defaultLoaderConfig()
inst.Loader = &defaultCfg
}
Expand All @@ -512,10 +526,14 @@ func (c *TaskConfig) adjust() error {
if !ok {
return terror.ErrConfigSyncerCfgNotFound.Generate(i, inst.SyncerConfigName)
}
globalConfigReferCount[prefixs[5]+inst.SyncerConfigName]++
inst.Syncer = new(SyncerConfig)
*inst.Syncer = *rule // ref syncer config
}
if inst.Syncer == nil {
if len(c.Syncers) != 0 {
log.L().Warn("mysql instance don't refer syncer's configuration with syncer-config-name, the default configuration will be used", zap.String("mysql instance", inst.SourceID))
}
defaultCfg := defaultSyncerConfig()
inst.Syncer = &defaultCfg
}
Expand All @@ -539,6 +557,43 @@ func (c *TaskConfig) adjust() error {
return terror.ErrConfigDuplicateCfgItem.Generate(strings.Join(duplicateErrorStrings, "\n"))
}

unusedConfigs := []string{}
for route := range c.Routes {
if globalConfigReferCount[prefixs[0]+route] == 0 {
unusedConfigs = append(unusedConfigs, route)
}
}
for filter := range c.Filters {
if globalConfigReferCount[prefixs[1]+filter] == 0 {
unusedConfigs = append(unusedConfigs, filter)
}
}
for columnMapping := range c.ColumnMappings {
if globalConfigReferCount[prefixs[2]+columnMapping] == 0 {
unusedConfigs = append(unusedConfigs, columnMapping)
}
}
for mydumper := range c.Mydumpers {
if globalConfigReferCount[prefixs[3]+mydumper] == 0 {
unusedConfigs = append(unusedConfigs, mydumper)
}
}
for loader := range c.Loaders {
if globalConfigReferCount[prefixs[4]+loader] == 0 {
unusedConfigs = append(unusedConfigs, loader)
}
}
for syncer := range c.Syncers {
if globalConfigReferCount[prefixs[5]+syncer] == 0 {
unusedConfigs = append(unusedConfigs, syncer)
}
}

if len(unusedConfigs) != 0 {
sort.Strings(unusedConfigs)
return terror.ErrConfigGlobalConfigsUnused.Generate(unusedConfigs)
}

if c.Timezone != "" {
_, err := time.LoadLocation(c.Timezone)
if err != nil {
Expand Down
211 changes: 210 additions & 1 deletion dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,215 @@ import (
"github.com/coreos/go-semver/semver"
)

func (t *testConfig) TestUnusedTaskConfig(c *C) {
var correctTaskConfig = `---
name: test
task-mode: all
shard-mode: "pessimistic"
meta-schema: "dm_meta"
timezone: "Asia/Shanghai"
case-sensitive: false
online-ddl-scheme: "gh-ost"
clean-dump-file: true
target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""
routes:
route-rule-1:
schema-pattern: "test_*"
target-schema: "test"
route-rule-2:
schema-pattern: "test_*"
target-schema: "test"
filters:
filter-rule-1:
schema-pattern: "test_*"
events: ["truncate table", "drop table"]
action: Ignore
filter-rule-2:
schema-pattern: "test_*"
events: ["all dml"]
action: Do
column-mappings:
column-mapping-rule-1:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["1", "test", "t", "_"]
column-mapping-rule-2:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["2", "test", "t", "_"]
mydumpers:
global1:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "--consistency none"
global2:
threads: 8
chunk-filesize: 128
skip-tz-utc: true
extra-args: "--consistency none"
loaders:
global1:
pool-size: 16
dir: "./dumped_data1"
global2:
pool-size: 8
dir: "./dumped_data2"
syncers:
global1:
worker-count: 16
batch: 100
enable-ansi-quotes: true
safe-mode: false
global2:
worker-count: 32
batch: 100
enable-ansi-quotes: true
safe-mode: false
mysql-instances:
- source-id: "mysql-replica-01"
route-rules: ["route-rule-2"]
filter-rules: ["filter-rule-2"]
column-mapping-rules: ["column-mapping-rule-2"]
mydumper-config-name: "global1"
loader-config-name: "global1"
syncer-config-name: "global1"
- source-id: "mysql-replica-02"
route-rules: ["route-rule-1"]
filter-rules: ["filter-rule-1"]
column-mapping-rules: ["column-mapping-rule-1"]
mydumper-config-name: "global2"
loader-config-name: "global2"
syncer-config-name: "global2"
`
taskConfig := NewTaskConfig()
err := taskConfig.Decode(correctTaskConfig)
c.Assert(err, IsNil)
var errorTaskConfig = `---
name: test
task-mode: all
shard-mode: "pessimistic"
meta-schema: "dm_meta"
timezone: "Asia/Shanghai"
case-sensitive: false
online-ddl-scheme: "gh-ost"
clean-dump-file: true
target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""
routes:
route-rule-1:
schema-pattern: "test_*"
target-schema: "test"
route-rule-2:
schema-pattern: "test_*"
target-schema: "test"
filters:
filter-rule-1:
schema-pattern: "test_*"
table-pattern: "t_*"
events: ["truncate table", "drop table"]
action: Ignore
filter-rule-2:
schema-pattern: "test_*"
events: ["all dml"]
action: Do
column-mappings:
column-mapping-rule-1:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["1", "test", "t", "_"]
column-mapping-rule-2:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["2", "test", "t", "_"]
mydumpers:
global1:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "--consistency none"
global2:
threads: 8
chunk-filesize: 128
skip-tz-utc: true
extra-args: "--consistency none"
loaders:
global1:
pool-size: 16
dir: "./dumped_data1"
global2:
pool-size: 8
dir: "./dumped_data2"
syncers:
global1:
worker-count: 16
batch: 100
enable-ansi-quotes: true
safe-mode: false
global2:
worker-count: 32
batch: 100
enable-ansi-quotes: true
safe-mode: false
mysql-instances:
- source-id: "mysql-replica-01"
route-rules: ["route-rule-1"]
filter-rules: ["filter-rule-1"]
column-mapping-rules: ["column-mapping-rule-1"]
mydumper-config-name: "global1"
loader-config-name: "global1"
syncer-config-name: "global1"
- source-id: "mysql-replica-02"
route-rules: ["route-rule-1"]
filter-rules: ["filter-rule-1"]
column-mapping-rules: ["column-mapping-rule-1"]
mydumper-config-name: "global2"
loader-config-name: "global2"
syncer-config-name: "global2"
`
taskConfig = NewTaskConfig()
err = taskConfig.Decode(errorTaskConfig)
c.Check(err, NotNil)
c.Assert(err, ErrorMatches, `[\s\S]*The configurations as following \[column-mapping-rule-2 filter-rule-2 route-rule-2\] are set in global configuration[\s\S]*`)
}

func (t *testConfig) TestInvalidTaskConfig(c *C) {
var errorTaskConfig1 = `---
name: test
Expand Down Expand Up @@ -260,7 +469,7 @@ filters:
c.Assert(err, IsNil)
taskConfig = NewTaskConfig()
err = taskConfig.DecodeFile(filepath)
c.Assert(err, IsNil)
c.Assert(err, NotNil)
c.Assert(taskConfig.IsSharding, IsTrue)
c.Assert(taskConfig.ShardMode, Equals, ShardOptimistic)
taskConfig.MySQLInstances[0].RouteRules = []string{"route-rule-1", "route-rule-2", "route-rule-1", "route-rule-2"}
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,12 @@ description = ""
workaround = "Please check the `filters` config in source and task configuration files."
tags = ["internal", "high"]

[error.DM-config-20038]
message = "The configurations as following %v are set in global configuration but instances don't use them"
description = ""
workaround = "Please check the configuration files."
tags = ["internal", "high"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
2 changes: 2 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ const (
codeConfigEtcdParse
codeConfigMissingForBound
codeConfigBinlogEventFilter
codeConfigGlobalConfigsUnused
)

// Binlog operation error code list
Expand Down Expand Up @@ -823,6 +824,7 @@ var (
ErrConfigEtcdParse = New(codeConfigEtcdParse, ClassConfig, ScopeInternal, LevelHigh, "incapable config of %s from etcd", "")
ErrConfigMissingForBound = New(codeConfigMissingForBound, ClassConfig, ScopeInternal, LevelHigh, "source bound %s doesn't have related source config in etcd", "")
ErrConfigBinlogEventFilter = New(codeConfigBinlogEventFilter, ClassConfig, ScopeInternal, LevelHigh, "generate binlog event filter", "Please check the `filters` config in source and task configuration files.")
ErrConfigGlobalConfigsUnused = New(codeConfigGlobalConfigsUnused, ClassConfig, ScopeInternal, LevelHigh, "The configurations as following %v are set in global configuration but instances don't use them", "Please check the configuration files.")

// Binlog operation error
ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "")
Expand Down

0 comments on commit a6cb5bb

Please sign in to comment.