Skip to content

Commit

Permalink
Merge branch 'master' into create_view
Browse files Browse the repository at this point in the history
  • Loading branch information
feiniaofeiafei authored Apr 1, 2024
2 parents 32ba0a7 + e53806b commit 730f4fb
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 112 deletions.
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(
p._runtime_filter_descs[i], p._need_local_merge, &_runtime_filters[i], false));
if (!_runtime_filters[i]->is_broadcast_join()) {
return Status::InternalError(
"runtime filter({}) on NestedLoopJoin should be set to is_broadcast_join,",
_runtime_filters[i]->get_name());
}
}
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private LogicalPlan makeOlapScan(TableIf table, UnboundRelation unboundRelation,
List<Long> tabletIds = unboundRelation.getTabletIds();
if (!CollectionUtils.isEmpty(partIds)) {
scan = new LogicalOlapScan(unboundRelation.getRelationId(),
(OlapTable) table, ImmutableList.of(tableQualifier.get(1)), partIds,
(OlapTable) table, tableQualifier, partIds,
tabletIds, unboundRelation.getHints(), unboundRelation.getTableSample());
} else {
Optional<String> indexName = unboundRelation.getIndexName();
Expand All @@ -210,11 +210,11 @@ private LogicalPlan makeOlapScan(TableIf table, UnboundRelation unboundRelation,
: PreAggStatus.off("For direct index scan.");

scan = new LogicalOlapScan(unboundRelation.getRelationId(),
(OlapTable) table, ImmutableList.of(tableQualifier.get(1)), tabletIds, indexId,
(OlapTable) table, tableQualifier, tabletIds, indexId,
preAggStatus, unboundRelation.getHints(), unboundRelation.getTableSample());
} else {
scan = new LogicalOlapScan(unboundRelation.getRelationId(),
(OlapTable) table, ImmutableList.of(tableQualifier.get(1)), tabletIds, unboundRelation.getHints(),
(OlapTable) table, tableQualifier, tabletIds, unboundRelation.getHints(),
unboundRelation.getTableSample());
}
}
Expand All @@ -241,11 +241,14 @@ private LogicalPlan makeOlapScan(TableIf table, UnboundRelation unboundRelation,
}

private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelation,
List<String> tableQualifier, CascadesContext cascadesContext) {
List<String> tableQualifier, CascadesContext cascadesContext) {
List<String> qualifierWithoutTableName = Lists.newArrayList();
qualifierWithoutTableName.addAll(tableQualifier.subList(0, tableQualifier.size() - 1));

switch (table.getType()) {
case OLAP:
case MATERIALIZED_VIEW:
return makeOlapScan(table, unboundRelation, tableQualifier);
return makeOlapScan(table, unboundRelation, qualifierWithoutTableName);
case VIEW:
View view = (View) table;
String inlineViewDef = view.getInlineViewDef();
Expand All @@ -261,25 +264,26 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio
return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan);
}
hmsTable.setScanParams(unboundRelation.getScanParams());
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, tableQualifier,
unboundRelation.getTableSample());
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
qualifierWithoutTableName, unboundRelation.getTableSample());
case ICEBERG_EXTERNAL_TABLE:
case PAIMON_EXTERNAL_TABLE:
case MAX_COMPUTE_EXTERNAL_TABLE:
case TRINO_CONNECTOR_EXTERNAL_TABLE:
return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table, tableQualifier,
unboundRelation.getTableSample());
return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
qualifierWithoutTableName, unboundRelation.getTableSample());
case SCHEMA:
return new LogicalSchemaScan(unboundRelation.getRelationId(), table, tableQualifier);
return new LogicalSchemaScan(unboundRelation.getRelationId(), table, qualifierWithoutTableName);
case JDBC_EXTERNAL_TABLE:
case JDBC:
return new LogicalJdbcScan(unboundRelation.getRelationId(), table, tableQualifier);
return new LogicalJdbcScan(unboundRelation.getRelationId(), table, qualifierWithoutTableName);
case ODBC:
return new LogicalOdbcScan(unboundRelation.getRelationId(), table, tableQualifier);
return new LogicalOdbcScan(unboundRelation.getRelationId(), table, qualifierWithoutTableName);
case ES_EXTERNAL_TABLE:
return new LogicalEsScan(unboundRelation.getRelationId(), (EsExternalTable) table, tableQualifier);
return new LogicalEsScan(unboundRelation.getRelationId(), (EsExternalTable) table,
qualifierWithoutTableName);
case TEST_EXTERNAL_TABLE:
return new LogicalTestScan(unboundRelation.getRelationId(), table, tableQualifier);
return new LogicalTestScan(unboundRelation.getRelationId(), table, qualifierWithoutTableName);
default:
throw new AnalysisException("Unsupported tableType " + table.getType());
}
Expand Down Expand Up @@ -332,30 +336,6 @@ private List<Long> getPartitionIds(TableIf t, UnboundRelation unboundRelation) {
}).collect(ImmutableList.toImmutableList());
}

