From fc0222a64cb58415f05d913c724929b04fab476c Mon Sep 17 00:00:00 2001 From: wangbo Date: Wed, 7 Aug 2024 11:01:46 +0800 Subject: [PATCH] [opt](info) processlist schema table support show all fe (#38701) (#38953) pick #38701 --- be/src/exec/schema_scanner.h | 1 + .../schema_processlist_scanner.cpp | 11 +++++++--- be/src/pipeline/exec/schema_scan_operator.cpp | 11 ++++++++++ .../org/apache/doris/catalog/SchemaTable.java | 17 +++++++++++++++- .../apache/doris/planner/SchemaScanNode.java | 20 +++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 1 + 6 files changed, 57 insertions(+), 4 deletions(-) diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h index 4666657af21b5f..da61d58b943fc4 100644 --- a/be/src/exec/schema_scanner.h +++ b/be/src/exec/schema_scanner.h @@ -69,6 +69,7 @@ struct SchemaScannerCommonParam { int32_t port; // frontend thrift port int64_t thread_id; const std::string* catalog = nullptr; + std::set fe_addr_list; }; // scanner parameter from frontend diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp index f1071359d0ad16..c65e1d14c2c5ad 100644 --- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp @@ -55,9 +55,14 @@ Status SchemaProcessListScanner::start(RuntimeState* state) { TShowProcessListRequest request; request.__set_show_full_sql(true); - RETURN_IF_ERROR(SchemaHelper::show_process_list(*(_param->common_param->ip), - _param->common_param->port, request, - &_process_list_result)); + for (const auto& fe_addr : _param->common_param->fe_addr_list) { + TShowProcessListResult tmp_ret; + RETURN_IF_ERROR( + SchemaHelper::show_process_list(fe_addr.hostname, fe_addr.port, request, &tmp_ret)); + _process_list_result.process_list.insert(_process_list_result.process_list.end(), + tmp_ret.process_list.begin(), + tmp_ret.process_list.end()); + } return Status::OK(); } diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index 8ff05cc41b7714..d5353655ab070a 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -133,6 +133,17 @@ Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { _common_scanner_param->catalog = state->obj_pool()->add(new std::string(tnode.schema_scan_node.catalog)); } + + if (tnode.schema_scan_node.__isset.fe_addr_list) { + for (const auto& fe_addr : tnode.schema_scan_node.fe_addr_list) { + _common_scanner_param->fe_addr_list.insert(fe_addr); + } + } else if (tnode.schema_scan_node.__isset.ip && tnode.schema_scan_node.__isset.port) { + TNetworkAddress fe_addr; + fe_addr.hostname = tnode.schema_scan_node.ip; + fe_addr.port = tnode.schema_scan_node.port; + _common_scanner_param->fe_addr_list.insert(fe_addr); + } return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 0995884dc6fb05..53b00b0880a582 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -504,7 +504,7 @@ public class SchemaTable extends Table { .column("QUERY_ID", ScalarType.createVarchar(256)) .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) .column("FE", - ScalarType.createVarchar(64)).build())) + ScalarType.createVarchar(64)).build(), true)) .put("workload_policy", new SchemaTable(SystemIdGenerator.getNextId(), "workload_policy", TableType.SCHEMA, builder().column("ID", ScalarType.createType(PrimitiveType.BIGINT)) @@ -518,10 +518,17 @@ public class SchemaTable extends Table { .build())) .build(); + private boolean fetchAllFe = false; + protected SchemaTable(long id, String name, TableType type, List baseSchema) { super(id, name, type, baseSchema); } + protected SchemaTable(long id, String name, TableType type, List baseSchema, boolean fetchAllFe) { + this(id, name, type, baseSchema); + this.fetchAllFe = fetchAllFe; + } + @Override public void write(DataOutput out) throws IOException { throw new UnsupportedOperationException("Do not allow to write SchemaTable to image."); @@ -535,6 +542,14 @@ public static Builder builder() { return new Builder(); } + public static boolean isShouldFetchAllFe(String schemaTableName) { + Table table = TABLE_MAP.get(schemaTableName); + if (table != null && table instanceof SchemaTable) { + return ((SchemaTable) table).fetchAllFe; + } + return false; + } + /** * For TABLE_MAP. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java index 5a52c79e9534a4..9418f4f6cf3593 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.SchemaTable; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; @@ -27,6 +28,8 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.system.Frontend; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TScanRangeLocations; @@ -38,6 +41,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; /** @@ -84,6 +88,21 @@ public void finalizeForNereids() throws UserException { frontendPort = Config.rpc_port; } + private void setFeAddrList(TPlanNode msg) { + if (SchemaTable.isShouldFetchAllFe(tableName)) { + List feAddrList = new ArrayList(); + if (ConnectContext.get().getSessionVariable().showAllFeConnection) { + List feList = Env.getCurrentEnv().getFrontends(null); + for (Frontend fe : feList) { + feAddrList.add(new TNetworkAddress(fe.getHost(), fe.getRpcPort())); + } + } else { + feAddrList.add(new TNetworkAddress(frontendIP, frontendPort)); + } + msg.schema_scan_node.setFeAddrList(feAddrList); + } + } + @Override protected void toThrift(TPlanNode msg) { msg.node_type = TPlanNodeType.SCHEMA_SCAN_NODE; @@ -116,6 +135,7 @@ protected void toThrift(TPlanNode msg) { TUserIdentity tCurrentUser = ConnectContext.get().getCurrentUserIdentity().toThrift(); msg.schema_scan_node.setCurrentUserIdent(tCurrentUser); + setFeAddrList(msg); } @Override diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index f8a08dd708f202..fc1a6e6baf57a2 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -672,6 +672,7 @@ struct TSchemaScanNode { 12: optional bool show_hidden_cloumns = false // 13: optional list table_structure // deprecated 14: optional string catalog + 15: optional list fe_addr_list } struct TMetaScanNode {