Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264) #48288

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type AnalyzeExec struct {
wg util.WaitGroupWrapper
opts map[ast.AnalyzeOptionType]uint64
OptionsMap map[int64]core.V2AnalyzeOptions
<<<<<<< HEAD:executor/analyze.go
=======
gp *gp.Pool
// errExitCh is used to notice the worker that the whole analyze task is finished when to meet error.
errExitCh chan struct{}
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}

var (
Expand Down Expand Up @@ -164,6 +170,17 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
for i := 0; i < concurrency; i++ {
e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) })
}
<<<<<<< HEAD:executor/analyze.go
=======
pruneMode := variable.PartitionPruneMode(sessionVars.PartitionPruneMode.Load())
// needGlobalStats used to indicate whether we should merge the partition-level stats to global-level stats.
needGlobalStats := pruneMode == variable.Dynamic
globalStatsMap := make(map[globalStatsKey]globalStatsInfo)
g, _ := errgroup.WithContext(ctx)
g.Go(func() error {
return e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh, len(tasks))
})
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
for _, task := range tasks {
prepareV2AnalyzeJobInfo(task.colExec, false)
AddNewAnalyzeJob(e.ctx, task.job)
Expand All @@ -176,6 +193,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
taskCh <- task
}
close(taskCh)
<<<<<<< HEAD:executor/analyze.go
e.wg.Wait()
close(resultsCh)
pruneMode := variable.PartitionPruneMode(e.ctx.GetSessionVars().PartitionPruneMode.Load())
Expand All @@ -186,11 +204,21 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
=======
defer func() {
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
}
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}
}
}()

err = e.waitFinish(ctx, g, resultsCh)
if err != nil {
return err
}

failpoint.Inject("mockKillFinishedAnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
Expand All @@ -205,8 +233,49 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if err != nil {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
}
<<<<<<< HEAD:executor/analyze.go
if e.ctx.GetSessionVars().InRestrictedSQL {
return statsHandle.Update(e.ctx.GetInfoSchema().(infoschema.InfoSchema))
=======
return statsHandle.Update(infoSchema)
}

func (e *AnalyzeExec) waitFinish(ctx context.Context, g *errgroup.Group, resultsCh chan *statistics.AnalyzeResults) error {
checkwg, _ := errgroup.WithContext(ctx)
checkwg.Go(func() error {
// It is to wait for the completion of the result handler. if the result handler meets error, we should cancel
// the analyze process by closing the errExitCh.
err := g.Wait()
if err != nil {
close(e.errExitCh)
return err
}
return nil
})
checkwg.Go(func() error {
// Wait all workers done and close the results channel.
e.wg.Wait()
close(resultsCh)
return nil
})
return checkwg.Wait()
}

// filterAndCollectTasks filters the tasks that are not locked and collects the table IDs.
func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, infoSchema infoschema.InfoSchema) ([]*analyzeTask, uint, []string, error) {
var (
filteredTasks []*analyzeTask
skippedTables []string
needAnalyzeTableCnt uint
// tidMap is used to deduplicate table IDs.
// In stats v1, analyze for each index is a single task, and they have the same table id.
tidAndPidsMap = make(map[int64]struct{}, len(tasks))
)

lockedTableAndPartitionIDs, err := getLockedTableAndPartitionIDs(statsHandle, tasks)
if err != nil {
return nil, 0, nil, err
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}
return statsHandle.Update(e.ctx.GetInfoSchema().(infoschema.InfoSchema), handle.WithTableStatsByQuery())
}
Expand Down Expand Up @@ -406,6 +475,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
StartAnalyzeJob(e.ctx, task.job)
switch task.taskType {
case colTask:
<<<<<<< HEAD:executor/analyze.go
resultsCh <- analyzeColumnsPushDownEntry(task.colExec)
case idxTask:
resultsCh <- analyzeIndexPushdown(task.idxExec)
Expand All @@ -415,6 +485,19 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
resultsCh <- analyzePKIncremental(task.colIncrementalExec)
case idxIncrementalTask:
resultsCh <- analyzeIndexIncremental(task.idxIncrementalExec)
=======
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeColumnsPushDownEntry(e.gp, task.colExec):
}
case idxTask:
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeIndexPushdown(task.idxExec):
}
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}
}
}
Expand Down Expand Up @@ -569,15 +652,33 @@ func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
logutil.BgLogger().Warn(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason),
zap.Error(analyzeErr))
} else {
state = statistics.AnalyzeFinished
logutil.BgLogger().Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason))
}
<<<<<<< HEAD:executor/analyze.go
logutil.BgLogger().Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()))
=======
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/analyze.go
}
}

Expand Down
4 changes: 2 additions & 2 deletions executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func isAnalyzeWorkerPanic(err error) bool {
func getAnalyzePanicErr(r interface{}) error {
if msg, ok := r.(string); ok {
if msg == globalPanicAnalyzeMemoryExceed {
return errAnalyzeOOM
return errors.Trace(errAnalyzeOOM)
}
if strings.Contains(msg, memory.PanicMemoryExceedWarnMsg) {
return errors.Errorf(msg, errAnalyzeOOM)
Expand All @@ -61,7 +61,7 @@ func getAnalyzePanicErr(r interface{}) error {
}
return err
}
return errAnalyzeWorkerPanic
return errors.Trace(errAnalyzeWorkerPanic)
}

// analyzeResultsNotifyWaitGroupWrapper is a wrapper for sync.WaitGroup
Expand Down
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2936,6 +2936,12 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) Executor {
tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),
opts: v.Opts,
OptionsMap: v.OptionsMap,
<<<<<<< HEAD:executor/builder.go
=======
wg: util.NewWaitGroupPool(gp),
gp: gp,
errExitCh: make(chan struct{}),
>>>>>>> 4f00ece106b (executor: fix hang for analyze table when exceed tidb_mem_quota_analyze (#48264)):pkg/executor/builder.go
}
enableFastAnalyze := b.ctx.GetSessionVars().EnableFastAnalyze
autoAnalyze := ""
Expand Down
23 changes: 23 additions & 0 deletions pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "memorycontrol_test",
timeout = "short",
srcs = [
"main_test.go",
"memory_control_test.go",
],
flaky = True,
shard_count = 4,
deps = [
"//pkg/config",
"//pkg/executor",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/autoanalyze",
"//pkg/testkit",
"//pkg/util",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
)
Loading