Skip to content

Commit

Permalink
[refactor](Coordinator) refactor coordinator (#41730)
Browse files Browse the repository at this point in the history
Use NereidsSqlCoordinator instead of Coordinator because the code of Coordinator is too hard to maintaining

The main design approach is as follows:

1. Divide the original flat Coordinator into multiple modules, with each module maintaining high cohesion.
- `DistributePlanner`: The logic for calculating parallelism has been extracted in #36531, and in the future, we will dynamically calculate parallelism based on cost.
- `CoordinatorContext`: Some global parameters and states related to the Coordinator are encapsulated within CoordinatorContext.
- `PipelineExecutionTask`: The entire scheduling task is encapsulated by PipelineExecutionTask, which includes the mapping relationship between each Backend and Pipeline task. PipelineExecutionTask contains two layers of tasks, each responsible for specific duties, with state maintenance handled internally rather than being centralized in the Coordinator.
  - `MultiFragmentsPipelineTask`: A Backend will generate multiple fragment tasks, which are bundled together and sent concurrently to the corresponding Backend.
  - `SingleFragmentPipelineTask`: A single fragment task for a Backend.
- `JobProcessor`: Describes two types of tasks: SQL tasks and Load tasks.
  - `QueryProcessor`: Represents query tasks and provides a ResultReceiver to obtain query results.
  - `LoadProcessor`: Represents Insert into and Broker load tasks, providing a blocking function to wait for load completion.
- `ThriftPlansBuilder`: Uses the DistributedPlan structure to build thrift parameters and encapsulates some intermediate temporary variables within functions, rather than placing them in the Coordinator.

2. The overall Coordinator logic is more clearly organized. We can see that the NereidsCoordinator consists of only a few functions, allowing quick understanding of the main flow when reading the code.
  - Construct CoordinatorContext.
  - Enqueue the tasks.
  - Handle different sinks accordingly.
  - Register the Coordinator with `QeProcessorImpl` for cancellation and progress tracking.
  - Construct thrift parameters.
  - Build PipelineTask.
  - Initiate RPC calls to each Backend.


TODO:
1. delete old `Coordinator`
2. support cloud mode
  • Loading branch information
924060929 authored Nov 7, 2024
1 parent 50fd997 commit 46e5294
Show file tree
Hide file tree
Showing 168 changed files with 4,676 additions and 462 deletions.
29 changes: 27 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,21 @@
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.NereidsCoordinator;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.GlobalTransactionMgr;
Expand Down Expand Up @@ -134,15 +141,33 @@ public BrokerLoadJob createBrokerLoadJob() {

public Coordinator createCoordinator(ConnectContext context, Analyzer analyzer, Planner planner,
StatsErrorEstimator statsErrorEstimator) {
if (planner instanceof NereidsPlanner && SessionVariable.canUseNereidsDistributePlanner()) {
return new NereidsCoordinator(context, analyzer, (NereidsPlanner) planner, statsErrorEstimator);
}
return new Coordinator(context, analyzer, planner, statsErrorEstimator);
}

// Used for broker load task/export task/update coordinator
public Coordinator createCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable,
List<PlanFragment> fragments, List<ScanNode> scanNodes,
String timezone, boolean loadZeroTolerance, boolean enableProfile) {
return new Coordinator(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance,
enableProfile);
if (SessionVariable.canUseNereidsDistributePlanner()) {
ConnectContext connectContext = new ConnectContext();
connectContext.setQueryId(queryId);
StatementContext statementContext = new StatementContext(
connectContext, new OriginStatement("", 0)
);
DistributePlanner distributePlanner = new DistributePlanner(statementContext, fragments);
FragmentIdMapping<DistributedPlan> distributedPlans = distributePlanner.plan();

return new NereidsCoordinator(
jobId, queryId, descTable, fragments, distributedPlans.valueList(),
scanNodes, timezone, loadZeroTolerance, enableProfile
);
}
return new Coordinator(
jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance, enableProfile
);
}

public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class SummaryProfile {
NEREIDS_REWRITE_TIME,
NEREIDS_OPTIMIZE_TIME,
NEREIDS_TRANSLATE_TIME,
NEREIDS_DISTRIBUTE_TIME,
WORKLOAD_GROUP,
ANALYSIS_TIME,
PLAN_TIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties,
}
}

