Skip to content

Commit

Permalink
[opt](mtmv) Set query rewrite by materialized view default enable (#3…
Browse files Browse the repository at this point in the history
…5897)

Set query rewrite by materialized view default enable
As before, if want to use query reweirte by async materialized view,
must run: SET enable_materialized_view_rewrite=true
After this, not need
  • Loading branch information
seawinde authored and dataroaring committed Jun 13, 2024
1 parent 13760aa commit 2c54f57
Show file tree
Hide file tree
Showing 39 changed files with 55 additions and 286 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,11 @@ private void transferToMaster() {
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
}
}
if (journalVersion <= FeMetaVersion.VERSION_133) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.ENABLE_MATERIALIZED_VIEW_REWRITE,
"true");
}
}

getPolicyMgr().createDefaultStoragePolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,6 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
invalidPartitions;
if (partitionNeedUnion && !sessionVariable.isEnableMaterializedViewUnionRewrite()) {
// if use invalid partition but not enable union rewrite
materializationContext.recordFailReason(queryStructInfo,
"Partition query used is invalid",
() -> String.format("the partition used by query is invalid by materialized view,"
+ "invalid partition info query used is %s", finalInvalidPartitions));
continue;
}
if (partitionNeedUnion) {
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
Plan originPlanWithFilter = StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.nereids.CascadesContext;
Expand Down Expand Up @@ -62,12 +61,18 @@ public void initMaterializationContext(CascadesContext cascadesContext) {
if (!cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewRewrite()) {
return;
}
Plan rewritePlan = cascadesContext.getRewritePlan();
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), true);
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
collectorContext.setConnectContext(cascadesContext.getConnectContext());
rewritePlan.accept(TableCollector.INSTANCE, collectorContext);
try {
Plan rewritePlan = cascadesContext.getRewritePlan();
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
collectorContext.setConnectContext(cascadesContext.getConnectContext());
rewritePlan.accept(TableCollector.INSTANCE, collectorContext);
} catch (Exception e) {
LOG.warn(String.format("MaterializationContext init table collect fail, current queryId is %s",
cascadesContext.getConnectContext().getQueryIdentifier()), e);
return;
}
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
if (collectedTables.isEmpty()) {
return;
Expand All @@ -77,30 +82,32 @@ public void initMaterializationContext(CascadesContext cascadesContext) {
Set<MTMV> availableMTMVs = Env.getCurrentEnv().getMtmvService().getRelationManager()
.getAvailableMTMVs(usedBaseTables, cascadesContext.getConnectContext());
if (availableMTMVs.isEmpty()) {
LOG.warn(String.format("enable materialized view rewrite but availableMTMVs is empty, current queryId "
+ "is %s", cascadesContext.getConnectContext().getQueryIdentifier()));
LOG.debug(String.format("Enable materialized view rewrite but availableMTMVs is empty, current queryId "
+ "is %s", cascadesContext.getConnectContext().getQueryIdentifier()));
return;
}
for (MTMV materializedView : availableMTMVs) {
MTMVCache mtmvCache = null;
try {
mtmvCache = materializedView.getOrGenerateCache(cascadesContext.getConnectContext());
} catch (AnalysisException e) {
LOG.warn("MaterializationContext init mv cache generate fail", e);
}
if (mtmvCache == null) {
continue;
if (mtmvCache == null) {
continue;
}
// For async materialization context, the cascades context when construct the struct info maybe
// different from the current cascadesContext
// so regenerate the struct info table bitset
StructInfo mvStructInfo = mtmvCache.getStructInfo();
BitSet tableBitSetInCurrentCascadesContext = new BitSet();
mvStructInfo.getRelations().forEach(relation -> tableBitSetInCurrentCascadesContext.set(
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt()));
cascadesContext.addMaterializationContext(new AsyncMaterializationContext(materializedView,
mtmvCache.getLogicalPlan(), mtmvCache.getOriginalPlan(), ImmutableList.of(),
ImmutableList.of(), cascadesContext,
mtmvCache.getStructInfo().withTableBitSet(tableBitSetInCurrentCascadesContext)));
} catch (Exception e) {
LOG.warn(String.format("MaterializationContext init mv cache generate fail, current queryId is %s",
cascadesContext.getConnectContext().getQueryIdentifier()), e);
}
// For async materialization context, the cascades context when construct the struct info maybe
// different from the current cascadesContext
// so regenerate the struct info table bitset
StructInfo mvStructInfo = mtmvCache.getStructInfo();
BitSet tableBitSetInCurrentCascadesContext = new BitSet();
mvStructInfo.getRelations().forEach(relation -> tableBitSetInCurrentCascadesContext.set(
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt()));
cascadesContext.addMaterializationContext(new AsyncMaterializationContext(materializedView,
mtmvCache.getLogicalPlan(), mtmvCache.getOriginalPlan(), ImmutableList.of(), ImmutableList.of(),
cascadesContext, mtmvCache.getStructInfo().withTableBitSet(tableBitSetInCurrentCascadesContext)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1716,7 +1716,7 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_REWRITE, needForward = true,
description = {"是否开启基于结构信息的物化视图透明改写",
"Whether to enable materialized view rewriting based on struct info"})
public boolean enableMaterializedViewRewrite = false;
public boolean enableMaterializedViewRewrite = true;

@VariableMgr.VarAttr(name = MATERIALIZED_VIEW_REWRITE_ENABLE_CONTAIN_EXTERNAL_TABLE, needForward = true,
description = {"基于结构信息的透明改写,是否使用包含外表的物化视图",
Expand All @@ -1738,7 +1738,7 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
description = {"当物化视图不足以提供查询的全部数据时,是否允许基表和物化视图 union 来响应查询",
"When the materialized view is not enough to provide all the data for the query, "
+ "whether to allow the union of the base table and the materialized view to "
+ "respond to the query"})
+ "respond to the query"}, varType = VariableAnnotation.REMOVED)
public boolean enableMaterializedViewUnionRewrite = true;

@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_NEST_REWRITE, needForward = true,
Expand Down Expand Up @@ -3886,6 +3886,10 @@ public boolean isEnableMaterializedViewRewrite() {
return enableMaterializedViewRewrite;
}

public void setEnableMaterializedViewRewrite(boolean enableMaterializedViewRewrite) {
this.enableMaterializedViewRewrite = enableMaterializedViewRewrite;
}

public boolean isMaterializedViewRewriteEnableContainExternalTable() {
return materializedViewRewriteEnableContainExternalTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void testStarGraphWithInnerJoin() {
@Test
void testRandomQuery() {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
connectContext.getSessionVariable().setEnableMaterializedViewRewrite(false);
Plan p1 = new HyperGraphBuilder(Sets.newHashSet(JoinType.INNER_JOIN))
.randomBuildPlanWith(3, 3);
PlanChecker planChecker = PlanChecker.from(connectContext, p1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.nereids.util.PlanConstructor;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -60,12 +61,14 @@ void test() {
LogicalPlan agg = new LogicalPlanBuilder(scan1)
.agg(groupBy, output)
.build();
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
ConnectContext connectContext = MemoTestUtils.createConnectContext();
connectContext.getSessionVariable().setEnableMaterializedViewRewrite(false);
PlanChecker.from(connectContext, agg)
.applyTopDown(new SimplifyAggGroupBy())
.matchesFromRoot(
logicalAggregate().when(a -> a.getGroupByExpressions().size() == 1)
);
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
PlanChecker.from(connectContext, agg)
.analyze()
.matchesFromRoot(
logicalProject(logicalAggregate().when(a -> a.getGroupByExpressions().size() == 1))
Expand All @@ -87,12 +90,14 @@ void testSqrt() {
LogicalPlan agg = new LogicalPlanBuilder(scan1)
.agg(groupBy, output)
.build();
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
ConnectContext connectContext = MemoTestUtils.createConnectContext();
connectContext.getSessionVariable().setEnableMaterializedViewRewrite(false);
PlanChecker.from(connectContext, agg)
.applyTopDown(new SimplifyAggGroupBy())
.matchesFromRoot(
logicalAggregate().when(a -> a.equals(agg))
);
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
PlanChecker.from(connectContext, agg)
.analyze()
.matchesFromRoot(
logicalProject(logicalAggregate().when(a -> a.getGroupByExpressions().size() == 2))
Expand All @@ -114,12 +119,14 @@ void testAbs() {
LogicalPlan agg = new LogicalPlanBuilder(scan1)
.agg(groupBy, output)
.build();
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
ConnectContext connectContext = MemoTestUtils.createConnectContext();
connectContext.getSessionVariable().setEnableMaterializedViewRewrite(false);
PlanChecker.from(connectContext, agg)
.applyTopDown(new SimplifyAggGroupBy())
.matchesFromRoot(
logicalAggregate().when(a -> a.equals(agg))
);
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
PlanChecker.from(connectContext, agg)
.analyze()
.matchesFromRoot(
logicalProject(logicalAggregate().when(a -> a.getGroupByExpressions().size() == 2))
Expand Down
64 changes: 0 additions & 64 deletions regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out
Original file line number Diff line number Diff line change
@@ -1,22 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query_1_0_before --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_1_0_after --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_2_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_2_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_3_0_before --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
Expand All @@ -35,26 +17,6 @@
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_5_0_before --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
2023-10-21 \N 2 3 \N

-- !query_5_0_after --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
2023-10-21 \N 2 3 \N

-- !query_6_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_6_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_7_0_before --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
Expand All @@ -75,24 +37,6 @@
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_9_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
2023-10-21 \N 2 3 \N

-- !query_9_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
2023-10-21 \N 2 3 \N

-- !query_10_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_10_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_11_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
Expand All @@ -111,14 +55,6 @@
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_14_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_14_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_16_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
suite("aggregate_with_roll_up") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF";
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
suite("aggregate_without_roll_up") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF";
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"
sql "SET enable_nereids_timeout = false"
sql "SET enable_agg_state = true"

sql """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ suite("grace_period") {
// if update will not be used to query rewrite
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ suite("materialized_view_switch") {

String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF";
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ This suite is a one dimensional test case file.
suite("partition_mv_rewrite_dimension_1") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ It mainly tests the agg function, etc
suite("partition_mv_rewrite_dimension_2_3") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_2_3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ It mainly tests the query partial, view partial, union rewriting, predicate comp
suite("partition_mv_rewrite_dimension_2_4") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_2_4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ It mainly tests the query partial, view partial, union rewriting, predicate comp
suite("partition_mv_rewrite_dimension_2_5") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_2_5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ It mainly tests the query partial, view partial, union rewriting, predicate comp
suite("partition_mv_rewrite_dimension_2_6") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_2_6
Expand Down
Loading

0 comments on commit 2c54f57

Please sign in to comment.