diff --git a/sync_diff_inspector/config/config.go b/sync_diff_inspector/config/config.go index cf9247654..366ff8da4 100644 --- a/sync_diff_inspector/config/config.go +++ b/sync_diff_inspector/config/config.go @@ -237,20 +237,14 @@ type Config struct { *flag.FlagSet `json:"-"` // log level - LogLevel string `toml:"log-level" json:"log-level"` + LogLevel string `toml:"-" json:"-"` // how many goroutines are created to check data CheckThreadCount int `toml:"check-thread-count" json:"check-thread-count"` - // set true if want to compare checksum only - CompareChecksumOnly bool `toml:"compare-checksum-only" json:"compare-checksum-only"` - // ignore check table's struct - IgnoreStructCheck bool `toml:"ignore-struct-check" json:"ignore-struct-check"` - // ignore tidb stats only use randomSpliter to split chunks - IgnoreStats bool `toml:"ignore-stats" json:"ignore-stats"` - // ignore check table's data - IgnoreDataCheck bool `toml:"ignore-data-check" json:"ignore-data-check"` - // set true will continue check from the latest checkpoint - UseCheckpoint bool `toml:"use-checkpoint" json:"use-checkpoint"` - + // set true if want to compare rows + // set false won't compare rows. + ExportFixSQL bool `toml:"export-fix-sql" json:"export-fix-sql"` + // only check table struct without table data. + CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"` // DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261" DMAddr string `toml:"dm-addr" json:"dm-addr"` // DMTask string `toml:"dm-task" json:"dm-task"` @@ -279,14 +273,11 @@ func NewConfig() *Config { fs.StringVar(&cfg.ConfigFile, "config", "", "Config file") fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal") fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 1, "how many goroutines are created to check data") - fs.BoolVar(&cfg.CompareChecksumOnly, "compare-checksum-only", true, "set true if want to compare checksum only") + fs.BoolVar(&cfg.ExportFixSQL, "export-fix-sql", true, "set true if want to compare rows or set to false will only compare checksum") fs.BoolVar(&cfg.PrintVersion, "V", false, "print version of sync_diff_inspector") fs.StringVar(&cfg.DMAddr, "A", "", "the address of DM") fs.StringVar(&cfg.DMTask, "T", "", "identifier of dm task") - fs.BoolVar(&cfg.IgnoreDataCheck, "ignore-data-check", false, "ignore check table's data") - fs.BoolVar(&cfg.IgnoreStructCheck, "ignore-struct-check", false, "ignore check table's struct") - fs.BoolVar(&cfg.IgnoreStats, "ignore-stats", false, "don't use tidb stats to split chunks") - fs.BoolVar(&cfg.UseCheckpoint, "use-checkpoint", true, "set true will continue check from the latest checkpoint") + fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data") return cfg } diff --git a/sync_diff_inspector/config/config.toml b/sync_diff_inspector/config/config.toml index e5610a068..a9942e09e 100644 --- a/sync_diff_inspector/config/config.toml +++ b/sync_diff_inspector/config/config.toml @@ -2,22 +2,15 @@ ######################### Global config ######################### -log-level = "info" - # how many goroutines are created to check data check-thread-count = 4 -# set true if just want compare data by checksum, will skip select data when checksum is not equal. -compare-checksum-only = false - -# set true will continue check from the latest checkpoint -use-checkpoint = true +# set false if just want compare data by checksum, will skip select data when checksum is not equal. +# set true if want compare all different rows, will slow down the total compare time. +export-fix-sql= false # ignore check table's data -ignore-data-check = false - -# ignore check table's struct -ignore-struct-check = false +check-struct-only = true ######################### Databases config ######################### @@ -29,13 +22,14 @@ ignore-struct-check = false password = "" # mysql doesn't has snapshot config -[data-sources.tidb] +[data-sources.tidb0] host = "127.0.0.1" port = 4000 user = "root" password = "" # remove comment if use tidb's snapshot data # snapshot = "2016-10-08 16:45:26" + # snapshot = "386902609362944000" [table-configs] [table-configs.config1] @@ -57,7 +51,7 @@ collation = "" source-instances = ["mysql1"] - target-instance = "tidb" + target-instance = "tidb0" # tables need to check. *Include `schema` and `table`. Use `.` to split* target-check-tables = ["schema*.table*", "!c.*", "test2.t2"] diff --git a/sync_diff_inspector/config/config_dm.toml b/sync_diff_inspector/config/config_dm.toml index 158051086..15141dbb0 100644 --- a/sync_diff_inspector/config/config_dm.toml +++ b/sync_diff_inspector/config/config_dm.toml @@ -2,22 +2,15 @@ ######################### Global config ######################### -log-level = "info" - # how many goroutines are created to check data check-thread-count = 4 -# set true if just want compare data by checksum, will skip select data when checksum is not equal. -compare-checksum-only = false - -# set true will continue check from the latest checkpoint -use-checkpoint = true +# set false if just want compare data by checksum, will skip select data when checksum is not equal. +# set true if want compare all different rows, will slow down the total compare time. +export-fix-sql = false # ignore check table's data -ignore-data-check = false - -# ignore check table's struct -ignore-struct-check = false +check-struct-only = true # dm-master's address, the format should like "http://127.0.0.1:8261" dm-addr = "http://127.0.0.1:8261" diff --git a/sync_diff_inspector/config/config_sharding.toml b/sync_diff_inspector/config/config_sharding.toml index 4d59bb74a..8a34596b8 100644 --- a/sync_diff_inspector/config/config_sharding.toml +++ b/sync_diff_inspector/config/config_sharding.toml @@ -7,17 +7,12 @@ log-level = "info" # how many goroutines are created to check data check-thread-count = 4 -# set true if just want compare data by checksum, will skip select data when checksum is not equal. -compare-checksum-only = false - -# set true will continue check from the latest checkpoint -use-checkpoint = true +# set false if just want compare data by checksum, will skip select data when checksum is not equal. +# set true if want compare all different rows, will slow down the total compare time. +export-fix-sql = false # ignore check table's data -ignore-data-check = false - -# ignore check table's struct -ignore-struct-check = false +check-struct-only = true ######################### Databases config ######################### @@ -51,7 +46,7 @@ ignore-struct-check = false # remove comment if use tidb's snapshot data # snapshot = "2016-10-08 16:45:26" -[data-sources.tidb] +[data-sources.tidb0] host = "127.0.0.1" port = 4000 user = "root" @@ -88,7 +83,7 @@ target-table = "t" # target table source-instances = ["mysql1", "mysql2", "mysql3"] - target-instance = "tidb" + target-instance = "tidb0" # tables need to check. *Include `schema` and `table`. Use `.` to split* target-check-tables = ["schema*.table*", "!c.*", "test2.t2"] diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index 9fdc10957..3c0910fbb 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -67,15 +67,13 @@ type Diff struct { // workSource is one of upstream/downstream by some policy in #pickSource. workSource source.Source - sample int - checkThreadCount int - compareChecksumOnly bool - useCheckpoint bool - ignoreDataCheck bool - ignoreStructCheck bool - ignoreStats bool - sqlWg sync.WaitGroup - checkpointWg sync.WaitGroup + sample int + checkThreadCount int + exportFixSQL bool + useCheckpoint bool + ignoreDataCheck bool + sqlWg sync.WaitGroup + checkpointWg sync.WaitGroup FixSQLDir string CheckpointDir string @@ -89,15 +87,12 @@ type Diff struct { // NewDiff returns a Diff instance. func NewDiff(ctx context.Context, cfg *config.Config) (diff *Diff, err error) { diff = &Diff{ - checkThreadCount: cfg.CheckThreadCount, - compareChecksumOnly: cfg.CompareChecksumOnly, - useCheckpoint: cfg.UseCheckpoint, - ignoreDataCheck: cfg.IgnoreDataCheck, - ignoreStructCheck: cfg.IgnoreStructCheck, - ignoreStats: cfg.IgnoreStats, - sqlCh: make(chan *ChunkDML, splitter.DefaultChannelBuffer), - cp: new(checkpoints.Checkpoint), - report: report.NewReport(&cfg.Task), + checkThreadCount: cfg.CheckThreadCount, + exportFixSQL: cfg.ExportFixSQL, + ignoreDataCheck: cfg.CheckStructOnly, + sqlCh: make(chan *ChunkDML, splitter.DefaultChannelBuffer), + cp: new(checkpoints.Checkpoint), + report: report.NewReport(&cfg.Task), } if err = diff.init(ctx, cfg); err != nil { diff.Close() @@ -165,46 +160,37 @@ func (df *Diff) initCheckpoint() error { df.cp.Init() finishTableNums := 0 - if df.useCheckpoint { - path := filepath.Join(df.CheckpointDir, checkpointFile) - if ioutil2.FileExists(path) { - node, reportInfo, err := df.cp.LoadChunk(path) - if err != nil { - return errors.Annotate(err, "the checkpoint load process failed") - } else { - // this need not be synchronized, because at the moment, the is only one thread access the section - log.Info("load checkpoint", - zap.Any("chunk index", node.GetID()), - zap.Reflect("chunk", node), - zap.String("state", node.GetState())) - df.cp.SetCurrentSavedID(node) - } - - if node != nil { - // remove the sql file that ID bigger than node. - // cause we will generate these sql again. - err = df.removeSQLFiles(node.GetID()) - if err != nil { - return errors.Trace(err) - } - df.startRange = splitter.FromNode(node) - df.report.LoadReport(reportInfo) - finishTableNums = df.startRange.GetTableIndex() - if df.startRange.ChunkRange.Type == chunk.Empty { - // chunk_iter will skip this table directly - finishTableNums++ - } - } + path := filepath.Join(df.CheckpointDir, checkpointFile) + if ioutil2.FileExists(path) { + node, reportInfo, err := df.cp.LoadChunk(path) + if err != nil { + return errors.Annotate(err, "the checkpoint load process failed") } else { - log.Info("not found checkpoint file, start from beginning") - id := &chunk.ChunkID{TableIndex: -1, BucketIndexLeft: -1, BucketIndexRight: -1, ChunkIndex: -1, ChunkCnt: 0} - err := df.removeSQLFiles(id) + // this need not be synchronized, because at the moment, the is only one thread access the section + log.Info("load checkpoint", + zap.Any("chunk index", node.GetID()), + zap.Reflect("chunk", node), + zap.String("state", node.GetState())) + df.cp.SetCurrentSavedID(node) + } + + if node != nil { + // remove the sql file that ID bigger than node. + // cause we will generate these sql again. + err = df.removeSQLFiles(node.GetID()) if err != nil { return errors.Trace(err) } + df.startRange = splitter.FromNode(node) + df.report.LoadReport(reportInfo) + finishTableNums = df.startRange.GetTableIndex() + if df.startRange.ChunkRange.Type == chunk.Empty { + // chunk_iter will skip this table directly + finishTableNums++ + } } } else { - log.Info("skip load checkpoint file, start from beginning") + log.Info("not found checkpoint file, start from beginning") id := &chunk.ChunkID{TableIndex: -1, BucketIndexLeft: -1, BucketIndexRight: -1, ChunkIndex: -1, ChunkCnt: 0} err := df.removeSQLFiles(id) if err != nil { @@ -276,7 +262,7 @@ func (df *Diff) Equal(ctx context.Context) error { defer func() { pool.WaitFinished() - log.Debug("all comsume tasks finished") + log.Debug("all consume tasks finished") // close the sql channel close(df.sqlCh) df.sqlWg.Wait() @@ -293,7 +279,7 @@ func (df *Diff) Equal(ctx context.Context) error { // finish read the tables break } - log.Info("chunk index", zap.Any("chunk index", c.ChunkRange.Index), zap.Any("chunk bound", c.ChunkRange.Bounds)) + log.Info("global consume chunk info", zap.Any("chunk index", c.ChunkRange.Index), zap.Any("chunk bound", c.ChunkRange.Bounds)) pool.Apply(func() { isEqual := df.consume(ctx, c) if !isEqual { @@ -431,7 +417,7 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool // If an error occurs during the checksum phase, skip the data compare phase. state = checkpoints.FailedState df.report.SetTableMeetError(schema, table, err) - } else if !isEqual && !df.compareChecksumOnly { + } else if !isEqual && df.exportFixSQL { log.Debug("checksum failed", zap.Any("chunk id", rangeInfo.ChunkRange.Index), zap.Int64("chunk size", count), zap.String("table", df.workSource.GetTables()[rangeInfo.GetTableIndex()].Table)) state = checkpoints.FailedState // if the chunk's checksum differ, try to do binary check diff --git a/sync_diff_inspector/main.go b/sync_diff_inspector/main.go index 32e949fe9..e186bec66 100644 --- a/sync_diff_inspector/main.go +++ b/sync_diff_inspector/main.go @@ -96,13 +96,11 @@ func checkSyncState(ctx context.Context, cfg *config.Config) bool { } defer d.Close() - if !d.ignoreStructCheck { - err = d.StructEqual(ctx) - if err != nil { - fmt.Printf("There is something error when compare structure of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName)) - log.Fatal("failed to check structure difference", zap.Error(err)) - return false - } + err = d.StructEqual(ctx) + if err != nil { + fmt.Printf("There is something error when compare structure of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName)) + log.Fatal("failed to check structure difference", zap.Error(err)) + return false } if !d.ignoreDataCheck { err = d.Equal(ctx) diff --git a/sync_diff_inspector/source/source_test.go b/sync_diff_inspector/source/source_test.go index c27deb1cc..d426be418 100644 --- a/sync_diff_inspector/source/source_test.go +++ b/sync_diff_inspector/source/source_test.go @@ -590,13 +590,11 @@ func TestSource(t *testing.T) { router, err := router.NewTableRouter(false, nil) cfg := &config.Config{ - LogLevel: "debug", - CheckThreadCount: 4, - CompareChecksumOnly: false, - IgnoreStructCheck: false, - IgnoreStats: false, - IgnoreDataCheck: false, - UseCheckpoint: true, + LogLevel: "debug", + CheckThreadCount: 4, + ExportFixSQL: true, + IgnoreStats: false, + CheckStructOnly: false, DataSources: map[string]*config.DataSource{ "mysql1": { Host: host,