Skip to content

Commit

Permalink
[feature](mtmv) Support querying rewrite by materialized view when in…
Browse files Browse the repository at this point in the history
…sert and insert overwrite dml (#38115)

Support querying rewrite by materialized view when DML such as insert
and insert overwrite , into outfile etc.
this is default enable, you can disable it by `set
enable_dml_materialized_view_rewrite = false`
and the `enable_materialized_view_rewrite` is only control DQL.

It would not rewrite by materialized view when query sql use external
table.
If you want rewrite by materialized view when use external table in DML,
you should `set
enable_dml_materialized_view_rewrite_when_base_table_unawareness =
true;`
this is default false.
  • Loading branch information
seawinde committed Aug 6, 2024
1 parent ff6fa33 commit 2b29288
Show file tree
Hide file tree
Showing 33 changed files with 1,008 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiPredicate;

/**
* when do some operation, do something about cache
Expand Down Expand Up @@ -76,13 +77,17 @@ public Set<BaseTableInfo> getMtmvsByBaseTableOneLevel(BaseTableInfo table) {
* @param ctx
* @return
*/
public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos, ConnectContext ctx) {
public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos, ConnectContext ctx,
boolean forceConsistent, BiPredicate<ConnectContext, MTMV> predicate) {
Set<MTMV> res = Sets.newLinkedHashSet();
Set<BaseTableInfo> mvInfos = getMTMVInfos(tableInfos);
for (BaseTableInfo tableInfo : mvInfos) {
try {
MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo);
if (isMVPartitionValid(mtmv, ctx)) {
if (predicate.test(ctx, mtmv)) {
continue;
}
if (isMVPartitionValid(mtmv, ctx, forceConsistent)) {
res.add(mtmv);
}
} catch (AnalysisException e) {
Expand All @@ -94,9 +99,10 @@ public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos, ConnectContex
}

@VisibleForTesting
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx) {
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean forceConsistent) {
long currentTimeMillis = System.currentTimeMillis();
return !CollectionUtils
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, System.currentTimeMillis()));
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMillis, forceConsistent));
}

