Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize] Add session variable max_fetch_remote_schema_tablet_count to limit tablets size for remote schema fetch #37217

Merged
merged 4 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -2761,6 +2762,55 @@ public List<Tablet> getAllTablets() throws AnalysisException {
return tablets;
}

// Get sample tablets for remote desc schema
// 1. Estimate tablets for a partition, 1 at least
// 2. Pick the partition sorted with id in desc order, greater id with the newest partition
// 3. Truncate to sampleSize
public List<Tablet> getSampleTablets(int sampleSize) {
List<Tablet> sampleTablets = new ArrayList<>();
// Filter partition with empty data
Collection<Partition> partitions = getPartitions()
.stream()
.filter(partition -> partition.getVisibleVersion() > Partition.PARTITION_INIT_VERSION)
.collect(Collectors.toList());
if (partitions.isEmpty()) {
return sampleTablets;
}
// 1. Estimate tablets for a partition, 1 at least
int estimatePartitionTablets = Math.max(sampleSize / partitions.size(), 1);

// 2. Sort the partitions by id in descending order (greater id means the newest partition)
List<Partition> sortedPartitions = partitions.stream().sorted(new Comparator<Partition>() {
@Override
public int compare(Partition p1, Partition p2) {
// compare with desc order
return Long.compare(p2.getId(), p1.getId());
}
}).collect(Collectors.toList());

// 3. Collect tablets from partitions
for (Partition partition : sortedPartitions) {
List<Tablet> targetTablets = new ArrayList<>(partition.getBaseIndex().getTablets());
Collections.shuffle(targetTablets);
if (!targetTablets.isEmpty()) {
// Ensure we do not exceed the available number of tablets
int tabletsToFetch = Math.min(targetTablets.size(), estimatePartitionTablets);
sampleTablets.addAll(targetTablets.subList(0, tabletsToFetch));
}

if (sampleTablets.size() >= sampleSize) {
break;
}
}

// 4. Truncate to sample size if needed
if (sampleTablets.size() > sampleSize) {
sampleTablets = sampleTablets.subList(0, sampleSize);
}

return sampleTablets;
}

// During `getNextVersion` and `updateVisibleVersionAndTime` period,
// the write lock on the table should be held continuously
public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersionTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -62,7 +63,8 @@ public ProcResult fetchResult() throws AnalysisException {
table.readLock();
try {
OlapTable olapTable = (OlapTable) table;
tablets = olapTable.getAllTablets();
// Get sample tablets for remote desc schema
tablets = olapTable.getSampleTablets(ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount);
} finally {
table.readUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -62,6 +64,13 @@ public ProcResult fetchResult() throws AnalysisException {
tablets.add(tablet);
}
}
// Get the maximum number of Remote Tablets that can be fetched
int maxFetchCount = ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount;
// If the number of tablets is greater than the maximum fetch count, randomly select maxFetchCount tablets
if (tablets.size() > maxFetchCount) {
Collections.shuffle(tablets);
tablets = tablets.subList(0, maxFetchCount);
}
List<Column> remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch();
this.schema.addAll(remoteSchema);
return IndexSchemaProcNode.createResult(this.schema, this.bfColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public List<Column> fetch() {
Long backendId = entry.getKey();
Set<Long> tabletIds = entry.getValue();
Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
LOG.debug("fetch schema from coord backend {}, sample tablets count {}",
backend.getId(), tabletIds.size());
// only need alive be
if (!backend.isAlive()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS = "fetch_remote_schema_timeout_seconds";

public static final String MAX_FETCH_REMOTE_TABLET_COUNT = "max_fetch_remote_schema_tablet_count";

// CLOUD_VARIABLES_BEGIN
public static final String CLOUD_CLUSTER = "cloud_cluster";
public static final String DISABLE_EMPTY_PARTITION_PRUNE = "disable_empty_partition_prune";
Expand Down Expand Up @@ -1911,6 +1913,9 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
// fetch remote schema rpc timeout
@VariableMgr.VarAttr(name = FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS, fuzzy = true)
public long fetchRemoteSchemaTimeoutSeconds = 120;
// max tablet count for fetch remote schema
@VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true)
public int maxFetchRemoteTabletCount = 512;

@VariableMgr.VarAttr(
name = ENABLE_JOIN_SPILL,
Expand Down
8 changes: 8 additions & 0 deletions regression-test/data/variant_p0/desc.out
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,11 @@ v.金额 SMALLINT Yes false \N NONE
k BIGINT Yes true \N
v VARIANT Yes false \N NONE

-- !sql15 --
k BIGINT Yes true \N
v VARIANT Yes false \N NONE
v.a TINYINT Yes false \N NONE
v.b TINYINT Yes false \N NONE
v.c TINYINT Yes false \N NONE
v.d TINYINT Yes false \N NONE

20 changes: 20 additions & 0 deletions regression-test/suites/variant_p0/desc.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,26 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """ insert into ${table_name} values (0, '100')"""
sql """set describe_extend_variant_column = true"""
qt_sql_12 """desc ${table_name}"""


// desc with large tablets
table_name = "large_tablets"
create_table_partition.call(table_name, "200")
sql """insert into large_tablets values (1, '{"a" : 10}')"""
sql """insert into large_tablets values (3001, '{"b" : 10}')"""
sql """insert into large_tablets values (50001, '{"c" : 10}')"""
sql """insert into large_tablets values (99999, '{"d" : 10}')"""
sql """set max_fetch_remote_schema_tablet_count = 2"""
sql "desc large_tablets"
sql """set max_fetch_remote_schema_tablet_count = 128"""
sql "desc large_tablets"
sql """set max_fetch_remote_schema_tablet_count = 512"""
sql "desc large_tablets"
sql """set max_fetch_remote_schema_tablet_count = 2048"""
qt_sql15 "desc large_tablets"

sql "truncate table large_tablets"
sql "desc large_tablets"
} finally {
// reset flags
set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95")
Expand Down
Loading