Skip to content

Commit

Permalink
executor: fix hang for analyze table when exceed tidb_mem_quota_analy…
Browse files Browse the repository at this point in the history
…ze (#48264) (#48287)

close #48171
  • Loading branch information
ti-chi-bot authored Nov 6, 2023
1 parent af47b70 commit 98becfa
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 22 deletions.
76 changes: 57 additions & 19 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type AnalyzeExec struct {
wg util.WaitGroupWrapper
opts map[ast.AnalyzeOptionType]uint64
OptionsMap map[int64]core.V2AnalyzeOptions
// errExitCh is used to notice the worker that the whole analyze task is finished when to meet error.
errExitCh chan struct{}
}

var (
Expand Down Expand Up @@ -122,7 +124,6 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
g.Go(func() error {
return e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh, len(tasks))
})

for _, task := range tasks {
prepareV2AnalyzeJobInfo(task.colExec, false)
AddNewAnalyzeJob(e.Ctx(), task.job)
Expand All @@ -136,19 +137,19 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
taskCh <- task
}
close(taskCh)

// Wait all workers done and close the results channel.
e.wg.Wait()
close(resultsCh)
err = g.Wait()
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
defer func() {
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
}
}
}
}()

err = e.waitFinish(ctx, g, resultsCh)
if err != nil {
return err
}

failpoint.Inject("mockKillFinishedAnalyzeJob", func() {
dom := domain.GetDomain(e.Ctx())
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
Expand All @@ -169,6 +170,27 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return statsHandle.Update(infoSchema)
}

func (e *AnalyzeExec) waitFinish(ctx context.Context, g *errgroup.Group, resultsCh chan *statistics.AnalyzeResults) error {
checkwg, _ := errgroup.WithContext(ctx)
checkwg.Go(func() error {
// It is to wait for the completion of the result handler. if the result handler meets error, we should cancel
// the analyze process by closing the errExitCh.
err := g.Wait()
if err != nil {
close(e.errExitCh)
return err
}
return nil
})
checkwg.Go(func() error {
// Wait all workers done and close the results channel.
e.wg.Wait()
close(resultsCh)
return nil
})
return checkwg.Wait()
}

// filterAndCollectTasks filters the tasks that are not locked and collects the table IDs.
func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, infoSchema infoschema.InfoSchema) ([]*analyzeTask, uint, []string, error) {
var (
Expand Down Expand Up @@ -495,9 +517,17 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
StartAnalyzeJob(e.Ctx(), task.job)
switch task.taskType {
case colTask:
resultsCh <- analyzeColumnsPushDownEntry(task.colExec)
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeColumnsPushDownEntry(task.colExec):
}
case idxTask:
resultsCh <- analyzeIndexPushdown(task.idxExec)
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeIndexPushdown(task.idxExec):
}
}
}
}
Expand Down Expand Up @@ -684,16 +714,24 @@ func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
logutil.BgLogger().Warn(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason),
zap.Error(analyzeErr))
} else {
state = statistics.AnalyzeFinished
logutil.BgLogger().Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason))
}
logutil.BgLogger().Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason))
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func isAnalyzeWorkerPanic(err error) bool {
func getAnalyzePanicErr(r interface{}) error {
if msg, ok := r.(string); ok {
if msg == globalPanicAnalyzeMemoryExceed {
return errAnalyzeOOM
return errors.Trace(errAnalyzeOOM)
}
if strings.Contains(msg, memory.PanicMemoryExceedWarnMsg) {
return errors.Errorf("%s, %s", msg, errAnalyzeOOM)
Expand All @@ -69,7 +69,7 @@ func getAnalyzePanicErr(r interface{}) error {
}
return err
}
return errAnalyzeWorkerPanic
return errors.Trace(errAnalyzeWorkerPanic)
}

// analyzeResultsNotifyWaitGroupWrapper is a wrapper for sync.WaitGroup
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2895,6 +2895,7 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),
opts: v.Opts,
OptionsMap: v.OptionsMap,
errExitCh: make(chan struct{}),
}
autoAnalyze := ""
if b.ctx.GetSessionVars().InRestrictedSQL {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"memory_control_test.go",
],
flaky = True,
shard_count = 3,
shard_count = 4,
deps = [
"//pkg/config",
"//pkg/executor",
Expand Down
19 changes: 19 additions & 0 deletions pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,22 @@ func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) {
childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
require.Len(t, childTrackers, 0)
}