private LogicalPlan addProjectForView(View view, LogicalView<Plan> logicalView) {
List<Slot> viewOutput = logicalView.getOutput();
List<Column> fullSchema = view.getFullSchema();
boolean needProject = false;
for (int i = 0; i < viewOutput.size(); i++) {
if (!viewOutput.get(i).getName().equals(fullSchema.get(i).getName())) {
needProject = true;
break;
}
}
if (!needProject) {
return logicalView;
}
List<NamedExpression> projectList = Lists.newArrayList();
for (int i = 0; i < viewOutput.size(); i++) {
if (viewOutput.get(i).getName().equals(fullSchema.get(i).getName())) {
projectList.add(viewOutput.get(i));
} else {
projectList.add(new Alias(viewOutput.get(i), fullSchema.get(i).getName()));
}
}
return new LogicalProject<Plan>(projectList, logicalView);
}

/** CustomTableResolver */
public interface CustomTableResolver extends Function<List<String>, TableIf> {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids.rules.rewrite.mv;

import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndexMeta;
Expand All @@ -33,12 +34,14 @@
import org.apache.doris.nereids.trees.expressions.InPredicate;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.VirtualSlotReference;
import org.apache.doris.nereids.trees.expressions.WhenClause;
import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.Sum;
import org.apache.doris.nereids.trees.expressions.functions.scalar.ScalarFunction;
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
Expand All @@ -62,6 +65,7 @@
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -72,6 +76,7 @@
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Base class for selecting materialized index rules.
Expand Down Expand Up @@ -109,6 +114,45 @@ protected boolean shouldSelectIndexWithoutAgg(LogicalOlapScan scan) {
}
}

// get the predicates that can be ignored when all aggregate functions are sum
protected static List<Expression> getPrunedPredicatesWithAllSumAgg(List<Expression> aggExpressions,
Set<Expression> predicateExpr) {
List<Expression> prunedExpr = new ArrayList<>();

Set<String> sumSlots = aggExpressions.stream().map(e -> e.child(0).toSql())
.collect(Collectors.toCollection(() -> new TreeSet<String>(String.CASE_INSENSITIVE_ORDER)));
for (Expression expr : predicateExpr) {
if (expr instanceof Not && expr.child(0) instanceof IsNull) {
Expression slot = expr.child(0).child(0);
String countColumn = normalizeName(CreateMaterializedViewStmt.mvColumnBuilder(AggregateType.SUM,
CreateMaterializedViewStmt.mvColumnBuilder(slotToCaseWhen(slot).toSql())));
if (sumSlots.contains(countColumn)) {
prunedExpr.add(expr);
}
}
}
return prunedExpr;
}

// we can prune some predicates when there is no group-by column
protected static List<Expression> getPrunedPredicates(List<Expression> aggExpressions,
Set<Expression> predicateExpr) {
List<Expression> prunedExpr = new ArrayList<>();

boolean isAllSumAgg = true;
for (Expression expr : aggExpressions) {
if (!(expr instanceof Sum)) {
isAllSumAgg = false;
break;
}
}
if (isAllSumAgg) {
prunedExpr.addAll(getPrunedPredicatesWithAllSumAgg(aggExpressions, predicateExpr));
}

return prunedExpr;
}

