Skip to content

Commit

Permalink
Merge pull request pingcap#2 from 3pointer/update_config
Browse files Browse the repository at this point in the history
refine config
  • Loading branch information
Leavrth authored Oct 27, 2021
2 parents d4797f9 + 3f6bee9 commit aac8098
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 121 deletions.
25 changes: 8 additions & 17 deletions sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,20 +237,14 @@ type Config struct {
*flag.FlagSet `json:"-"`

// log level
LogLevel string `toml:"log-level" json:"log-level"`
LogLevel string `toml:"-" json:"-"`
// how many goroutines are created to check data
CheckThreadCount int `toml:"check-thread-count" json:"check-thread-count"`
// set true if want to compare checksum only
CompareChecksumOnly bool `toml:"compare-checksum-only" json:"compare-checksum-only"`
// ignore check table's struct
IgnoreStructCheck bool `toml:"ignore-struct-check" json:"ignore-struct-check"`
// ignore tidb stats only use randomSpliter to split chunks
IgnoreStats bool `toml:"ignore-stats" json:"ignore-stats"`
// ignore check table's data
IgnoreDataCheck bool `toml:"ignore-data-check" json:"ignore-data-check"`
// set true will continue check from the latest checkpoint
UseCheckpoint bool `toml:"use-checkpoint" json:"use-checkpoint"`

// set true if want to compare rows
// set false won't compare rows.
ExportFixSQL bool `toml:"export-fix-sql" json:"export-fix-sql"`
// only check table struct without table data.
CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"`
// DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261"
DMAddr string `toml:"dm-addr" json:"dm-addr"`
// DMTask string `toml:"dm-task" json:"dm-task"`
Expand Down Expand Up @@ -279,14 +273,11 @@ func NewConfig() *Config {
fs.StringVar(&cfg.ConfigFile, "config", "", "Config file")
fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 1, "how many goroutines are created to check data")
fs.BoolVar(&cfg.CompareChecksumOnly, "compare-checksum-only", true, "set true if want to compare checksum only")
fs.BoolVar(&cfg.ExportFixSQL, "export-fix-sql", true, "set true if want to compare rows or set to false will only compare checksum")
fs.BoolVar(&cfg.PrintVersion, "V", false, "print version of sync_diff_inspector")
fs.StringVar(&cfg.DMAddr, "A", "", "the address of DM")
fs.StringVar(&cfg.DMTask, "T", "", "identifier of dm task")
fs.BoolVar(&cfg.IgnoreDataCheck, "ignore-data-check", false, "ignore check table's data")
fs.BoolVar(&cfg.IgnoreStructCheck, "ignore-struct-check", false, "ignore check table's struct")
fs.BoolVar(&cfg.IgnoreStats, "ignore-stats", false, "don't use tidb stats to split chunks")
fs.BoolVar(&cfg.UseCheckpoint, "use-checkpoint", true, "set true will continue check from the latest checkpoint")
fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data")

return cfg
}
Expand Down
20 changes: 7 additions & 13 deletions sync_diff_inspector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,15 @@

######################### Global config #########################

log-level = "info"

# how many goroutines are created to check data
check-thread-count = 4

# set true if just want compare data by checksum, will skip select data when checksum is not equal.
compare-checksum-only = false

# set true will continue check from the latest checkpoint
use-checkpoint = true
# set false if just want compare data by checksum, will skip select data when checksum is not equal.
# set true if want compare all different rows, will slow down the total compare time.
export-fix-sql= false

# ignore check table's data
ignore-data-check = false

# ignore check table's struct
ignore-struct-check = false
check-struct-only = true


######################### Databases config #########################
Expand All @@ -29,13 +22,14 @@ ignore-struct-check = false
password = ""
# mysql doesn't has snapshot config

