Skip to content

Commit

Permalink
add load type
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Feb 21, 2024
1 parent 058489e commit c81bd9f
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo.QueryType;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryType;
Expand Down Expand Up @@ -172,7 +174,7 @@ private void executeOnce() throws Exception {
}

try {
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator, QueryType.BROKER_LOAD);
actualExecute(curCoordinator, timeoutS);
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo.QueryType;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
Expand Down Expand Up @@ -202,7 +204,7 @@ public void executeSingleInsertTransaction(StmtExecutor executor, long jobId) {
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator);
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator, QueryType.INSERT);
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public interface QeProcessor {

TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr);

void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException;
void registerQuery(TUniqueId queryId, Coordinator coord, QeProcessorImpl.QueryInfo.QueryType queryType)
throws UserException;

void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info) throws UserException;

Expand Down
21 changes: 16 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public List<Coordinator> getAllCoordinators() {
}

@Override
public void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException {
registerQuery(queryId, new QueryInfo(coord));
public void registerQuery(TUniqueId queryId, Coordinator coord, QueryInfo.QueryType queryType) throws UserException {
registerQuery(queryId, new QueryInfo(coord, queryType));
}

@Override
Expand Down Expand Up @@ -262,22 +262,33 @@ public Map<String, QueryInfo> getQueryInfoMap() {
}

public static final class QueryInfo {

public enum QueryType {
QUERY,
INSERT,
BROKER_LOAD,
STREAM_LOAD,
UNKNOWN
}

private final ConnectContext connectContext;
private final Coordinator coord;
private final String sql;
private final long startExecTime;
private QueryType queryType = QueryType.UNKNOWN;

// from Export, Pull load, Insert
public QueryInfo(Coordinator coord) {
this(null, null, coord);
public QueryInfo(Coordinator coord, QueryType queryType) {
this(null, null, coord, queryType);
}

// from query
public QueryInfo(ConnectContext connectContext, String sql, Coordinator coord) {
public QueryInfo(ConnectContext connectContext, String sql, Coordinator coord, QueryType queryType) {
this.connectContext = connectContext;
this.coord = coord;
this.sql = sql;
this.startExecTime = System.currentTimeMillis();
this.queryType = queryType;
}

public ConnectContext getConnectContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo.QueryType;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
Expand Down Expand Up @@ -1572,7 +1574,7 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable
} else {
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord, QueryType.QUERY));
profile.setExecutionProfile(coord.getExecutionProfile());
coordBase = coord;
}
Expand Down Expand Up @@ -2052,7 +2054,7 @@ private void handleInsertStmt() throws Exception {
coord.setQueryType(TQueryType.LOAD);
profile.setExecutionProfile(coord.getExecutionProfile());

QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord, QueryType.INSERT);

Table table = insertStmt.getTargetTable();
if (table instanceof OlapTable) {
Expand Down Expand Up @@ -2855,7 +2857,7 @@ public List<ResultRow> executeInternalQuery() {
profile.setExecutionProfile(coord.getExecutionProfile());
try {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord, QueryType.QUERY));
} catch (UserException e) {
throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e);
}
Expand Down

0 comments on commit c81bd9f

Please sign in to comment.