private Set<BaseTableInfo> getMTMVInfos(List<BaseTableInfo> tableInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,9 @@ public class MTMVRewriteUtil {
* @return
*/
public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx,
long currentTimeMills) {
long currentTimeMills, boolean forceConsistent) {
List<Partition> res = Lists.newArrayList();
Collection<Partition> allPartitions = mtmv.getPartitions();
// check session variable if enable rewrite
if (!ctx.getSessionVariable().isEnableMaterializedViewRewrite()) {
return res;
}
if (MTMVUtil.mtmvContainsExternalTable(mtmv) && !ctx.getSessionVariable()
.isMaterializedViewRewriteEnableContainExternalTable()) {
return res;
}

MTMVRelation mtmvRelation = mtmv.getRelation();
if (mtmvRelation == null) {
return res;
Expand All @@ -71,7 +62,7 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
long gracePeriodMills = mtmv.getGracePeriod();
for (Partition partition : allPartitions) {
if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime()
+ gracePeriodMills)) {
+ gracePeriodMills) && !forceConsistent) {
res.add(partition);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class CascadesContext implements ScheduleContext {
private final Optional<CTEId> currentTree;
private final Optional<CascadesContext> parent;

private final List<MaterializationContext> materializationContexts;
private final Set<MaterializationContext> materializationContexts;
private boolean isLeadingJoin = false;

private boolean isLeadingDisableJoinReorder = false;
Expand Down Expand Up @@ -160,7 +160,7 @@ private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> curren
this.currentJobContext = new JobContext(this, requireProperties, Double.MAX_VALUE);
this.subqueryExprIsAnalyzed = new HashMap<>();
this.runtimeFilterContext = new RuntimeFilterContext(getConnectContext().getSessionVariable());
this.materializationContexts = new ArrayList<>();
this.materializationContexts = new HashSet<>();
if (statementContext.getConnectContext() != null) {
ConnectContext connectContext = statementContext.getConnectContext();
SessionVariable sessionVariable = connectContext.getSessionVariable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public class NereidsPlanner extends Planner {
// The cost of optimized plan
private double cost = 0;
private LogicalPlanAdapter logicalPlanAdapter;
private List<PlannerHook> hooks = new ArrayList<>();

public NereidsPlanner(StatementContext statementContext) {
this.statementContext = statementContext;
Expand Down Expand Up @@ -274,7 +273,7 @@ private void analyze(boolean showPlanProcess) {
LOG.debug("Start analyze plan");
}
keepOrShowPlanProcess(showPlanProcess, () -> cascadesContext.newAnalyzer().analyze());
getHooks().forEach(hook -> hook.afterAnalyze(this));
this.statementContext.getPlannerHooks().forEach(hook -> hook.afterAnalyze(this));
NereidsTracer.logImportantTime("EndAnalyzePlan");
if (LOG.isDebugEnabled()) {
LOG.debug("End analyze plan");
Expand Down Expand Up @@ -640,14 +639,6 @@ public LogicalPlanAdapter getLogicalPlanAdapter() {
return logicalPlanAdapter;
}

public List<PlannerHook> getHooks() {
return hooks;
}

public void addHook(PlannerHook hook) {
this.hooks.add(hook);
}

private String getTimeMetricString(Function<SummaryProfile, String> profileSupplier) {
return getProfile(summaryProfile -> {
String metricString = profileSupplier.apply(summaryProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public class StatementContext implements Closeable {

private FormatOptions formatOptions = FormatOptions.getDefault();

private List<PlannerHook> plannerHooks = new ArrayList<>();

public StatementContext() {
this(ConnectContext.get(), null, 0);
}
Expand Down Expand Up @@ -488,6 +490,14 @@ public FormatOptions getFormatOptions() {
return formatOptions;
}

public List<PlannerHook> getPlannerHooks() {
return plannerHooks;
}

public void addPlannerHook(PlannerHook plannerHook) {
this.plannerHooks.add(plannerHook);
}

private static class CloseableResource implements Closeable {
public final String resourceName;
public final String threadName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.doris.nereids.jobs.JobType;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -102,10 +101,6 @@ private List<Rule> getJoinRules() {
}

private List<Rule> getMvRules() {
ConnectContext connectContext = context.getCascadesContext().getConnectContext();
if (connectContext.getSessionVariable().isEnableMaterializedViewRewrite()) {
return getRuleSet().getMaterializedViewRules();
}
return ImmutableList.of();
return getRuleSet().getMaterializedViewRules();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.rewrite.RewriteJob;
import org.apache.doris.nereids.rules.analysis.AddInitMaterializationHook;
import org.apache.doris.nereids.rules.analysis.AdjustAggregateNullableForEmptySet;
import org.apache.doris.nereids.rules.analysis.AnalyzeCTE;
import org.apache.doris.nereids.rules.analysis.BindExpression;
Expand Down Expand Up @@ -123,6 +124,7 @@ private static List<RewriteJob> buildAnalyzerJobs(Optional<CustomTableResolver>
bottomUp(new BindExpression()),
topDown(new BindSink()),
bottomUp(new CheckAfterBind()),
bottomUp(new AddInitMaterializationHook()),
bottomUp(
new ProjectToGlobalAggregate(),
// this rule check's the logicalProject node's isDistinct property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public enum RuleType {
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
BINDING_RELATION(RuleTypeClass.REWRITE),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.analysis;

import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.exploration.mv.InitConsistentMaterializationContextHook;
import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;

import com.google.common.collect.ImmutableList;

import java.util.List;

/**
* Add init materialization hook for table sink and file sink
* */
public class AddInitMaterializationHook implements AnalysisRuleFactory {

@Override
public List<Rule> buildRules() {
return ImmutableList.of(
RuleType.INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK.build(logicalFileSink()
.thenApply(ctx -> {
if (ctx.connectContext.getSessionVariable().isEnableDmlMaterializedViewRewrite()) {
ctx.statementContext.addPlannerHook(InitConsistentMaterializationContextHook.INSTANCE);
}
return ctx.root;
})),
RuleType.INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK.build(
any().when(LogicalTableSink.class::isInstance)
.thenApply(ctx -> {
if (ctx.connectContext.getSessionVariable().isEnableDmlMaterializedViewRewrite()) {
ctx.statementContext.addPlannerHook(InitConsistentMaterializationContextHook.INSTANCE);
}
return ctx.root;
}))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>>
.collect(Collectors.toSet());

Collection<Partition> mvValidPartitions = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
cascadesContext.getConnectContext(), System.currentTimeMillis());
cascadesContext.getConnectContext(), System.currentTimeMillis(), false);
Set<String> mvValidPartitionNameSet = new HashSet<>();
Set<String> mvValidBaseTablePartitionNameSet = new HashSet<>();
Set<String> mvValidHasDataRelatedBaseTableNameSet = new HashSet<>();
Expand Down Expand Up @@ -732,7 +732,7 @@ protected boolean checkIfRewritten(Plan plan, MaterializationContext context) {

// check mv plan is valid or not, this can use cache for performance
private boolean isMaterializationValid(CascadesContext cascadesContext, MaterializationContext context) {
long materializationId = context.getMaterializationQualifier().hashCode();
long materializationId = context.generateMaterializationIdentifier().hashCode();
Boolean cachedCheckResult = cascadesContext.getMemo().materializationHasChecked(this.getClass(),
materializationId);
if (cachedCheckResult == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class AsyncMaterializationContext extends MaterializationContext {

private static final Logger LOG = LogManager.getLogger(AsyncMaterializationContext.class);
private final MTMV mtmv;
private List<String> materializationQualifier;

/**
* MaterializationContext, this contains necessary info for query rewriting by mv
Expand All @@ -72,11 +71,11 @@ Plan doGenerateScanPlan(CascadesContext cascadesContext) {
}

@Override
List<String> getMaterializationQualifier() {
if (this.materializationQualifier == null) {
this.materializationQualifier = this.mtmv.getFullQualifiers();
List<String> generateMaterializationIdentifier() {
if (super.identifier == null) {
super.identifier = MaterializationContext.generateMaterializationIdentifier(mtmv, null);
}
return this.materializationQualifier;
return super.identifier;
}

@Override
Expand All @@ -92,7 +91,7 @@ String getStringInfo() {
}
}
failReasonBuilder.append("\n").append("]");
return Utils.toSqlString("MaterializationContext[" + getMaterializationQualifier() + "]",
return Utils.toSqlString("MaterializationContext[" + generateMaterializationIdentifier() + "]",
"rewriteSuccess", this.success,
"failReason", failReasonBuilder.toString());
}
Expand All @@ -104,7 +103,7 @@ public Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascades
mtmvCache = mtmv.getOrGenerateCache(cascadesContext.getConnectContext());
} catch (AnalysisException e) {
LOG.warn(String.format("get mv plan statistics fail, materialization qualifier is %s",
getMaterializationQualifier()), e);
generateMaterializationIdentifier()), e);
return Optional.empty();
}
RelationId relationId = null;
Expand All @@ -120,7 +119,12 @@ boolean isFinalChosen(Relation relation) {
if (!(relation instanceof PhysicalCatalogRelation)) {
return false;
}
return ((PhysicalCatalogRelation) relation).getTable() instanceof MTMV;
if (!(((PhysicalCatalogRelation) relation).getTable() instanceof MTMV)) {
return false;
}
return ((PhysicalCatalogRelation) relation).getTable().getFullQualifiers().equals(
this.generateMaterializationIdentifier()
);
}

public Plan getScanPlan() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.PlannerHook;

import com.google.common.annotations.VisibleForTesting;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* If enable query rewrite with mv in dml, should init consistent materialization context after analyze
*/
public class InitConsistentMaterializationContextHook extends InitMaterializationContextHook implements PlannerHook {

public static final InitConsistentMaterializationContextHook INSTANCE =
new InitConsistentMaterializationContextHook();

@VisibleForTesting
@Override
public void initMaterializationContext(CascadesContext cascadesContext) {
if (!cascadesContext.getConnectContext().getSessionVariable().isEnableDmlMaterializedViewRewrite()) {
return;
}
super.doInitMaterializationContext(cascadesContext);
}

protected Set<MTMV> getAvailableMTMVs(Set<TableIf> usedTables, CascadesContext cascadesContext) {
List<BaseTableInfo> usedBaseTables =
usedTables.stream().map(BaseTableInfo::new).collect(Collectors.toList());
return Env.getCurrentEnv().getMtmvService().getRelationManager()
.getAvailableMTMVs(usedBaseTables, cascadesContext.getConnectContext(),
true, ((connectContext, mtmv) -> {
return MTMVUtil.mtmvContainsExternalTable(mtmv) && (!connectContext.getSessionVariable()
.isEnableDmlMaterializedViewRewriteWhenBaseTableUnawareness());
}));
}
}
Loading

0 comments on commit 2b29288

Please sign in to comment.