Skip to content

Commit

Permalink
[Improvement](statistics)Improve show analyze performance. (#22484) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li authored Aug 11, 2023
1 parent 8e99116 commit 971b35c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
Expand Down Expand Up @@ -77,9 +78,14 @@ public enum ScheduleType {
@SerializedName("jobId")
public final long jobId;

// When this AnalysisInfo represent a task, this is the task id for it.
@SerializedName("taskId")
public final long taskId;

// When this AnalysisInfo represent a job, this is the list of task ids belong to this job.
@SerializedName("taskIds")
public final List<Long> taskIds;

@SerializedName("catalogName")
public final String catalogName;

Expand Down Expand Up @@ -153,14 +159,19 @@ public enum ScheduleType {
@SerializedName("samplingPartition")
public boolean samplingPartition;

public AnalysisInfo(long jobId, long taskId, String catalogName, String dbName, String tblName,
// For serialize
@SerializedName("cronExpr")
public String cronExprStr;

public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, String catalogName, String dbName, String tblName,
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
this.catalogName = catalogName;
this.dbName = dbName;
this.tblName = tblName;
Expand Down Expand Up @@ -231,6 +242,10 @@ public boolean isJob() {
return taskId == -1;
}

public void addTaskId(long taskId) {
taskIds.add(taskId);
}

// TODO: use thrift
public static AnalysisInfo fromResultRow(ResultRow resultRow) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;

import java.util.List;
import java.util.Map;
import java.util.Set;

public class AnalysisInfoBuilder {
private long jobId;
private long taskId;
private List<Long> taskIds;
private String catalogName;
private String dbName;
private String tblName;
Expand Down Expand Up @@ -59,6 +61,7 @@ public AnalysisInfoBuilder() {
public AnalysisInfoBuilder(AnalysisInfo info) {
jobId = info.jobId;
taskId = info.taskId;
taskIds = info.taskIds;
catalogName = info.catalogName;
dbName = info.dbName;
tblName = info.tblName;
Expand Down Expand Up @@ -94,6 +97,11 @@ public AnalysisInfoBuilder setTaskId(long taskId) {
return this;
}

public AnalysisInfoBuilder setTaskIds(List<Long> taskIds) {
this.taskIds = taskIds;
return this;
}

public AnalysisInfoBuilder setCatalogName(String catalogName) {
this.catalogName = catalogName;
return this;
Expand Down Expand Up @@ -210,7 +218,7 @@ public AnalysisInfoBuilder setSamplingPartition(boolean samplingPartition) {
}

public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions, partitionNames,
return new AnalysisInfo(jobId, taskId, taskIds, catalogName, dbName, tblName, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition);
Expand All @@ -220,6 +228,7 @@ public AnalysisInfoBuilder copy() {
return new AnalysisInfoBuilder()
.setJobId(jobId)
.setTaskId(taskId)
.setTaskIds(taskIds)
.setCatalogName(catalogName)
.setDbName(dbName)
.setTblName(tblName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -439,6 +440,7 @@ private AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExcepti
Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table, columnNames,
partitionNames, analysisType, analysisMode);
taskInfoBuilder.setColToPartitions(colToPartitions);
taskInfoBuilder.setTaskIds(Lists.newArrayList());

return taskInfoBuilder.build();
}
Expand Down Expand Up @@ -511,6 +513,7 @@ private void createTaskForMVIdx(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask
AnalysisInfoBuilder indexTaskInfoBuilder = new AnalysisInfoBuilder(jobInfo);
AnalysisInfo analysisInfo = indexTaskInfoBuilder.setIndexId(indexId)
.setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build();
jobInfo.addTaskId(taskId);
if (isSync) {
return;
}
Expand All @@ -537,6 +540,7 @@ private void createTaskForEachColumns(AnalysisInfo jobInfo, Map<Long, BaseAnalys
AnalysisInfo analysisInfo = colTaskInfoBuilder.setColName(colName).setIndexId(indexId)
.setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build();
analysisTasks.put(taskId, createTask(analysisInfo));
jobInfo.addTaskId(taskId);
if (isSync) {
continue;
}
Expand Down Expand Up @@ -580,6 +584,7 @@ private void createTaskForExternalTable(AnalysisInfo jobInfo,
AnalysisInfo analysisInfo = colTaskInfoBuilder.setIndexId(-1L).setLastExecTimeInMs(System.currentTimeMillis())
.setTaskId(taskId).setColName("TableRowCount").setExternalTableLevelTask(true).build();
analysisTasks.put(taskId, createTask(analysisInfo));
jobInfo.addTaskId(taskId);
if (isSync) {
// For sync job, don't need to persist, return here and execute it immediately.
return;
Expand Down Expand Up @@ -708,7 +713,10 @@ public List<AnalysisInfo> showAnalysisJob(ShowAnalyzeStmt stmt) {
}

public String getJobProgress(long jobId) {
List<AnalysisInfo> tasks = findTasks(jobId);
List<AnalysisInfo> tasks = findTasksByTaskIds(jobId);
if (tasks == null) {
return "N/A";
}
int finished = 0;
int failed = 0;
int inProgress = 0;
Expand Down Expand Up @@ -921,6 +929,14 @@ public List<AnalysisInfo> findTasks(long jobId) {
}
}

public List<AnalysisInfo> findTasksByTaskIds(long jobId) {
AnalysisInfo jobInfo = analysisJobInfoMap.get(jobId);
if (jobInfo != null && jobInfo.taskIds != null) {
return jobInfo.taskIds.stream().map(id -> analysisTaskInfoMap.get(id)).collect(Collectors.toList());
}
return null;
}

public void removeAll(List<AnalysisInfo> analysisInfos) {
for (AnalysisInfo analysisInfo : analysisInfos) {
analysisTaskInfoMap.remove(analysisInfo.taskId);
Expand Down

0 comments on commit 971b35c

Please sign in to comment.