Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](split) optimize the split manner of hive and hudi #3

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,11 @@ public void createScanRangeLocations() throws UserException {
locationType = getLocationType(fileSplit.getPath().toString());
}
totalFileSize = fileSplit.getLength() * inputSplitsNum;
long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
// Not accurate, only used to estimate concurrency.
int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends();
for (Backend backend : backendPolicy.getBackends()) {
SplitSource splitSource = new SplitSource(backend, splitAssignment);
SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime);
splitSources.add(splitSource);
Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
TScanRangeLocations curLocations = newLocations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,18 @@
public class SplitSource {
private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0);
private static final long WAIT_TIME_OUT = 100; // 100ms
private static final long MAX_WAIT_TIME_OUT = 500; // 500ms

private final long uniqueId;
private final Backend backend;
private final SplitAssignment splitAssignment;
private final AtomicBoolean isLastBatch;
private final long maxWaitTime;

public SplitSource(Backend backend, SplitAssignment splitAssignment) {
public SplitSource(Backend backend, SplitAssignment splitAssignment, long maxWaitTime) {
this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement();
this.backend = backend;
this.splitAssignment = splitAssignment;
this.maxWaitTime = maxWaitTime;
this.isLastBatch = new AtomicBoolean(false);
splitAssignment.registerSource(uniqueId);
}
Expand All @@ -71,7 +72,7 @@ public List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserExcep
return Collections.emptyList();
}
List<TScanRangeLocations> scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize);
long maxTimeOut = 0;
long startTime = System.currentTimeMillis();
while (scanRanges.size() < maxBatchSize) {
BlockingQueue<Collection<TScanRangeLocations>> splits = splitAssignment.getAssignedSplits(backend);
if (splits == null) {
Expand All @@ -81,18 +82,19 @@ public List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserExcep
while (scanRanges.size() < maxBatchSize) {
try {
Collection<TScanRangeLocations> splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS);
if (splitCollection != null) {
scanRanges.addAll(splitCollection);
}
if (!scanRanges.isEmpty() && System.currentTimeMillis() - startTime > maxWaitTime) {
return scanRanges;
}
if (splitCollection == null) {
maxTimeOut += WAIT_TIME_OUT;
break;
}
scanRanges.addAll(splitCollection);
} catch (InterruptedException e) {
throw new UserException("Failed to get next batch of splits", e);
}
}
if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) {
break;
}
}
return scanRanges;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@

@Data
public class TablePartitionValues {
public enum PartitionOrdering {
NATURAL,
REVERSE,
SHUFFLE;

public static PartitionOrdering parse(String ordering) {
for (PartitionOrdering order : PartitionOrdering.values()) {
if (order.name().equalsIgnoreCase(ordering)) {
return order;
}
}
return null;
}
}

public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__";

private final ReadWriteLock readWriteLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.TablePartitionValues.PartitionOrdering;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
Expand All @@ -57,6 +58,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import lombok.Setter;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -137,6 +139,17 @@ protected void doInitialize() throws UserException {
}
}

protected List<HivePartition> orderingPartitions(List<HivePartition> partitions) {
PartitionOrdering ordering = PartitionOrdering.parse(
ConnectContext.get().getSessionVariable().getPartitionOrdering());
if (ordering == PartitionOrdering.REVERSE) {
return Ordering.natural().onResultOf(HivePartition::getPath).reverse().sortedCopy(partitions);
} else if (ordering == PartitionOrdering.SHUFFLE) {
return Ordering.arbitrary().onResultOf(HivePartition::getPath).sortedCopy(partitions);
}
return partitions;
}

protected List<HivePartition> getPartitions() throws AnalysisException {
List<HivePartition> resPartitions = Lists.newArrayList();
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -211,7 +224,7 @@ public List<Split> getSplits() throws UserException {
long start = System.currentTimeMillis();
try {
if (!partitionInit) {
prunedPartitions = getPartitions();
prunedPartitions = orderingPartitions(getPartitions());
partitionInit = true;
}
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
Expand Down Expand Up @@ -289,7 +302,7 @@ public void startSplit() {
public boolean isBatchMode() {
if (!partitionInit) {
try {
prunedPartitions = getPartitions();
prunedPartitions = orderingPartitions(getPartitions());
} catch (Exception e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ public List<Split> getSplits() throws UserException {
return getIncrementalSplits();
}
if (!partitionInit) {
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
prunedPartitions = orderingPartitions(HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
() -> getPrunedPartitions(hudiClient, snapshotTimestamp)));
partitionInit = true;
}
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -448,9 +448,9 @@ public boolean isBatchMode() {
}
if (!partitionInit) {
// Non partition table will get one dummy partition
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
prunedPartitions = orderingPartitions(HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
() -> getPrunedPartitions(hudiClient, snapshotTimestamp)));
partitionInit = true;
}
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
Expand Down
34 changes: 34 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ public class SessionVariable implements Serializable, Writable {

public static final String NUM_PARTITIONS_IN_BATCH_MODE = "num_partitions_in_batch_mode";

public static final String FETCH_SPLITS_MAX_WAIT_TIME = "fetch_splits_max_wait_time";

public static final String PARTITION_ORDERING = "partition_ordering";

/**
* use insert stmt as the unified backend for all loads
*/
Expand Down Expand Up @@ -1461,6 +1465,20 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
needForward = true)
public int numPartitionsInBatchMode = 1024;

@VariableMgr.VarAttr(
name = FETCH_SPLITS_MAX_WAIT_TIME,
description = {"batch方式中BE获取splits的最大等待时间",
"The max wait time of getting splits in batch mode."},
needForward = true)
public long fetchSplitsMaxWaitTime = 4000;

@VariableMgr.VarAttr(
name = PARTITION_ORDERING,
description = {"list partition的排序方式",
"Ordering style of list partition."},
needForward = true)
public String partitionOrdering = "natural";

@VariableMgr.VarAttr(
name = ENABLE_PARQUET_LAZY_MAT,
description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。",
Expand Down Expand Up @@ -2713,6 +2731,22 @@ public void setNumSplitsInBatchMode(int numPartitionsInBatchMode) {
this.numPartitionsInBatchMode = numPartitionsInBatchMode;
}

public long getFetchSplitsMaxWaitTime() {
return fetchSplitsMaxWaitTime;
}

public void setFetchSplitsMaxWaitTime(long fetchSplitsMaxWaitTime) {
this.fetchSplitsMaxWaitTime = fetchSplitsMaxWaitTime;
}

public String getPartitionOrdering() {
return partitionOrdering;
}

public void setPartitionOrdering(String partitionOrdering) {
this.partitionOrdering = partitionOrdering;
}

public boolean isEnableParquetLazyMat() {
return enableParquetLazyMat;
}
Expand Down
Loading