Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](nereids) New distribute planner #36531

Merged
merged 37 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ec5bee0
new distribute planner
924060929 May 6, 2024
787a0ed
NereidsCoordinator
924060929 Jun 20, 2024
fbedeed
NereidsCoordinator
924060929 Jun 20, 2024
69f436e
NereidsCoordinator
924060929 Jun 20, 2024
9b44f52
NereidsCoordinator
924060929 Jun 20, 2024
8c7e60b
NereidsCoordinator
924060929 Jun 20, 2024
6078055
NereidsCoordinator
924060929 Jun 20, 2024
1fd64db
fix
924060929 Jun 20, 2024
fe45958
fix
924060929 Jun 20, 2024
1e72024
fix
924060929 Jun 20, 2024
a83d486
fix
924060929 Jun 20, 2024
1d8bc62
fix prune bucket with colocate join
924060929 Jun 21, 2024
d175e25
fix local shuffle with bucket shuffle join
924060929 Jun 21, 2024
666d860
add local shuffle test
924060929 Jun 21, 2024
9810bc6
add local shuffle test
924060929 Jun 21, 2024
01169e1
add query_constant test
924060929 Jun 21, 2024
42b1d18
add shuffle test
924060929 Jun 21, 2024
705f455
fix local shuffle with bucket shuffle join
924060929 Jun 21, 2024
e5587b0
fix local shuffle with bucket shuffle join
924060929 Jun 21, 2024
f6941c5
fix local shuffle with bucket shuffle join
924060929 Jun 21, 2024
065e206
add shuffle test
924060929 Jun 22, 2024
b7eb2fd
add shuffle test
924060929 Jun 22, 2024
b3f5ccc
add shuffle test
924060929 Jun 24, 2024
e8410fc
fix unstable ut
924060929 Jun 24, 2024
2fdc16a
fix unstable ut
924060929 Jun 25, 2024
5675455
add profile info and instanceId to AssignedJob
924060929 Jun 25, 2024
8c29e26
fix bucket shuffle join
924060929 Jun 25, 2024
a9961fa
fix
924060929 Jun 25, 2024
4dffa46
fix
924060929 Jun 25, 2024
506105b
fix
924060929 Jun 25, 2024
f4e8e92
fix
924060929 Jun 27, 2024
b0b36fc
fix by comments
924060929 Jun 27, 2024
a411888
fix checkstyle
924060929 Jun 27, 2024
13183ce
fix checkstyle
924060929 Jun 27, 2024
bae5321
fix checkstyle
924060929 Jun 27, 2024
7f9210a
fix checkstyle
924060929 Jun 27, 2024
76f77f4
fix
924060929 Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ planType
| OPTIMIZED | PHYSICAL // same type
| SHAPE
| MEMO
| DISTRIBUTED
| ALL // default type
;

