From 98bd239dd8a02cda907ddbcdb2f9e773ac320e5a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 8 Mar 2021 18:14:27 +0800 Subject: [PATCH 1/5] dbutil: support visual formatted time types in STATS_BUCKETS --- pkg/dbutil/common.go | 8 +++++++ pkg/dbutil/common_test.go | 48 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/pkg/dbutil/common.go b/pkg/dbutil/common.go index 31cdfdbd4..e00a817ed 100644 --- a/pkg/dbutil/common.go +++ b/pkg/dbutil/common.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb-tools/pkg/utils" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" "go.uber.org/zap" ) @@ -498,6 +499,13 @@ func AnalyzeValuesFromBuckets(valueString string, cols []*model.ColumnInfo) ([]s for i, col := range cols { if IsTimeTypeAndNeedDecode(col.Tp) { + // check if values[i] is already a time string + sc := &stmtctx.StatementContext{TimeZone: time.UTC} + _, err := types.ParseTime(sc, values[i], col.Tp, types.MinFsp) + if err == nil { + continue + } + value, err := DecodeTimeInBucket(values[i]) if err != nil { log.Error("analyze values from buckets", zap.String("column", col.Name.O), zap.String("value", values[i]), zap.Error(err)) diff --git a/pkg/dbutil/common_test.go b/pkg/dbutil/common_test.go index 848cfc224..083eae6b4 100644 --- a/pkg/dbutil/common_test.go +++ b/pkg/dbutil/common_test.go @@ -20,6 +20,9 @@ import ( "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + pmysql "github.com/pingcap/parser/mysql" + "github.com/pingcap/parser/types" "github.com/pingcap/tidb/infoschema" ) @@ -179,3 +182,48 @@ func (s *testDBSuite) TestGetParser(c *C) { } } } + +func (s *testDBSuite) TestAnalyzeValuesFromBuckets(c *C) { + cases := []struct { + value string + col *model.ColumnInfo + expect string + }{ + { + "2021-03-05 21:31:03", + &model.ColumnInfo{FieldType: types.FieldType{Tp: pmysql.TypeDatetime}}, + "2021-03-05 21:31:03", + }, + { + "2021-03-05 21:31:03", + &model.ColumnInfo{FieldType: types.FieldType{Tp: pmysql.TypeTimestamp}}, + "2021-03-05 21:31:03", + }, + { + "2021-03-05", + &model.ColumnInfo{FieldType: types.FieldType{Tp: pmysql.TypeDate}}, + "2021-03-05", + }, + { + "1847956477067657216", + &model.ColumnInfo{FieldType: types.FieldType{Tp: pmysql.TypeDatetime}}, + "2020-01-01 10:00:00", + }, + { + "1847955927311843328", + &model.ColumnInfo{FieldType: types.FieldType{Tp: pmysql.TypeTimestamp}}, + "2020-01-01 02:00:00", + }, + { + "1847955789872889856", + &model.ColumnInfo{FieldType: types.FieldType{Tp: pmysql.TypeDate}}, + "2020-01-01 00:00:00", + }, + } + for _, ca := range cases { + val, err := AnalyzeValuesFromBuckets(ca.value, []*model.ColumnInfo{ca.col}) + c.Assert(err, IsNil) + c.Assert(val, HasLen, 1) + c.Assert(val[0], Equals, ca.expect) + } +} From 463be6c42caa3d22fbb7dff5c6078044b6ce953b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 9 Mar 2021 17:26:23 +0800 Subject: [PATCH 2/5] diff: add a config for checkpointDB connection limit --- pkg/diff/conn.go | 2 +- sync_diff_inspector/config.go | 4 ++++ sync_diff_inspector/diff.go | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/diff/conn.go b/pkg/diff/conn.go index 777dffe26..0d8bcb77e 100644 --- a/pkg/diff/conn.go +++ b/pkg/diff/conn.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/tidb-tools/pkg/dbutil" ) -// CreateDB creates sql.DB used for select data +// CreateDB creates sql.DB and set connection limit func CreateDB(ctx context.Context, dbConfig dbutil.DBConfig, num int) (db *sql.DB, err error) { db, err = dbutil.OpenDB(dbConfig) if err != nil { diff --git a/sync_diff_inspector/config.go b/sync_diff_inspector/config.go index ff03973a8..9c1a488ac 100644 --- a/sync_diff_inspector/config.go +++ b/sync_diff_inspector/config.go @@ -175,6 +175,9 @@ type Config struct { // how many goroutines are created to check data CheckThreadCount int `toml:"check-thread-count" json:"check-thread-count"` + // how many goroutines are created to save checkpoint + CheckpointThreadCount int `toml:"checkpoint-thread-count" json:"checkpoint-thread-count"` + // set false if want to comapre the data directly UseChecksum bool `toml:"use-checksum" json:"use-checksum"` @@ -228,6 +231,7 @@ func NewConfig() *Config { fs.IntVar(&cfg.ChunkSize, "chunk-size", 1000, "diff check chunk size") fs.IntVar(&cfg.Sample, "sample", 100, "the percent of sampling check") fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 1, "how many goroutines are created to check data") + fs.IntVar(&cfg.CheckpointThreadCount, "checkpoint-thread-count", 64, "how many goroutines are created to save checkpoint") fs.BoolVar(&cfg.UseChecksum, "use-checksum", true, "set false if want to comapre the data directly") fs.StringVar(&cfg.FixSQLFile, "fix-sql-file", "fix.sql", "the name of the file which saves sqls used to fix different data") fs.BoolVar(&cfg.PrintVersion, "V", false, "print version of sync_diff_inspector") diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index ba2cbcd2f..34dca9051 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -137,7 +137,7 @@ func (df *Diff) CreateDBConn(cfg *Config) (err error) { } df.targetDB = cfg.TargetDBCfg - df.cpDB, err = diff.CreateDBForCP(df.ctx, cfg.TargetDBCfg.DBConfig) + df.cpDB, err = diff.CreateDB(df.ctx, cfg.TargetDBCfg.DBConfig, cfg.CheckpointThreadCount) if err != nil { return errors.Errorf("create checkpoint db %s error %v", cfg.TargetDBCfg.DBConfig.String(), err) } From 8a6b5fbbbafb32eecdb8f1ba59a2c984b5a67fc2 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 9 Mar 2021 19:09:51 +0800 Subject: [PATCH 3/5] another impl --- pkg/diff/conn.go | 2 -- pkg/diff/diff.go | 13 +++++++++++-- sync_diff_inspector/diff.go | 5 +++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/diff/conn.go b/pkg/diff/conn.go index 0d8bcb77e..7bc046fbf 100644 --- a/pkg/diff/conn.go +++ b/pkg/diff/conn.go @@ -44,8 +44,6 @@ func CreateDBForCP(ctx context.Context, dbConfig dbutil.DBConfig) (cpDB *sql.DB, if err != nil { return nil, errors.Errorf("create db connections %+v error %v", dbConfig, err) } - cpDB.SetMaxOpenConns(1) - cpDB.SetMaxIdleConns(1) return cpDB, nil } diff --git a/pkg/diff/diff.go b/pkg/diff/diff.go index e61688cf8..fabbdea95 100644 --- a/pkg/diff/diff.go +++ b/pkg/diff/diff.go @@ -408,6 +408,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, }) update := func() { + checksumLimiter <- struct{}{} ctx1, cancel1 := context.WithTimeout(ctx, dbutil.DefaultTimeout) defer cancel1() @@ -415,6 +416,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, if err1 != nil { log.Warn("update chunk info", zap.Error(err1)) } + <-checksumLimiter } defer func() { @@ -434,7 +436,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, } } } - update() + go update() }() if filterByRand { @@ -447,7 +449,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, } chunk.State = checkingState - update() + go update() if t.UseChecksum { // first check the checksum is equal or not @@ -475,6 +477,13 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, return equal, nil } +var checksumLimiter chan struct{} + +// SetChecksumLimit limits the number of checkpoint DB connections simultaneously +func SetChecksumLimit(num int) { + checksumLimiter = make(chan struct{}, num) +} + // checksumInfo save some information about checksum type checksumInfo struct { checksum int64 diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index 34dca9051..02c296467 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -107,6 +107,7 @@ func (df *Diff) init(cfg *Config) (err error) { if err = df.CreateDBConn(cfg); err != nil { return errors.Trace(err) } + diff.SetChecksumLimit(cfg.CheckpointThreadCount) if err = df.AdjustTableConfig(cfg); err != nil { return errors.Trace(err) @@ -120,7 +121,7 @@ func (df *Diff) init(cfg *Config) (err error) { return nil } -// CreateDBConn creates db connections for source and target. +// CreateDBConn creates db connections for source and target func (df *Diff) CreateDBConn(cfg *Config) (err error) { for _, source := range cfg.SourceDBCfg { source.Conn, err = diff.CreateDB(df.ctx, source.DBConfig, cfg.CheckThreadCount) @@ -137,7 +138,7 @@ func (df *Diff) CreateDBConn(cfg *Config) (err error) { } df.targetDB = cfg.TargetDBCfg - df.cpDB, err = diff.CreateDB(df.ctx, cfg.TargetDBCfg.DBConfig, cfg.CheckpointThreadCount) + df.cpDB, err = diff.CreateDBForCP(df.ctx, cfg.TargetDBCfg.DBConfig) if err != nil { return errors.Errorf("create checkpoint db %s error %v", cfg.TargetDBCfg.DBConfig.String(), err) } From e413006495d3f0275bc55a6503668a564b011d9b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 10 Mar 2021 13:34:36 +0800 Subject: [PATCH 4/5] force index --- pkg/dbutil/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dbutil/common.go b/pkg/dbutil/common.go index e00a817ed..43285b5b3 100644 --- a/pkg/dbutil/common.go +++ b/pkg/dbutil/common.go @@ -387,7 +387,7 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", ColumnName(col.Name.O))) } - query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;", + query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s FORCE INDEX(PRIMARY) WHERE %s;", strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange) log.Debug("checksum", zap.String("sql", query), zap.Reflect("args", args)) From 3309ffca735a23371a9a12de8e61dc5a4efb468c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 10 Mar 2021 14:01:08 +0800 Subject: [PATCH 5/5] try concurrently split chunk --- pkg/diff/chunk.go | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/pkg/diff/chunk.go b/pkg/diff/chunk.go index 06c2c926f..ee7866788 100644 --- a/pkg/diff/chunk.go +++ b/pkg/diff/chunk.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" "github.com/pingcap/errors" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/utils" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -322,6 +324,9 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) { var ( lowerValues, upperValues []string latestCount int64 + wg sync.WaitGroup + chunkCh = make(chan []*ChunkRange, len(buckets)+1) + splitErr atomic.Error ) indexColumns := getColumnsFromIndex(index, s.table.info) @@ -363,11 +368,16 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) { if count == 0 { continue } else if count >= 2 { - splitChunks, err := splitRangeByRandom(s.table.Conn, chunk, int(count), s.table.Schema, s.table.Table, indexColumns, s.limits, s.collation) - if err != nil { - return nil, errors.Trace(err) - } - chunks = append(chunks, splitChunks...) + wg.Add(1) + go func() { + defer wg.Done() + splitChunks, err := splitRangeByRandom(s.table.Conn, chunk, int(count), s.table.Schema, s.table.Table, indexColumns, s.limits, s.collation) + if err != nil && splitErr.Load() == nil { + splitErr.Store(errors.Trace(err)) + } + chunkCh <- splitChunks + }() + } else { chunks = append(chunks, chunk) } @@ -376,6 +386,22 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) { lowerValues = upperValues } + // early fail + err = splitErr.Load() + if err != nil { + return nil, err + } + wg.Wait() + err = splitErr.Load() + if err != nil { + return nil, err + } + + for len(chunkCh) > 0 { + splitChunks := <-chunkCh + chunks = append(chunks, splitChunks...) + } + if len(chunks) != 0 { break }