diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 395cdc16665d1c..bf87c338921323 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -101,7 +101,7 @@ statementBase (REFRESH refreshMethod? refreshTrigger?)? ((DUPLICATE)? KEY keys=identifierList)? (COMMENT STRING_LITERAL)? - (PARTITION BY LEFT_PAREN partitionKey = identifier RIGHT_PAREN)? + (PARTITION BY LEFT_PAREN mvPartition RIGHT_PAREN)? (DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM) (BUCKETS (INTEGER_VALUE | AUTO))?)? propertyClause? AS query #createMTMV @@ -225,6 +225,11 @@ refreshMethod : COMPLETE | AUTO ; +mvPartition + : partitionKey = identifier + | partitionExpr = functionCallExpression + ; + identifierOrStringLiteral : identifier | STRING_LITERAL diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index c76f1a253f2fe2..a3050bc3ac6b70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -30,6 +30,7 @@ import org.apache.doris.mtmv.MTMVJobManager; import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mtmv.MTMVPlanUtil; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; @@ -38,7 +39,6 @@ import org.apache.doris.mtmv.MTMVRefreshSnapshot; import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVStatus; -import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; @@ -51,7 +51,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; @@ -313,35 +312,6 @@ public Map generateMvPartitionDescs() { return result; } - /** - * generateRelatedPartitionDescs - *

