Skip to content

Commit

Permalink
[Optimize] Add session variable `max_fetch_remote_schema_tablet_count…
Browse files Browse the repository at this point in the history
…` to limit tablets size for remote schema fetch (apache#37217)

Describing tables with many partitions and tablets can cause high CPU
usage. To mitigate this, we estimate and pick sample tablets for schema
fetch, reducing the overall cost.
  • Loading branch information
eldenmoon committed Jul 9, 2024
1 parent 1e3ab0f commit 7c7676e
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 1 deletion.
50 changes: 50 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -2650,6 +2651,55 @@ public List<Tablet> getAllTablets() throws AnalysisException {
return tablets;
}

// Get sample tablets for remote desc schema
// 1. Estimate tablets for a partition, 1 at least
// 2. Pick the partition sorted with id in desc order, greater id with the newest partition
// 3. Truncate to sampleSize
public List<Tablet> getSampleTablets(int sampleSize) {
List<Tablet> sampleTablets = new ArrayList<>();
// Filter partition with empty data
Collection<Partition> partitions = getPartitions()
.stream()
.filter(partition -> partition.getVisibleVersion() > Partition.PARTITION_INIT_VERSION)
.collect(Collectors.toList());
if (partitions.isEmpty()) {
return sampleTablets;
}
// 1. Estimate tablets for a partition, 1 at least
int estimatePartitionTablets = Math.max(sampleSize / partitions.size(), 1);

// 2. Sort the partitions by id in descending order (greater id means the newest partition)
List<Partition> sortedPartitions = partitions.stream().sorted(new Comparator<Partition>() {
@Override
public int compare(Partition p1, Partition p2) {
// compare with desc order
return Long.compare(p2.getId(), p1.getId());
}
}).collect(Collectors.toList());

// 3. Collect tablets from partitions
for (Partition partition : sortedPartitions) {
List<Tablet> targetTablets = new ArrayList<>(partition.getBaseIndex().getTablets());
Collections.shuffle(targetTablets);
if (!targetTablets.isEmpty()) {
// Ensure we do not exceed the available number of tablets
int tabletsToFetch = Math.min(targetTablets.size(), estimatePartitionTablets);
sampleTablets.addAll(targetTablets.subList(0, tabletsToFetch));
}

if (sampleTablets.size() >= sampleSize) {
break;
}
}

// 4. Truncate to sample size if needed
if (sampleTablets.size() > sampleSize) {
sampleTablets = sampleTablets.subList(0, sampleSize);
}

return sampleTablets;
}

// During `getNextVersion` and `updateVisibleVersionAndTime` period,
// the write lock on the table should be held continuously
public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersionTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -63,7 +64,8 @@ public ProcResult fetchResult() throws AnalysisException {
table.readLock();
try {
OlapTable olapTable = (OlapTable) table;
tablets = olapTable.getAllTablets();
// Get sample tablets for remote desc schema
tablets = olapTable.getSampleTablets(ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount);
} finally {
table.readUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -62,6 +64,13 @@ public ProcResult fetchResult() throws AnalysisException {
tablets.add(tablet);
}
}
// Get the maximum number of Remote Tablets that can be fetched
int maxFetchCount = ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount;
// If the number of tablets is greater than the maximum fetch count, randomly select maxFetchCount tablets
if (tablets.size() > maxFetchCount) {
Collections.shuffle(tablets);
tablets = tablets.subList(0, maxFetchCount);
}
List<Column> remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch();
this.schema.addAll(remoteSchema);
return IndexSchemaProcNode.createResult(this.schema, this.bfColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public List<Column> fetch() {
Long backendId = entry.getKey();
Set<Long> tabletIds = entry.getValue();
Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
LOG.debug("fetch schema from coord backend {}, sample tablets count {}",
backend.getId(), tabletIds.size());
// only need alive be
if (!backend.isAlive()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS = "fetch_remote_schema_timeout_seconds";

public static final String MAX_FETCH_REMOTE_TABLET_COUNT = "max_fetch_remote_schema_tablet_count";

// CLOUD_VARIABLES_BEGIN
public static final String CLOUD_CLUSTER = "cloud_cluster";
public static final String DISABLE_EMPTY_PARTITION_PRUNE = "disable_empty_partition_prune";
Expand Down Expand Up @@ -1839,6 +1841,9 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
// fetch remote schema rpc timeout
@VariableMgr.VarAttr(name = FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS, fuzzy = true)
public long fetchRemoteSchemaTimeoutSeconds = 120;
// max tablet count for fetch remote schema
@VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true)
public int maxFetchRemoteTabletCount = 512;

@VariableMgr.VarAttr(
name = ENABLE_JOIN_SPILL,
Expand Down
8 changes: 8 additions & 0 deletions regression-test/data/variant_p0/desc.out
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,11 @@ v.金额 SMALLINT Yes false \N NONE
k BIGINT Yes true \N
v VARIANT Yes false \N NONE

-- !sql15 --
k BIGINT Yes true \N
v VARIANT Yes false \N NONE
v.a TINYINT Yes false \N NONE
v.b TINYINT Yes false \N NONE
v.c TINYINT Yes false \N NONE
v.d TINYINT Yes false \N NONE

20 changes: 20 additions & 0 deletions regression-test/suites/variant_p0/desc.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,26 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """ insert into ${table_name} values (0, '100')"""
sql """set describe_extend_variant_column = true"""
qt_sql_12 """desc ${table_name}"""


// desc with large tablets
table_name = "large_tablets"
create_table_partition.call(table_name, "200")
sql """insert into large_tablets values (1, '{"a" : 10}')"""
sql """insert into large_tablets values (3001, '{"b" : 10}')"""
sql """insert into large_tablets values (50001, '{"c" : 10}')"""
sql """insert into large_tablets values (99999, '{"d" : 10}')"""
sql """set max_fetch_remote_schema_tablet_count = 2"""
sql "desc large_tablets"
sql """set max_fetch_remote_schema_tablet_count = 128"""
sql "desc large_tablets"
sql """set max_fetch_remote_schema_tablet_count = 512"""
sql "desc large_tablets"
sql """set max_fetch_remote_schema_tablet_count = 2048"""
qt_sql15 "desc large_tablets"

sql "truncate table large_tablets"
sql "desc large_tablets"
} finally {
// reset flags
set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95")
Expand Down

0 comments on commit 7c7676e

Please sign in to comment.