Skip to content

Commit

Permalink
new distribute planner
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jun 20, 2024
1 parent a0201ac commit 310c64c
Show file tree
Hide file tree
Showing 65 changed files with 3,889 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ planType
| OPTIMIZED | PHYSICAL // same type
| SHAPE
| MEMO
| DISTRIBUTED
| ALL // default type
;

Expand Down
7 changes: 6 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/common/Id.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Integer ids that cannot accidentally be compared with ints.
*/
public class Id<IdType extends Id<IdType>> {
public class Id<IdType extends Id<IdType>> implements Comparable<Id<IdType>> {
protected final int id;

public Id(int id) {
Expand Down Expand Up @@ -62,4 +62,9 @@ public ArrayList<IdType> asList() {
public String toString() {
return Integer.toString(id);
}

@Override
public int compareTo(Id<IdType> idTypeId) {
return id - idTypeId.id;
}
}
24 changes: 24 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,28 @@ public <C extends NodeType> C findFirstOf(Class<C> cl) {
return null;
}

/** anyMatch */
public boolean anyMatch(Predicate<TreeNode<? extends NodeType>> func) {
if (func.apply(this)) {
return true;
}

for (NodeType child : children) {
if (child.anyMatch(func)) {
return true;
}
}
return false;
}

/** foreachDown */
public void foreachDown(Predicate<TreeNode<NodeType>> visitor) {
if (!visitor.test(this)) {
return;
}

for (TreeNode<NodeType> child : getChildren()) {
child.foreachDown(visitor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class SummaryProfile {
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate Time";
public static final String NEREIDS_DISTRIBUTE_TIME = "Nereids Distribute Time";

public static final String FRAGMENT_COMPRESSED_SIZE = "Fragment Compressed Size";
public static final String FRAGMENT_RPC_COUNT = "Fragment RPC Count";
Expand Down Expand Up @@ -199,6 +200,7 @@ public class SummaryProfile {
private long nereidsRewriteFinishTime = -1;
private long nereidsOptimizeFinishTime = -1;
private long nereidsTranslateFinishTime = -1;
private long nereidsDistributeFinishTime = -1;
// timestamp of query begin
private long queryBeginTime = -1;
// Analysis end time
Expand Down Expand Up @@ -315,6 +317,7 @@ private void updateExecutionSummaryProfile() {
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime());
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime());
executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME, getPrettyNereidsTranslateTime());
executionSummaryProfile.addInfoString(NEREIDS_DISTRIBUTE_TIME, getPrettyNereidsDistributeTime());
executionSummaryProfile.addInfoString(ANALYSIS_TIME,
getPrettyTime(queryAnalysisFinishTime, queryBeginTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(PLAN_TIME,
Expand Down Expand Up @@ -419,6 +422,10 @@ public void setNereidsTranslateTime() {
this.nereidsTranslateFinishTime = TimeUtils.getStartTimeMs();
}

public void setNereidsDistributeTime() {
this.nereidsDistributeFinishTime = TimeUtils.getStartTimeMs();
}

public void setQueryBeginTime() {
this.queryBeginTime = TimeUtils.getStartTimeMs();
}
Expand Down Expand Up @@ -654,6 +661,10 @@ public String getPrettyNereidsTranslateTime() {
return getPrettyTime(nereidsTranslateFinishTime, nereidsOptimizeFinishTime, TUnit.TIME_MS);
}

public String getPrettyNereidsDistributeTime() {
return getPrettyTime(nereidsDistributeFinishTime, nereidsTranslateFinishTime, TUnit.TIME_MS);
}

private String getPrettyGetPartitionVersionTime() {
if (getPartitionVersionTime == 0) {
return "N/A";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
Expand All @@ -70,6 +73,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.cache.CacheAnalyzer;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -102,6 +106,7 @@ public class NereidsPlanner extends Planner {
private Plan rewrittenPlan;
private Plan optimizedPlan;
private PhysicalPlan physicalPlan;
private FragmentIdMapping<DistributedPlan> distributedPlans;
// The cost of optimized plan
private double cost = 0;
private LogicalPlanAdapter logicalPlanAdapter;
Expand Down Expand Up @@ -130,17 +135,18 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
LogicalPlan parsedPlan = logicalPlanAdapter.getLogicalPlan();
NereidsTracer.logImportantTime("EndParsePlan");
setParsedPlan(parsedPlan);

PhysicalProperties requireProperties = buildInitRequireProperties();
statementContext.getStopwatch().start();
boolean showPlanProcess = showPlanProcess(queryStmt.getExplainOptions());
Plan resultPlan = plan(parsedPlan, requireProperties, explainLevel, showPlanProcess);
statementContext.getStopwatch().stop();
setOptimizedPlan(resultPlan);
if (explainLevel.isPlanLevel) {
return;

if (resultPlan instanceof PhysicalPlan) {
physicalPlan = (PhysicalPlan) resultPlan;
distribute(physicalPlan, explainLevel);
}
physicalPlan = (PhysicalPlan) resultPlan;
translate(physicalPlan);
}

@VisibleForTesting
Expand Down Expand Up @@ -315,7 +321,7 @@ private void optimize() {
}
}

private void translate(PhysicalPlan resultPlan) throws UserException {
private void splitFragments(PhysicalPlan resultPlan) throws UserException {
if (resultPlan instanceof PhysicalSqlCache) {
return;
}
Expand Down Expand Up @@ -360,6 +366,27 @@ private void translate(PhysicalPlan resultPlan) throws UserException {
ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
}

private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) throws UserException {
boolean canUseNereidsDistributePlanner = SessionVariable.canUseNereidsDistributePlanner();
if ((!canUseNereidsDistributePlanner && explainLevel.isPlanLevel)) {
return;
} else if ((canUseNereidsDistributePlanner && explainLevel.isPlanLevel
&& (explainLevel != ExplainLevel.ALL_PLAN && explainLevel != ExplainLevel.DISTRIBUTED_PLAN))) {
return;
}

splitFragments(physicalPlan);

if (!canUseNereidsDistributePlanner) {
return;
}

distributedPlans = new DistributePlanner(fragments).plan();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsDistributeTime();
}
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}
Expand Down Expand Up @@ -498,6 +525,17 @@ public String getExplainString(ExplainOptions explainOptions) {
+ "\n\n========== MATERIALIZATIONS ==========\n"
+ materializationStringBuilder;
break;
case DISTRIBUTED_PLAN:
StringBuilder distributedPlanStringBuilder = new StringBuilder();

distributedPlanStringBuilder.append("========== DISTRIBUTED PLAN ==========\n");
if (distributedPlans == null || distributedPlans.isEmpty()) {
plan = "Distributed plan not generated, please set enable_nereids_distribute_planner "
+ "and enable_pipeline_x_engine to true";
} else {
plan += DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) + "\n\n";
}
break;
case ALL_PLAN:
plan = "========== PARSED PLAN "
+ getTimeMetricString(SummaryProfile::getPrettyParseSqlTime) + " ==========\n"
Expand All @@ -510,7 +548,13 @@ public String getExplainString(ExplainOptions explainOptions) {
+ rewrittenPlan.treeString() + "\n\n"
+ "========== OPTIMIZED PLAN "
+ getTimeMetricString(SummaryProfile::getPrettyNereidsOptimizeTime) + " ==========\n"
+ optimizedPlan.treeString();
+ optimizedPlan.treeString() + "\n\n";

if (distributedPlans != null && !distributedPlans.isEmpty()) {
plan += "========== DISTRIBUTED PLAN "
+ getTimeMetricString(SummaryProfile::getPrettyNereidsDistributeTime) + " ==========\n";
plan += DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) + "\n\n";
}
break;
default:
plan = super.getExplainString(explainOptions)
Expand Down Expand Up @@ -681,6 +725,10 @@ public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}

public FragmentIdMapping<DistributedPlan> getDistributedPlans() {
return distributedPlans;
}

public LogicalPlanAdapter getLogicalPlanAdapter() {
return logicalPlanAdapter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3417,6 +3417,9 @@ private ExplainLevel parseExplainPlanType(PlanTypeContext planTypeContext) {
if (planTypeContext.MEMO() != null) {
return ExplainLevel.MEMO_PLAN;
}
if (planTypeContext.DISTRIBUTED() != null) {
return ExplainLevel.DISTRIBUTED_PLAN;
}
return ExplainLevel.ALL_PLAN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -217,8 +218,8 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp
}

@Override
public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
Void context) {
public Boolean visitPhysicalHashJoin(
PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void context) {
Preconditions.checkArgument(children.size() == 2, "children.size() != 2");
Preconditions.checkArgument(childrenProperties.size() == 2);
Preconditions.checkArgument(requiredProperties.size() == 2);
Expand Down Expand Up @@ -248,7 +249,8 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
// check colocate join with scan
return true;
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)) {
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)
&& !SessionVariable.canUseNereidsDistributePlanner()) {
// right anti, right outer, full outer join could not do bucket shuffle join
// TODO remove this after we refactor coordinator
updatedForLeft = Optional.of(calAnotherSideRequired(
Expand Down Expand Up @@ -302,7 +304,8 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL
&& !SessionVariable.canUseNereidsDistributePlanner()) {
// TODO: we must do shuffle on right because coordinator could not do right be selection in this case,
// since it always to check the left most node whether olap scan node.
// after we fix coordinator problem, we could do right to left bucket shuffle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.NereidsException;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.BigIntType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Statistics;
Expand Down Expand Up @@ -93,6 +95,14 @@ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitNumbers(this, context);
}

@Override
public PhysicalProperties getPhysicalProperties() {
if (SessionVariable.canUseNereidsDistributePlanner()) {
return PhysicalProperties.ANY;
}
return super.getPhysicalProperties();
}

@Override
public Numbers withChildren(List<Expression> children) {
Preconditions.checkArgument(children().size() == 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* placeholder id for prepared statement parameters
*/
public class PlaceholderId extends Id<PlaceholderId> implements Comparable<PlaceholderId> {
public class PlaceholderId extends Id<PlaceholderId> {

public PlaceholderId(int id) {
super(id);
Expand Down Expand Up @@ -55,9 +55,4 @@ public boolean equals(Object obj) {
public int hashCode() {
return super.hashCode();
}

@Override
public int compareTo(PlaceholderId o) {
return this.id - o.id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public enum ExplainLevel {
OPTIMIZED_PLAN(true),
SHAPE_PLAN(true),
MEMO_PLAN(true),
DISTRIBUTED_PLAN(true),
ALL_PLAN(true)
;

Expand Down
Loading

0 comments on commit 310c64c

Please sign in to comment.