protected static boolean containAllRequiredColumns(MaterializedIndex index, LogicalOlapScan scan,
Set<Slot> requiredScanOutput, Set<? extends Expression> requiredExpr, Set<Expression> predicateExpr) {
OlapTable table = scan.getTable();
Expand All @@ -121,12 +165,14 @@ protected static boolean containAllRequiredColumns(MaterializedIndex index, Logi
.map(e -> {
e.setDisableTableName(true);
return e;
})
.map(e -> new NereidsParser().parseExpression(e.toSql()).toSql()).collect(Collectors.toSet());
Set<String> commonConjuncts = indexConjuncts.stream().filter(predicateExprSql::contains)
.collect(Collectors.toSet());
if (commonConjuncts.size() != indexConjuncts.size()) {
return false;
}).map(e -> new NereidsParser().parseExpression(e.toSql()).toSql()).collect(Collectors.toSet());

for (String indexConjunct : indexConjuncts) {
if (predicateExprSql.contains(indexConjunct)) {
predicateExprSql.remove(indexConjunct);
} else {
return false;
}
}

Set<String> requiredMvColumnNames = requiredScanOutput.stream()
Expand All @@ -138,10 +184,24 @@ protected static boolean containAllRequiredColumns(MaterializedIndex index, Logi
.collect(Collectors.toCollection(() -> new TreeSet<String>(String.CASE_INSENSITIVE_ORDER)));
mvColNames.addAll(indexConjuncts);

return mvColNames.containsAll(requiredMvColumnNames)
&& (indexConjuncts.isEmpty() || commonConjuncts.size() == predicateExprSql.size())
|| requiredExpr.stream().filter(e -> !containsAllColumn(e, mvColNames)).collect(Collectors.toSet())
.isEmpty();
if (mvColNames.containsAll(requiredMvColumnNames) && predicateExprSql.isEmpty()) {
return true;
}

Set<Expression> remained = requiredExpr.stream().filter(e -> !containsAllColumn(e, mvColNames))
.collect(Collectors.toSet());
if (remained.isEmpty()) {
return true;
}

if (!scan.getGroupExpression().isPresent()) {
Set<Expression> prunedExpr = getPrunedPredicates(
requiredExpr.stream().filter(e -> e instanceof AggregateFunction).collect(Collectors.toList()),
predicateExpr).stream().collect(Collectors.toSet());
remained = remained.stream().filter(e -> !prunedExpr.contains(e)).collect(Collectors.toSet());
}

return remained.isEmpty();
}

public static String parseMvColumnToSql(String mvName) {
Expand Down Expand Up @@ -433,6 +493,21 @@ protected SlotContext generateBaseScanExprToMvExpr(LogicalOlapScan mvPlan) {
.collect(Collectors.toSet()));
}

// Call this generateBaseScanExprToMvExpr only when we have both agg and filter
protected SlotContext generateBaseScanExprToMvExpr(LogicalOlapScan mvPlan, Set<Expression> requiredExpr,
Set<Expression> predicateExpr) {
SlotContext context = generateBaseScanExprToMvExpr(mvPlan);
if (mvPlan.getGroupExpression().isPresent()) {
return context;
}
Set<Expression> pruned = getPrunedPredicates(
requiredExpr.stream().filter(e -> e instanceof AggregateFunction).collect(Collectors.toList()),
predicateExpr).stream().collect(Collectors.toSet());

return new SlotContext(context.baseSlotToMvSlot, context.mvNameToMvSlot,
Stream.concat(pruned.stream(), context.trueExprs.stream()).collect(Collectors.toSet()));
}

/** SlotContext */
protected static class SlotContext {
public static final SlotContext EMPTY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ public List<Rule> buildRules() {
);

LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan, requiredExpr.stream()
.map(e -> result.exprRewriteMap.replaceAgg(e)).collect(Collectors.toSet()),
filter.getConjuncts());

return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
Expand Down Expand Up @@ -250,7 +252,9 @@ public List<Rule> buildRules() {
);

LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan, requiredExpr.stream()
.map(e -> result.exprRewriteMap.replaceAgg(e)).collect(Collectors.toSet()),
filter.getConjuncts());
if (result.indexId == scan.getTable().getBaseIndexId()) {
LogicalOlapScan mvPlanWithoutAgg = SelectMaterializedIndexWithoutAggregate.select(scan,
project::getInputSlots, filter::getConjuncts,
Expand Down Expand Up @@ -311,7 +315,9 @@ public List<Rule> buildRules() {
);

LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan, requiredExpr.stream()
.map(e -> result.exprRewriteMap.replaceAgg(e)).collect(Collectors.toSet()),
filter.getConjuncts());

List<NamedExpression> newProjectList = replaceProjectList(project,
result.exprRewriteMap.projectExprMap);
Expand Down Expand Up @@ -390,7 +396,9 @@ public List<Rule> buildRules() {
);

LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan, requiredExpr.stream()
.map(e -> result.exprRewriteMap.replaceAgg(e)).collect(Collectors.toSet()),
filter.getConjuncts());

return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
Expand Down Expand Up @@ -481,7 +489,9 @@ public List<Rule> buildRules() {
);

LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan, requiredExpr.stream()
.map(e -> result.exprRewriteMap.replaceAgg(e)).collect(Collectors.toSet()),
filter.getConjuncts());

List<NamedExpression> newProjectList = replaceProjectList(project,
result.exprRewriteMap.projectExprMap);
Expand Down Expand Up @@ -531,7 +541,9 @@ public List<Rule> buildRules() {
);

LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan, requiredExpr.stream()
.map(e -> result.exprRewriteMap.replaceAgg(e)).collect(Collectors.toSet()),
filter.getConjuncts());

List<NamedExpression> newProjectList = replaceProjectList(project,
result.exprRewriteMap.projectExprMap);
Expand Down
Loading

0 comments on commit 730f4fb

Please sign in to comment.