Skip to content

Commit

Permalink
This is an automated cherry-pick of #48264
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
hawkingrei authored and ti-chi-bot committed Nov 3, 2023
1 parent 9e55944 commit 6b16bdd
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 3 deletions.
103 changes: 102 additions & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type AnalyzeExec struct {
wg util.WaitGroupWrapper
opts map[ast.AnalyzeOptionType]uint64
OptionsMap map[int64]core.V2AnalyzeOptions
<<<<<<< HEAD:executor/analyze.go
=======
gp *gp.Pool
// errExitCh is used to notice the worker that the whole analyze task is finished when to meet error.
errExitCh chan struct{}
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}

var (
Expand Down Expand Up @@ -164,6 +170,17 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
for i := 0; i < concurrency; i++ {
e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) })
}
<<<<<<< HEAD:executor/analyze.go
=======
pruneMode := variable.PartitionPruneMode(sessionVars.PartitionPruneMode.Load())
// needGlobalStats used to indicate whether we should merge the partition-level stats to global-level stats.
needGlobalStats := pruneMode == variable.Dynamic
globalStatsMap := make(map[globalStatsKey]globalStatsInfo)
g, _ := errgroup.WithContext(ctx)
g.Go(func() error {
return e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh, len(tasks))
})
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
for _, task := range tasks {
prepareV2AnalyzeJobInfo(task.colExec, false)
AddNewAnalyzeJob(e.ctx, task.job)
Expand All @@ -176,6 +193,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
taskCh <- task
}
close(taskCh)
<<<<<<< HEAD:executor/analyze.go
e.wg.Wait()
close(resultsCh)
pruneMode := variable.PartitionPruneMode(e.ctx.GetSessionVars().PartitionPruneMode.Load())
Expand All @@ -186,11 +204,21 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
=======
defer func() {
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
}
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}
}
}()

err = e.waitFinish(ctx, g, resultsCh)
if err != nil {
return err
}

failpoint.Inject("mockKillFinishedAnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
Expand All @@ -205,8 +233,49 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if err != nil {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
}
<<<<<<< HEAD:executor/analyze.go
if e.ctx.GetSessionVars().InRestrictedSQL {
return statsHandle.Update(e.ctx.GetInfoSchema().(infoschema.InfoSchema))
=======
return statsHandle.Update(infoSchema)
}

func (e *AnalyzeExec) waitFinish(ctx context.Context, g *errgroup.Group, resultsCh chan *statistics.AnalyzeResults) error {
checkwg, _ := errgroup.WithContext(ctx)
checkwg.Go(func() error {
// It is to wait for the completion of the result handler. if the result handler meets error, we should cancel
// the analyze process by closing the errExitCh.
err := g.Wait()
if err != nil {
close(e.errExitCh)
return err
}
return nil
})
checkwg.Go(func() error {
// Wait all workers done and close the results channel.
e.wg.Wait()
close(resultsCh)
return nil
})
return checkwg.Wait()
}

// filterAndCollectTasks filters the tasks that are not locked and collects the table IDs.
func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, infoSchema infoschema.InfoSchema) ([]*analyzeTask, uint, []string, error) {
var (
filteredTasks []*analyzeTask
skippedTables []string
needAnalyzeTableCnt uint
// tidMap is used to deduplicate table IDs.
// In stats v1, analyze for each index is a single task, and they have the same table id.
tidAndPidsMap = make(map[int64]struct{}, len(tasks))
)

lockedTableAndPartitionIDs, err := getLockedTableAndPartitionIDs(statsHandle, tasks)
if err != nil {
return nil, 0, nil, err
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}
return statsHandle.Update(e.ctx.GetInfoSchema().(infoschema.InfoSchema), handle.WithTableStatsByQuery())
}
Expand Down Expand Up @@ -406,6 +475,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
StartAnalyzeJob(e.ctx, task.job)
switch task.taskType {
case colTask:
<<<<<<< HEAD:executor/analyze.go
resultsCh <- analyzeColumnsPushDownEntry(task.colExec)
case idxTask:
resultsCh <- analyzeIndexPushdown(task.idxExec)
Expand All @@ -415,6 +485,19 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
resultsCh <- analyzePKIncremental(task.colIncrementalExec)
case idxIncrementalTask:
resultsCh <- analyzeIndexIncremental(task.idxIncrementalExec)
=======
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeColumnsPushDownEntry(e.gp, task.colExec):
}
case idxTask:
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeIndexPushdown(task.idxExec):
}
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}
}
}
Expand Down Expand Up @@ -569,15 +652,33 @@ func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
logutil.BgLogger().Warn(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason),
zap.Error(analyzeErr))
} else {
state = statistics.AnalyzeFinished
logutil.BgLogger().Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason))
}
<<<<<<< HEAD:executor/analyze.go
logutil.BgLogger().Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()))
=======
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}
}

Expand Down
4 changes: 2 additions & 2 deletions executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func isAnalyzeWorkerPanic(err error) bool {
func getAnalyzePanicErr(r interface{}) error {
if msg, ok := r.(string); ok {
if msg == globalPanicAnalyzeMemoryExceed {
return errAnalyzeOOM
return errors.Trace(errAnalyzeOOM)
}
if strings.Contains(msg, memory.PanicMemoryExceedWarnMsg) {
return errors.Errorf(msg, errAnalyzeOOM)
Expand All @@ -61,7 +61,7 @@ func getAnalyzePanicErr(r interface{}) error {
}
return err
}
return errAnalyzeWorkerPanic
return errors.Trace(errAnalyzeWorkerPanic)
}

// analyzeResultsNotifyWaitGroupWrapper is a wrapper for sync.WaitGroup
Expand Down
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2936,6 +2936,12 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) Executor {
tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),
opts: v.Opts,
OptionsMap: v.OptionsMap,
<<<<<<< HEAD:executor/builder.go
=======
wg: util.NewWaitGroupPool(gp),
gp: gp,
errExitCh: make(chan struct{}),
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/builder.go
}
enableFastAnalyze := b.ctx.GetSessionVars().EnableFastAnalyze
autoAnalyze := ""
Expand Down
23 changes: 23 additions & 0 deletions pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "memorycontrol_test",
timeout = "short",
srcs = [
"main_test.go",
"memory_control_test.go",
],
flaky = True,
shard_count = 4,
deps = [
"//pkg/config",
"//pkg/executor",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/autoanalyze",
"//pkg/testkit",
"//pkg/util",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
)
Loading

0 comments on commit 6b16bdd

Please sign in to comment.