Skip to content

Commit

Permalink
[feature](nereids) judge if the join is at bottom of join cluster (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly authored and pull[bot] committed Feb 26, 2024
1 parent abf5aac commit eb04f55
Show file tree
Hide file tree
Showing 27 changed files with 499 additions and 481 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.statistics.StatisticsBuilder;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -619,7 +620,7 @@ public void updateConsumerStats(CTEId cteId, Statistics statistics) {
List<Pair<Map<Slot, Slot>, Group>> consumerGroups = this.statementContext.getCteIdToConsumerGroup().get(cteId);
for (Pair<Map<Slot, Slot>, Group> p : consumerGroups) {
Map<Slot, Slot> producerSlotToConsumerSlot = p.first;
Statistics updatedConsumerStats = new Statistics(statistics);
Statistics updatedConsumerStats = new StatisticsBuilder(statistics).build();
for (Entry<Expression, ColumnStatistic> entry : statistics.columnStatistics().entrySet()) {
updatedConsumerStats.addColumnStats(producerSlotToConsumerSlot.get(entry.getKey()), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,22 @@ public Map<NamedExpression, Pair<PhysicalRelation, Slot>> getAliasTransferMap()
return aliasTransferMap;
}

public Pair<PhysicalRelation, Slot> aliasTransferMapRemove(NamedExpression slot) {
return aliasTransferMap.remove(slot);
}

public Pair<PhysicalRelation, Slot> getAliasTransferPair(NamedExpression slot) {
return aliasTransferMap.get(slot);
}

public Pair<PhysicalRelation, Slot> aliasTransferMapPut(NamedExpression slot, Pair<PhysicalRelation, Slot> pair) {
return aliasTransferMap.put(slot, pair);
}

public boolean aliasTransferMapContains(NamedExpression slot) {
return aliasTransferMap.containsKey(slot);
}

public Map<Slot, ScanNode> getScanNodeOfLegacyRuntimeFilterTarget() {
return scanNodeOfLegacyRuntimeFilterTarget;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? ext
join.left().accept(this, context);
if (RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) {
join.right().getOutput().forEach(slot ->
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
context.getRuntimeFilterContext().aliasTransferMapRemove(slot));
}
collectPushDownCTEInfos(join, context);
if (!getPushDownCTECandidates(ctx).isEmpty()) {
Expand All @@ -136,7 +136,7 @@ public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? ext
@Override
public PhysicalCTEConsumer visitPhysicalCTEConsumer(PhysicalCTEConsumer scan, CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, Pair.of(scan, slot)));
scan.getOutput().forEach(slot -> ctx.aliasTransferMapPut(slot, Pair.of(scan, slot)));
return scan;
}

Expand All @@ -158,7 +158,6 @@ private void generateBitMapRuntimeFilterForNLJ(PhysicalNestedLoopJoin<? extends
if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN && join.getJoinType() != JoinType.CROSS_JOIN) {
return;
}
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
List<Slot> leftSlots = join.left().getOutput();
List<Slot> rightSlots = join.right().getOutput();
List<Expression> bitmapRuntimeFilterConditions = JoinUtils.extractBitmapRuntimeFilterConditions(leftSlots,
Expand All @@ -183,15 +182,15 @@ private void generateBitMapRuntimeFilterForNLJ(PhysicalNestedLoopJoin<? extends
if (!checkPushDownPreconditionsForJoin(join, ctx, targetSlot)) {
continue;
}
Slot scanSlot = aliasTransferMap.get(targetSlot).second;
PhysicalRelation scan = aliasTransferMap.get(targetSlot).first;
Slot scanSlot = ctx.getAliasTransferPair(targetSlot).second;
PhysicalRelation scan = ctx.getAliasTransferPair(targetSlot).first;
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
bitmapContains.child(0), ImmutableList.of(scanSlot),
ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L);
scan.addAppliedRuntimeFilter(filter);
ctx.addJoinToTargetMap(join, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first,
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair(targetSlot).first,
scanSlot);
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
}
Expand Down Expand Up @@ -246,7 +245,6 @@ private TMinMaxRuntimeFilterType getMinMaxType(ComparisonPredicate compare) {
*/
private void generateMinMaxRuntimeFilter(AbstractPhysicalJoin<? extends Plan, ? extends Plan> join,
RuntimeFilterContext ctx) {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
int hashCondionSize = join.getHashJoinConjuncts().size();
for (int idx = 0; idx < join.getOtherJoinConjuncts().size(); idx++) {
int exprOrder = idx + hashCondionSize;
Expand All @@ -257,7 +255,7 @@ private void generateMinMaxRuntimeFilter(AbstractPhysicalJoin<? extends Plan, ?
if (unwrappedSlot == null) {
continue;
}
Pair<PhysicalRelation, Slot> pair = aliasTransferMap.get(unwrappedSlot);
Pair<PhysicalRelation, Slot> pair = ctx.getAliasTransferPair(unwrappedSlot);
if (pair == null) {
continue;
}
Expand Down Expand Up @@ -286,7 +284,7 @@ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends

if (RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) {
join.right().getOutput().forEach(slot ->
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
context.getRuntimeFilterContext().aliasTransferMapRemove(slot));
return join;
}
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Expand All @@ -310,19 +308,19 @@ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends
@Override
public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> project, CascadesContext context) {
project.child().accept(this, context);
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap
= context.getRuntimeFilterContext().getAliasTransferMap();
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
// change key when encounter alias.
// TODO: same action will be taken for set operation
for (Expression expression : project.getProjects()) {
if (expression.children().isEmpty()) {
continue;
}
Expression expr = ExpressionUtils.getExpressionCoveredByCast(expression.child(0));
if (expr instanceof NamedExpression && aliasTransferMap.containsKey((NamedExpression) expr)) {
if (expr instanceof NamedExpression
&& ctx.aliasTransferMapContains((NamedExpression) expr)) {
if (expression instanceof Alias) {
Alias alias = ((Alias) expression);
aliasTransferMap.put(alias.toSlot(), aliasTransferMap.get(expr));
ctx.aliasTransferMapPut(alias.toSlot(), ctx.getAliasTransferPair((NamedExpression) expr));
}
}
}
Expand All @@ -340,7 +338,7 @@ public Plan visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, C
public PhysicalRelation visitPhysicalRelation(PhysicalRelation relation, CascadesContext context) {
// add all the slots in map.
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
relation.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, Pair.of(relation, slot)));
relation.getOutput().forEach(slot -> ctx.aliasTransferMapPut(slot, Pair.of(relation, slot)));
return relation;
}

Expand Down Expand Up @@ -579,16 +577,15 @@ private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {

private void doPushDownIntoCTEProducerInternal(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, PhysicalCTEProducer cteProducer) {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
Slot unwrappedSlot = checkTargetChild(equalTo.left());
// aliasTransMap doesn't contain the key, means that the path from the scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!checkPushDownPreconditionsForJoin(join, ctx, unwrappedSlot)) {
return;
}
Slot cteSlot = aliasTransferMap.get(unwrappedSlot).second;
PhysicalRelation cteNode = aliasTransferMap.get(unwrappedSlot).first;
Slot cteSlot = ctx.getAliasTransferPair(unwrappedSlot).second;
PhysicalRelation cteNode = ctx.getAliasTransferPair(unwrappedSlot).first;
long buildSideNdv = getBuildSideNdv(join, equalTo);
if (cteNode instanceof PhysicalCTEConsumer && inputPlanNode instanceof PhysicalProject) {
PhysicalProject project = (PhysicalProject) inputPlanNode;
Expand All @@ -608,7 +605,7 @@ private void doPushDownIntoCTEProducerInternal(PhysicalHashJoin<? extends Plan,
return;
} else {
Map<Slot, PhysicalRelation> pushDownBasicTableInfos = getPushDownBasicTablesInfos(project,
(SlotReference) targetExpr, aliasTransferMap);
(SlotReference) targetExpr, ctx);
if (!pushDownBasicTableInfos.isEmpty()) {
List<Slot> targetList = new ArrayList<>();
List<PhysicalRelation> targetNodes = new ArrayList<>();
Expand Down Expand Up @@ -642,7 +639,7 @@ public PhysicalPlan visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, Cascade
topN.child().accept(this, context);
PhysicalPlan child = (PhysicalPlan) topN.child();
for (Slot slot : child.getOutput()) {
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot);
context.getRuntimeFilterContext().aliasTransferMapRemove(slot);
}
return topN;
}
Expand All @@ -652,7 +649,7 @@ public PhysicalPlan visitPhysicalWindow(PhysicalWindow<? extends Plan> window, C
window.child().accept(this, context);
Set<SlotReference> commonPartitionKeys = window.getCommonPartitionKeyFromWindowExpressions();
window.child().getOutput().stream().filter(slot -> !commonPartitionKeys.contains(slot)).forEach(
slot -> context.getRuntimeFilterContext().getAliasTransferMap().remove(slot)
slot -> context.getRuntimeFilterContext().aliasTransferMapRemove(slot)
);
return window;
}
Expand All @@ -662,8 +659,7 @@ public PhysicalPlan visitPhysicalWindow(PhysicalWindow<? extends Plan> window, C
*/
public static boolean checkPushDownPreconditionsForJoin(AbstractPhysicalJoin physicalJoin,
RuntimeFilterContext ctx, Slot slot) {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
if (slot == null || !aliasTransferMap.containsKey(slot)) {
if (slot == null || !ctx.aliasTransferMapContains(slot)) {
return false;
} else if (DENIED_JOIN_TYPES.contains(physicalJoin.getJoinType()) || physicalJoin.isMarkJoin()) {
return false;
Expand Down Expand Up @@ -695,12 +691,12 @@ private boolean checkCanPushDownIntoBasicTable(PhysicalPlan root) {
}

private Map<Slot, PhysicalRelation> getPushDownBasicTablesInfos(PhysicalPlan root, SlotReference slot,
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap) {
RuntimeFilterContext ctx) {
Map<Slot, PhysicalRelation> basicTableInfos = new HashMap<>();
Set<PhysicalHashJoin> joins = new HashSet<>();
ExprId exprId = slot.getExprId();
if (aliasTransferMap.get(slot) != null) {
basicTableInfos.put(slot, aliasTransferMap.get(slot).first);
if (ctx.getAliasTransferPair(slot) != null) {
basicTableInfos.put(slot, ctx.getAliasTransferPair(slot).first);
}
// try to find propagation condition from join
getAllJoinInfo(root, joins);
Expand All @@ -710,13 +706,13 @@ private Map<Slot, PhysicalRelation> getPushDownBasicTablesInfos(PhysicalPlan roo
if (equalTo instanceof EqualTo) {
SlotReference leftSlot = (SlotReference) ((EqualTo) equalTo).left();
SlotReference rightSlot = (SlotReference) ((EqualTo) equalTo).right();
if (leftSlot.getExprId() == exprId && aliasTransferMap.get(rightSlot) != null) {
PhysicalRelation rightTable = aliasTransferMap.get(rightSlot).first;
if (leftSlot.getExprId() == exprId && ctx.getAliasTransferPair(rightSlot) != null) {
PhysicalRelation rightTable = ctx.getAliasTransferPair(rightSlot).first;
if (rightTable != null) {
basicTableInfos.put(rightSlot, rightTable);
}
} else if (rightSlot.getExprId() == exprId && aliasTransferMap.get(leftSlot) != null) {
PhysicalRelation leftTable = aliasTransferMap.get(leftSlot).first;
} else if (rightSlot.getExprId() == exprId && ctx.getAliasTransferPair(leftSlot) != null) {
PhysicalRelation leftTable = ctx.getAliasTransferPair(leftSlot).first;
if (leftTable != null) {
basicTableInfos.put(leftSlot, leftTable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,13 @@ public static boolean isZigZagJoin(LogicalJoin<GroupPlan, GroupPlan> join) {
}

private static boolean containJoin(GroupPlan groupPlan) {
// TODO: tmp way to judge containJoin
List<Slot> output = groupPlan.getOutput();
return !output.stream().map(Slot::getQualifier).allMatch(output.get(0).getQualifier()::equals);
if (groupPlan.getGroup().getStatistics() != null) {
return groupPlan.getGroup().getStatistics().getWidthInJoinCluster() > 1;
} else {
// tmp way to judge containJoin, just used for test case where stats is null
List<Slot> output = groupPlan.getOutput();
return !output.stream().map(Slot::getQualifier).allMatch(output.get(0).getQualifier()::equals);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ A not in (1, 2, 3, 100):
}
}
compareExprStatsBuilder.setNumNulls(0);
Statistics estimated = new Statistics(context.statistics);
Statistics estimated = new StatisticsBuilder(context.statistics).build();
ColumnStatistic stats = compareExprStatsBuilder.build();
selectivity = getNotNullSelectivity(stats, selectivity);
estimated = estimated.withSel(selectivity);
Expand Down
Loading

0 comments on commit eb04f55

Please sign in to comment.