Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker(dm): add a worker pool to concurrently working #7796

Merged
merged 12 commits into from
Dec 8, 2022
79 changes: 50 additions & 29 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,35 +169,6 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er
return nil, egErr
}

if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok &&
c.stCfgs[0].LoaderConfig.ImportMode == config.LoadModePhysical &&
c.stCfgs[0].Mode != config.ModeIncrement {
// TODO: concurrently read it intra-source later
for idx := range c.instances {
i := idx
eg.Go(func() error {
for _, sourceTables := range tableMapPerUpstream[i] {
for _, sourceTable := range sourceTables {
size, err2 := conn.FetchTableEstimatedBytes(
ctx,
c.instances[i].sourceDB,
sourceTable.Schema,
sourceTable.Name,
)
if err2 != nil {
return err2
}
info.totalDataSize.Add(size)
}
}
return nil
})
}
}
if egErr := eg.Wait(); egErr != nil {
return nil, egErr
}

info.targetTable2ExtendedColumns = extendedColumnPerTable
info.targetTable2SourceTablesMap = make(map[filter.Table]map[string][]filter.Table)
info.targetTableShardNum = make(map[filter.Table]int)
Expand Down Expand Up @@ -226,6 +197,8 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er
info.sourceID2SourceTables = make(map[string][]filter.Table, len(c.instances))
info.sourceID2InterestedDB = make([]map[string]struct{}, len(c.instances))
info.sourceID2TableMap = make(map[string]map[filter.Table][]filter.Table, len(c.instances))
sourceIDs := make([]string, 0, len(c.instances))
dbs := make(map[string]*conn.BaseDB, len(c.instances))
for i, inst := range c.instances {
sourceID := inst.cfg.SourceID
info.sourceID2InterestedDB[i] = make(map[string]struct{})
Expand All @@ -237,7 +210,55 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er
info.sourceID2InterestedDB[i][table.Schema] = struct{}{}
}
}
sourceIDs = append(sourceIDs, sourceID)
dbs[sourceID] = inst.sourceDB
}

if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok &&
c.stCfgs[0].LoaderConfig.ImportMode == config.LoadModePhysical &&
c.stCfgs[0].Mode != config.ModeIncrement {
concurrency, err := checker.GetConcurrency(ctx, sourceIDs, dbs, c.stCfgs[0].MydumperConfig.Threads)
if err != nil {
return nil, err
}

type job struct {
db *conn.BaseDB
schema string
table string
}

pool := checker.NewWorkerPoolWithContext[job, int64](ctx, func(result int64) {
info.totalDataSize.Add(result)
})
for i := 0; i < concurrency; i++ {
pool.Go(func(ctx context.Context, job job) (int64, error) {
return conn.FetchTableEstimatedBytes(
ctx,
job.db,
job.schema,
job.table,
)
})
}

for idx := range c.instances {
for _, sourceTables := range tableMapPerUpstream[idx] {
for _, sourceTable := range sourceTables {
pool.PutJob(job{
db: c.instances[idx].sourceDB,
schema: sourceTable.Schema,
table: sourceTable.Name,
})
}
}
}
err2 := pool.Wait()
if err2 != nil {
return nil, err2
}
}

return info, nil
}

Expand Down
Loading