Skip to content

Commit

Permalink
[opt](mtmv) Set query rewrite by materialized view default enable (ap…
Browse files Browse the repository at this point in the history
…ache#35897)

Set query rewrite by materialized view default enable
As before, if want to use query reweirte by async materialized view,
must run: SET enable_materialized_view_rewrite=true
After this, not need
  • Loading branch information
seawinde committed Jun 20, 2024
1 parent 82f4d1e commit 89d2687
Show file tree
Hide file tree
Showing 39 changed files with 55 additions and 286 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,11 @@ private void transferToMaster() {
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
}
}
if (journalVersion <= FeMetaVersion.VERSION_133) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.ENABLE_MATERIALIZED_VIEW_REWRITE,
"true");
}
}

getPolicyMgr().createDefaultStoragePolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,6 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
invalidPartitions;
if (partitionNeedUnion && !sessionVariable.isEnableMaterializedViewUnionRewrite()) {
// if use invalid partition but not enable union rewrite
materializationContext.recordFailReason(queryStructInfo,
"Partition query used is invalid",
() -> String.format("the partition used by query is invalid by materialized view,"
+ "invalid partition info query used is %s", finalInvalidPartitions));
continue;
}
if (partitionNeedUnion) {
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
Plan originPlanWithFilter = StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.nereids.CascadesContext;
Expand Down Expand Up @@ -62,12 +61,18 @@ public void initMaterializationContext(CascadesContext cascadesContext) {
if (!cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewRewrite()) {
return;
}
Plan rewritePlan = cascadesContext.getRewritePlan();
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), true);
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
collectorContext.setConnectContext(cascadesContext.getConnectContext());
rewritePlan.accept(TableCollector.INSTANCE, collectorContext);
try {
Plan rewritePlan = cascadesContext.getRewritePlan();
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
collectorContext.setConnectContext(cascadesContext.getConnectContext());
rewritePlan.accept(TableCollector.INSTANCE, collectorContext);
} catch (Exception e) {
LOG.warn(String.format("MaterializationContext init table collect fail, current queryId is %s",
cascadesContext.getConnectContext().getQueryIdentifier()), e);
return;
}
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
if (collectedTables.isEmpty()) {
return;
Expand All @@ -77,30 +82,32 @@ public void initMaterializationContext(CascadesContext cascadesContext) {
Set<MTMV> availableMTMVs = Env.getCurrentEnv().getMtmvService().getRelationManager()
.getAvailableMTMVs(usedBaseTables, cascadesContext.getConnectContext());
if (availableMTMVs.isEmpty()) {
LOG.warn(String.format("enable materialized view rewrite but availableMTMVs is empty, current queryId "
+ "is %s", cascadesContext.getConnectContext().getQueryIdentifier()));
LOG.debug(String.format("Enable materialized view rewrite but availableMTMVs is empty, current queryId "
+ "is %s", cascadesContext.getConnectContext().getQueryIdentifier()));
return;
}
for (MTMV materializedView : availableMTMVs) {
MTMVCache mtmvCache = null;
try {
mtmvCache = materializedView.getOrGenerateCache(cascadesContext.getConnectContext());
} catch (AnalysisException e) {
LOG.warn("MaterializationContext init mv cache generate fail", e);
}
if (mtmvCache == null) {
continue;
if (mtmvCache == null) {
continue;
}
// For async materialization context, the cascades context when construct the struct info maybe
// different from the current cascadesContext
// so regenerate the struct info table bitset
StructInfo mvStructInfo = mtmvCache.getStructInfo();
BitSet tableBitSetInCurrentCascadesContext = new BitSet();
mvStructInfo.getRelations().forEach(relation -> tableBitSetInCurrentCascadesContext.set(
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt()));
cascadesContext.addMaterializationContext(new AsyncMaterializationContext(materializedView,
mtmvCache.getLogicalPlan(), mtmvCache.getOriginalPlan(), ImmutableList.of(),
ImmutableList.of(), cascadesContext,
mtmvCache.getStructInfo().withTableBitSet(tableBitSetInCurrentCascadesContext)));
} catch (Exception e) {
LOG.warn(String.format("MaterializationContext init mv cache generate fail, current queryId is %s",
cascadesContext.getConnectContext().getQueryIdentifier()), e);
}
// For async materialization context, the cascades context when construct the struct info maybe
// different from the current cascadesContext
// so regenerate the struct info table bitset
StructInfo mvStructInfo = mtmvCache.getStructInfo();
BitSet tableBitSetInCurrentCascadesContext = new BitSet();
mvStructInfo.getRelations().forEach(relation -> tableBitSetInCurrentCascadesContext.set(
cascadesContext.getStatementContext().getTableId(relation.getTable()).asInt()));
cascadesContext.addMaterializationContext(new AsyncMaterializationContext(materializedView,
mtmvCache.getLogicalPlan(), mtmvCache.getOriginalPlan(), ImmutableList.of(), ImmutableList.of(),
cascadesContext, mtmvCache.getStructInfo().withTableBitSet(tableBitSetInCurrentCascadesContext)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1667,7 +1667,7 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_REWRITE, needForward = true,
description = {"是否开启基于结构信息的物化视图透明改写",
"Whether to enable materialized view rewriting based on struct info"})
public boolean enableMaterializedViewRewrite = false;
public boolean enableMaterializedViewRewrite = true;

@VariableMgr.VarAttr(name = MATERIALIZED_VIEW_REWRITE_ENABLE_CONTAIN_EXTERNAL_TABLE, needForward = true,
description = {"基于结构信息的透明改写,是否使用包含外表的物化视图",
Expand All @@ -1689,7 +1689,7 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
description = {"当物化视图不足以提供查询的全部数据时,是否允许基表和物化视图 union 来响应查询",
"When the materialized view is not enough to provide all the data for the query, "
+ "whether to allow the union of the base table and the materialized view to "
+ "respond to the query"})
+ "respond to the query"}, varType = VariableAnnotation.REMOVED)
public boolean enableMaterializedViewUnionRewrite = true;

@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_NEST_REWRITE, needForward = true,
Expand Down Expand Up @@ -3759,6 +3759,10 @@ public boolean isEnableMaterializedViewRewrite() {
return enableMaterializedViewRewrite;
}

public void setEnableMaterializedViewRewrite(boolean enableMaterializedViewRewrite) {
this.enableMaterializedViewRewrite = enableMaterializedViewRewrite;
}

public boolean isMaterializedViewRewriteEnableContainExternalTable() {
return materializedViewRewriteEnableContainExternalTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void testStarGraphWithInnerJoin() {
@Test
void testRandomQuery() {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
connectContext.getSessionVariable().setEnableMaterializedViewRewrite(false);
Plan p1 = new HyperGraphBuilder(Sets.newHashSet(JoinType.INNER_JOIN))
.randomBuildPlanWith(3, 3);
PlanChecker planChecker = PlanChecker.from(connectContext, p1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.nereids.util.PlanConstructor;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -60,12 +61,14 @@ void test() {
LogicalPlan agg = new LogicalPlanBuilder(scan1)
.agg(groupBy, output)
.build();
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
ConnectContext connectContext = MemoTestUtils.createConnectContext();
connectContext.getSessionVariable().setEnableMaterializedViewRewrite(false);
PlanChecker.from(connectContext, agg)
.applyTopDown(new SimplifyAggGroupBy())
.matchesFromRoot(
logicalAggregate().when(a -> a.getGroupByExpressions().size() == 1)
);
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
PlanChecker.from(connectContext, agg)
.analyze()
.matchesFromRoot(
logicalProject(logicalAggregate().when(a -> a.getGroupByExpressions().size() == 1))
Expand All @@ -87,12 +90,14 @@ void testSqrt() {
LogicalPlan agg = new LogicalPlanBuilder(scan1)
.agg(groupBy, output)
.build();
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
ConnectContext connectContext = MemoTestUtils.createConnectContext();
connectContext.getSessionVariable().setEnableMaterializedViewRewrite(false);
PlanChecker.from(connectContext, agg)
.applyTopDown(new SimplifyAggGroupBy())
.matchesFromRoot(
logicalAggregate().when(a -> a.equals(agg))
);
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
PlanChecker.from(connectContext, agg)
.analyze()
.matchesFromRoot(
logicalProject(logicalAggregate().when(a -> a.getGroupByExpressions().size() == 2))
Expand All @@ -114,12 +119,14 @@ void testAbs() {
LogicalPlan agg = new LogicalPlanBuilder(scan1)
.agg(groupBy, output)
.build();
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
ConnectContext connectContext = MemoTestUtils.createConnectContext();
connectContext.getSessionVariable().setEnableMaterializedViewRewrite(false);
PlanChecker.from(connectContext, agg)
.applyTopDown(new SimplifyAggGroupBy())
.matchesFromRoot(
logicalAggregate().when(a -> a.equals(agg))
);
PlanChecker.from(MemoTestUtils.createConnectContext(), agg)
PlanChecker.from(connectContext, agg)
.analyze()
.matchesFromRoot(
logicalProject(logicalAggregate().when(a -> a.getGroupByExpressions().size() == 2))
Expand Down
64 changes: 0 additions & 64 deletions regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out
Original file line number Diff line number Diff line change
@@ -1,22 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query_1_0_before --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_1_0_after --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_2_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_2_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_3_0_before --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
Expand All @@ -35,26 +17,6 @@
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_5_0_before --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
2023-10-21 \N 2 3 \N

-- !query_5_0_after --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
2023-10-21 \N 2 3 \N

-- !query_6_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_6_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_7_0_before --
2023-10-17 2023-10-17 2 3 199.00
2023-10-18 2023-10-18 2 3 109.20
Expand All @@ -75,24 +37,6 @@
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_9_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
2023-10-21 \N 2 3 \N

-- !query_9_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
2023-10-21 \N 2 3 \N

-- !query_10_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_10_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_11_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
Expand All @@ -111,14 +55,6 @@
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_14_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_14_0_after --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50

-- !query_16_0_before --
2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2 3 99.50
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
suite("aggregate_with_roll_up") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF";
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
suite("aggregate_without_roll_up") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF";
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"
sql "SET enable_nereids_timeout = false"
sql "SET enable_agg_state = true"

sql """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ suite("grace_period") {
// if update will not be used to query rewrite
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ suite("materialized_view_switch") {

String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "set runtime_filter_mode=OFF";
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ This suite is a one dimensional test case file.
suite("partition_mv_rewrite_dimension_1") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ It mainly tests the agg function, etc
suite("partition_mv_rewrite_dimension_2_3") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_2_3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ It mainly tests the query partial, view partial, union rewriting, predicate comp
suite("partition_mv_rewrite_dimension_2_4") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_2_4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ It mainly tests the query partial, view partial, union rewriting, predicate comp
suite("partition_mv_rewrite_dimension_2_5") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_2_5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ It mainly tests the query partial, view partial, union rewriting, predicate comp
suite("partition_mv_rewrite_dimension_2_6") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"

sql """
drop table if exists orders_2_6
Expand Down
Loading

0 comments on commit 89d2687

Please sign in to comment.