Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

diff: add a config for checkpointDB connection limit #421

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/dbutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", ColumnName(col.Name.O)))
}

query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;",
query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s FORCE INDEX(PRIMARY) WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange)
log.Debug("checksum", zap.String("sql", query), zap.Reflect("args", args))

Expand Down
36 changes: 31 additions & 5 deletions pkg/diff/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/utils"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -322,6 +324,9 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) {
var (
lowerValues, upperValues []string
latestCount int64
wg sync.WaitGroup
chunkCh = make(chan []*ChunkRange, len(buckets)+1)
splitErr atomic.Error
)

indexColumns := getColumnsFromIndex(index, s.table.info)
Expand Down Expand Up @@ -363,11 +368,16 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) {
if count == 0 {
continue
} else if count >= 2 {
splitChunks, err := splitRangeByRandom(s.table.Conn, chunk, int(count), s.table.Schema, s.table.Table, indexColumns, s.limits, s.collation)
if err != nil {
return nil, errors.Trace(err)
}
chunks = append(chunks, splitChunks...)
wg.Add(1)
go func() {
defer wg.Done()
splitChunks, err := splitRangeByRandom(s.table.Conn, chunk, int(count), s.table.Schema, s.table.Table, indexColumns, s.limits, s.collation)
if err != nil && splitErr.Load() == nil {
splitErr.Store(errors.Trace(err))
}
chunkCh <- splitChunks
}()

} else {
chunks = append(chunks, chunk)
}
Expand All @@ -376,6 +386,22 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) {
lowerValues = upperValues
}

// early fail
err = splitErr.Load()
if err != nil {
return nil, err
}
wg.Wait()
err = splitErr.Load()
if err != nil {
return nil, err
}

for len(chunkCh) > 0 {
splitChunks := <-chunkCh
chunks = append(chunks, splitChunks...)
}

if len(chunks) != 0 {
break
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/diff/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/pingcap/tidb-tools/pkg/dbutil"
)

// CreateDB creates sql.DB used for select data
// CreateDB creates sql.DB and set connection limit
func CreateDB(ctx context.Context, dbConfig dbutil.DBConfig, num int) (db *sql.DB, err error) {
db, err = dbutil.OpenDB(dbConfig)
if err != nil {
Expand All @@ -44,8 +44,6 @@ func CreateDBForCP(ctx context.Context, dbConfig dbutil.DBConfig) (cpDB *sql.DB,
if err != nil {
return nil, errors.Errorf("create db connections %+v error %v", dbConfig, err)
}
cpDB.SetMaxOpenConns(1)
cpDB.SetMaxIdleConns(1)

return cpDB, nil
}
13 changes: 11 additions & 2 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,15 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool,
})

update := func() {
checksumLimiter <- struct{}{}
ctx1, cancel1 := context.WithTimeout(ctx, dbutil.DefaultTimeout)
defer cancel1()

err1 := saveChunk(ctx1, t.CpDB, chunk.ID, t.TargetTable.InstanceID, t.TargetTable.Schema, t.TargetTable.Table, "", chunk)
if err1 != nil {
log.Warn("update chunk info", zap.Error(err1))
}
<-checksumLimiter
}

defer func() {
Expand All @@ -434,7 +436,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool,
}
}
}
update()
go update()
}()

if filterByRand {
Expand All @@ -447,7 +449,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool,
}

chunk.State = checkingState
update()
go update()

if t.UseChecksum {
// first check the checksum is equal or not
Expand Down Expand Up @@ -475,6 +477,13 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool,
return equal, nil
}

var checksumLimiter chan struct{}

// SetChecksumLimit limits the number of checkpoint DB connections simultaneously
func SetChecksumLimit(num int) {
checksumLimiter = make(chan struct{}, num)
}

// checksumInfo save some information about checksum
type checksumInfo struct {
checksum int64
Expand Down
4 changes: 4 additions & 0 deletions sync_diff_inspector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ type Config struct {
// how many goroutines are created to check data
CheckThreadCount int `toml:"check-thread-count" json:"check-thread-count"`

// how many goroutines are created to save checkpoint
CheckpointThreadCount int `toml:"checkpoint-thread-count" json:"checkpoint-thread-count"`

// set false if want to comapre the data directly
UseChecksum bool `toml:"use-checksum" json:"use-checksum"`

Expand Down Expand Up @@ -228,6 +231,7 @@ func NewConfig() *Config {
fs.IntVar(&cfg.ChunkSize, "chunk-size", 1000, "diff check chunk size")
fs.IntVar(&cfg.Sample, "sample", 100, "the percent of sampling check")
fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 1, "how many goroutines are created to check data")
fs.IntVar(&cfg.CheckpointThreadCount, "checkpoint-thread-count", 64, "how many goroutines are created to save checkpoint")
fs.BoolVar(&cfg.UseChecksum, "use-checksum", true, "set false if want to comapre the data directly")
fs.StringVar(&cfg.FixSQLFile, "fix-sql-file", "fix.sql", "the name of the file which saves sqls used to fix different data")
fs.BoolVar(&cfg.PrintVersion, "V", false, "print version of sync_diff_inspector")
Expand Down
3 changes: 2 additions & 1 deletion sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (df *Diff) init(cfg *Config) (err error) {
if err = df.CreateDBConn(cfg); err != nil {
return errors.Trace(err)
}
diff.SetChecksumLimit(cfg.CheckpointThreadCount)

if err = df.AdjustTableConfig(cfg); err != nil {
return errors.Trace(err)
Expand All @@ -120,7 +121,7 @@ func (df *Diff) init(cfg *Config) (err error) {
return nil
}

// CreateDBConn creates db connections for source and target.
// CreateDBConn creates db connections for source and target
func (df *Diff) CreateDBConn(cfg *Config) (err error) {
for _, source := range cfg.SourceDBCfg {
source.Conn, err = diff.CreateDB(df.ctx, source.DBConfig, cfg.CheckThreadCount)
Expand Down