- * Different partitions may generate the same PartitionKeyDesc through logical calculations - * (such as selecting only one column, or rolling up partitions), so it is a one to many relationship - * - * @return related PartitionKeyDesc ==> relatedPartitionIds - * @throws AnalysisException - */ - public Map> generateRelatedPartitionDescs() throws AnalysisException { - if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) { - return Maps.newHashMap(); - } - Map> res = new HashMap<>(); - int relatedColPos = mvPartitionInfo.getRelatedColPos(); - Map relatedPartitionItems = mvPartitionInfo.getRelatedTable() - .getPartitionItemsByTimeFilter(relatedColPos, - MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties)); - for (Entry entry : relatedPartitionItems.entrySet()) { - PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos); - if (res.containsKey(partitionKeyDesc)) { - res.get(partitionKeyDesc).add(entry.getKey()); - } else { - res.put(partitionKeyDesc, Sets.newHashSet(entry.getKey())); - } - } - return res; - } - /** * Calculate the partition and associated partition mapping relationship of the MTMV * It is the result of real-time comparison calculation, so there may be some costs, @@ -354,13 +324,19 @@ public Map> calculatePartitionMappings() throws AnalysisExceptio if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) { return Maps.newHashMap(); } + long start = System.currentTimeMillis(); Map> res = Maps.newHashMap(); - Map> relatedPartitionDescs = generateRelatedPartitionDescs(); + Map> relatedPartitionDescs = MTMVPartitionUtil + .generateRelatedPartitionDescs(mvPartitionInfo, mvProperties); Map mvPartitionItems = getAndCopyPartitionItems(); for (Entry entry : mvPartitionItems.entrySet()) { res.put(entry.getKey(), relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet())); } + if (LOG.isDebugEnabled()) { + LOG.debug("calculatePartitionMappings use [{}] mills, mvName is [{}]", + System.currentTimeMillis() - start, name); + } return res; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 9e872cf034c107..eeb94a30be7c2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -169,7 +169,7 @@ public void run() throws JobException { // Now, the MTMV first ensures consistency with the data in the cache. // To be completely consistent with hive, you need to manually refresh the cache // refreshHmsTable(); - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVPartitionUtil.alignMvPartition(mtmv); } Map> partitionMappings = mtmv.calculatePartitionMappings(); @@ -217,7 +217,7 @@ private void exec(ConnectContext ctx, Set refreshPartitionIds, lastQueryId = DebugUtil.printId(queryId); // if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand - .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE + .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE ? refreshPartitionIds : Sets.newHashSet(), tableWithPartKey); executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); ctx.setExecutor(executor); @@ -380,7 +380,7 @@ private void after() { private Map getIncrementalTableMap() throws AnalysisException { Map tableWithPartKey = Maps.newHashMap(); - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { tableWithPartKey .put(mtmv.getMvPartitionInfo().getRelatedTable(), mtmv.getMvPartitionInfo().getRelatedCol()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java new file mode 100644 index 00000000000000..e1991ab2921815 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.PartitionExprUtil; +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform; +import org.apache.doris.nereids.trees.expressions.literal.DateTimeV2Literal; +import org.apache.doris.nereids.trees.expressions.literal.DateV2Literal; +import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; +import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +public class MTMVPartitionExprDateTrunc implements MTMVPartitionExprService { + private static Set timeUnits = ImmutableSet.of("year", "month", "day"); + private String timeUnit; + + public MTMVPartitionExprDateTrunc(FunctionCallExpr functionCallExpr) throws AnalysisException { + List paramsExprs = functionCallExpr.getParams().exprs(); + if (paramsExprs.size() != 2) { + throw new AnalysisException("date_trunc params exprs size should be 2."); + } + Expr param = paramsExprs.get(1); + if (!(param instanceof StringLiteral)) { + throw new AnalysisException("date_trunc param of time unit is not string literal."); + } + this.timeUnit = param.getStringValue().toLowerCase(); + } + + @Override + public void analyze(MTMVPartitionInfo mvPartitionInfo) throws AnalysisException { + if (!timeUnits.contains(this.timeUnit)) { + throw new AnalysisException( + String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits)); + } + MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); + PartitionType partitionType = relatedTable.getPartitionType(); + if (partitionType == PartitionType.RANGE) { + Type partitionColumnType = MTMVPartitionUtil + .getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol()); + if (!partitionColumnType.isDateType()) { + throw new AnalysisException( + "partitionColumnType should be date/datetime " + + "when PartitionType is range and expr is date_trunc"); + } + } + } + + @Override + public String getRollUpIdentity(PartitionKeyDesc partitionKeyDesc, Map mvProperties) + throws AnalysisException { + String res = null; + Optional dateFormat = getDateFormat(mvProperties); + List> inValues = partitionKeyDesc.getInValues(); + for (int i = 0; i < inValues.size(); i++) { + // mtmv only support one partition column + PartitionValue partitionValue = inValues.get(i).get(0); + if (partitionValue.isNullPartition()) { + throw new AnalysisException("date trunc not support null partition value"); + } + String identity = dateTrunc(partitionValue.getStringValue(), dateFormat, false).toString(); + if (i == 0) { + res = identity; + } else { + if (!Objects.equals(res, identity)) { + throw new AnalysisException( + String.format("partition values not equal, res: %s, identity: %s", res, + identity)); + } + } + } + return res; + } + + private Optional getDateFormat(Map mvProperties) { + Optional dateFormat = + StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT)) + ? Optional.empty() + : Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT)); + return dateFormat; + } + + @Override + public PartitionKeyDesc generateRollUpPartitionKeyDesc(PartitionKeyDesc partitionKeyDesc, + MTMVPartitionInfo mvPartitionInfo) throws AnalysisException { + Type partitionColumnType = MTMVPartitionUtil + .getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol()); + // mtmv only support one partition column + Preconditions.checkState(partitionKeyDesc.getLowerValues().size() == 1, + "only support one partition column"); + DateTimeV2Literal beginTime = dateTrunc( + partitionKeyDesc.getLowerValues().get(0).getStringValue(), + Optional.empty(), false); + + PartitionValue lowerValue = new PartitionValue(dateTimeToStr(beginTime, partitionColumnType)); + PartitionValue upperValue = getUpperValue(partitionKeyDesc.getUpperValues().get(0), beginTime, + partitionColumnType); + return PartitionKeyDesc.createFixed( + Collections.singletonList(lowerValue), + Collections.singletonList(upperValue)); + } + + private PartitionValue getUpperValue(PartitionValue upperValue, DateTimeV2Literal beginTruncTime, + Type partitionColumnType) throws AnalysisException { + if (upperValue.isMax()) { + throw new AnalysisException("date trunc not support MAXVALUE partition"); + } + // begin time and end time dateTrunc should has same result + DateTimeV2Literal endTruncTime = dateTrunc(upperValue.getStringValue(), Optional.empty(), true); + if (!Objects.equals(beginTruncTime, endTruncTime)) { + throw new AnalysisException( + String.format("partition values not equal, beginTruncTime: %s, endTruncTime: %s", beginTruncTime, + endTruncTime)); + } + DateTimeV2Literal endTime = dateIncrement(beginTruncTime); + return new PartitionValue(dateTimeToStr(endTime, partitionColumnType)); + } + + private DateTimeV2Literal dateTrunc(String value, + Optional dateFormat, boolean isUpper) throws AnalysisException { + DateTimeV2Literal dateTimeLiteral = strToDate(value, dateFormat); + // for (2020-01-31,2020-02-01),if not -1, lower value and upper value will not same after rollup + if (isUpper) { + dateTimeLiteral = (DateTimeV2Literal) DateTimeArithmetic.secondsSub(dateTimeLiteral, new IntegerLiteral(1)); + } + Expression expression = DateTimeExtractAndTransform.dateTrunc(dateTimeLiteral, new VarcharLiteral(timeUnit)); + if (!(expression instanceof DateTimeV2Literal)) { + throw new AnalysisException("dateTrunc() should return DateLiteral, expression: " + expression); + } + return (DateTimeV2Literal) expression; + } + + private DateTimeV2Literal strToDate(String value, + Optional dateFormat) throws AnalysisException { + try { + return new DateTimeV2Literal(value); + } catch (Exception e) { + if (!dateFormat.isPresent()) { + throw e; + } + Expression strToDate = DateTimeExtractAndTransform + .strToDate(new VarcharLiteral(value), + new VarcharLiteral(dateFormat.get())); + if (strToDate instanceof DateV2Literal) { + DateV2Literal dateV2Literal = (DateV2Literal) strToDate; + return new DateTimeV2Literal(dateV2Literal.getYear(), dateV2Literal.getMonth(), dateV2Literal.getDay(), + 0, 0, 0); + } else if (strToDate instanceof DateTimeV2Literal) { + return (DateTimeV2Literal) strToDate; + } else { + throw new AnalysisException( + String.format("strToDate failed, stringValue: %s, dateFormat: %s", value, + dateFormat)); + } + } + } + + private DateTimeV2Literal dateIncrement(DateTimeV2Literal value) throws AnalysisException { + Expression result; + switch (timeUnit) { + case "year": + result = value.plusYears(1L); + break; + case "month": + result = value.plusMonths(1L); + break; + case "day": + result = value.plusDays(1L); + break; + default: + throw new AnalysisException("MTMV partition roll up not support timeUnit: " + timeUnit); + } + if (!(result instanceof DateTimeV2Literal)) { + throw new AnalysisException("sub() should return DateTimeLiteral, result: " + result); + } + return (DateTimeV2Literal) result; + } + + private String dateTimeToStr(DateTimeV2Literal literal, + Type partitionColumnType) throws AnalysisException { + if (partitionColumnType.isDate() || partitionColumnType.isDateV2()) { + return String.format(PartitionExprUtil.DATE_FORMATTER, literal.getYear(), literal.getMonth(), + literal.getDay()); + } else if (partitionColumnType.isDatetime() || partitionColumnType.isDatetimeV2()) { + return String.format(PartitionExprUtil.DATETIME_FORMATTER, + literal.getYear(), literal.getMonth(), literal.getDay(), + literal.getHour(), literal.getMinute(), literal.getSecond()); + } else { + throw new AnalysisException( + "MTMV not support partition with column type : " + partitionColumnType); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprFactory.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprFactory.java new file mode 100644 index 00000000000000..0fc9a067fad88e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprFactory.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.common.AnalysisException; + +/** + * MTMV Partition Expr Factory + */ +public class MTMVPartitionExprFactory { + public static MTMVPartitionExprService getExprService(Expr expr) throws AnalysisException { + if (!(expr instanceof FunctionCallExpr)) { + throw new AnalysisException("now mtmv partition only support FunctionCallExpr"); + } + FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr; + String fnName = functionCallExpr.getFnName().getFunction().toLowerCase(); + if ("date_trunc".equals(fnName)) { + return new MTMVPartitionExprDateTrunc(functionCallExpr); + } + throw new AnalysisException("MTMV partition not support function name: " + fnName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprService.java new file mode 100644 index 00000000000000..e6974343ef2444 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprService.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.common.AnalysisException; + +import java.util.Map; + +/** + * Interface for materialized view partitioning function + */ +public interface MTMVPartitionExprService { + + /** + * for list partition, get identity by expr + * + * @param partitionKeyDesc + * @param mvProperties + * @return + * @throws AnalysisException + */ + String getRollUpIdentity(PartitionKeyDesc partitionKeyDesc, Map mvProperties) + throws AnalysisException; + + /** + * for range partition, get roll up PartitionKeyDesc by expr + * + * @param partitionKeyDesc + * @param mvPartitionInfo + * @return + * @throws AnalysisException + */ + PartitionKeyDesc generateRollUpPartitionKeyDesc( + PartitionKeyDesc partitionKeyDesc, MTMVPartitionInfo mvPartitionInfo) + throws AnalysisException; + + /** + * Check if user input is legal + * + * @param mtmvPartitionInfo + * @throws AnalysisException + */ + void analyze(MTMVPartitionInfo mtmvPartitionInfo) throws AnalysisException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java index 348f6e4de99671..7ca1b7e3e63748 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java @@ -17,10 +17,16 @@ package org.apache.doris.mtmv; +import org.apache.doris.analysis.Expr; +import org.apache.doris.catalog.Column; import org.apache.doris.common.AnalysisException; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; import com.google.gson.annotations.SerializedName; +import java.util.List; + /** * MTMVPartitionInfo */ @@ -28,17 +34,24 @@ public class MTMVPartitionInfo { public enum MTMVPartitionType { FOLLOW_BASE_TABLE, + EXPR, SELF_MANAGE } + public static final ImmutableSet MTMV_PARTITION_FUNCTIONS = new ImmutableSortedSet.Builder( + String.CASE_INSENSITIVE_ORDER).add("date_trunc") + .build(); + @SerializedName("pt") - MTMVPartitionType partitionType; + private MTMVPartitionType partitionType; @SerializedName("rt") - BaseTableInfo relatedTable; + private BaseTableInfo relatedTable; @SerializedName("rc") - String relatedCol; + private String relatedCol; @SerializedName("pc") - String partitionCol; + private String partitionCol; + @SerializedName("expr") + private Expr expr; public MTMVPartitionInfo() { } @@ -89,6 +102,14 @@ public void setPartitionCol(String partitionCol) { this.partitionCol = partitionCol; } + public Expr getExpr() { + return expr; + } + + public void setExpr(Expr expr) { + this.expr = expr; + } + /** * Get the position of relatedCol in the relatedTable partition column * @@ -99,7 +120,15 @@ public int getRelatedColPos() throws AnalysisException { if (partitionType == MTMVPartitionType.SELF_MANAGE) { throw new AnalysisException("partitionType is: " + partitionType); } - return MTMVPartitionUtil.getPos(getRelatedTable(), relatedCol); + List partitionColumns = getRelatedTable().getPartitionColumns(); + for (int i = 0; i < partitionColumns.size(); i++) { + if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) { + return i; + } + } + throw new AnalysisException( + String.format("getRelatedColPos error, relatedCol: %s, partitionColumns: %s", relatedCol, + partitionColumns)); } // toString() is not easy to find where to call the method diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index b80c5fc283f448..cd0312c419e814 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -27,12 +27,13 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -56,6 +57,18 @@ public class MTMVPartitionUtil { private static final Pattern PARTITION_NAME_PATTERN = Pattern.compile("[^a-zA-Z0-9,]"); private static final String PARTITION_NAME_PREFIX = "p_"; + private static final List partitionDescGenerators = ImmutableList + .of( + // It is necessary to maintain this order, + // because some impl deal `PartitionItem`, and some impl deal `PartitionDesc` + // for example: if `MTMVRelatedPartitionDescOnePartitionColGenerator` not generate `PartitionDesc`, + // `MTMVRelatedPartitionDescRollUpGenerator` will not have parameter + new MTMVRelatedPartitionDescInitGenerator(), + new MTMVRelatedPartitionDescSyncLimitGenerator(), + new MTMVRelatedPartitionDescOnePartitionColGenerator(), + new MTMVRelatedPartitionDescRollUpGenerator() + ); + /** * Determine whether the partition is sync with retated partition and other baseTables * @@ -71,7 +84,7 @@ public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set Set tables, Set excludedTriggerTables) throws AnalysisException { boolean isSyncWithPartition = true; - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); // if follow base table, not need compare with related table, only should compare with related partition excludedTriggerTables.add(relatedTable.getName()); @@ -96,7 +109,8 @@ public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set public static void alignMvPartition(MTMV mtmv) throws DdlException, AnalysisException { Map mtmvPartitionDescs = mtmv.generateMvPartitionDescs(); - Set relatedPartitionDescs = mtmv.generateRelatedPartitionDescs().keySet(); + Set relatedPartitionDescs = generateRelatedPartitionDescs(mtmv.getMvPartitionInfo(), + mtmv.getMvProperties()).keySet(); // drop partition of mtmv for (Entry entry : mtmvPartitionDescs.entrySet()) { if (!relatedPartitionDescs.contains(entry.getValue())) { @@ -115,19 +129,18 @@ public static void alignMvPartition(MTMV mtmv) /** * getPartitionDescsByRelatedTable when create MTMV * - * @param relatedTable * @param tableProperties - * @param relatedCol + * @param mvPartitionInfo * @return * @throws AnalysisException */ - public static List getPartitionDescsByRelatedTable(MTMVRelatedTableIf relatedTable, - Map tableProperties, String relatedCol, Map mvProperties) + public static List getPartitionDescsByRelatedTable( + Map tableProperties, MTMVPartitionInfo mvPartitionInfo, Map mvProperties) throws AnalysisException { - HashMap partitionProperties = Maps.newHashMap(); List res = Lists.newArrayList(); - Set relatedPartitionDescs = getRelatedPartitionDescs(relatedTable, relatedCol, - MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties)); + HashMap partitionProperties = Maps.newHashMap(); + Set relatedPartitionDescs = generateRelatedPartitionDescs(mvPartitionInfo, mvProperties) + .keySet(); for (PartitionKeyDesc partitionKeyDesc : relatedPartitionDescs) { SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true, generatePartitionName(partitionKeyDesc), @@ -139,28 +152,18 @@ public static List getPartitionDescsByRelatedTable(MTMVRelated return res; } - private static Set getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol, - MTMVPartitionSyncConfig config) - throws AnalysisException { - int pos = getPos(relatedTable, relatedCol); - Set res = Sets.newHashSet(); - for (Entry entry : relatedTable.getPartitionItemsByTimeFilter(pos, config).entrySet()) { - PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(pos); - res.add(partitionKeyDesc); + public static Map> generateRelatedPartitionDescs(MTMVPartitionInfo mvPartitionInfo, + Map mvProperties) throws AnalysisException { + long start = System.currentTimeMillis(); + RelatedPartitionDescResult result = new RelatedPartitionDescResult(); + for (MTMVRelatedPartitionDescGeneratorService service : partitionDescGenerators) { + service.apply(mvPartitionInfo, mvProperties, result); } - return res; - } - - public static int getPos(MTMVRelatedTableIf relatedTable, String relatedCol) throws AnalysisException { - List partitionColumns = relatedTable.getPartitionColumns(); - for (int i = 0; i < partitionColumns.size(); i++) { - if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) { - return i; - } + if (LOG.isDebugEnabled()) { + LOG.debug("generateRelatedPartitionDescs use [{}] mills, mvPartitionInfo is [{}]", + System.currentTimeMillis() - start, mvPartitionInfo); } - throw new AnalysisException( - String.format("getRelatedColPos error, relatedCol: %s, partitionColumns: %s", relatedCol, - partitionColumns)); + return result.getDescs(); } public static List getPartitionNamesByIds(MTMV mtmv, Collection ids) throws AnalysisException { @@ -258,7 +261,7 @@ private static List getPartitionUnSyncTables(MTMV mtmv, Long partitionId if (!mtmvRelatedTableIf.needAutoRefresh()) { continue; } - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv + if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE && mtmv .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { if (CollectionUtils.isEmpty(relatedPartitionIds)) { throw new AnalysisException("can not found related partition"); @@ -469,7 +472,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, Set baseTables, Set relatedPartitionIds) throws AnalysisException { MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot(); - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); for (Long relatedPartitionId : relatedPartitionIds) { MTMVSnapshotIf partitionSnapshot = relatedTable @@ -479,7 +482,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, } } for (BaseTableInfo baseTableInfo : baseTables) { - if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv + if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE && mtmv .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { continue; } @@ -491,4 +494,14 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, } return refreshPartitionSnapshot; } + + public static Type getPartitionColumnType(MTMVRelatedTableIf relatedTable, String col) throws AnalysisException { + List partitionColumns = relatedTable.getPartitionColumns(); + for (Column column : partitionColumns) { + if (column.getName().equals(col)) { + return column.getType(); + } + } + throw new AnalysisException("can not getPartitionColumnType by:" + col); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescGeneratorService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescGeneratorService.java new file mode 100644 index 00000000000000..09d85576b5cba4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescGeneratorService.java @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.common.AnalysisException; + +import java.util.Map; + +/** + * Interface for a series of processes to generate PartitionDesc + */ +public interface MTMVRelatedPartitionDescGeneratorService { + /** + * generate related table PartitionDesc + * + * @param mvPartitionInfo PartitionInfo of MTMV + * @param mvProperties properties of MTMV + * @param lastResult the processing result of the previous process + * @throws AnalysisException + */ + void apply(MTMVPartitionInfo mvPartitionInfo, Map mvProperties, + RelatedPartitionDescResult lastResult) throws AnalysisException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java new file mode 100644 index 00000000000000..13b58239376116 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.common.AnalysisException; + +import java.util.Map; + +/** + * get all related partition descs + */ +public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartitionDescGeneratorService { + + @Override + public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvProperties, + RelatedPartitionDescResult lastResult) throws AnalysisException { + lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescOnePartitionColGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescOnePartitionColGenerator.java new file mode 100644 index 00000000000000..ab14f302e756b8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescOnePartitionColGenerator.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +/** + * For example, the related table is partitioned by `date` and `region`, with the following 6 partitions + *

+ * 20200101 beijing + * 20200101 shanghai + * 20200102 beijing + * 20200102 shanghai + * 20200103 beijing + * 20200103 shanghai + *

+ * If the MTMV is partitioned by `date`, then the MTMV will have three partitions: 20200101, 202000102, 20200103 + *

+ * If the MTMV is partitioned by `region`, then the MTMV will have two partitions: beijing, shanghai + */ +public class MTMVRelatedPartitionDescOnePartitionColGenerator implements MTMVRelatedPartitionDescGeneratorService { + + @Override + public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvProperties, + RelatedPartitionDescResult lastResult) throws AnalysisException { + if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) { + return; + } + Map> res = Maps.newHashMap(); + Map relatedPartitionItems = lastResult.getItems(); + int relatedColPos = mvPartitionInfo.getRelatedColPos(); + for (Entry entry : relatedPartitionItems.entrySet()) { + PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos); + if (res.containsKey(partitionKeyDesc)) { + res.get(partitionKeyDesc).add(entry.getKey()); + } else { + res.put(partitionKeyDesc, Sets.newHashSet(entry.getKey())); + } + } + lastResult.setDescs(res); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java new file mode 100644 index 00000000000000..e9b4b1fe6a5262 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +/** + * Roll up some partitions into one partition + */ +public class MTMVRelatedPartitionDescRollUpGenerator implements MTMVRelatedPartitionDescGeneratorService { + + @Override + public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvProperties, + RelatedPartitionDescResult lastResult) throws AnalysisException { + if (mvPartitionInfo.getPartitionType() != MTMVPartitionType.EXPR) { + return; + } + MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); + PartitionType partitionType = relatedTable.getPartitionType(); + if (partitionType == PartitionType.RANGE) { + lastResult.setDescs(rollUpRange(lastResult.getDescs(), mvPartitionInfo)); + } else if (partitionType == PartitionType.LIST) { + lastResult.setDescs(rollUpList(lastResult.getDescs(), mvPartitionInfo, mvProperties)); + } else { + throw new AnalysisException("only RANGE/LIST partition support roll up"); + } + } + + /** + * when related table has 3 partitions:(20200101),(20200102),(20200201) + *

+ * if expr is `date_trunc(month)` + * then,MTMV will have 2 partitions (20200101,20200102),(20200201) + *

+ * if expr is `date_trunc(year)` + * then,MTMV will have 1 partitions (20200101,20200102,20200201) + * + * @param relatedPartitionDescs + * @param mvPartitionInfo + * @return + * @throws AnalysisException + */ + public Map> rollUpList(Map> relatedPartitionDescs, + MTMVPartitionInfo mvPartitionInfo, Map mvProperties) throws AnalysisException { + Map> identityToValues = Maps.newHashMap(); + Map> identityToPartitionIds = Maps.newHashMap(); + MTMVPartitionExprService exprSerice = MTMVPartitionExprFactory.getExprService(mvPartitionInfo.getExpr()); + + for (Entry> entry : relatedPartitionDescs.entrySet()) { + String rollUpIdentity = exprSerice.getRollUpIdentity(entry.getKey(), mvProperties); + Preconditions.checkNotNull(rollUpIdentity); + if (identityToValues.containsKey(rollUpIdentity)) { + identityToValues.get(rollUpIdentity).addAll(getStringValues(entry.getKey())); + identityToPartitionIds.get(rollUpIdentity).addAll(entry.getValue()); + } else { + identityToValues.put(rollUpIdentity, getStringValues(entry.getKey())); + identityToPartitionIds.put(rollUpIdentity, entry.getValue()); + } + } + Map> result = Maps.newHashMap(); + for (Entry> entry : identityToValues.entrySet()) { + result.put(PartitionKeyDesc.createIn(getPartitionValues(entry.getValue())), + identityToPartitionIds.get(entry.getKey())); + } + return result; + } + + private List> getPartitionValues(Set strings) { + List> inValues = Lists.newArrayList(); + for (String value : strings) { + inValues.add(Lists.newArrayList(new PartitionValue(value))); + } + return inValues; + } + + private Set getStringValues(PartitionKeyDesc partitionKeyDesc) { + List> inValues = partitionKeyDesc.getInValues(); + Set res = Sets.newHashSet(); + for (List list : inValues) { + res.add(list.get(0).getStringValue()); + } + return res; + } + + /** + * when related table has 3 partitions:(20200101-20200102),(20200102-20200103),(20200201-20200202) + *

+ * if expr is `date_trunc(month)` + * then,MTMV will have 2 partitions (20200101-20200201),(20200101-20200301) + *

+ * if expr is `date_trunc(year)` + * then,MTMV will have 1 partitions (20200101-20210101) + * + * @param relatedPartitionDescs + * @param mvPartitionInfo + * @return + * @throws AnalysisException + */ + public Map> rollUpRange(Map> relatedPartitionDescs, + MTMVPartitionInfo mvPartitionInfo) throws AnalysisException { + Map> result = Maps.newHashMap(); + MTMVPartitionExprService exprSerice = MTMVPartitionExprFactory.getExprService(mvPartitionInfo.getExpr()); + for (Entry> entry : relatedPartitionDescs.entrySet()) { + PartitionKeyDesc rollUpDesc = exprSerice.generateRollUpPartitionKeyDesc(entry.getKey(), mvPartitionInfo); + if (result.containsKey(rollUpDesc)) { + result.get(rollUpDesc).addAll(entry.getValue()); + } else { + result.put(rollUpDesc, entry.getValue()); + } + } + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGenerator.java new file mode 100644 index 00000000000000..e031071192e308 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGenerator.java @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform; +import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; +import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; +import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; + +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; + +/** + * Only focus on partial partitions of related tables + */ +public class MTMVRelatedPartitionDescSyncLimitGenerator implements MTMVRelatedPartitionDescGeneratorService { + + @Override + public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvProperties, + RelatedPartitionDescResult lastResult) throws AnalysisException { + Map partitionItems = lastResult.getItems(); + MTMVPartitionSyncConfig config = generateMTMVPartitionSyncConfigByProperties(mvProperties); + if (config.getSyncLimit() <= 0) { + return; + } + long nowTruncSubSec = getNowTruncSubSec(config.getTimeUnit(), config.getSyncLimit()); + Optional dateFormat = config.getDateFormat(); + Map res = Maps.newHashMap(); + int relatedColPos = mvPartitionInfo.getRelatedColPos(); + for (Entry entry : partitionItems.entrySet()) { + if (entry.getValue().isGreaterThanSpecifiedTime(relatedColPos, dateFormat, nowTruncSubSec)) { + res.put(entry.getKey(), entry.getValue()); + } + } + lastResult.setItems(res); + } + + /** + * Generate MTMVPartitionSyncConfig based on mvProperties + * + * @param mvProperties + * @return + */ + public MTMVPartitionSyncConfig generateMTMVPartitionSyncConfigByProperties( + Map mvProperties) { + int syncLimit = StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT)) ? -1 + : Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT)); + MTMVPartitionSyncTimeUnit timeUnit = + StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT)) + ? MTMVPartitionSyncTimeUnit.DAY : MTMVPartitionSyncTimeUnit + .valueOf(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT).toUpperCase()); + Optional dateFormat = + StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT)) + ? Optional.empty() + : Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT)); + return new MTMVPartitionSyncConfig(syncLimit, timeUnit, dateFormat); + } + + /** + * Obtain the minimum second from `syncLimit` `timeUnit` ago + * + * @param timeUnit + * @param syncLimit + * @return + * @throws AnalysisException + */ + public long getNowTruncSubSec(MTMVPartitionSyncTimeUnit timeUnit, int syncLimit) + throws AnalysisException { + if (syncLimit < 1) { + throw new AnalysisException("Unexpected syncLimit, syncLimit: " + syncLimit); + } + // get current time + Expression now = DateTimeAcquire.now(); + if (!(now instanceof DateTimeLiteral)) { + throw new AnalysisException("now() should return DateTimeLiteral, now: " + now); + } + DateTimeLiteral nowLiteral = (DateTimeLiteral) now; + // date trunc + now = DateTimeExtractAndTransform + .dateTrunc(nowLiteral, new VarcharLiteral(timeUnit.name())); + if (!(now instanceof DateTimeLiteral)) { + throw new AnalysisException("dateTrunc() should return DateTimeLiteral, now: " + now); + } + nowLiteral = (DateTimeLiteral) now; + // date sub + if (syncLimit > 1) { + nowLiteral = dateSub(nowLiteral, timeUnit, syncLimit - 1); + } + return ((IntegerLiteral) DateTimeExtractAndTransform.unixTimestamp(nowLiteral)).getValue(); + } + + + private DateTimeLiteral dateSub( + org.apache.doris.nereids.trees.expressions.literal.DateLiteral date, MTMVPartitionSyncTimeUnit timeUnit, + int num) + throws AnalysisException { + IntegerLiteral integerLiteral = new IntegerLiteral(num); + Expression result; + switch (timeUnit) { + case DAY: + result = DateTimeArithmetic.dateSub(date, integerLiteral); + break; + case YEAR: + result = DateTimeArithmetic.yearsSub(date, integerLiteral); + break; + case MONTH: + result = DateTimeArithmetic.monthsSub(date, integerLiteral); + break; + default: + throw new AnalysisException("MTMV partition limit not support timeUnit: " + timeUnit.name()); + } + if (!(result instanceof DateTimeLiteral)) { + throw new AnalysisException("sub() should return DateTimeLiteral, result: " + result); + } + return (DateTimeLiteral) result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 1adfe315a8baa0..ec99a04d73f914 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -24,12 +24,8 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import com.google.common.collect.Maps; - import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; /** @@ -44,31 +40,6 @@ public interface MTMVRelatedTableIf extends TableIf { */ Map getAndCopyPartitionItems(); - /** - * Obtain a list of partitions filtered by time - * - * @param pos The position of the partition column to be checked in all partition columns - * @param config - * @return - * @throws AnalysisException - */ - default Map getPartitionItemsByTimeFilter(int pos, MTMVPartitionSyncConfig config) - throws AnalysisException { - Map partitionItems = getAndCopyPartitionItems(); - if (config.getSyncLimit() <= 0) { - return partitionItems; - } - long nowTruncSubSec = MTMVUtil.getNowTruncSubSec(config.getTimeUnit(), config.getSyncLimit()); - Optional dateFormat = config.getDateFormat(); - Map res = Maps.newHashMap(); - for (Entry entry : partitionItems.entrySet()) { - if (entry.getValue().isGreaterThanSpecifiedTime(pos, dateFormat, nowTruncSubSec)) { - res.put(entry.getKey(), entry.getValue()); - } - } - return res; - } - /** * getPartitionType LIST/RANGE/UNPARTITIONED * diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 01033a9615a45d..ddbe763fdfb845 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -25,22 +25,15 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire; -import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic; import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform; -import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; import org.apache.doris.nereids.trees.expressions.literal.DateTimeV2Literal; import org.apache.doris.nereids.trees.expressions.literal.DateV2Literal; import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; -import org.apache.commons.lang3.StringUtils; - -import java.util.Map; import java.util.Optional; import java.util.Set; @@ -61,6 +54,20 @@ public static TableIf getTable(BaseTableInfo baseTableInfo) throws AnalysisExcep return table; } + public static MTMVRelatedTableIf getRelatedTable(BaseTableInfo baseTableInfo) { + TableIf relatedTable = null; + try { + relatedTable = MTMVUtil.getTable(baseTableInfo); + } catch (org.apache.doris.common.AnalysisException e) { + throw new org.apache.doris.nereids.exceptions.AnalysisException(e.getMessage(), e); + } + if (!(relatedTable instanceof MTMVRelatedTableIf)) { + throw new org.apache.doris.nereids.exceptions.AnalysisException( + "base table for partitioning only can be OlapTable or HMSTable"); + } + return (MTMVRelatedTableIf) relatedTable; + } + public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotFoundException { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); @@ -82,64 +89,6 @@ public static boolean mtmvContainsExternalTable(MTMV mtmv) { return false; } - /** - * Obtain the minimum second from `syncLimit` `timeUnit` ago - * - * @param timeUnit - * @param syncLimit - * @return - * @throws AnalysisException - */ - public static long getNowTruncSubSec(MTMVPartitionSyncTimeUnit timeUnit, int syncLimit) - throws AnalysisException { - if (syncLimit < 1) { - throw new AnalysisException("Unexpected syncLimit, syncLimit: " + syncLimit); - } - // get current time - Expression now = DateTimeAcquire.now(); - if (!(now instanceof DateTimeLiteral)) { - throw new AnalysisException("now() should return DateTimeLiteral, now: " + now); - } - DateTimeLiteral nowLiteral = (DateTimeLiteral) now; - // date trunc - now = DateTimeExtractAndTransform - .dateTrunc(nowLiteral, new VarcharLiteral(timeUnit.name())); - if (!(now instanceof DateTimeLiteral)) { - throw new AnalysisException("dateTrunc() should return DateTimeLiteral, now: " + now); - } - nowLiteral = (DateTimeLiteral) now; - // date sub - if (syncLimit > 1) { - nowLiteral = dateSub(nowLiteral, timeUnit, syncLimit - 1); - } - return ((IntegerLiteral) DateTimeExtractAndTransform.unixTimestamp(nowLiteral)).getValue(); - } - - private static DateTimeLiteral dateSub( - org.apache.doris.nereids.trees.expressions.literal.DateLiteral date, MTMVPartitionSyncTimeUnit timeUnit, - int num) - throws AnalysisException { - IntegerLiteral integerLiteral = new IntegerLiteral(num); - Expression result; - switch (timeUnit) { - case DAY: - result = DateTimeArithmetic.dateSub(date, integerLiteral); - break; - case YEAR: - result = DateTimeArithmetic.yearsSub(date, integerLiteral); - break; - case MONTH: - result = DateTimeArithmetic.monthsSub(date, integerLiteral); - break; - default: - throw new AnalysisException("MTMV partition limit not support timeUnit: " + timeUnit.name()); - } - if (!(result instanceof DateTimeLiteral)) { - throw new AnalysisException("sub() should return DateTimeLiteral, result: " + result); - } - return (DateTimeLiteral) result; - } - /** * Convert LiteralExpr to second * @@ -177,25 +126,4 @@ public static long getExprTimeSec(org.apache.doris.analysis.LiteralExpr expr, Op expr.getStringValue(), dateFormat)); } } - - /** - * Generate MTMVPartitionSyncConfig based on mvProperties - * - * @param mvProperties - * @return - */ - public static MTMVPartitionSyncConfig generateMTMVPartitionSyncConfigByProperties( - Map mvProperties) { - int syncLimit = StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT)) ? -1 - : Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT)); - MTMVPartitionSyncTimeUnit timeUnit = - StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT)) - ? MTMVPartitionSyncTimeUnit.DAY : MTMVPartitionSyncTimeUnit - .valueOf(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT).toUpperCase()); - Optional dateFormat = - StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT)) - ? Optional.empty() - : Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT)); - return new MTMVPartitionSyncConfig(syncLimit, timeUnit, dateFormat); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/RelatedPartitionDescResult.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/RelatedPartitionDescResult.java new file mode 100644 index 00000000000000..068cf1522a71d1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/RelatedPartitionDescResult.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.Set; + +public class RelatedPartitionDescResult { + // PartitionKeyDesc to relatedTable partition ids(Different partitions may have the same PartitionKeyDesc) + private Map> descs; + private Map items; + + public RelatedPartitionDescResult() { + this.descs = Maps.newHashMap(); + this.items = Maps.newHashMap(); + } + + public Map> getDescs() { + return descs; + } + + public void setDescs(Map> descs) { + this.descs = descs; + } + + public Map getItems() { + return items; + } + + public void setItems(Map items) { + this.items = items; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index ec295724f4f4fe..4f24f766c6103c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -33,7 +33,6 @@ import org.apache.doris.common.Pair; import org.apache.doris.job.common.IntervalUnit; import org.apache.doris.load.loadv2.LoadTask; -import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; @@ -127,6 +126,7 @@ import org.apache.doris.nereids.DorisParser.MapLiteralContext; import org.apache.doris.nereids.DorisParser.MultiStatementsContext; import org.apache.doris.nereids.DorisParser.MultipartIdentifierContext; +import org.apache.doris.nereids.DorisParser.MvPartitionContext; import org.apache.doris.nereids.DorisParser.NamedExpressionContext; import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext; import org.apache.doris.nereids.DorisParser.NullLiteralContext; @@ -407,6 +407,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.InPartition; import org.apache.doris.nereids.trees.plans.commands.info.IndexDefinition; import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition; +import org.apache.doris.nereids.trees.plans.commands.info.MTMVPartitionDefinition; import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition; import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition.MaxValue; import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; @@ -638,22 +639,29 @@ public CreateMTMVCommand visitCreateMTMV(CreateMTMVContext ctx) { desc, properties, logicalPlan, querySql, new MTMVRefreshInfo(buildMode, refreshMethod, refreshTriggerInfo), ctx.cols == null ? Lists.newArrayList() : visitSimpleColumnDefs(ctx.cols), - visitMTMVPartitionInfo(ctx.partitionKey) + visitMTMVPartitionInfo(ctx.mvPartition()) )); } /** - * get MTMVPartitionInfo + * get MTMVPartitionDefinition * - * @param ctx IdentifierContext - * @return MTMVPartitionInfo + * @param ctx MvPartitionContext + * @return MTMVPartitionDefinition */ - public MTMVPartitionInfo visitMTMVPartitionInfo(IdentifierContext ctx) { + public MTMVPartitionDefinition visitMTMVPartitionInfo(MvPartitionContext ctx) { + MTMVPartitionDefinition mtmvPartitionDefinition = new MTMVPartitionDefinition(); if (ctx == null) { - return new MTMVPartitionInfo(MTMVPartitionType.SELF_MANAGE); + mtmvPartitionDefinition.setPartitionType(MTMVPartitionType.SELF_MANAGE); + } else if (ctx.partitionKey != null) { + mtmvPartitionDefinition.setPartitionType(MTMVPartitionType.FOLLOW_BASE_TABLE); + mtmvPartitionDefinition.setPartitionCol(ctx.partitionKey.getText()); } else { - return new MTMVPartitionInfo(MTMVPartitionType.FOLLOW_BASE_TABLE, ctx.getText()); + mtmvPartitionDefinition.setPartitionType(MTMVPartitionType.EXPR); + Expression functionCallExpression = visitFunctionCallExpression(ctx.partitionExpr); + mtmvPartitionDefinition.setFunctionCallExpression(functionCallExpression); } + return mtmvPartitionDefinition; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index b56265b999a198..7d86ff9a8b1f9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -30,10 +30,8 @@ import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; -import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeNameFormat; -import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.mtmv.EnvInfo; import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; @@ -45,14 +43,12 @@ import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils; -import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo; import org.apache.doris.nereids.trees.TreeNode; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; @@ -78,7 +74,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; @@ -103,9 +98,10 @@ public class CreateMTMVInfo { private final List columns = Lists.newArrayList(); private final List simpleColumnDefinitions; private final EnvInfo envInfo; - private final MTMVPartitionInfo mvPartitionInfo; + private final MTMVPartitionDefinition mvPartitionDefinition; private PartitionDesc partitionDesc; private MTMVRelation relation; + private MTMVPartitionInfo mvPartitionInfo; /** * constructor for create MTMV @@ -116,7 +112,7 @@ public CreateMTMVInfo(boolean ifNotExists, TableNameInfo mvName, LogicalPlan logicalQuery, String querySql, MTMVRefreshInfo refreshInfo, List simpleColumnDefinitions, - MTMVPartitionInfo mvPartitionInfo) { + MTMVPartitionDefinition mvPartitionDefinition) { this.ifNotExists = Objects.requireNonNull(ifNotExists, "require ifNotExists object"); this.mvName = Objects.requireNonNull(mvName, "require mvName object"); this.keys = Utils.copyRequiredList(keys); @@ -130,8 +126,8 @@ public CreateMTMVInfo(boolean ifNotExists, TableNameInfo mvName, .requireNonNull(simpleColumnDefinitions, "require simpleColumnDefinitions object"); this.envInfo = new EnvInfo(ConnectContext.get().getCurrentCatalog().getId(), ConnectContext.get().getCurrentDbId()); - this.mvPartitionInfo = Objects - .requireNonNull(mvPartitionInfo, "require mtmvPartitionInfo object"); + this.mvPartitionDefinition = Objects + .requireNonNull(mvPartitionDefinition, "require mtmvPartitionInfo object"); } /** @@ -212,7 +208,9 @@ public void analyzeQuery(ConnectContext ctx) { } getRelation(planner); getColumns(plan); - analyzePartition(planner, ctx); + this.mvPartitionInfo = mvPartitionDefinition + .analyzeAndTransferToMTMVPartitionInfo(planner, ctx, logicalQuery); + this.partitionDesc = generatePartitionDesc(ctx); } private void getRelation(NereidsPlanner planner) { @@ -235,67 +233,17 @@ private void getRelation(NereidsPlanner planner) { this.relation = MTMVPlanUtil.generateMTMVRelation(plan); } - private void analyzePartition(NereidsPlanner planner, ConnectContext ctx) { - if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { - - CascadesContext cascadesContext = planner.getCascadesContext(); - SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable(); - Set tempDisableRules = sessionVariable.getDisableNereidsRuleNames(); - // Should not make table without data to empty relation when analyze the related table, - // so add disable rules - sessionVariable.setDisableNereidsRules(MTMV_PLANER_DISABLE_RULES); - cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES); - try { - Plan mvRewrittenPlan = - planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN); - Optional relatedTableInfo = MaterializedViewUtils - .getRelatedTableInfo(mvPartitionInfo.getPartitionCol(), mvRewrittenPlan); - if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) { - throw new AnalysisException("Unable to find a suitable base table for partitioning"); - } - TableIf relatedTable = null; - try { - relatedTable = MTMVUtil.getTable(relatedTableInfo.get().getTableInfo()); - } catch (org.apache.doris.common.AnalysisException e) { - throw new AnalysisException(e.getMessage(), e); - } - if (!(relatedTable instanceof MTMVRelatedTableIf)) { - throw new AnalysisException("base table for partitioning only can be OlapTable or HMSTable"); - } - MTMVRelatedTableIf mtmvBaseRealtedTable = (MTMVRelatedTableIf) relatedTable; - Set partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); - try { - partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames()); - } catch (DdlException e) { - throw new AnalysisException(e.getMessage(), e); - } - - if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) { - throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn()); - } - if (!(mtmvBaseRealtedTable instanceof HMSExternalTable) - && partitionColumnNames.size() != 1) { - throw new AnalysisException("only hms table support multi column partition."); - } - mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo()); - mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn()); - partitionDesc = generatePartitionDesc(mtmvBaseRealtedTable, ctx); - } finally { - // after operate, roll back the disable rules - sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules)); - cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES); - } + private PartitionDesc generatePartitionDesc(ConnectContext ctx) { + if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) { + return null; } - } - - private PartitionDesc generatePartitionDesc(MTMVRelatedTableIf relatedTable, ConnectContext ctx) { + MTMVRelatedTableIf relatedTable = MTMVUtil.getRelatedTable(mvPartitionInfo.getRelatedTableInfo()); List allPartitionDescs = null; try { allPartitionDescs = MTMVPartitionUtil - .getPartitionDescsByRelatedTable(relatedTable, properties, mvPartitionInfo.getRelatedCol(), - mvProperties); + .getPartitionDescsByRelatedTable(properties, mvPartitionInfo, mvProperties); } catch (org.apache.doris.common.AnalysisException e) { - throw new AnalysisException("getPartitionDescsByRelatedTable failed", e); + throw new AnalysisException(e.getMessage(), e); } if (allPartitionDescs.size() > ctx.getSessionVariable().getCreateTablePartitionMaxNum()) { throw new AnalysisException(String.format( @@ -316,7 +264,7 @@ private PartitionDesc generatePartitionDesc(MTMVRelatedTableIf relatedTable, Con return null; } } catch (org.apache.doris.common.AnalysisException e) { - throw new AnalysisException("can not generate partitionDesc", e); + throw new AnalysisException(e.getMessage(), e); } } @@ -333,7 +281,7 @@ private void analyzeBaseTables(Plan plan) { throw new AnalysisException("can not contain VIEW"); } } catch (org.apache.doris.common.AnalysisException e) { - LOG.warn("can not get table, ", e); + LOG.warn(e.getMessage(), e); } } } @@ -362,7 +310,7 @@ private void getColumns(Plan plan) { try { FeNameFormat.checkColumnName(colName); } catch (org.apache.doris.common.AnalysisException e) { - throw new AnalysisException(e.getMessage()); + throw new AnalysisException(e.getMessage(), e); } if (colNames.contains(colName)) { throw new AnalysisException("repeat cols:" + colName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java new file mode 100644 index 00000000000000..09b30063b9de06 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/Expr.java +// and modified by Doris + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.FunctionParams; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.mtmv.MTMVPartitionExprFactory; +import org.apache.doris.mtmv.MTMVPartitionInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.analyzer.UnboundFunction; +import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils; +import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; + +import com.google.common.collect.Sets; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * MTMVPartitionDefinition + */ +public class MTMVPartitionDefinition { + private MTMVPartitionType partitionType; + private String partitionCol; + private Expression functionCallExpression; + + /** + * analyzeAndTransferToMTMVPartitionInfo + * + * @param planner planner + * @param ctx ctx + * @param logicalQuery logicalQuery + * @return MTMVPartitionInfo + */ + public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx, + LogicalPlan logicalQuery) { + MTMVPartitionInfo mtmvPartitionInfo = new MTMVPartitionInfo(partitionType); + if (this.partitionType == MTMVPartitionType.SELF_MANAGE) { + return mtmvPartitionInfo; + } + String partitionColName; + if (this.partitionType == MTMVPartitionType.EXPR) { + Expr expr; + if (functionCallExpression instanceof UnboundFunction) { + UnboundFunction function = (UnboundFunction) functionCallExpression; + expr = new FunctionCallExpr(function.getName(), + new FunctionParams(convertToLegacyArguments(function.children()))); + } else { + throw new AnalysisException( + "unsupported auto partition expr " + functionCallExpression.toString()); + } + partitionColName = getColNameFromExpr(expr); + mtmvPartitionInfo.setExpr(expr); + } else { + partitionColName = this.partitionCol; + } + mtmvPartitionInfo.setPartitionCol(partitionColName); + RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, logicalQuery, partitionColName); + mtmvPartitionInfo.setRelatedCol(relatedTableInfo.getColumn()); + mtmvPartitionInfo.setRelatedTable(relatedTableInfo.getTableInfo()); + if (this.partitionType == MTMVPartitionType.EXPR) { + try { + MTMVPartitionExprFactory.getExprService(mtmvPartitionInfo.getExpr()).analyze(mtmvPartitionInfo); + } catch (org.apache.doris.common.AnalysisException e) { + throw new AnalysisException(e.getMessage(), e); + } + } + return mtmvPartitionInfo; + } + + /** + * getColNameFromExpr + * + * @param expr expr + * @return String + */ + public static String getColNameFromExpr(Expr expr) { + if (!(expr instanceof FunctionCallExpr)) { + throw new AnalysisException( + "auto create partition only support function call expr is: " + + MTMVPartitionInfo.MTMV_PARTITION_FUNCTIONS); + } + FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr; + List paramsExpr = functionCallExpr.getParams().exprs(); + String name = functionCallExpr.getFnName().getFunction(); + if (MTMVPartitionInfo.MTMV_PARTITION_FUNCTIONS.contains(name)) { + for (Expr param : paramsExpr) { + if (param instanceof SlotRef) { + return ((SlotRef) param).getColumnName(); + } + } + throw new AnalysisException("can not find colName"); + } else { + throw new AnalysisException( + "auto create partition only support function call expr is: " + + MTMVPartitionInfo.MTMV_PARTITION_FUNCTIONS); + } + } + + private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx, LogicalPlan + logicalQuery, + String partitionColName) { + CascadesContext cascadesContext = planner.getCascadesContext(); + SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable(); + Set tempDisableRules = sessionVariable.getDisableNereidsRuleNames(); + // Should not make table without data to empty relation when analyze the related table, + // so add disable rules + sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES); + cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES); + try { + Plan mvRewrittenPlan = + planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN); + Optional relatedTableInfo = MaterializedViewUtils + .getRelatedTableInfo(partitionColName, mvRewrittenPlan); + if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) { + throw new AnalysisException("Unable to find a suitable base table for partitioning"); + } + MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.get().getTableInfo()); + Set partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); + try { + partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames()); + } catch (DdlException e) { + throw new AnalysisException(e.getMessage(), e); + } + + if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) { + throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn()); + } + if (!(mtmvBaseRealtedTable instanceof HMSExternalTable) + && partitionColumnNames.size() != 1) { + throw new AnalysisException("only hms table support multi column partition."); + } + return relatedTableInfo.get(); + } finally { + // after operate, roll back the disable rules + sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules)); + cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES); + } + } + + private static List convertToLegacyArguments(List children) { + return children.stream().map(child -> { + if (child instanceof UnboundSlot) { + return new SlotRef(null, ((UnboundSlot) child).getName()); + } else if (child instanceof Literal) { + return new StringLiteral(((Literal) child).getStringValue()); + } else { + throw new AnalysisException("unsupported argument " + child.toString()); + } + }).collect(Collectors.toList()); + } + + public MTMVPartitionType getPartitionType() { + return partitionType; + } + + public void setPartitionType(MTMVPartitionType partitionType) { + this.partitionType = partitionType; + } + + public String getPartitionCol() { + return partitionCol; + } + + public void setPartitionCol(String partitionCol) { + this.partitionCol = partitionCol; + } + + public Expression getFunctionCallExpression() { + return functionCallExpression; + } + + public void setFunctionCallExpression(Expression functionCallExpression) { + this.functionCallExpression = functionCallExpression; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGeneratorTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGeneratorTest.java new file mode 100644 index 00000000000000..b866100b63d4e7 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGeneratorTest.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class MTMVRelatedPartitionDescRollUpGeneratorTest { + @Mocked + private MTMVPartitionUtil mtmvPartitionUtil; + @Mocked + private MTMVPartitionInfo mtmvPartitionInfo; + + @Test + public void testRollUpRange() throws AnalysisException { + FunctionCallExpr expr = new FunctionCallExpr("date_trunc", + Lists.newArrayList(new SlotRef(null, null), new StringLiteral("month"))); + new Expectations() { + { + mtmvPartitionUtil.getPartitionColumnType((MTMVRelatedTableIf) any, (String) any); + minTimes = 0; + result = Type.DATE; + + mtmvPartitionInfo.getRelatedTable(); + minTimes = 0; + result = null; + + mtmvPartitionInfo.getExpr(); + minTimes = 0; + result = null; + + mtmvPartitionInfo.getPartitionType(); + minTimes = 0; + result = MTMVPartitionType.EXPR; + + mtmvPartitionInfo.getExpr(); + minTimes = 0; + result = expr; + } + }; + MTMVRelatedPartitionDescRollUpGenerator generator = new MTMVRelatedPartitionDescRollUpGenerator(); + Map> relatedPartitionDescs = Maps.newHashMap(); + PartitionKeyDesc desc20200101 = PartitionKeyDesc.createFixed( + Lists.newArrayList(new PartitionValue("2020-01-01")), + Lists.newArrayList(new PartitionValue("2020-01-02"))); + PartitionKeyDesc desc20200102 = PartitionKeyDesc.createFixed( + Lists.newArrayList(new PartitionValue("2020-01-02")), + Lists.newArrayList(new PartitionValue("2020-01-03"))); + PartitionKeyDesc desc20200201 = PartitionKeyDesc.createFixed( + Lists.newArrayList(new PartitionValue("2020-02-01")), + Lists.newArrayList(new PartitionValue("2020-02-02"))); + relatedPartitionDescs.put(desc20200101, Sets.newHashSet(1L)); + relatedPartitionDescs.put(desc20200102, Sets.newHashSet(2L)); + relatedPartitionDescs.put(desc20200201, Sets.newHashSet(3L)); + Map> res = generator.rollUpRange(relatedPartitionDescs, + mtmvPartitionInfo); + + PartitionKeyDesc expectDesc202001 = PartitionKeyDesc.createFixed( + Lists.newArrayList(new PartitionValue("2020-01-01")), + Lists.newArrayList(new PartitionValue("2020-02-01"))); + PartitionKeyDesc expectDesc202002 = PartitionKeyDesc.createFixed( + Lists.newArrayList(new PartitionValue("2020-02-01")), + Lists.newArrayList(new PartitionValue("2020-03-01"))); + Assert.assertEquals(2, res.size()); + Assert.assertEquals(Sets.newHashSet(1L, 2L), res.get(expectDesc202001)); + Assert.assertEquals(Sets.newHashSet(3L), res.get(expectDesc202002)); + } + + @Test + public void testRollUpList() throws AnalysisException { + FunctionCallExpr expr = new FunctionCallExpr("date_trunc", + Lists.newArrayList(new SlotRef(null, null), new StringLiteral("month"))); + new Expectations() { + { + mtmvPartitionUtil.getPartitionColumnType((MTMVRelatedTableIf) any, (String) any); + minTimes = 0; + result = Type.DATE; + + mtmvPartitionInfo.getRelatedTable(); + minTimes = 0; + result = null; + + mtmvPartitionInfo.getExpr(); + minTimes = 0; + result = null; + + mtmvPartitionInfo.getPartitionType(); + minTimes = 0; + result = MTMVPartitionType.EXPR; + + mtmvPartitionInfo.getExpr(); + minTimes = 0; + result = expr; + } + }; + MTMVRelatedPartitionDescRollUpGenerator generator = new MTMVRelatedPartitionDescRollUpGenerator(); + Map> relatedPartitionDescs = Maps.newHashMap(); + relatedPartitionDescs.put(generateInDesc("2020-01-01"), Sets.newHashSet(1L)); + relatedPartitionDescs.put(generateInDesc("2020-01-02"), Sets.newHashSet(2L)); + relatedPartitionDescs.put(generateInDesc("2020-02-01"), Sets.newHashSet(3L)); + Map> res = generator.rollUpList(relatedPartitionDescs, + mtmvPartitionInfo, Maps.newHashMap()); + + PartitionKeyDesc expectDesc202001 = generateInDesc("2020-01-01", "2020-01-02"); + PartitionKeyDesc expectDesc202002 = generateInDesc("2020-02-01"); + Assert.assertEquals(2, res.size()); + Assert.assertEquals(Sets.newHashSet(1L, 2L), res.get(expectDesc202001)); + Assert.assertEquals(Sets.newHashSet(3L), res.get(expectDesc202002)); + } + + private PartitionKeyDesc generateInDesc(String... values) { + List> partitionValues = Lists.newArrayList(); + for (String value : values) { + List partitionValue = Lists.newArrayList(new PartitionValue(value)); + partitionValues.add(partitionValue); + } + return PartitionKeyDesc.createIn(partitionValues); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGeneratorTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGeneratorTest.java new file mode 100644 index 00000000000000..666a7947bcb691 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGeneratorTest.java @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire; +import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; + +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + + +public class MTMVRelatedPartitionDescSyncLimitGeneratorTest { + @Mocked + private DateTimeAcquire dateTimeAcquire; + + @Test + public void testGenerateMTMVPartitionSyncConfigByProperties() throws AnalysisException { + MTMVRelatedPartitionDescSyncLimitGenerator generator = new MTMVRelatedPartitionDescSyncLimitGenerator(); + Map mvProperties = Maps.newHashMap(); + MTMVPartitionSyncConfig config = generator + .generateMTMVPartitionSyncConfigByProperties(mvProperties); + Assert.assertEquals(-1, config.getSyncLimit()); + Assert.assertFalse(config.getDateFormat().isPresent()); + Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY, config.getTimeUnit()); + + mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT, "1"); + config = generator.generateMTMVPartitionSyncConfigByProperties(mvProperties); + Assert.assertEquals(1, config.getSyncLimit()); + Assert.assertFalse(config.getDateFormat().isPresent()); + Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY, config.getTimeUnit()); + + mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT, "month"); + config = generator.generateMTMVPartitionSyncConfigByProperties(mvProperties); + Assert.assertEquals(1, config.getSyncLimit()); + Assert.assertFalse(config.getDateFormat().isPresent()); + Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH, config.getTimeUnit()); + + mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT, "%Y%m%d"); + config = generator.generateMTMVPartitionSyncConfigByProperties(mvProperties); + Assert.assertEquals(1, config.getSyncLimit()); + Assert.assertEquals("%Y%m%d", config.getDateFormat().get()); + Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH, config.getTimeUnit()); + } + + @Test + public void testGetNowTruncSubSec() throws AnalysisException { + MTMVRelatedPartitionDescSyncLimitGenerator generator = new MTMVRelatedPartitionDescSyncLimitGenerator(); + DateTimeLiteral dateTimeLiteral = new DateTimeLiteral("2020-02-03 20:10:10"); + new Expectations() { + { + dateTimeAcquire.now(); + minTimes = 0; + result = dateTimeLiteral; + } + }; + long nowTruncSubSec = generator.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 1); + // 2020-02-03 + Assert.assertEquals(1580659200L, nowTruncSubSec); + nowTruncSubSec = generator.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 1); + // 2020-02-01 + Assert.assertEquals(1580486400L, nowTruncSubSec); + nowTruncSubSec = generator.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.YEAR, 1); + // 2020-01-01 + Assert.assertEquals(1577808000L, nowTruncSubSec); + nowTruncSubSec = generator.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 3); + // 2019-12-01 + Assert.assertEquals(1575129600L, nowTruncSubSec); + nowTruncSubSec = generator.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 4); + // 2020-01-31 + Assert.assertEquals(1580400000L, nowTruncSubSec); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java index 2e6df56cd1a4b5..2a1b2d0c1401e7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java @@ -23,51 +23,13 @@ import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire; -import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; -import com.google.common.collect.Maps; -import mockit.Expectations; -import mockit.Mocked; import org.junit.Assert; import org.junit.Test; -import java.util.Map; import java.util.Optional; public class MTMVUtilTest { - @Mocked - private DateTimeAcquire dateTimeAcquire; - - @Test - public void testGenerateMTMVPartitionSyncConfigByProperties() throws AnalysisException { - Map mvProperties = Maps.newHashMap(); - MTMVPartitionSyncConfig config = MTMVUtil - .generateMTMVPartitionSyncConfigByProperties(mvProperties); - Assert.assertEquals(-1, config.getSyncLimit()); - Assert.assertFalse(config.getDateFormat().isPresent()); - Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY, config.getTimeUnit()); - - mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT, "1"); - config = MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties); - Assert.assertEquals(1, config.getSyncLimit()); - Assert.assertFalse(config.getDateFormat().isPresent()); - Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY, config.getTimeUnit()); - - mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT, "month"); - config = MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties); - Assert.assertEquals(1, config.getSyncLimit()); - Assert.assertFalse(config.getDateFormat().isPresent()); - Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH, config.getTimeUnit()); - - mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT, "%Y%m%d"); - config = MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties); - Assert.assertEquals(1, config.getSyncLimit()); - Assert.assertEquals("%Y%m%d", config.getDateFormat().get()); - Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH, config.getTimeUnit()); - } - @Test public void testGetExprTimeSec() throws AnalysisException { LiteralExpr expr = new DateLiteral("2020-01-01"); @@ -83,31 +45,4 @@ public void testGetExprTimeSec() throws AnalysisException { exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.empty()); Assert.assertEquals(253402185600L, exprTimeSec); } - - @Test - public void testGetNowTruncSubSec() throws AnalysisException { - DateTimeLiteral dateTimeLiteral = new DateTimeLiteral("2020-02-03 20:10:10"); - new Expectations() { - { - dateTimeAcquire.now(); - minTimes = 0; - result = dateTimeLiteral; - } - }; - long nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 1); - // 2020-02-03 - Assert.assertEquals(1580659200L, nowTruncSubSec); - nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 1); - // 2020-02-01 - Assert.assertEquals(1580486400L, nowTruncSubSec); - nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.YEAR, 1); - // 2020-01-01 - Assert.assertEquals(1577808000L, nowTruncSubSec); - nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 3); - // 2019-12-01 - Assert.assertEquals(1575129600L, nowTruncSubSec); - nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 4); - // 2020-01-31 - Assert.assertEquals(1580400000L, nowTruncSubSec); - } } diff --git a/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out b/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out index 5b63ad2f7b6880..1a8731d6a1c446 100644 --- a/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out +++ b/regression-test/data/mtmv_p0/test_hive_limit_partition_mtmv.out @@ -4,17 +4,31 @@ 2 sh 20380101 3 bj 20200101 4 sh 20200101 +5 bj 20380102 -- !mtmv_complete -- 1 20380101 bj 2 20380101 sh +5 20380102 bj + +-- !mtmv_datetrunc -- +1 20380101 bj +2 20380101 sh +5 20380102 bj -- !select_base_table -- 1 bj 20380101 2 sh 20380101 3 bj 20200101 4 sh 20200101 +5 bj 20380102 -- !mtmv_complete -- 1 20380101 bj 2 20380101 sh +5 20380102 bj + +-- !mtmv_datetrunc -- +1 20380101 bj +2 20380101 sh +5 20380102 bj diff --git a/regression-test/data/mtmv_p0/test_rollup_partition_mtmv.out b/regression-test/data/mtmv_p0/test_rollup_partition_mtmv.out new file mode 100644 index 00000000000000..5552dac72c3005 --- /dev/null +++ b/regression-test/data/mtmv_p0/test_rollup_partition_mtmv.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !date_list_month -- +1 2020-01-01 +2 2020-01-02 +3 2020-02-01 + +-- !string_list_month -- +1 2020==01==01 +2 2020==01==02 +3 2020==02==01 + +-- !date_range_month -- +1 2020-01-01 +2 2020-01-02 +3 2020-02-01 + diff --git a/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy b/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy index 06390437a79e16..1e6c49bb50af0e 100644 --- a/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_hive_limit_partition_mtmv.groovy @@ -45,11 +45,13 @@ suite("test_hive_limit_partition_mtmv", "p0,external,hive,external_docker,extern partition(region="sh",day="20380101") partition(region="bj",day="20200101") partition(region="sh",day="20200101") + partition(region="bj",day="20380102") """ def insert_str1 = """insert into ${hive_database}.${hive_table} PARTITION(region="bj",day="20380101") values(1)""" def insert_str2 = """insert into ${hive_database}.${hive_table} PARTITION(region="sh",day="20380101") values(2)""" def insert_str3 = """insert into ${hive_database}.${hive_table} PARTITION(region="bj",day="20200101") values(3)""" def insert_str4 = """insert into ${hive_database}.${hive_table} PARTITION(region="sh",day="20200101") values(4)""" + def insert_str5 = """insert into ${hive_database}.${hive_table} PARTITION(region="bj",day="20380102") values(5)""" logger.info("hive sql: " + drop_table_str) hive_docker """ ${drop_table_str} """ @@ -69,12 +71,15 @@ suite("test_hive_limit_partition_mtmv", "p0,external,hive,external_docker,extern hive_docker """ ${insert_str3} """ logger.info("hive sql: " + insert_str4) hive_docker """ ${insert_str4} """ + logger.info("hive sql: " + insert_str5) + hive_docker """ ${insert_str5} """ // prepare catalog String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String catalog_name = "test_${hivePrefix}_limit_partition_mtmv_catalog" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + sql """drop catalog if exists ${catalog_name}""" sql """create catalog if not exists ${catalog_name} properties ( "type"="hms", @@ -83,7 +88,6 @@ suite("test_hive_limit_partition_mtmv", "p0,external,hive,external_docker,extern order_qt_select_base_table "SELECT * FROM ${catalog_name}.${hive_database}.${hive_table}" - // string type def mvName = "test_hive_limit_partition_mtmv" def dbName = "regression_test_mtmv_p0" @@ -105,62 +109,85 @@ suite("test_hive_limit_partition_mtmv", "p0,external,hive,external_docker,extern """ def showPartitionsResult = sql """show partitions from ${mvName}""" logger.info("showPartitionsResult: " + showPartitionsResult.toString()) - assertEquals(1, showPartitionsResult.size()) + assertEquals(2, showPartitionsResult.size()) assertTrue(showPartitionsResult.toString().contains("p_20380101")) + assertTrue(showPartitionsResult.toString().contains("p_20380102")) - // refresh complete - sql """ - REFRESH MATERIALIZED VIEW ${mvName} complete + // date trunc + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by (date_trunc(`day`,'month')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_sync_limit'='2', + 'partition_sync_time_unit'='MONTH', + 'partition_date_format'='%Y%m%d' + ) + AS + SELECT k1,day,region FROM ${catalog_name}.${hive_database}.${hive_table}; """ - def jobName = getJobName(dbName, mvName); - waitingMTMVTaskFinished(jobName) - order_qt_mtmv_complete "SELECT * FROM ${mvName} order by k1,day,region" + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(1, showPartitionsResult.size()) + assertTrue(showPartitionsResult.toString().contains("_20380101")) + assertTrue(showPartitionsResult.toString().contains("_20380102")) + // refresh complete + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + jobName = getJobName(dbName, mvName); + waitingMTMVTaskFinished(jobName) + order_qt_mtmv_datetrunc "SELECT * FROM ${mvName} order by k1,day,region" - // date type - sql """drop materialized view if exists ${mvName};""" - create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} ( - `k1` int) - PARTITIONED BY ( - `region` string, - `day` date - ) - STORED AS ORC; - """ - add_partition_str = """ - alter table ${hive_database}.${hive_table} add if not exists - partition(region="bj",day="2038-01-01") - partition(region="sh",day="2038-01-01") - partition(region="bj",day="2020-01-01") - partition(region="sh",day="2020-01-01") - """ - logger.info("hive sql: " + drop_table_str) - hive_docker """ ${drop_table_str} """ - logger.info("hive sql: " + create_table_str) - hive_docker """ ${create_table_str} """ - logger.info("hive sql: " + add_partition_str) - hive_docker """ ${add_partition_str} """ - sql """REFRESH catalog ${catalog_name}""" - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - partition by(`day`) - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ( - 'replication_num' = '1', - 'partition_sync_limit'='2', - 'partition_sync_time_unit'='YEAR' - ) - AS - SELECT k1,day,region FROM ${catalog_name}.${hive_database}.${hive_table}; - """ - showPartitionsResult = sql """show partitions from ${mvName}""" - logger.info("showPartitionsResult: " + showPartitionsResult.toString()) - assertEquals(1, showPartitionsResult.size()) - assertTrue(showPartitionsResult.toString().contains("p_20380101")) - sql """drop materialized view if exists ${mvName};""" - sql """drop catalog if exists ${catalog_name}""" + // date type + sql """drop materialized view if exists ${mvName};""" + create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} ( + `k1` int) + PARTITIONED BY ( + `region` string, + `day` date + ) + STORED AS ORC; + """ + add_partition_str = """ + alter table ${hive_database}.${hive_table} add if not exists + partition(region="bj",day="2038-01-01") + partition(region="sh",day="2038-01-01") + partition(region="bj",day="2020-01-01") + partition(region="sh",day="2020-01-01") + """ + logger.info("hive sql: " + drop_table_str) + hive_docker """ ${drop_table_str} """ + logger.info("hive sql: " + create_table_str) + hive_docker """ ${create_table_str} """ + logger.info("hive sql: " + add_partition_str) + hive_docker """ ${add_partition_str} """ + + sql """REFRESH catalog ${catalog_name}""" + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`day`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_sync_limit'='2', + 'partition_sync_time_unit'='YEAR' + ) + AS + SELECT k1,day,region FROM ${catalog_name}.${hive_database}.${hive_table}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(1, showPartitionsResult.size()) + assertTrue(showPartitionsResult.toString().contains("p_20380101")) + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalog_name}""" } } diff --git a/regression-test/suites/mtmv_p0/test_rollup_partition_mtmv.groovy b/regression-test/suites/mtmv_p0/test_rollup_partition_mtmv.groovy new file mode 100644 index 00000000000000..3a1bfe9f4fdd5a --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_rollup_partition_mtmv.groovy @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert; + +suite("test_rollup_partition_mtmv") { + def tableName = "t_test_rollup_partition_mtmv_user" + def mvName = "multi_mv_test_rollup_partition_mtmv" + def dbName = "regression_test_mtmv_p0" + + // list partition date type + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY list(`k2`) + ( + PARTITION p_20200101 VALUES IN ("2020-01-01"), + PARTITION p_20200102 VALUES IN ("2020-01-02"), + PARTITION p_20200201 VALUES IN ("2020-02-01") + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,"2020-01-01"),(2,"2020-01-02"),(3,"2020-02-01"); + """ + + // list date month + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by (date_trunc(`k2`,'month')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * FROM ${tableName}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(2, showPartitionsResult.size()) + + sql """ + REFRESH MATERIALIZED VIEW ${mvName} AUTO + """ + def jobName = getJobName(dbName, mvName); + log.info(jobName) + waitingMTMVTaskFinished(jobName) + order_qt_date_list_month "SELECT * FROM ${mvName} order by k1,k2" + + sql """drop materialized view if exists ${mvName};""" + // list date year + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by (date_trunc(`k2`,'year')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * FROM ${tableName}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(1, showPartitionsResult.size()) + + // list string month + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` varchar(200) NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY list(`k2`) + ( + PARTITION p_20200101 VALUES IN ("2020==01==01"), + PARTITION p_20200102 VALUES IN ("2020==01==02"), + PARTITION p_20200201 VALUES IN ("2020==02==01") + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,"2020==01==01"),(2,"2020==01==02"),(3,"2020==02==01"); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by (date_trunc(`k2`,'month')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_date_format'='%Y==%m==%d' + ) + AS + SELECT * FROM ${tableName}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(2, showPartitionsResult.size()) + + sql """ + REFRESH MATERIALIZED VIEW ${mvName} AUTO + """ + jobName = getJobName(dbName, mvName); + log.info(jobName) + waitingMTMVTaskFinished(jobName) + order_qt_string_list_month "SELECT * FROM ${mvName} order by k1,k2" + + + // range date month + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY range(`k2`) + ( + PARTITION p_20200101 VALUES [("2020-01-01"),("2020-01-02")), + PARTITION p_20200102 VALUES [("2020-01-02"),("2020-01-03")), + PARTITION p_20200201 VALUES [("2020-02-01"),("2020-02-02")) + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,"2020-01-01"),(2,"2020-01-02"),(3,"2020-02-01"); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by (date_trunc(`k2`,'month')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * FROM ${tableName}; + """ + showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertEquals(2, showPartitionsResult.size()) + + sql """ + REFRESH MATERIALIZED VIEW ${mvName} AUTO + """ + jobName = getJobName(dbName, mvName); + log.info(jobName) + waitingMTMVTaskFinished(jobName) + order_qt_date_range_month "SELECT * FROM ${mvName} order by k1,k2" + + // not support MAXVALUE + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY range(`k2`) + ( + PARTITION p_20200101 VALUES [("2020-01-01"),("2020-01-02")), + PARTITION p_20200102 VALUES [("2020-01-02"),("2020-01-03")), + PARTITION p_20200201 VALUES [("2020-02-01"),(MAXVALUE)) + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + + try { + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by (date_trunc(`k2`,'month')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * FROM ${tableName}; + """ + Assert.fail(); + } catch (Exception e) { + log.info(e.getMessage()) + } + + + // range not support other data type + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` int NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY range(`k2`) + ( + PARTITION p_1 VALUES [(1),(2)) + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + + try { + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by (date_trunc(`k2`,'month')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * FROM ${tableName}; + """ + Assert.fail(); + } catch (Exception e) { + log.info(e.getMessage()) + } + + // not support trunc hour + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE TABLE `${tableName}` ( + `k1` LARGEINT NOT NULL COMMENT '\"用户id\"', + `k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"' + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY range(`k2`) + ( + PARTITION p_20200101 VALUES [("2020-01-01"),("2020-01-02")), + PARTITION p_20200102 VALUES [("2020-01-02"),("2020-01-03")), + PARTITION p_20200201 VALUES [("2020-02-01"),("2020-02-02")) + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + + try { + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by (date_trunc(`k2`,'hour')) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * FROM ${tableName}; + """ + Assert.fail(); + } catch (Exception e) { + log.info(e.getMessage()) + } +}