Skip to content

Commit

Permalink
[fix](mtmv) Fix compensate union all wrongly when query rewrite by ma…
Browse files Browse the repository at this point in the history
…terialized view apache#40803 (apache#42019)

## Proposed changes

pr: apache#40803
commitId: d7e5d46
  • Loading branch information
seawinde authored Oct 18, 2024
1 parent cec0458 commit 28066a0
Show file tree
Hide file tree
Showing 5 changed files with 628 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.rules.analysis.NormalizeRepeat;
Expand All @@ -37,6 +40,7 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.VirtualSlotReference;
import org.apache.doris.nereids.trees.expressions.functions.Function;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
Expand All @@ -60,6 +64,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
Expand Down Expand Up @@ -321,6 +326,51 @@ protected Expression tryRewriteExpression(StructInfo queryStructInfo, Expression
return rewrittenExpression;
}

/**
* Not all query after rewritten successfully can compensate union all
* Such as:
* mv def sql is as following, partition column is a
* select a, b, count(*) from t1 group by a, b
* Query is as following:
* select b, count(*) from t1 group by b, after rewritten by materialized view successfully
* If mv part partition is invalid, can not compensate union all, because result is wrong after
* compensate union all.
*/
@Override
protected boolean canUnionRewrite(Plan queryPlan, MTMV mtmv, CascadesContext cascadesContext) {
// Check query plan is contain the partition column
// Query plan in the current rule must contain aggregate node, because the rule pattern is
//
Optional<LogicalAggregate<Plan>> logicalAggregateOptional =
queryPlan.collectFirst(planTreeNode -> planTreeNode instanceof LogicalAggregate);
if (!logicalAggregateOptional.isPresent()) {
return true;
}
List<Expression> groupByExpressions = logicalAggregateOptional.get().getGroupByExpressions();
if (groupByExpressions.isEmpty()) {
// Scalar aggregate can not compensate union all
return false;
}
final String relatedCol = mtmv.getMvPartitionInfo().getRelatedCol();
final BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo();
boolean canUnionRewrite = false;
// Check the query plan group by expression contains partition col or not
List<? extends Expression> groupByShuttledExpressions =
ExpressionUtils.shuttleExpressionWithLineage(groupByExpressions, queryPlan, new BitSet());
for (Expression expression : groupByShuttledExpressions) {
canUnionRewrite = !expression.collectToSet(expr -> expr instanceof SlotReference
&& ((SlotReference) expr).isColumnFromTable()
&& Objects.equals(((SlotReference) expr).getColumn().map(Column::getName).orElse(null),
relatedCol)
&& Objects.equals(((SlotReference) expr).getTable().map(BaseTableInfo::new).orElse(null),
relatedTableInfo)).isEmpty();
if (canUnionRewrite) {
break;
}
}
return canUnionRewrite;
}

/**
* Check query and view aggregate compatibility
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,17 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
return rewriteResults;
}
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
boolean canUnionRewrite = canUnionRewrite(queryPlan,
((AsyncMaterializationContext) materializationContext).getMtmv(),
cascadesContext);
if (partitionNeedUnion && !canUnionRewrite) {
materializationContext.recordFailReason(queryStructInfo,
"need compensate union all, but can not, because the query structInfo",
() -> String.format("mv partition info is %s, and the query plan is %s",
((AsyncMaterializationContext) materializationContext).getMtmv()
.getMvPartitionInfo(), queryPlan.treeString()));
return rewriteResults;
}
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
invalidPartitions;
if (partitionNeedUnion) {
Expand Down Expand Up @@ -372,6 +383,20 @@ protected boolean needUnionRewrite(
&& (!invalidPartitions.key().isEmpty() || !invalidPartitions.value().isEmpty());
}

/**
* Not all query after rewritten successfully can compensate union all
* Such as:
* mv def sql is as following, partition column is a
* select a, b, count(*) from t1 group by a, b
* Query is as following:
* select b, count(*) from t1 group by b, after rewritten by materialized view successfully
* If mv part partition is invalid, can not compensate union all, because result is wrong after
* compensate union all.
*/
protected boolean canUnionRewrite(Plan queryPlan, MTMV mtmv, CascadesContext cascadesContext) {
return true;
}

// Normalize expression such as nullable property and output slot id
protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) {
if (rewrittenPlan.getOutput().size() != originPlan.getOutput().size()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query1_0_before --
28

-- !query1_0_after --
28

-- !query1_1_before --
32

-- !query1_1_after --
32

-- !query2_0_before --
a 4
b 28

-- !query2_0_after --
a 2
b 26

-- !query3_0_before --
a 4
b 28

-- !query3_0_after --
a 4
b 28

-- !query4_0_before --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query4_0_after --
2024-09-12 4
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query5_0_before --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query5_0_after --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query6_0_before --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query6_0_after --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query7_0_before --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query7_0_after --
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,13 @@ class Suite implements GroovyInterceptable {
}
}

def mv_rewrite_fail = { query_sql, mv_name ->
explain {
sql("${query_sql}")
notContains("${mv_name}(${mv_name})")
}
}

def check_mv_rewrite_fail = { db, mv_sql, query_sql, mv_name ->

sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}"""
Expand Down
Loading

0 comments on commit 28066a0

Please sign in to comment.