From 7c7676e8920470dee244ffb4b1c5bda6b19102c3 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Mon, 8 Jul 2024 23:35:27 +0800 Subject: [PATCH] [Optimize] Add session variable `max_fetch_remote_schema_tablet_count` to limit tablets size for remote schema fetch (#37217) Describing tables with many partitions and tablets can cause high CPU usage. To mitigate this, we estimate and pick sample tablets for schema fetch, reducing the overall cost. --- .../org/apache/doris/catalog/OlapTable.java | 50 +++++++++++++++++++ .../common/proc/RemoteIndexSchemaProcDir.java | 4 +- .../proc/RemoteIndexSchemaProcNode.java | 9 ++++ .../util/FetchRemoteTabletSchemaUtil.java | 2 + .../org/apache/doris/qe/SessionVariable.java | 5 ++ regression-test/data/variant_p0/desc.out | 8 +++ regression-test/suites/variant_p0/desc.groovy | 20 ++++++++ 7 files changed, 97 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 0164c311974206..abd8a3b6baa961 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -92,6 +92,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -2650,6 +2651,55 @@ public List getAllTablets() throws AnalysisException { return tablets; } + // Get sample tablets for remote desc schema + // 1. Estimate tablets for a partition, 1 at least + // 2. Pick the partition sorted with id in desc order, greater id with the newest partition + // 3. Truncate to sampleSize + public List getSampleTablets(int sampleSize) { + List sampleTablets = new ArrayList<>(); + // Filter partition with empty data + Collection partitions = getPartitions() + .stream() + .filter(partition -> partition.getVisibleVersion() > Partition.PARTITION_INIT_VERSION) + .collect(Collectors.toList()); + if (partitions.isEmpty()) { + return sampleTablets; + } + // 1. Estimate tablets for a partition, 1 at least + int estimatePartitionTablets = Math.max(sampleSize / partitions.size(), 1); + + // 2. Sort the partitions by id in descending order (greater id means the newest partition) + List sortedPartitions = partitions.stream().sorted(new Comparator() { + @Override + public int compare(Partition p1, Partition p2) { + // compare with desc order + return Long.compare(p2.getId(), p1.getId()); + } + }).collect(Collectors.toList()); + + // 3. Collect tablets from partitions + for (Partition partition : sortedPartitions) { + List targetTablets = new ArrayList<>(partition.getBaseIndex().getTablets()); + Collections.shuffle(targetTablets); + if (!targetTablets.isEmpty()) { + // Ensure we do not exceed the available number of tablets + int tabletsToFetch = Math.min(targetTablets.size(), estimatePartitionTablets); + sampleTablets.addAll(targetTablets.subList(0, tabletsToFetch)); + } + + if (sampleTablets.size() >= sampleSize) { + break; + } + } + + // 4. Truncate to sample size if needed + if (sampleTablets.size() > sampleSize) { + sampleTablets = sampleTablets.subList(0, sampleSize); + } + + return sampleTablets; + } + // During `getNextVersion` and `updateVisibleVersionAndTime` period, // the write lock on the table should be held continuously public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersionTime) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java index 1af9ebab0f299e..c21f64c5d38e8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -63,7 +64,8 @@ public ProcResult fetchResult() throws AnalysisException { table.readLock(); try { OlapTable olapTable = (OlapTable) table; - tablets = olapTable.getAllTablets(); + // Get sample tablets for remote desc schema + tablets = olapTable.getSampleTablets(ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount); } finally { table.readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java index 8176b09bbf778d..cdb1bbc133e356 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java @@ -23,11 +23,13 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -62,6 +64,13 @@ public ProcResult fetchResult() throws AnalysisException { tablets.add(tablet); } } + // Get the maximum number of Remote Tablets that can be fetched + int maxFetchCount = ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount; + // If the number of tablets is greater than the maximum fetch count, randomly select maxFetchCount tablets + if (tablets.size() > maxFetchCount) { + Collections.shuffle(tablets); + tablets = tablets.subList(0, maxFetchCount); + } List remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch(); this.schema.addAll(remoteSchema); return IndexSchemaProcNode.createResult(this.schema, this.bfColumns); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java index db9700f744870b..0e96dc8c5930d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java @@ -92,6 +92,8 @@ public List fetch() { Long backendId = entry.getKey(); Set tabletIds = entry.getValue(); Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId); + LOG.debug("fetch schema from coord backend {}, sample tablets count {}", + backend.getId(), tabletIds.size()); // only need alive be if (!backend.isAlive()) { continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index fe288ea12c957c..72f17bacbcb62d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -588,6 +588,8 @@ public class SessionVariable implements Serializable, Writable { public static final String FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS = "fetch_remote_schema_timeout_seconds"; + public static final String MAX_FETCH_REMOTE_TABLET_COUNT = "max_fetch_remote_schema_tablet_count"; + // CLOUD_VARIABLES_BEGIN public static final String CLOUD_CLUSTER = "cloud_cluster"; public static final String DISABLE_EMPTY_PARTITION_PRUNE = "disable_empty_partition_prune"; @@ -1839,6 +1841,9 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { // fetch remote schema rpc timeout @VariableMgr.VarAttr(name = FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS, fuzzy = true) public long fetchRemoteSchemaTimeoutSeconds = 120; + // max tablet count for fetch remote schema + @VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true) + public int maxFetchRemoteTabletCount = 512; @VariableMgr.VarAttr( name = ENABLE_JOIN_SPILL, diff --git a/regression-test/data/variant_p0/desc.out b/regression-test/data/variant_p0/desc.out index b46b5f9b4b08d8..b3ebce2b887835 100644 --- a/regression-test/data/variant_p0/desc.out +++ b/regression-test/data/variant_p0/desc.out @@ -198,3 +198,11 @@ v.金额 SMALLINT Yes false \N NONE k BIGINT Yes true \N v VARIANT Yes false \N NONE +-- !sql15 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a TINYINT Yes false \N NONE +v.b TINYINT Yes false \N NONE +v.c TINYINT Yes false \N NONE +v.d TINYINT Yes false \N NONE + diff --git a/regression-test/suites/variant_p0/desc.groovy b/regression-test/suites/variant_p0/desc.groovy index 78c9f078c9f2e5..ee2607756b80d2 100644 --- a/regression-test/suites/variant_p0/desc.groovy +++ b/regression-test/suites/variant_p0/desc.groovy @@ -235,6 +235,26 @@ suite("regression_test_variant_desc", "nonConcurrent"){ sql """ insert into ${table_name} values (0, '100')""" sql """set describe_extend_variant_column = true""" qt_sql_12 """desc ${table_name}""" + + + // desc with large tablets + table_name = "large_tablets" + create_table_partition.call(table_name, "200") + sql """insert into large_tablets values (1, '{"a" : 10}')""" + sql """insert into large_tablets values (3001, '{"b" : 10}')""" + sql """insert into large_tablets values (50001, '{"c" : 10}')""" + sql """insert into large_tablets values (99999, '{"d" : 10}')""" + sql """set max_fetch_remote_schema_tablet_count = 2""" + sql "desc large_tablets" + sql """set max_fetch_remote_schema_tablet_count = 128""" + sql "desc large_tablets" + sql """set max_fetch_remote_schema_tablet_count = 512""" + sql "desc large_tablets" + sql """set max_fetch_remote_schema_tablet_count = 2048""" + qt_sql15 "desc large_tablets" + + sql "truncate table large_tablets" + sql "desc large_tablets" } finally { // reset flags set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95")