Skip to content

Commit

Permalink
[fix](mtmv) Fix compensate union all wrongly when query rewrite by ma…
Browse files Browse the repository at this point in the history
…terialized view (apache#40803)

## Proposed changes

This is brought by apache#36056

Not all query after rewritten successfully can compensate union all
Such as:
mv def sql is as following, partition column is a
```sql
select a, b, count(*) from t1 group by a, b
```
Query is as following:
```sq
select count(*) from t1
```
the result is
+----------+
| count(*) |
+----------+
|       24 |
+----------+

after rewritten by materialized view successfully
If mv part partition is invalid, can not compensate union all, because
result is wrong after
compensate union all.

+----------+
| count(*) |
+----------+
|       24 |
|       3 |
+----------+

This pr fix this.
  • Loading branch information
seawinde authored Sep 20, 2024
1 parent 252aeeb commit d7e5d46
Show file tree
Hide file tree
Showing 6 changed files with 631 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.properties.DataTrait;
Expand All @@ -38,6 +41,7 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.VirtualSlotReference;
import org.apache.doris.nereids.trees.expressions.functions.Function;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
Expand All @@ -63,6 +67,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
Expand Down Expand Up @@ -324,6 +329,51 @@ protected Expression tryRewriteExpression(StructInfo queryStructInfo, Expression
return rewrittenExpression;
}

/**
* Not all query after rewritten successfully can compensate union all
* Such as:
* mv def sql is as following, partition column is a
* select a, b, count(*) from t1 group by a, b
* Query is as following:
* select b, count(*) from t1 group by b, after rewritten by materialized view successfully
* If mv part partition is invalid, can not compensate union all, because result is wrong after
* compensate union all.
*/
@Override
protected boolean canUnionRewrite(Plan queryPlan, MTMV mtmv, CascadesContext cascadesContext) {
// Check query plan is contain the partition column
// Query plan in the current rule must contain aggregate node, because the rule pattern is
//
Optional<LogicalAggregate<Plan>> logicalAggregateOptional =
queryPlan.collectFirst(planTreeNode -> planTreeNode instanceof LogicalAggregate);
if (!logicalAggregateOptional.isPresent()) {
return true;
}
List<Expression> groupByExpressions = logicalAggregateOptional.get().getGroupByExpressions();
if (groupByExpressions.isEmpty()) {
// Scalar aggregate can not compensate union all
return false;
}
final String relatedCol = mtmv.getMvPartitionInfo().getRelatedCol();
final BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo();
boolean canUnionRewrite = false;
// Check the query plan group by expression contains partition col or not
List<? extends Expression> groupByShuttledExpressions =
ExpressionUtils.shuttleExpressionWithLineage(groupByExpressions, queryPlan, new BitSet());
for (Expression expression : groupByShuttledExpressions) {
canUnionRewrite = !expression.collectToSet(expr -> expr instanceof SlotReference
&& ((SlotReference) expr).isColumnFromTable()
&& Objects.equals(((SlotReference) expr).getColumn().map(Column::getName).orElse(null),
relatedCol)
&& Objects.equals(((SlotReference) expr).getTable().map(BaseTableInfo::new).orElse(null),
relatedTableInfo)).isEmpty();
if (canUnionRewrite) {
break;
}
}
return canUnionRewrite;
}

/**
* Check query and view aggregate compatibility
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,17 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
return rewriteResults;
}
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
boolean canUnionRewrite = canUnionRewrite(queryPlan,
((AsyncMaterializationContext) materializationContext).getMtmv(),
cascadesContext);
if (partitionNeedUnion && !canUnionRewrite) {
materializationContext.recordFailReason(queryStructInfo,
"need compensate union all, but can not, because the query structInfo",
() -> String.format("mv partition info is %s, and the query plan is %s",
((AsyncMaterializationContext) materializationContext).getMtmv()
.getMvPartitionInfo(), queryPlan.treeString()));
return rewriteResults;
}
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
invalidPartitions;
if (partitionNeedUnion) {
Expand Down Expand Up @@ -377,6 +388,20 @@ protected boolean needUnionRewrite(
&& (!invalidPartitions.key().isEmpty() || !invalidPartitions.value().isEmpty());
}

/**
* Not all query after rewritten successfully can compensate union all
* Such as:
* mv def sql is as following, partition column is a
* select a, b, count(*) from t1 group by a, b
* Query is as following:
* select b, count(*) from t1 group by b, after rewritten by materialized view successfully
* If mv part partition is invalid, can not compensate union all, because result is wrong after
* compensate union all.
*/
protected boolean canUnionRewrite(Plan queryPlan, MTMV mtmv, CascadesContext cascadesContext) {
return true;
}

// Normalize expression such as nullable property and output slot id
protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) {
if (rewrittenPlan.getOutput().size() != originPlan.getOutput().size()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query1_0_before --
28

-- !query1_0_after --
28

-- !query1_1_before --
32

-- !query1_1_after --
32

-- !query2_0_before --
a 4
b 28

-- !query2_0_after --
a 2
b 26

-- !query3_0_before --
a 4
b 28

-- !query3_0_after --
a 4
b 28

-- !query4_0_before --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query4_0_after --
2024-09-12 4
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query5_0_before --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query5_0_after --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query6_0_before --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query6_0_after --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query7_0_before --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query7_0_after --
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,9 @@ class Suite implements GroovyInterceptable {
def mv_rewrite_success_without_check_chosen = { query_sql, mv_name ->
explain {
sql(" memo plan ${query_sql}")
contains("${mv_name} not chose")
check { result ->
result.contains("${mv_name} chose") || result.contains("${mv_name} not chose")
}
}
}

Expand Down Expand Up @@ -1721,7 +1723,9 @@ class Suite implements GroovyInterceptable {

explain {
sql(" memo plan ${query_sql}")
notContains("${mv_name} fail")
check { result ->
result.contains("${mv_name} chose") || result.contains("${mv_name} not chose")
}
}
}

Expand All @@ -1744,8 +1748,7 @@ class Suite implements GroovyInterceptable {

explain {
sql(" memo plan ${query_sql}")
notContains("${mv_name} chose")
notContains("${mv_name} not chose")
contains("${mv_name} fail")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ suite("materialized_view_switch") {
sql """ DROP MATERIALIZED VIEW IF EXISTS mv_name_1"""

sql "SET enable_materialized_view_rewrite=false"
async_mv_rewrite_fail(db, mv_name, query, "mv_name_2")
create_async_mv(db, "mv_name_2", mv_name)
mv_not_part_in(query, "mv_name_2")
sql """ DROP MATERIALIZED VIEW IF EXISTS mv_name_2"""

sql "SET enable_materialized_view_rewrite=true"
async_mv_rewrite_success(db, mv_name, query, "mv_name_3")
sql """ DROP MATERIALIZED VIEW IF EXISTS mv_name_3"""
Expand Down
Loading

0 comments on commit d7e5d46

Please sign in to comment.