private Plan planWithoutLock(
protected Plan planWithoutLock(
LogicalPlan plan, ExplainLevel explainLevel,
boolean showPlanProcess, PhysicalProperties requireProperties) {
// resolve column, table and function
Expand Down Expand Up @@ -311,7 +311,7 @@ private Plan planWithoutLock(
return physicalPlan;
}

private LogicalPlan preprocess(LogicalPlan logicalPlan) {
protected LogicalPlan preprocess(LogicalPlan logicalPlan) {
return new PlanPreprocessors(statementContext).process(logicalPlan);
}

Expand All @@ -322,7 +322,7 @@ private void initCascadesContext(LogicalPlan plan, PhysicalProperties requirePro
}
}

private void analyze(boolean showPlanProcess) {
protected void analyze(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start analyze plan");
}
Expand All @@ -337,7 +337,7 @@ private void analyze(boolean showPlanProcess) {
/**
* Logical plan rewrite based on a series of heuristic rules.
*/
private void rewrite(boolean showPlanProcess) {
protected void rewrite(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start rewrite plan");
}
Expand All @@ -349,7 +349,7 @@ private void rewrite(boolean showPlanProcess) {
}

// DependsRules: EnsureProjectOnTopJoin.class
private void optimize() {
protected void optimize() {
if (LOG.isDebugEnabled()) {
LOG.debug("Start optimize plan");
}
Expand All @@ -360,7 +360,7 @@ private void optimize() {
}
}

private void splitFragments(PhysicalPlan resultPlan) {
protected void splitFragments(PhysicalPlan resultPlan) {
if (resultPlan instanceof PhysicalSqlCache) {
return;
}
Expand Down Expand Up @@ -455,7 +455,7 @@ private void splitFragments(PhysicalPlan resultPlan) {
}
}

private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
protected void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
boolean canUseNereidsDistributePlanner = SessionVariable.canUseNereidsDistributePlanner();
if ((!canUseNereidsDistributePlanner && explainLevel.isPlanLevel)) {
return;
Expand All @@ -465,18 +465,21 @@ private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
}

splitFragments(physicalPlan);
doDistribute(canUseNereidsDistributePlanner);
}

protected void doDistribute(boolean canUseNereidsDistributePlanner) {
if (!canUseNereidsDistributePlanner) {
return;
}

distributedPlans = new DistributePlanner(fragments).plan();
distributedPlans = new DistributePlanner(statementContext, fragments).plan();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsDistributeTime();
}
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
protected PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}

Expand Down Expand Up @@ -735,6 +738,10 @@ public CascadesContext getCascadesContext() {
return cascadesContext;
}

public ConnectContext getConnectContext() {
return cascadesContext.getConnectContext();
}

public static PhysicalProperties buildInitRequireProperties() {
return PhysicalProperties.GATHER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,19 @@
import org.apache.doris.qe.ShortCircuitQueryContext;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.system.Backend;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.sparkproject.guava.base.Throwables;

import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -171,6 +172,8 @@ public class StatementContext implements Closeable {

private String disableJoinReorderReason;

private Backend groupCommitMergeBackend;

public StatementContext() {
this(ConnectContext.get(), null, 0);
}
Expand Down Expand Up @@ -568,4 +571,13 @@ public Optional<String> getDisableJoinReorderReason() {
public void setDisableJoinReorderReason(String disableJoinReorderReason) {
this.disableJoinReorderReason = disableJoinReorderReason;
}

public Backend getGroupCommitMergeBackend() {
return groupCommitMergeBackend;
}

public void setGroupCommitMergeBackend(
Backend groupCommitMergeBackend) {
this.groupCommitMergeBackend = groupCommitMergeBackend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ private void ensureChildrenRewritten() {

// some rule return new plan tree, which the number of new plan node > 1,
// we should transform this new plan nodes too.
// NOTICE: this relay on pull up cte anchor
if (isTraverseChildren.test(plan)) {
pushChildrenJobs(plan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ protected final RewriteResult rewrite(Plan plan, List<Rule> rules, RewriteJobCon
}
Plan newPlan = newPlans.get(0);
if (!newPlan.deepEquals(plan)) {
// don't remove this comment, it can help us to trace some bug when developing.

NereidsTracer.logRewriteEvent(rule.toString(), pattern, plan, newPlan);
String traceBefore = null;
if (showPlanProcess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public void execute() {
RewriteJobContext newRewriteJobContext = rewriteJobContext.withChildrenVisited(true);
pushJob(new PlanTreeRewriteTopDownJob(newRewriteJobContext, context, isTraverseChildren, rules));

// NOTICE: this relay on pull up cte anchor
if (isTraverseChildren.test(rewriteJobContext.plan)) {
pushChildrenJobs(newRewriteJobContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,15 @@ private PhysicalProperties computeShuffleJoinOutputProperties(

switch (hashJoin.getJoinType()) {
case INNER_JOIN:
case CROSS_JOIN:
if (shuffleSide == ShuffleSide.LEFT) {
return new PhysicalProperties(DistributionSpecHash.merge(
rightHashSpec, leftHashSpec, outputShuffleType));
return new PhysicalProperties(
DistributionSpecHash.merge(rightHashSpec, leftHashSpec, outputShuffleType)
);
} else {
return new PhysicalProperties(DistributionSpecHash.merge(
leftHashSpec, rightHashSpec, outputShuffleType));
return new PhysicalProperties(
DistributionSpecHash.merge(leftHashSpec, rightHashSpec, outputShuffleType)
);
}
case LEFT_SEMI_JOIN:
case LEFT_ANTI_JOIN:
Expand All @@ -526,12 +529,13 @@ private PhysicalProperties computeShuffleJoinOutputProperties(
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
case RIGHT_OUTER_JOIN:
if (shuffleSide == ShuffleSide.RIGHT) {
return new PhysicalProperties(
rightHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
);
} else {
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
return new PhysicalProperties(rightHashSpec);
} else {
// retain left shuffle type, since coordinator use left most node to schedule fragment
// forbid colocate join, since right table already shuffle
return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin(
leftHashSpec.getShuffleType()));
}
case FULL_OUTER_JOIN:
return PhysicalProperties.createAnyFromHash(leftHashSpec, rightHashSpec);
Expand Down Expand Up @@ -563,6 +567,9 @@ private ShuffleSide computeShuffleSide(DistributionSpecHash leftHashSpec, Distri
case STORAGE_BUCKETED:
// use storage hash to shuffle right to left to do bucket shuffle join
return ShuffleSide.RIGHT;
case EXECUTION_BUCKETED:
// compatible old ut
return ShuffleSide.RIGHT;
default:
}
break;
Expand Down
Loading

0 comments on commit 46e5294

Please sign in to comment.