[data-sources.tidb]
[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
# remove comment if use tidb's snapshot data
# snapshot = "2016-10-08 16:45:26"
# snapshot = "386902609362944000"

[table-configs]
[table-configs.config1]
Expand All @@ -57,7 +51,7 @@ collation = ""

source-instances = ["mysql1"]

target-instance = "tidb"
target-instance = "tidb0"

# tables need to check. *Include `schema` and `table`. Use `.` to split*
target-check-tables = ["schema*.table*", "!c.*", "test2.t2"]
Expand Down
15 changes: 4 additions & 11 deletions sync_diff_inspector/config/config_dm.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,15 @@

######################### Global config #########################

log-level = "info"

# how many goroutines are created to check data
check-thread-count = 4

# set true if just want compare data by checksum, will skip select data when checksum is not equal.
compare-checksum-only = false

# set true will continue check from the latest checkpoint
use-checkpoint = true
# set false if just want compare data by checksum, will skip select data when checksum is not equal.
# set true if want compare all different rows, will slow down the total compare time.
export-fix-sql = false

# ignore check table's data
ignore-data-check = false

# ignore check table's struct
ignore-struct-check = false
check-struct-only = true

# dm-master's address, the format should like "http://127.0.0.1:8261"
dm-addr = "http://127.0.0.1:8261"
Expand Down
17 changes: 6 additions & 11 deletions sync_diff_inspector/config/config_sharding.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,12 @@ log-level = "info"
# how many goroutines are created to check data
check-thread-count = 4

# set true if just want compare data by checksum, will skip select data when checksum is not equal.
compare-checksum-only = false

# set true will continue check from the latest checkpoint
use-checkpoint = true
# set false if just want compare data by checksum, will skip select data when checksum is not equal.
# set true if want compare all different rows, will slow down the total compare time.
export-fix-sql = false

# ignore check table's data
ignore-data-check = false

# ignore check table's struct
ignore-struct-check = false
check-struct-only = true


######################### Databases config #########################
Expand Down Expand Up @@ -51,7 +46,7 @@ ignore-struct-check = false
# remove comment if use tidb's snapshot data
# snapshot = "2016-10-08 16:45:26"

[data-sources.tidb]
[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "root"
Expand Down Expand Up @@ -88,7 +83,7 @@ target-table = "t" # target table

source-instances = ["mysql1", "mysql2", "mysql3"]

target-instance = "tidb"
target-instance = "tidb0"

# tables need to check. *Include `schema` and `table`. Use `.` to split*
target-check-tables = ["schema*.table*", "!c.*", "test2.t2"]
Expand Down
96 changes: 41 additions & 55 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,13 @@ type Diff struct {
// workSource is one of upstream/downstream by some policy in #pickSource.
workSource source.Source

sample int
checkThreadCount int
compareChecksumOnly bool
useCheckpoint bool
ignoreDataCheck bool
ignoreStructCheck bool
ignoreStats bool
sqlWg sync.WaitGroup
checkpointWg sync.WaitGroup
sample int
checkThreadCount int
exportFixSQL bool
useCheckpoint bool
ignoreDataCheck bool
sqlWg sync.WaitGroup
checkpointWg sync.WaitGroup

FixSQLDir string
CheckpointDir string
Expand All @@ -89,15 +87,12 @@ type Diff struct {
// NewDiff returns a Diff instance.
func NewDiff(ctx context.Context, cfg *config.Config) (diff *Diff, err error) {
diff = &Diff{
checkThreadCount: cfg.CheckThreadCount,
compareChecksumOnly: cfg.CompareChecksumOnly,
useCheckpoint: cfg.UseCheckpoint,
ignoreDataCheck: cfg.IgnoreDataCheck,
ignoreStructCheck: cfg.IgnoreStructCheck,
ignoreStats: cfg.IgnoreStats,
sqlCh: make(chan *ChunkDML, splitter.DefaultChannelBuffer),
cp: new(checkpoints.Checkpoint),
report: report.NewReport(&cfg.Task),
checkThreadCount: cfg.CheckThreadCount,
exportFixSQL: cfg.ExportFixSQL,
ignoreDataCheck: cfg.CheckStructOnly,
sqlCh: make(chan *ChunkDML, splitter.DefaultChannelBuffer),
cp: new(checkpoints.Checkpoint),
report: report.NewReport(&cfg.Task),
}
if err = diff.init(ctx, cfg); err != nil {
diff.Close()
Expand Down Expand Up @@ -165,46 +160,37 @@ func (df *Diff) initCheckpoint() error {
df.cp.Init()

finishTableNums := 0
if df.useCheckpoint {
path := filepath.Join(df.CheckpointDir, checkpointFile)
if ioutil2.FileExists(path) {
node, reportInfo, err := df.cp.LoadChunk(path)
if err != nil {
return errors.Annotate(err, "the checkpoint load process failed")
} else {
// this need not be synchronized, because at the moment, the is only one thread access the section
log.Info("load checkpoint",
zap.Any("chunk index", node.GetID()),
zap.Reflect("chunk", node),
zap.String("state", node.GetState()))
df.cp.SetCurrentSavedID(node)
}

if node != nil {
// remove the sql file that ID bigger than node.
// cause we will generate these sql again.
err = df.removeSQLFiles(node.GetID())
if err != nil {
return errors.Trace(err)
}
df.startRange = splitter.FromNode(node)
df.report.LoadReport(reportInfo)
finishTableNums = df.startRange.GetTableIndex()
if df.startRange.ChunkRange.Type == chunk.Empty {
// chunk_iter will skip this table directly
finishTableNums++
}
}
path := filepath.Join(df.CheckpointDir, checkpointFile)
if ioutil2.FileExists(path) {
node, reportInfo, err := df.cp.LoadChunk(path)
if err != nil {
return errors.Annotate(err, "the checkpoint load process failed")
} else {
log.Info("not found checkpoint file, start from beginning")
id := &chunk.ChunkID{TableIndex: -1, BucketIndexLeft: -1, BucketIndexRight: -1, ChunkIndex: -1, ChunkCnt: 0}
err := df.removeSQLFiles(id)
// this need not be synchronized, because at the moment, the is only one thread access the section
log.Info("load checkpoint",
zap.Any("chunk index", node.GetID()),
zap.Reflect("chunk", node),
zap.String("state", node.GetState()))
df.cp.SetCurrentSavedID(node)
}

if node != nil {
// remove the sql file that ID bigger than node.
// cause we will generate these sql again.
err = df.removeSQLFiles(node.GetID())
if err != nil {
return errors.Trace(err)
}
df.startRange = splitter.FromNode(node)
df.report.LoadReport(reportInfo)
finishTableNums = df.startRange.GetTableIndex()
if df.startRange.ChunkRange.Type == chunk.Empty {
// chunk_iter will skip this table directly
finishTableNums++
}
}
} else {
log.Info("skip load checkpoint file, start from beginning")
log.Info("not found checkpoint file, start from beginning")
id := &chunk.ChunkID{TableIndex: -1, BucketIndexLeft: -1, BucketIndexRight: -1, ChunkIndex: -1, ChunkCnt: 0}
err := df.removeSQLFiles(id)
if err != nil {
Expand Down Expand Up @@ -276,7 +262,7 @@ func (df *Diff) Equal(ctx context.Context) error {

defer func() {
pool.WaitFinished()
log.Debug("all comsume tasks finished")
log.Debug("all consume tasks finished")
// close the sql channel
close(df.sqlCh)
df.sqlWg.Wait()
Expand All @@ -293,7 +279,7 @@ func (df *Diff) Equal(ctx context.Context) error {
// finish read the tables
break
}
log.Info("chunk index", zap.Any("chunk index", c.ChunkRange.Index), zap.Any("chunk bound", c.ChunkRange.Bounds))
log.Info("global consume chunk info", zap.Any("chunk index", c.ChunkRange.Index), zap.Any("chunk bound", c.ChunkRange.Bounds))
pool.Apply(func() {
isEqual := df.consume(ctx, c)
if !isEqual {
Expand Down Expand Up @@ -431,7 +417,7 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
// If an error occurs during the checksum phase, skip the data compare phase.
state = checkpoints.FailedState
df.report.SetTableMeetError(schema, table, err)
} else if !isEqual && !df.compareChecksumOnly {
} else if !isEqual && df.exportFixSQL {
log.Debug("checksum failed", zap.Any("chunk id", rangeInfo.ChunkRange.Index), zap.Int64("chunk size", count), zap.String("table", df.workSource.GetTables()[rangeInfo.GetTableIndex()].Table))
state = checkpoints.FailedState
// if the chunk's checksum differ, try to do binary check
Expand Down
12 changes: 5 additions & 7 deletions sync_diff_inspector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,11 @@ func checkSyncState(ctx context.Context, cfg *config.Config) bool {
}
defer d.Close()

if !d.ignoreStructCheck {
err = d.StructEqual(ctx)
if err != nil {
fmt.Printf("There is something error when compare structure of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName))
log.Fatal("failed to check structure difference", zap.Error(err))
return false
}
err = d.StructEqual(ctx)
if err != nil {
fmt.Printf("There is something error when compare structure of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName))
log.Fatal("failed to check structure difference", zap.Error(err))
return false
}
if !d.ignoreDataCheck {
err = d.Equal(ctx)
Expand Down
12 changes: 5 additions & 7 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,13 +590,11 @@ func TestSource(t *testing.T) {

router, err := router.NewTableRouter(false, nil)
cfg := &config.Config{
LogLevel: "debug",
CheckThreadCount: 4,
CompareChecksumOnly: false,
IgnoreStructCheck: false,
IgnoreStats: false,
IgnoreDataCheck: false,
UseCheckpoint: true,
LogLevel: "debug",
CheckThreadCount: 4,
ExportFixSQL: true,
IgnoreStats: false,
CheckStructOnly: false,
DataSources: map[string]*config.DataSource{
"mysql1": {
Host: host,
Expand Down

0 comments on commit aac8098

Please sign in to comment.