Expand Down
7 changes: 6 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/common/Id.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Integer ids that cannot accidentally be compared with ints.
*/
public class Id<IdType extends Id<IdType>> {
public class Id<IdType extends Id<IdType>> implements Comparable<Id<IdType>> {
protected final int id;

public Id(int id) {
Expand Down Expand Up @@ -62,4 +62,9 @@ public ArrayList<IdType> asList() {
public String toString() {
return Integer.toString(id);
}

@Override
public int compareTo(Id<IdType> idTypeId) {
return id - idTypeId.id;
}
}
25 changes: 25 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,29 @@ public void foreach(ThrowingConsumer<TreeNode<NodeType>> func) throws AnalysisEx
child.foreach(func);
}
}

/** anyMatch */
public boolean anyMatch(Predicate<TreeNode<? extends NodeType>> func) {
if (func.apply(this)) {
return true;
}

for (NodeType child : children) {
if (child.anyMatch(func)) {
return true;
}
}
return false;
}

/** foreachDown */
public void foreachDown(Predicate<TreeNode<NodeType>> visitor) {
if (!visitor.test(this)) {
return;
}

for (TreeNode<NodeType> child : getChildren()) {
child.foreachDown(visitor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.planner.Planner;

Expand Down Expand Up @@ -108,6 +110,14 @@ public synchronized void updateSummary(long startTime, Map<String, String> summa
}
summaryInfo.put(SummaryProfile.PHYSICAL_PLAN,
builder.toString().replace("\n", "\n "));

924060929 marked this conversation as resolved.
Show resolved Hide resolved
FragmentIdMapping<DistributedPlan> distributedPlans = nereidsPlanner.getDistributedPlans();
if (distributedPlans != null) {
summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
DistributedPlan.toString(Lists.newArrayList(distributedPlans.values()))
.replace("\n", "\n ")
);
}
}
summaryProfile.update(summaryInfo);
for (ExecutionProfile executionProfile : executionProfiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class SummaryProfile {
public static final String TRACE_ID = "Trace ID";
public static final String WORKLOAD_GROUP = "Workload Group";
public static final String PHYSICAL_PLAN = "Physical Plan";
public static final String DISTRIBUTED_PLAN = "Distributed Plan";
// Execution Summary
public static final String EXECUTION_SUMMARY_PROFILE_NAME = "Execution Summary";
public static final String ANALYSIS_TIME = "Analysis Time";
Expand Down Expand Up @@ -86,6 +87,7 @@ public class SummaryProfile {
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate Time";
public static final String NEREIDS_DISTRIBUTE_TIME = "Nereids Distribute Time";

public static final String FRAGMENT_COMPRESSED_SIZE = "Fragment Compressed Size";
public static final String FRAGMENT_RPC_COUNT = "Fragment RPC Count";
Expand All @@ -109,6 +111,7 @@ public class SummaryProfile {
public static final ImmutableList<String> SUMMARY_KEYS = new ImmutableList.Builder<String>()
.addAll(SUMMARY_CAPTIONS)
.add(PHYSICAL_PLAN)
.add(DISTRIBUTED_PLAN)
.build();

// The display order of execution summary items.
Expand Down Expand Up @@ -199,6 +202,7 @@ public class SummaryProfile {
private long nereidsRewriteFinishTime = -1;
private long nereidsOptimizeFinishTime = -1;
private long nereidsTranslateFinishTime = -1;
private long nereidsDistributeFinishTime = -1;
// timestamp of query begin
private long queryBeginTime = -1;
// Analysis end time
Expand Down Expand Up @@ -315,6 +319,7 @@ private void updateExecutionSummaryProfile() {
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime());
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime());
executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME, getPrettyNereidsTranslateTime());
executionSummaryProfile.addInfoString(NEREIDS_DISTRIBUTE_TIME, getPrettyNereidsDistributeTime());
executionSummaryProfile.addInfoString(ANALYSIS_TIME,
getPrettyTime(queryAnalysisFinishTime, queryBeginTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(PLAN_TIME,
Expand Down Expand Up @@ -419,6 +424,10 @@ public void setNereidsTranslateTime() {
this.nereidsTranslateFinishTime = TimeUtils.getStartTimeMs();
}

public void setNereidsDistributeTime() {
this.nereidsDistributeFinishTime = TimeUtils.getStartTimeMs();
}

public void setQueryBeginTime() {
this.queryBeginTime = TimeUtils.getStartTimeMs();
}
Expand Down Expand Up @@ -654,6 +663,10 @@ public String getPrettyNereidsTranslateTime() {
return getPrettyTime(nereidsTranslateFinishTime, nereidsOptimizeFinishTime, TUnit.TIME_MS);
}

public String getPrettyNereidsDistributeTime() {
return getPrettyTime(nereidsDistributeFinishTime, nereidsTranslateFinishTime, TUnit.TIME_MS);
}

private String getPrettyGetPartitionVersionTime() {
if (getPartitionVersionTime == 0) {
return "N/A";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
Expand All @@ -70,6 +73,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.cache.CacheAnalyzer;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -102,6 +106,7 @@ public class NereidsPlanner extends Planner {
private Plan rewrittenPlan;
private Plan optimizedPlan;
private PhysicalPlan physicalPlan;
private FragmentIdMapping<DistributedPlan> distributedPlans;
// The cost of optimized plan
private double cost = 0;
private LogicalPlanAdapter logicalPlanAdapter;
Expand Down Expand Up @@ -130,17 +135,18 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
LogicalPlan parsedPlan = logicalPlanAdapter.getLogicalPlan();
NereidsTracer.logImportantTime("EndParsePlan");
setParsedPlan(parsedPlan);

PhysicalProperties requireProperties = buildInitRequireProperties();
statementContext.getStopwatch().start();
boolean showPlanProcess = showPlanProcess(queryStmt.getExplainOptions());
Plan resultPlan = plan(parsedPlan, requireProperties, explainLevel, showPlanProcess);
statementContext.getStopwatch().stop();
setOptimizedPlan(resultPlan);
if (explainLevel.isPlanLevel) {
return;

if (resultPlan instanceof PhysicalPlan) {
924060929 marked this conversation as resolved.
Show resolved Hide resolved
physicalPlan = (PhysicalPlan) resultPlan;
distribute(physicalPlan, explainLevel);
}
physicalPlan = (PhysicalPlan) resultPlan;
translate(physicalPlan);
}

@VisibleForTesting
Expand Down Expand Up @@ -315,7 +321,7 @@ private void optimize() {
}
}

private void translate(PhysicalPlan resultPlan) throws UserException {
private void splitFragments(PhysicalPlan resultPlan) throws UserException {
if (resultPlan instanceof PhysicalSqlCache) {
return;
}
Expand Down Expand Up @@ -360,6 +366,27 @@ private void translate(PhysicalPlan resultPlan) throws UserException {
ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
}

private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) throws UserException {
boolean canUseNereidsDistributePlanner = SessionVariable.canUseNereidsDistributePlanner();
if ((!canUseNereidsDistributePlanner && explainLevel.isPlanLevel)) {
return;
} else if ((canUseNereidsDistributePlanner && explainLevel.isPlanLevel
&& (explainLevel != ExplainLevel.ALL_PLAN && explainLevel != ExplainLevel.DISTRIBUTED_PLAN))) {
return;
}

splitFragments(physicalPlan);

if (!canUseNereidsDistributePlanner) {
return;
}

distributedPlans = new DistributePlanner(fragments).plan();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsDistributeTime();
}
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}
Expand Down Expand Up @@ -498,6 +525,17 @@ public String getExplainString(ExplainOptions explainOptions) {
+ "\n\n========== MATERIALIZATIONS ==========\n"
+ materializationStringBuilder;
break;
case DISTRIBUTED_PLAN:
StringBuilder distributedPlanStringBuilder = new StringBuilder();

distributedPlanStringBuilder.append("========== DISTRIBUTED PLAN ==========\n");
if (distributedPlans == null || distributedPlans.isEmpty()) {
plan = "Distributed plan not generated, please set enable_nereids_distribute_planner "
+ "and enable_pipeline_x_engine to true";
} else {
plan += DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) + "\n\n";
}
break;
case ALL_PLAN:
plan = "========== PARSED PLAN "
+ getTimeMetricString(SummaryProfile::getPrettyParseSqlTime) + " ==========\n"
Expand All @@ -510,7 +548,13 @@ public String getExplainString(ExplainOptions explainOptions) {
+ rewrittenPlan.treeString() + "\n\n"
+ "========== OPTIMIZED PLAN "
+ getTimeMetricString(SummaryProfile::getPrettyNereidsOptimizeTime) + " ==========\n"
+ optimizedPlan.treeString();
+ optimizedPlan.treeString() + "\n\n";

if (distributedPlans != null && !distributedPlans.isEmpty()) {
plan += "========== DISTRIBUTED PLAN "
+ getTimeMetricString(SummaryProfile::getPrettyNereidsDistributeTime) + " ==========\n";
plan += DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) + "\n\n";
}
break;
default:
plan = super.getExplainString(explainOptions)
Expand Down Expand Up @@ -681,6 +725,10 @@ public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}

public FragmentIdMapping<DistributedPlan> getDistributedPlans() {
return distributedPlans;
}

public LogicalPlanAdapter getLogicalPlanAdapter() {
return logicalPlanAdapter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3451,6 +3451,9 @@ private ExplainLevel parseExplainPlanType(PlanTypeContext planTypeContext) {
if (planTypeContext.MEMO() != null) {
return ExplainLevel.MEMO_PLAN;
}
if (planTypeContext.DISTRIBUTED() != null) {
return ExplainLevel.DISTRIBUTED_PLAN;
}
return ExplainLevel.ALL_PLAN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -213,12 +214,12 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp
|| joinType == JoinType.FULL_OUTER_JOIN);
boolean isSpecInScope = (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
|| rightHashSpec.getShuffleType() == ShuffleType.NATURAL);
return isJoinTypeInScope && isSpecInScope;
return isJoinTypeInScope && isSpecInScope && !SessionVariable.canUseNereidsDistributePlanner();
}

@Override
public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
Void context) {
public Boolean visitPhysicalHashJoin(
PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void context) {
Preconditions.checkArgument(children.size() == 2, "children.size() != 2");
Preconditions.checkArgument(childrenProperties.size() == 2);
Preconditions.checkArgument(requiredProperties.size() == 2);
Expand Down Expand Up @@ -303,13 +304,24 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
// TODO: we must do shuffle on right because coordinator could not do right be selection in this case,
// since it always to check the left most node whether olap scan node.
// after we fix coordinator problem, we could do right to left bucket shuffle
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
if (SessionVariable.canUseNereidsDistributePlanner()) {
// nereids coordinator can exchange left side to right side to do bucket shuffle join
// TODO: maybe we should check if left child is PhysicalDistribute.
// If so add storage bucketed shuffle on left side. Other wise,
// add execution bucketed shuffle on right side.
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
924060929 marked this conversation as resolved.
Show resolved Hide resolved
} else {
// legacy coordinator could not do right be selection in this case,
// since it always to check the left most node whether olap scan node.
// so we can only shuffle right to left side to do normal shuffle join
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
}
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
&& rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) {
if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
Expand Down Expand Up @@ -537,20 +549,20 @@ private List<ExprId> calAnotherSideRequiredShuffleIds(DistributionSpecHash notSh
* calAnotherSideRequiredShuffleIds's comment.
*
* @param shuffleType real output shuffle type
* @param notShuffleSideOutput not shuffle side real output used hash spec
* @param shuffleSideOutput shuffle side real output used hash spec
* @param notShuffleSideRequired not shuffle side required used hash spec
* @param shuffleSideRequired shuffle side required hash spec
* @param notNeedShuffleSideOutput not shuffle side real output used hash spec
* @param needShuffleSideOutput shuffle side real output used hash spec
* @param notNeedShuffleSideRequired not shuffle side required used hash spec
* @param needShuffleSideRequired shuffle side required hash spec
* @return shuffle side new required hash spec
*/
private PhysicalProperties calAnotherSideRequired(ShuffleType shuffleType,
DistributionSpecHash notShuffleSideOutput, DistributionSpecHash shuffleSideOutput,
DistributionSpecHash notShuffleSideRequired, DistributionSpecHash shuffleSideRequired) {
List<ExprId> shuffleSideIds = calAnotherSideRequiredShuffleIds(notShuffleSideOutput,
notShuffleSideRequired, shuffleSideRequired);
DistributionSpecHash notNeedShuffleSideOutput, DistributionSpecHash needShuffleSideOutput,
DistributionSpecHash notNeedShuffleSideRequired, DistributionSpecHash needShuffleSideRequired) {
List<ExprId> shuffleSideIds = calAnotherSideRequiredShuffleIds(notNeedShuffleSideOutput,
notNeedShuffleSideRequired, needShuffleSideRequired);
return new PhysicalProperties(new DistributionSpecHash(shuffleSideIds, shuffleType,
shuffleSideOutput.getTableId(), shuffleSideOutput.getSelectedIndexId(),
shuffleSideOutput.getPartitionIds()));
needShuffleSideOutput.getTableId(), needShuffleSideOutput.getSelectedIndexId(),
needShuffleSideOutput.getPartitionIds()));
}

private void updateChildEnforceAndCost(int index, PhysicalProperties targetProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
Expand Down Expand Up @@ -113,6 +114,9 @@ public boolean nullable() {
}

public PhysicalProperties getPhysicalProperties() {
if (SessionVariable.canUseNereidsDistributePlanner()) {
return PhysicalProperties.ANY;
}
return PhysicalProperties.STORAGE_ANY;
}

Expand Down
Loading
Loading