func TestMemQuotaAnalyze(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table tbl_2 ( col_20 decimal default 84232 , col_21 tinyint not null , col_22 int default 80814394 , col_23 mediumint default -8036687 not null , col_24 smallint default 9185 not null , col_25 tinyint unsigned default 65 , col_26 char(115) default 'ZyfroRODMbNDRZnPNRW' not null , col_27 bigint not null , col_28 tinyint not null , col_29 char(130) default 'UMApsVgzHblmY' , primary key idx_14 ( col_28,col_22 ) , unique key idx_15 ( col_24,col_22 ) , key idx_16 ( col_21,col_20,col_24,col_25,col_27,col_28,col_26,col_29 ) , key idx_17 ( col_24,col_25 ) , unique key idx_18 ( col_25,col_23,col_29,col_27,col_26,col_22 ) , key idx_19 ( col_25,col_22,col_26,col_23 ) , unique key idx_20 ( col_22,col_24,col_28,col_29,col_26,col_20 ) , key idx_21 ( col_25,col_24,col_26,col_29,col_27,col_22,col_28 ) ) partition by range ( col_22 ) ( partition p0 values less than (-1938341588), partition p1 values less than (-1727506184), partition p2 values less than (-1700184882), partition p3 values less than (-1596142809), partition p4 values less than (445165686) );")
tk.MustExec("insert ignore into tbl_2 values ( 942,33,-1915007317,3408149,-3699,193,'Trywdis',1876334369465184864,115,null );")
tk.MustExec("insert ignore into tbl_2 values ( 7,-39,-1382727205,-2544981,-28075,88,'FDhOsTRKRLCwEk',-1239168882463214388,17,'WskQzCK' );")
tk.MustExec("insert ignore into tbl_2 values ( null,55,-388460319,-2292918,10130,162,'UqjDlYvdcNY',4872802276956896607,-51,'ORBQjnumcXP' );")
tk.MustExec("insert ignore into tbl_2 values ( 42,-19,-9677826,-1168338,16904,79,'TzOqH',8173610791128879419,65,'lNLcvOZDcRzWvDO' );")
tk.MustExec("insert ignore into tbl_2 values ( 2,26,369867543,-6773303,-24953,41,'BvbdrKTNtvBgsjjnxt',5996954963897924308,-95,'wRJYPBahkIGDfz' );")
tk.MustExec("insert ignore into tbl_2 values ( 6896,3,444460824,-2070971,-13095,167,'MvWNKbaOcnVuIrtbT',6968339995987739471,-5,'zWipNBxGeVmso' );")
tk.MustExec("insert ignore into tbl_2 values ( 58761,112,-1535034546,-5837390,-14204,157,'',-8319786912755096816,15,'WBjsozfBfrPPHmKv' );")
tk.MustExec("insert ignore into tbl_2 values ( 84923,113,-973946646,406140,25040,51,'THQdwkQvppWZnULm',5469507709881346105,94,'oGNmoxLLgHkdyDCT' );")
tk.MustExec("insert ignore into tbl_2 values ( 0,-104,-488745187,-1941015,-2646,39,'jyKxfs',-5307175470406648836,46,'KZpfjFounVgFeRPa' );")
tk.MustExec("insert ignore into tbl_2 values ( 4,97,2105289255,1034363,28385,192,'',4429378142102752351,8,'jOk' );")
tk.MustExec("set global tidb_mem_quota_analyze=128;")
tk.MustExecToErr("analyze table tbl_2;")
}

0 comments on commit 98becfa

Please sign in to comment.