From c81bd9f5acf3ed6ac3661980808c1a6335f2a6eb Mon Sep 17 00:00:00 2001 From: wangbo <506340561@qq.com> Date: Wed, 21 Feb 2024 17:40:30 +0800 Subject: [PATCH] add load type --- .../doris/load/loadv2/LoadLoadingTask.java | 4 +++- .../trees/plans/commands/InsertExecutor.java | 4 +++- .../java/org/apache/doris/qe/QeProcessor.java | 3 ++- .../org/apache/doris/qe/QeProcessorImpl.java | 21 ++++++++++++++----- .../org/apache/doris/qe/StmtExecutor.java | 8 ++++--- 5 files changed, 29 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 8fbabea86291c09..e8cf1a8207fc62e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -33,6 +33,8 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo.QueryType; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TQueryType; @@ -172,7 +174,7 @@ private void executeOnce() throws Exception { } try { - QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator); + QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator, QueryType.BROKER_LOAD); actualExecute(curCoordinator, timeoutS); } finally { QeProcessorImpl.INSTANCE.unregisterQuery(loadId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java index abee26775275da6..78407a12e5a178f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java @@ -66,6 +66,8 @@ import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.InsertStreamTxnExecutor; import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo.QueryType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; @@ -202,7 +204,7 @@ public void executeSingleInsertTransaction(StmtExecutor executor, long jobId) { coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); coordinator.setQueryType(TQueryType.LOAD); executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile()); - QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator); + QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator, QueryType.INSERT); coordinator.exec(); int execTimeout = ctx.getExecTimeout(); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java index 44999ecef64caf5..51737a832d94c90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java @@ -31,7 +31,8 @@ public interface QeProcessor { TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr); - void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException; + void registerQuery(TUniqueId queryId, Coordinator coord, QeProcessorImpl.QueryInfo.QueryType queryType) + throws UserException; void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info) throws UserException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 55536639bfd1868..30f3cc7fa51a824 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -90,8 +90,8 @@ public List getAllCoordinators() { } @Override - public void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException { - registerQuery(queryId, new QueryInfo(coord)); + public void registerQuery(TUniqueId queryId, Coordinator coord, QueryInfo.QueryType queryType) throws UserException { + registerQuery(queryId, new QueryInfo(coord, queryType)); } @Override @@ -262,22 +262,33 @@ public Map getQueryInfoMap() { } public static final class QueryInfo { + + public enum QueryType { + QUERY, + INSERT, + BROKER_LOAD, + STREAM_LOAD, + UNKNOWN + } + private final ConnectContext connectContext; private final Coordinator coord; private final String sql; private final long startExecTime; + private QueryType queryType = QueryType.UNKNOWN; // from Export, Pull load, Insert - public QueryInfo(Coordinator coord) { - this(null, null, coord); + public QueryInfo(Coordinator coord, QueryType queryType) { + this(null, null, coord, queryType); } // from query - public QueryInfo(ConnectContext connectContext, String sql, Coordinator coord) { + public QueryInfo(ConnectContext connectContext, String sql, Coordinator coord, QueryType queryType) { this.connectContext = connectContext; this.coord = coord; this.sql = sql; this.startExecTime = System.currentTimeMillis(); + this.queryType = queryType; } public ConnectContext getConnectContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 7879f5ed86f95e3..5504df7dd286cdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -146,6 +146,8 @@ import org.apache.doris.proto.Types; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import org.apache.doris.qe.ConnectContext.ConnectType; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo.QueryType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; @@ -1572,7 +1574,7 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable } else { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), - new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord, QueryType.QUERY)); profile.setExecutionProfile(coord.getExecutionProfile()); coordBase = coord; } @@ -2052,7 +2054,7 @@ private void handleInsertStmt() throws Exception { coord.setQueryType(TQueryType.LOAD); profile.setExecutionProfile(coord.getExecutionProfile()); - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord, QueryType.INSERT); Table table = insertStmt.getTargetTable(); if (table instanceof OlapTable) { @@ -2855,7 +2857,7 @@ public List executeInternalQuery() { profile.setExecutionProfile(coord.getExecutionProfile()); try { QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), - new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord, QueryType.QUERY)); } catch (UserException e) { throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e); }