From 0deac1583750e7f7ee463e7f6cbaba7129f73008 Mon Sep 17 00:00:00 2001 From: slothever <18522955+wsjz@users.noreply.github.com> Date: Mon, 18 Mar 2024 14:18:50 +0800 Subject: [PATCH] [feature](insert)implement hive table sink plan (#31765) Issue Number: #31442 --- .../java/org/apache/doris/common/Config.java | 17 +- .../apache/doris/datasource/CatalogIf.java | 3 + .../doris/datasource/ExternalCatalog.java | 11 + .../doris/datasource/InternalCatalog.java | 2 +- .../datasource/hive/HMSCachedClient.java | 2 + .../datasource/hive/HiveMetadataOps.java | 8 +- .../datasource/hive/HiveTableMetadata.java | 30 +- .../hive/PostgreSQLJdbcHMSCachedClient.java | 33 +- .../hive/ThriftHMSCachedClient.java | 48 +- .../analyzer/UnboundHiveTableSink.java | 149 +++++ .../nereids/analyzer/UnboundTableSink.java | 49 +- .../analyzer/UnboundTableSinkCreator.java | 74 +++ .../translator/PhysicalPlanTranslator.java | 30 + .../nereids/parser/LogicalPlanBuilder.java | 10 +- ...tributionSpecTableSinkHashPartitioned.java | 50 ++ ...ibutionSpecTableSinkRandomPartitioned.java | 36 ++ .../properties/PhysicalProperties.java | 3 + .../properties/RequestPropertyDeriver.java | 11 + .../apache/doris/nereids/rules/RuleType.java | 2 +- .../nereids/rules/analysis/BindSink.java | 569 ++++++++++-------- ...lHiveTableSinkToPhysicalHiveTableSink.java | 4 +- .../doris/nereids/trees/plans/PlanType.java | 2 + .../plans/commands/CreateTableCommand.java | 6 +- .../commands/DeleteFromUsingCommand.java | 4 +- .../trees/plans/commands/LoadCommand.java | 4 +- .../trees/plans/commands/UpdateCommand.java | 6 +- .../commands/UpdateMvByPartitionCommand.java | 11 +- .../insert/HiveInsertCommandContext.java | 33 + .../commands/insert/HiveInsertExecutor.java | 45 +- .../insert/InsertIntoTableCommand.java | 6 +- .../insert/InsertOverwriteTableCommand.java | 56 +- .../plans/commands/insert/InsertUtils.java | 76 +-- .../plans/logical/LogicalHiveTableSink.java | 41 +- .../plans/logical/UnboundLogicalSink.java | 62 ++ .../plans/physical/PhysicalHiveTableSink.java | 78 ++- .../trees/plans/physical/PhysicalSink.java | 10 +- .../plans/physical/PhysicalTableSink.java | 2 + .../trees/plans/visitor/SinkVisitor.java | 5 + .../doris/nereids/util/RelationUtil.java | 2 +- .../apache/doris/planner/DataPartition.java | 2 + .../apache/doris/planner/HiveTableSink.java | 172 +++++- .../doris/datasource/hive/HmsCommitTest.java | 8 +- 42 files changed, 1268 insertions(+), 504 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkHashPartitioned.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkRandomPartitioned.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/UnboundLogicalSink.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index f31f46dd049e83..daa9c8b1d35926 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2216,21 +2216,10 @@ public class Config extends ConfigBase { "Enable external table DDL"}) public static boolean enable_external_ddl = false; - - @ConfField(mutable = true, masterOnly = true, description = { - "Hive创建外部表默认指定的input format", - "Default hive input format for creating table."}) - public static String hive_default_input_format = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - - @ConfField(mutable = true, masterOnly = true, description = { - "Hive创建外部表默认指定的output format", - "Default hive output format for creating table."}) - public static String hive_default_output_format = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; - @ConfField(mutable = true, masterOnly = true, description = { - "Hive创建外部表默认指定的SerDe类", - "Default hive serde class for creating table."}) - public static String hive_default_serde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + "Hive创建外部表默认指定的文件格式", + "Default hive file format for creating table."}) + public static String hive_default_file_format = "orc"; @ConfField public static int statistics_sql_parallel_exec_instance_num = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java index ffae9420edeb48..44dba04157ecb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DropDbStmt; import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; @@ -176,6 +177,8 @@ default CatalogLog constructEditLog() { return log; } + TableName getTableNameByTableId(Long tableId); + // Return a copy of all db collection. Collection> getAllDbs(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index a60a21a619f956..e82734ec29705a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DropDbStmt; import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -396,6 +397,16 @@ public List getDbNamesOrEmpty() { } } + public TableName getTableNameByTableId(Long tableId) { + for (DatabaseIf db : idToDb.values()) { + TableIf table = db.getTableNullable(tableId); + if (table != null) { + return new TableName(getName(), db.getFullName(), table.getName()); + } + } + return null; + } + @Override public String getResource() { return catalogProperty.getResource(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index cd7a3f91e521d0..42a350163f6e32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -310,7 +310,7 @@ public TableName getTableNameByTableId(Long tableId) { for (Database db : fullNameToDb.values()) { Table table = db.getTableNullable(tableId); if (table != null) { - return new TableName("", db.getFullName(), table.getName()); + return new TableName(INTERNAL_CATALOG_NAME, db.getFullName(), table.getName()); } } return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index b55ed7bdf0eded..d8daeb155c59e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -51,6 +51,8 @@ public interface HMSCachedClient { List listPartitionNames(String dbName, String tblName); + List listPartitions(String dbName, String tblName); + List listPartitionNames(String dbName, String tblName, long maxListPartitionNum); Partition getPartition(String dbName, String tblName, List partitionValues); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 886d6d76fa8ad9..091cd51b232cb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -125,17 +125,13 @@ public void createTable(CreateTableStmt stmt) throws UserException { } try { Map props = stmt.getExtProperties(); - String inputFormat = props.getOrDefault("input_format", Config.hive_default_input_format); - String outputFormat = props.getOrDefault("output_format", Config.hive_default_output_format); - String serDe = props.getOrDefault("serde", Config.hive_default_serde); + String fileFormat = props.getOrDefault("file_format", Config.hive_default_file_format); HiveTableMetadata catalogTable = HiveTableMetadata.of(dbName, tblName, stmt.getColumns(), parsePartitionKeys(props), props, - inputFormat, - outputFormat, - serDe); + fileFormat); client.createTable(catalogTable, stmt.isSetIfNotExists()); db.setUnInitialized(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java index 8edd3033187a6f..fde0a2d4d04b81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java @@ -30,9 +30,7 @@ public class HiveTableMetadata implements TableMetadata { private String tableName; private List columns; private List partitionKeys; - private String inputFormat; - private String outputFormat; - private String serDe; + private String fileFormat; private Map properties; // private String viewSql; @@ -41,16 +39,12 @@ public HiveTableMetadata(String dbName, List columns, List partitionKeys, Map props, - String inputFormat, - String outputFormat, - String serDe) { + String fileFormat) { this.dbName = dbName; this.tableName = tblName; this.columns = columns; this.partitionKeys = partitionKeys; - this.inputFormat = inputFormat; - this.outputFormat = outputFormat; - this.serDe = serDe; + this.fileFormat = fileFormat; this.properties = props; } @@ -77,16 +71,8 @@ public List getPartitionKeys() { return partitionKeys; } - public String getInputFormat() { - return inputFormat; - } - - public String getOutputFormat() { - return outputFormat; - } - - public String getSerDe() { - return serDe; + public String getFileFormat() { + return fileFormat; } public static HiveTableMetadata of(String dbName, @@ -94,9 +80,7 @@ public static HiveTableMetadata of(String dbName, List columns, List partitionKeys, Map props, - String inputFormat, - String outputFormat, String serDe) { - return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props, - inputFormat, outputFormat, serDe); + String fileFormat) { + return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props, fileFormat); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index c18fa30189c611..0259784b7b9bb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -123,6 +123,10 @@ public List listPartitionNames(String dbName, String tblName) { return listPartitionNames(dbName, tblName, (long) -1); } + public List listPartitions(String dbName, String tblName) { + return getPartitionsByNames(dbName, tblName, ImmutableList.of()); + } + @Override public List listPartitionNames(String dbName, String tblName, long maxListPartitionNum) { String sql = String.format("SELECT \"PART_NAME\" from \"PARTITIONS\" WHERE \"TBL_ID\" = (" @@ -173,15 +177,26 @@ public List getPartitions(String dbName, String tblName, List private List getPartitionsByNames(String dbName, String tblName, List partitionNames) { List partitionNamesWithQuote = partitionNames.stream().map(partitionName -> "'" + partitionName + "'") .collect(Collectors.toList()); - String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote); - String sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\"," - + " \"PARTITIONS\".\"LAST_ACCESS_TIME\"," - + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\"" - + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\"" - + " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\"" - + " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'" - + " AND \"PART_NAME\" in (%s);", - dbName, tblName, partitionNamesString); + String sql; + if (partitionNamesWithQuote.isEmpty()) { + sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\"," + + " \"PARTITIONS\".\"LAST_ACCESS_TIME\"," + + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\"" + + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\"" + + " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\"" + + " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s';", + dbName, tblName); + } else { + String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote); + sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\"," + + " \"PARTITIONS\".\"LAST_ACCESS_TIME\"," + + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\"" + + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\"" + + " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\"" + + " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'" + + " AND \"PART_NAME\" in (%s);", + dbName, tblName, partitionNamesString); + } if (LOG.isDebugEnabled()) { LOG.debug("getPartitionsByNames exec sql: {}", sql); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index cb5328395ce1d1..d4f63c5a8fb28b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -201,9 +201,7 @@ private static Table toHiveTable(HiveTableMetadata hiveTable) { // table.setRetention(0); String location = hiveTable.getProperties().get("external_location"); table.setSd(toHiveStorageDesc(hiveTable.getColumns(), - hiveTable.getInputFormat(), - hiveTable.getOutputFormat(), - hiveTable.getSerDe(), + hiveTable.getFileFormat(), location)); table.setPartitionKeys(hiveTable.getPartitionKeys()); // table.setViewOriginalText(hiveTable.getViewSql()); @@ -213,15 +211,10 @@ private static Table toHiveTable(HiveTableMetadata hiveTable) { return table; } - private static StorageDescriptor toHiveStorageDesc(List columns, String inputFormat, String outputFormat, - String serDe, String location) { + private static StorageDescriptor toHiveStorageDesc(List columns, String fileFormat, String location) { StorageDescriptor sd = new StorageDescriptor(); sd.setCols(toHiveColumns(columns)); - SerDeInfo serDeInfo = new SerDeInfo(); - serDeInfo.setSerializationLib(serDe); - sd.setSerdeInfo(serDeInfo); - sd.setInputFormat(inputFormat); - sd.setOutputFormat(outputFormat); + setFileFormat(fileFormat, sd); if (StringUtils.isNotEmpty(location)) { sd.setLocation(location); } @@ -231,6 +224,28 @@ private static StorageDescriptor toHiveStorageDesc(List columns, String return sd; } + private static void setFileFormat(String fileFormat, StorageDescriptor sd) { + String inputFormat; + String outputFormat; + String serDe; + if (fileFormat.equalsIgnoreCase("orc")) { + inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + outputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; + serDe = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + } else if (fileFormat.equalsIgnoreCase("parquet")) { + inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + outputFormat = "'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; + serDe = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; + } else { + throw new IllegalArgumentException("Creating table with an unsupported file format: " + fileFormat); + } + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setSerializationLib(serDe); + sd.setSerdeInfo(serDeInfo); + sd.setInputFormat(inputFormat); + sd.setOutputFormat(outputFormat); + } + private static List toHiveColumns(List columns) { List result = new ArrayList<>(); for (Column column : columns) { @@ -297,6 +312,19 @@ public List listPartitionNames(String dbName, String tblName) { return listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM); } + public List listPartitions(String dbName, String tblName) { + try (ThriftHMSClient client = getClient()) { + try { + return ugiDoAs(() -> client.client.listPartitions(dbName, tblName, MAX_LIST_PARTITION_NUM)); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to check if table %s in db %s exists", e, tblName, dbName); + } + } + @Override public List listPartitionNames(String dbName, String tblName, long maxListPartitionNum) { // list all parts when the limit is greater than the short maximum diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java new file mode 100644 index 00000000000000..0b56c2b681d087 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.nereids.exceptions.UnboundException; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.UnboundLogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Represent an hive table sink plan node that has not been bound. + */ +public class UnboundHiveTableSink extends UnboundLogicalSink + implements Unbound, Sink, BlockFuncDepsPropagation { + private final List hints; + private final List partitions; + + public UnboundHiveTableSink(List nameParts, List colNames, List hints, + List partitions, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, DMLCommandType.NONE, + Optional.empty(), Optional.empty(), child); + } + + /** + * constructor + */ + public UnboundHiveTableSink(List nameParts, List colNames, List hints, + List partitions, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(nameParts, PlanType.LOGICAL_UNBOUND_HIVE_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child); + this.hints = Utils.copyRequiredList(hints); + this.partitions = Utils.copyRequiredList(partitions); + } + + public List getColNames() { + return colNames; + } + + public List getPartitions() { + return partitions; + } + + public List getHints() { + return hints; + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "UnboundHiveTableSink only accepts one child"); + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.empty(), children.get(0)); + } + + @Override + public UnboundHiveTableSink withOutputExprs(List outputExprs) { + throw new UnboundException("could not call withOutputExprs on UnboundHiveTableSink"); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitUnboundHiveTableSink(this, context); + } + + @Override + public List getExpressions() { + throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()"); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UnboundHiveTableSink that = (UnboundHiveTableSink) o; + return Objects.equals(nameParts, that.nameParts) + && Objects.equals(colNames, that.colNames) + && Objects.equals(hints, that.hints) + && Objects.equals(partitions, that.partitions); + } + + @Override + public int hashCode() { + return Objects.hash(nameParts, colNames, hints, partitions); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } + + @Override + public LogicalProperties computeLogicalProperties() { + return UnboundLogicalProperties.INSTANCE; + } + + @Override + public List computeOutput() { + throw new UnboundException("output"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index 89d67dc376661b..2a4416686cc9c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -29,7 +29,7 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; -import org.apache.doris.nereids.trees.plans.logical.LogicalSink; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; @@ -43,11 +43,8 @@ /** * Represent an olap table sink plan node that has not been bound. */ -public class UnboundTableSink extends LogicalSink +public class UnboundTableSink extends UnboundLogicalSink implements Unbound, Sink, BlockFuncDepsPropagation { - - private final List nameParts; - private final List colNames; private final List hints; private final boolean temporaryPartition; private final List partitions; @@ -60,31 +57,6 @@ public UnboundTableSink(List nameParts, List colNames, List nameParts, List colNames, List hints, - boolean temporaryPartition, List partitions, CHILD_TYPE child) { - this(nameParts, colNames, hints, temporaryPartition, partitions, - false, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); - } - - public UnboundTableSink(List nameParts, List colNames, List hints, - List partitions, boolean isPartialUpdate, CHILD_TYPE child) { - this(nameParts, colNames, hints, false, partitions, isPartialUpdate, DMLCommandType.NONE, - Optional.empty(), Optional.empty(), child); - } - - public UnboundTableSink(List nameParts, List colNames, List hints, - boolean temporaryPartition, List partitions, boolean isPartialUpdate, CHILD_TYPE child) { - this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, DMLCommandType.NONE, - Optional.empty(), Optional.empty(), child); - } - - public UnboundTableSink(List nameParts, List colNames, List hints, - boolean temporaryPartition, List partitions, - boolean isPartialUpdate, DMLCommandType dmlCommandType, CHILD_TYPE child) { - this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, dmlCommandType, - Optional.empty(), Optional.empty(), child); - } - /** * constructor */ @@ -93,9 +65,8 @@ public UnboundTableSink(List nameParts, List colNames, List groupExpression, Optional logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, child); - this.nameParts = Utils.copyRequiredList(nameParts); - this.colNames = Utils.copyRequiredList(colNames); + super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child); this.hints = Utils.copyRequiredList(hints); this.temporaryPartition = temporaryPartition; this.partitions = Utils.copyRequiredList(partitions); @@ -103,14 +74,6 @@ public UnboundTableSink(List nameParts, List colNames, List getColNames() { - return colNames; - } - - public List getNameParts() { - return nameParts; - } - public boolean isTemporaryPartition() { return temporaryPartition; } @@ -127,10 +90,6 @@ public boolean isPartialUpdate() { return isPartialUpdate; } - public DMLCommandType getDMLCommandType() { - return dmlCommandType; - } - @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java new file mode 100644 index 00000000000000..335d2f58035d62 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalSink; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.qe.ConnectContext; + +import java.util.List; +import java.util.Optional; + +/** + * Create unbound table sink + */ +public class UnboundTableSinkCreator { + + /** + * create unbound sink without DML command + */ + public static LogicalSink createUnboundTableSink(List nameParts, + List colNames, List hints, List partitions, Plan query) + throws UserException { + String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); + CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (curCatalog instanceof InternalCatalog) { + return new UnboundTableSink<>(nameParts, colNames, hints, partitions, query); + } else if (curCatalog instanceof HMSExternalCatalog) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, query); + } + throw new UserException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); + } + + /** + * create unbound sink for DML plan + */ + public static LogicalSink createUnboundTableSink(List nameParts, + List colNames, List hints, boolean temporaryPartition, List partitions, + boolean isPartialUpdate, DMLCommandType dmlCommandType, LogicalPlan plan) { + String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); + CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (curCatalog instanceof InternalCatalog) { + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, + isPartialUpdate, dmlCommandType, Optional.empty(), + Optional.empty(), plan); + } else if (curCatalog instanceof HMSExternalCatalog) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); + } + throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6b02d3e7d3406b..4151f7d957a69d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -73,6 +73,8 @@ import org.apache.doris.nereids.properties.DistributionSpecReplicated; import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.properties.DistributionSpecStorageGather; +import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned; +import org.apache.doris.nereids.properties.DistributionSpecTableSinkRandomPartitioned; import org.apache.doris.nereids.properties.DistributionSpecTabletIdShuffle; import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.properties.PhysicalProperties; @@ -166,6 +168,7 @@ import org.apache.doris.planner.GroupCommitBlockSink; import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.HashJoinNode.DistributionMode; +import org.apache.doris.planner.HiveTableSink; import org.apache.doris.planner.IntersectNode; import org.apache.doris.planner.JoinNodeBase; import org.apache.doris.planner.MultiCastDataSink; @@ -443,6 +446,20 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink hiveTableSink, PlanTranslatorContext context) { PlanFragment rootFragment = hiveTableSink.child().accept(this, context); + rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); + + TupleDescriptor hiveTuple = context.generateTupleDesc(); + List targetTableColumns = hiveTableSink.getTargetTable().getFullSchema(); + for (Column column : targetTableColumns) { + SlotDescriptor slotDesc = context.addSlotDesc(hiveTuple); + slotDesc.setIsMaterialized(true); + slotDesc.setType(column.getType()); + slotDesc.setColumn(column); + slotDesc.setIsNullable(column.isAllowNull()); + slotDesc.setAutoInc(column.isAutoInc()); + } + HiveTableSink sink = new HiveTableSink(hiveTableSink.getTargetTable()); + rootFragment.setSink(sink); return rootFragment; } @@ -2581,6 +2598,19 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec, return new DataPartition(partitionType, partitionExprs); } else if (distributionSpec instanceof DistributionSpecTabletIdShuffle) { return DataPartition.TABLET_ID; + } else if (distributionSpec instanceof DistributionSpecTableSinkHashPartitioned) { + DistributionSpecTableSinkHashPartitioned partitionSpecHash = + (DistributionSpecTableSinkHashPartitioned) distributionSpec; + List partitionExprs = Lists.newArrayList(); + List partitionExprIds = partitionSpecHash.getOutputColExprIds(); + for (ExprId partitionExprId : partitionExprIds) { + if (childOutputIds.contains(partitionExprId)) { + partitionExprs.add(context.findSlotRef(partitionExprId)); + } + } + return new DataPartition(TPartitionType.TABLE_SINK_HASH_PARTITIONED, partitionExprs); + } else if (distributionSpec instanceof DistributionSpecTableSinkRandomPartitioned) { + return new DataPartition(TPartitionType.TABLE_SINK_RANDOM_PARTITIONED); } else { throw new RuntimeException("Unknown DistributionSpec: " + distributionSpec); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index d89172f12d0824..bb7f316defb936 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -206,7 +206,7 @@ import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundStar; import org.apache.doris.nereids.analyzer.UnboundTVFRelation; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.analyzer.UnboundVariable; import org.apache.doris.nereids.analyzer.UnboundVariable.VariableType; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -422,6 +422,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; import org.apache.doris.nereids.trees.plans.logical.LogicalSelectHint; +import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSort; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; @@ -512,10 +513,11 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { boolean isOverwrite = ctx.INTO() == null; ImmutableList.Builder tableName = ImmutableList.builder(); if (null != ctx.tableName) { - tableName.addAll(visitMultipartIdentifier(ctx.tableName)); + List nameParts = visitMultipartIdentifier(ctx.tableName); + tableName.addAll(nameParts); } else if (null != ctx.tableId) { // process group commit insert table command send by be - TableName name = Env.getCurrentEnv().getInternalCatalog() + TableName name = Env.getCurrentEnv().getCurrentCatalog() .getTableNameByTableId(Long.valueOf(ctx.tableId.getText())); tableName.add(name.getDb()); tableName.add(name.getTbl()); @@ -527,7 +529,7 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { // TODO visit partitionSpecCtx Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); LogicalPlan plan = visitQuery(ctx.query()); - UnboundTableSink sink = new UnboundTableSink<>( + LogicalSink sink = UnboundTableSinkCreator.createUnboundTableSink( tableName.build(), colNames, ImmutableList.of(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkHashPartitioned.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkHashPartitioned.java new file mode 100644 index 00000000000000..4333bd956eec93 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkHashPartitioned.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.properties; + +import org.apache.doris.nereids.trees.expressions.ExprId; + +import java.util.List; + +/** + * use for shuffle data by partition keys before sink. + */ +public class DistributionSpecTableSinkHashPartitioned extends DistributionSpec { + + public static final DistributionSpecTableSinkHashPartitioned INSTANCE = + new DistributionSpecTableSinkHashPartitioned(); + + private List outputColExprIds; + + public DistributionSpecTableSinkHashPartitioned() { + super(); + } + + public List getOutputColExprIds() { + return outputColExprIds; + } + + public void setOutputColExprIds(List outputColExprIds) { + this.outputColExprIds = outputColExprIds; + } + + @Override + public boolean satisfy(DistributionSpec other) { + return other instanceof DistributionSpecTableSinkHashPartitioned; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkRandomPartitioned.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkRandomPartitioned.java new file mode 100644 index 00000000000000..88f791dabea144 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkRandomPartitioned.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.properties; + +/** + * use for Round Robin by data sink. + */ +public class DistributionSpecTableSinkRandomPartitioned extends DistributionSpec { + + public static final DistributionSpecTableSinkRandomPartitioned INSTANCE = + new DistributionSpecTableSinkRandomPartitioned(); + + private DistributionSpecTableSinkRandomPartitioned() { + super(); + } + + @Override + public boolean satisfy(DistributionSpec other) { + return other instanceof DistributionSpecTableSinkRandomPartitioned; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java index cc5d7db6a08ca0..81e7190e163ecb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java @@ -49,6 +49,9 @@ public class PhysicalProperties { public static PhysicalProperties TABLET_ID_SHUFFLE = new PhysicalProperties(DistributionSpecTabletIdShuffle.INSTANCE); + public static PhysicalProperties SINK_RANDOM_PARTITIONED + = new PhysicalProperties(DistributionSpecTableSinkRandomPartitioned.INSTANCE); + private final OrderSpec orderSpec; private final DistributionSpec distributionSpec; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 72df678b120855..16086f7e295003 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; @@ -129,6 +130,16 @@ public Void visitPhysicalOlapTableSink(PhysicalOlapTableSink ola return null; } + @Override + public Void visitPhysicalHiveTableSink(PhysicalHiveTableSink hiveTableSink, PlanContext context) { + if (connectContext != null && !connectContext.getSessionVariable().enableStrictConsistencyDml) { + addRequestPropertyToChildren(PhysicalProperties.ANY); + } else { + addRequestPropertyToChildren(hiveTableSink.getRequirePhysicalProperties()); + } + return null; + } + @Override public Void visitPhysicalResultSink(PhysicalResultSink physicalResultSink, PlanContext context) { addRequestPropertyToChildren(PhysicalProperties.GATHER); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 9650a9147b87cc..672f78163cef82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -30,7 +30,7 @@ public enum RuleType { // **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. **** BINDING_RESULT_SINK(RuleTypeClass.REWRITE), - BINDING_INSERT_TARGET_EXTERNAL_TABLE(RuleTypeClass.REWRITE), + BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_FILE(RuleTypeClass.REWRITE), BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 06ad6921defb42..cac10b75ec3fac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -27,11 +27,15 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.hive.HMSExternalDatabase; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.pattern.MatchingContext; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; @@ -47,6 +51,7 @@ import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -61,10 +66,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -75,287 +82,325 @@ public class BindSink implements AnalysisRuleFactory { @Override public List buildRules() { return ImmutableList.of( - RuleType.BINDING_INSERT_TARGET_TABLE.build(unboundTableSink().thenApply(ctx -> { - UnboundTableSink sink = ctx.root; - Pair pair = bind(ctx.cascadesContext, sink); - Database database = pair.first; - OlapTable table = pair.second; - boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; + RuleType.BINDING_INSERT_TARGET_TABLE.build(unboundTableSink().thenApply(this::bindOlapTableSink)), + RuleType.BINDING_INSERT_FILE.build( + logicalFileSink().when(s -> s.getOutputExprs().isEmpty()) + .then(fileSink -> fileSink.withOutputExprs( + fileSink.child().getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()))) + ), + // TODO: bind hive taget table + RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)) + ); + } - LogicalPlan child = ((LogicalPlan) sink.child()); - boolean childHasSeqCol = child.getOutput().stream() - .anyMatch(slot -> slot.getName().equals(Column.SEQUENCE_COL)); - boolean needExtraSeqCol = isPartialUpdate && !childHasSeqCol && table.hasSequenceCol() - && table.getSequenceMapCol() != null - && sink.getColNames().contains(table.getSequenceMapCol()); - Pair, Integer> bindColumnsResult = - bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol); - List bindColumns = bindColumnsResult.first; - int extraColumnsNum = bindColumnsResult.second; + private Plan bindOlapTableSink(MatchingContext> ctx) { + UnboundTableSink sink = ctx.root; + Pair pair = bind(ctx.cascadesContext, sink); + Database database = pair.first; + OlapTable table = pair.second; + boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; - LogicalOlapTableSink boundSink = new LogicalOlapTableSink<>( - database, - table, - bindColumns, - bindPartitionIds(table, sink.getPartitions(), sink.isTemporaryPartition()), - child.getOutput().stream() - .map(NamedExpression.class::cast) - .collect(ImmutableList.toImmutableList()), - isPartialUpdate, - sink.getDMLCommandType(), - child); + LogicalPlan child = ((LogicalPlan) sink.child()); + boolean childHasSeqCol = child.getOutput().stream() + .anyMatch(slot -> slot.getName().equals(Column.SEQUENCE_COL)); + boolean needExtraSeqCol = isPartialUpdate && !childHasSeqCol && table.hasSequenceCol() + && table.getSequenceMapCol() != null + && sink.getColNames().contains(table.getSequenceMapCol()); + Pair, Integer> bindColumnsResult = + bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol); + List bindColumns = bindColumnsResult.first; + int extraColumnsNum = bindColumnsResult.second; - // we need to insert all the columns of the target table - // although some columns are not mentions. - // so we add a projects to supply the default value. - if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) { - throw new AnalysisException("insert into cols should be corresponding to the query output"); - } + LogicalOlapTableSink boundSink = new LogicalOlapTableSink<>( + database, + table, + bindColumns, + bindPartitionIds(table, sink.getPartitions(), sink.isTemporaryPartition()), + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + isPartialUpdate, + sink.getDMLCommandType(), + child); - try { - // For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP), - // user MUST specify the sequence column while inserting data - // - // case1: create table by `function_column.sequence_col` - // a) insert with column list, must include the sequence map column - // b) insert without column list, already contains the column, don't need to check - // case2: create table by `function_column.sequence_type` - // a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__ - // b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__ - // by default, will fail. - if (table.hasSequenceCol()) { - boolean haveInputSeqCol = false; - Optional seqColInTable = Optional.empty(); - if (table.getSequenceMapCol() != null) { - if (!sink.getColNames().isEmpty()) { - if (sink.getColNames().stream() - .anyMatch(c -> c.equalsIgnoreCase(table.getSequenceMapCol()))) { - haveInputSeqCol = true; // case1.a - } - } else { - haveInputSeqCol = true; // case1.b - } - seqColInTable = table.getFullSchema().stream() - .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol())) - .findFirst(); - } else { - if (!sink.getColNames().isEmpty()) { - if (sink.getColNames().stream() - .anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) { - haveInputSeqCol = true; // case2.a - } // else case2.b - } - } + // we need to insert all the columns of the target table + // although some columns are not mentions. + // so we add a projects to supply the default value. + if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) { + throw new AnalysisException("insert into cols should be corresponding to the query output"); + } - // Don't require user to provide sequence column for partial updates, - // including the following cases: - // 1. it's a load job with `partial_columns=true` - // 2. UPDATE and DELETE, planner will automatically add these hidden columns - if (!haveInputSeqCol && !isPartialUpdate && ( - boundSink.getDmlCommandType() != DMLCommandType.UPDATE - && boundSink.getDmlCommandType() != DMLCommandType.DELETE)) { - if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null - || !seqColInTable.get().getDefaultValue() - .equalsIgnoreCase(DefaultValue.CURRENT_TIMESTAMP)) { - throw new org.apache.doris.common.AnalysisException("Table " + table.getName() - + " has sequence column, need to specify the sequence column"); - } - } + try { + // For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP), + // user MUST specify the sequence column while inserting data + // + // case1: create table by `function_column.sequence_col` + // a) insert with column list, must include the sequence map column + // b) insert without column list, already contains the column, don't need to check + // case2: create table by `function_column.sequence_type` + // a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__ + // b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__ + // by default, will fail. + if (table.hasSequenceCol()) { + boolean haveInputSeqCol = false; + Optional seqColInTable = Optional.empty(); + if (table.getSequenceMapCol() != null) { + if (!sink.getColNames().isEmpty()) { + if (sink.getColNames().stream() + .anyMatch(c -> c.equalsIgnoreCase(table.getSequenceMapCol()))) { + haveInputSeqCol = true; // case1.a } - } catch (Exception e) { - throw new AnalysisException(e.getMessage(), e.getCause()); + } else { + haveInputSeqCol = true; // case1.b } + seqColInTable = table.getFullSchema().stream() + .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol())) + .findFirst(); + } else { + if (!sink.getColNames().isEmpty()) { + if (sink.getColNames().stream() + .anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) { + haveInputSeqCol = true; // case2.a + } // else case2.b + } + } - // we need to insert all the columns of the target table - // although some columns are not mentions. - // so we add a projects to supply the default value. - - Map columnToChildOutput = Maps.newHashMap(); - for (int i = 0; i < child.getOutput().size(); ++i) { - columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i)); + // Don't require user to provide sequence column for partial updates, + // including the following cases: + // 1. it's a load job with `partial_columns=true` + // 2. UPDATE and DELETE, planner will automatically add these hidden columns + if (!haveInputSeqCol && !isPartialUpdate && ( + boundSink.getDmlCommandType() != DMLCommandType.UPDATE + && boundSink.getDmlCommandType() != DMLCommandType.DELETE)) { + if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null + || !seqColInTable.get().getDefaultValue() + .equalsIgnoreCase(DefaultValue.CURRENT_TIMESTAMP)) { + throw new org.apache.doris.common.AnalysisException("Table " + table.getName() + + " has sequence column, need to specify the sequence column"); } + } + } + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e.getCause()); + } - Map columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - NereidsParser expressionParser = new NereidsParser(); + // we need to insert all the columns of the target table + // although some columns are not mentions. + // so we add a projects to supply the default value. - // generate slots not mentioned in sql, mv slots and shaded slots. - for (Column column : boundSink.getTargetTable().getFullSchema()) { - if (column.isMaterializedViewColumn()) { - List refs = column.getRefColumns(); - // now we have to replace the column to slots. - Preconditions.checkArgument(refs != null, - "mv column %s 's ref column cannot be null", column); - Expression parsedExpression = expressionParser.parseExpression( - column.getDefineExpr().toSqlWithoutTbl()); - Expression boundSlotExpression = SlotReplacer.INSTANCE - .replace(parsedExpression, columnToOutput); - // the boundSlotExpression is an expression whose slots are bound but function - // may not be bound, we have to bind it again. - // for example: to_bitmap. - Expression boundExpression = FunctionBinder.INSTANCE.rewrite( - boundSlotExpression, new ExpressionRewriteContext(ctx.cascadesContext)); - if (boundExpression instanceof Alias) { - boundExpression = ((Alias) boundExpression).child(); - } - NamedExpression slot = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl()); - columnToOutput.put(column.getName(), slot); - } else if (columnToChildOutput.containsKey(column) - // do not process explicitly use DEFAULT value here: - // insert into table t values(DEFAULT) - && !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) { - columnToOutput.put(column.getName(), columnToChildOutput.get(column)); - } else { - if (table.hasSequenceCol() - && column.getName().equals(Column.SEQUENCE_COL) - && table.getSequenceMapCol() != null) { - Optional seqCol = table.getFullSchema().stream() - .filter(col -> col.getName().equals(table.getSequenceMapCol())) - .findFirst(); - if (!seqCol.isPresent()) { - throw new AnalysisException("sequence column is not contained in" - + " target table " + table.getName()); - } - if (columnToOutput.get(seqCol.get().getName()) != null) { - // should generate diff exprId for seq column - NamedExpression seqColumn = columnToOutput.get(seqCol.get().getName()); - if (seqColumn instanceof Alias) { - seqColumn = new Alias(((Alias) seqColumn).child(), column.getName()); - } else { - seqColumn = new Alias(seqColumn, column.getName()); - } - columnToOutput.put(column.getName(), seqColumn); - } - } else if (isPartialUpdate) { - // If the current load is a partial update, the values of unmentioned - // columns will be filled in SegmentWriter. And the output of sink node - // should not contain these unmentioned columns, so we just skip them. + Map columnToChildOutput = Maps.newHashMap(); + for (int i = 0; i < child.getOutput().size(); ++i) { + columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i)); + } - // But if the column has 'on update value', we should unconditionally - // update the value of the column to the current timestamp whenever there - // is an update on the row - if (column.hasOnUpdateDefaultValue()) { - Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite( - new NereidsParser().parseExpression( - column.getOnUpdateDefaultValueExpr().toSqlWithoutTbl()), - new ExpressionRewriteContext(ctx.cascadesContext)); - columnToOutput.put(column.getName(), - new Alias(defualtValueExpression, column.getName())); - } else { - continue; - } - } else if (column.getDefaultValue() == null) { - // throw exception if explicitly use Default value but no default value present - // insert into table t values(DEFAULT) - if (columnToChildOutput.get(column) instanceof DefaultValueSlot) { - throw new AnalysisException("Column has no default value," - + " column=" + column.getName()); - } - // Otherwise, the unmentioned columns should be filled with default values - // or null values - columnToOutput.put(column.getName(), new Alias( - new NullLiteral(DataType.fromCatalogType(column.getType())), - column.getName() - )); - } else { - try { - // it comes from the original planner, if default value expression is - // null, we use the literal string of the default value, or it may be - // default value function, like CURRENT_TIMESTAMP. - if (column.getDefaultValueExpr() == null) { - columnToOutput.put(column.getName(), - new Alias(Literal.of(column.getDefaultValue()) - .checkedCastTo(DataType.fromCatalogType(column.getType())), - column.getName())); - } else { - Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite( - new NereidsParser().parseExpression( - column.getDefaultValueExpr().toSqlWithoutTbl()), - new ExpressionRewriteContext(ctx.cascadesContext)); - if (defualtValueExpression instanceof Alias) { - defualtValueExpression = ((Alias) defualtValueExpression).child(); - } - columnToOutput.put(column.getName(), - new Alias(defualtValueExpression, column.getName())); - } - } catch (Exception e) { - throw new AnalysisException(e.getMessage(), e.getCause()); - } - } - } - } - List fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); - if (child instanceof LogicalOneRowRelation) { - // remove default value slot in one row relation - child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child) - .getProjects().stream() - .filter(p -> !(p instanceof DefaultValueSlot)) - .collect(ImmutableList.toImmutableList())); - } - LogicalProject fullOutputProject = new LogicalProject<>(fullOutputExprs, child); + Map columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + NereidsParser expressionParser = new NereidsParser(); - // add cast project - List castExprs = Lists.newArrayList(); - for (int i = 0; i < table.getFullSchema().size(); ++i) { - Column col = table.getFullSchema().get(i); - NamedExpression expr = columnToOutput.get(col.getName()); - if (expr == null) { - // If `expr` is null, it means that the current load is a partial update - // and `col` should not be contained in the output of the sink node so - // we skip it. - continue; - } - DataType inputType = expr.getDataType(); - DataType targetType = DataType.fromCatalogType(table.getFullSchema().get(i).getType()); - Expression castExpr = expr; - // TODO move string like type logic into TypeCoercionUtils#castIfNotSameType - if (isSourceAndTargetStringLikeType(inputType, targetType) && !inputType.equals(targetType)) { - int sourceLength = ((CharacterType) inputType).getLen(); - int targetLength = ((CharacterType) targetType).getLen(); - if (sourceLength == targetLength) { - castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); - } else if (sourceLength > targetLength && targetLength >= 0) { - castExpr = new Substring(castExpr, Literal.of(1), Literal.of(targetLength)); - } else if (targetType.isStringType()) { - castExpr = new Cast(castExpr, StringType.INSTANCE); - } + // generate slots not mentioned in sql, mv slots and shaded slots. + for (Column column : boundSink.getTargetTable().getFullSchema()) { + if (column.isMaterializedViewColumn()) { + List refs = column.getRefColumns(); + // now we have to replace the column to slots. + Preconditions.checkArgument(refs != null, + "mv column %s 's ref column cannot be null", column); + Expression parsedExpression = expressionParser.parseExpression( + column.getDefineExpr().toSqlWithoutTbl()); + Expression boundSlotExpression = SlotReplacer.INSTANCE + .replace(parsedExpression, columnToOutput); + // the boundSlotExpression is an expression whose slots are bound but function + // may not be bound, we have to bind it again. + // for example: to_bitmap. + Expression boundExpression = FunctionBinder.INSTANCE.rewrite( + boundSlotExpression, new ExpressionRewriteContext(ctx.cascadesContext)); + if (boundExpression instanceof Alias) { + boundExpression = ((Alias) boundExpression).child(); + } + NamedExpression slot = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl()); + columnToOutput.put(column.getName(), slot); + } else if (columnToChildOutput.containsKey(column) + // do not process explicitly use DEFAULT value here: + // insert into table t values(DEFAULT) + && !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) { + columnToOutput.put(column.getName(), columnToChildOutput.get(column)); + } else { + if (table.hasSequenceCol() + && column.getName().equals(Column.SEQUENCE_COL) + && table.getSequenceMapCol() != null) { + Optional seqCol = table.getFullSchema().stream() + .filter(col -> col.getName().equals(table.getSequenceMapCol())) + .findFirst(); + if (!seqCol.isPresent()) { + throw new AnalysisException("sequence column is not contained in" + + " target table " + table.getName()); + } + if (columnToOutput.get(seqCol.get().getName()) != null) { + // should generate diff exprId for seq column + NamedExpression seqColumn = columnToOutput.get(seqCol.get().getName()); + if (seqColumn instanceof Alias) { + seqColumn = new Alias(((Alias) seqColumn).child(), column.getName()); } else { - castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); + seqColumn = new Alias(seqColumn, column.getName()); } - if (castExpr instanceof NamedExpression) { - castExprs.add(((NamedExpression) castExpr)); + columnToOutput.put(column.getName(), seqColumn); + } + } else if (isPartialUpdate) { + // If the current load is a partial update, the values of unmentioned + // columns will be filled in SegmentWriter. And the output of sink node + // should not contain these unmentioned columns, so we just skip them. + + // But if the column has 'on update value', we should unconditionally + // update the value of the column to the current timestamp whenever there + // is an update on the row + if (column.hasOnUpdateDefaultValue()) { + Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite( + new NereidsParser().parseExpression( + column.getOnUpdateDefaultValueExpr().toSqlWithoutTbl()), + new ExpressionRewriteContext(ctx.cascadesContext)); + columnToOutput.put(column.getName(), + new Alias(defualtValueExpression, column.getName())); + } else { + continue; + } + } else if (column.getDefaultValue() == null) { + // throw exception if explicitly use Default value but no default value present + // insert into table t values(DEFAULT) + if (columnToChildOutput.get(column) instanceof DefaultValueSlot) { + throw new AnalysisException("Column has no default value," + + " column=" + column.getName()); + } + // Otherwise, the unmentioned columns should be filled with default values + // or null values + columnToOutput.put(column.getName(), new Alias( + new NullLiteral(DataType.fromCatalogType(column.getType())), + column.getName() + )); + } else { + try { + // it comes from the original planner, if default value expression is + // null, we use the literal string of the default value, or it may be + // default value function, like CURRENT_TIMESTAMP. + if (column.getDefaultValueExpr() == null) { + columnToOutput.put(column.getName(), + new Alias(Literal.of(column.getDefaultValue()) + .checkedCastTo(DataType.fromCatalogType(column.getType())), + column.getName())); } else { - castExprs.add(new Alias(castExpr)); + Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite( + new NereidsParser().parseExpression( + column.getDefaultValueExpr().toSqlWithoutTbl()), + new ExpressionRewriteContext(ctx.cascadesContext)); + if (defualtValueExpression instanceof Alias) { + defualtValueExpression = ((Alias) defualtValueExpression).child(); + } + columnToOutput.put(column.getName(), + new Alias(defualtValueExpression, column.getName())); } + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e.getCause()); } - if (!castExprs.equals(fullOutputExprs)) { - fullOutputProject = new LogicalProject(castExprs, fullOutputProject); - } + } + } + } + List fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); + if (child instanceof LogicalOneRowRelation) { + // remove default value slot in one row relation + child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child) + .getProjects().stream() + .filter(p -> !(p instanceof DefaultValueSlot)) + .collect(ImmutableList.toImmutableList())); + } + LogicalProject fullOutputProject = new LogicalProject<>(fullOutputExprs, child); - return boundSink.withChildAndUpdateOutput(fullOutputProject); + // add cast project + List castExprs = Lists.newArrayList(); + for (int i = 0; i < table.getFullSchema().size(); ++i) { + Column col = table.getFullSchema().get(i); + NamedExpression expr = columnToOutput.get(col.getName()); + if (expr == null) { + // If `expr` is null, it means that the current load is a partial update + // and `col` should not be contained in the output of the sink node so + // we skip it. + continue; + } + DataType inputType = expr.getDataType(); + DataType targetType = DataType.fromCatalogType(table.getFullSchema().get(i).getType()); + Expression castExpr = expr; + // TODO move string like type logic into TypeCoercionUtils#castIfNotSameType + if (isSourceAndTargetStringLikeType(inputType, targetType) && !inputType.equals(targetType)) { + int sourceLength = ((CharacterType) inputType).getLen(); + int targetLength = ((CharacterType) targetType).getLen(); + if (sourceLength == targetLength) { + castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); + } else if (sourceLength > targetLength && targetLength >= 0) { + castExpr = new Substring(castExpr, Literal.of(1), Literal.of(targetLength)); + } else if (targetType.isStringType()) { + castExpr = new Cast(castExpr, StringType.INSTANCE); + } + } else { + castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); + } + if (castExpr instanceof NamedExpression) { + castExprs.add(((NamedExpression) castExpr)); + } else { + castExprs.add(new Alias(castExpr)); + } + } + if (!castExprs.equals(fullOutputExprs)) { + fullOutputProject = new LogicalProject(castExprs, fullOutputProject); + } - })), - RuleType.BINDING_INSERT_FILE.build( - logicalFileSink().when(s -> s.getOutputExprs().isEmpty()) - .then(fileSink -> fileSink.withOutputExprs( - fileSink.child().getOutput().stream() - .map(NamedExpression.class::cast) - .collect(ImmutableList.toImmutableList()))) - ), - // TODO: bind hive taget table - RuleType.BINDING_INSERT_TARGET_EXTERNAL_TABLE.build( - logicalHiveTableSink().when(s -> s.getOutputExprs().isEmpty()) - .then(hiveTableSink -> hiveTableSink.withOutputExprs( - hiveTableSink.child().getOutput().stream() - .map(NamedExpression.class::cast) - .collect(ImmutableList.toImmutableList()))) - ) - ); + return boundSink.withChildAndUpdateOutput(fullOutputProject); + } + + private Plan bindHiveTableSink(MatchingContext> ctx) { + UnboundHiveTableSink sink = ctx.root; + Pair pair = bind(ctx.cascadesContext, sink); + HMSExternalDatabase database = pair.first; + HMSExternalTable table = pair.second; + LogicalPlan child = ((LogicalPlan) sink.child()); + + List bindColumns; + if (sink.getColNames().isEmpty()) { + bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList()); + } else { + bindColumns = sink.getColNames().stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(ImmutableList.toImmutableList()); + } + Set hivePartitionKeys = table.getRemoteTable() + .getPartitionKeys().stream() + .map(FieldSchema::getName) + .collect(Collectors.toSet()); + LogicalHiveTableSink boundSink = new LogicalHiveTableSink<>( + database, + table, + bindColumns, + hivePartitionKeys, + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + sink.getDMLCommandType(), + Optional.empty(), + Optional.empty(), + child); + // we need to insert all the columns of the target table + if (boundSink.getCols().size() != child.getOutput().size()) { + throw new AnalysisException("insert into cols should be corresponding to the query output"); + } + return boundSink; } private Pair bind(CascadesContext cascadesContext, UnboundTableSink sink) { List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), sink.getNameParts()); - Pair pair = RelationUtil.getDbAndTable(tableQualifier, + Pair, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, cascadesContext.getConnectContext().getEnv()); if (!(pair.second instanceof OlapTable)) { try { @@ -368,6 +413,18 @@ private Pair bind(CascadesContext cascadesContext, UnboundT return Pair.of(((Database) pair.first), (OlapTable) pair.second); } + private Pair bind(CascadesContext cascadesContext, + UnboundHiveTableSink sink) { + List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), + sink.getNameParts()); + Pair, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, + cascadesContext.getConnectContext().getEnv()); + if (pair.second instanceof HMSExternalTable) { + return Pair.of(((HMSExternalDatabase) pair.first), (HMSExternalTable) pair.second); + } + throw new AnalysisException("the target table of insert into is not a Hive table"); + } + private List bindPartitionIds(OlapTable table, List partitions, boolean temp) { return partitions.isEmpty() ? ImmutableList.of() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java index 13329a5d55e1d8..f2128658616070 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java @@ -37,13 +37,13 @@ public Rule build() { sink.getDatabase(), sink.getTargetTable(), sink.getCols(), - sink.getPartitionIds(), sink.getOutputExprs(), Optional.empty(), sink.getLogicalProperties(), null, null, - sink.child()); + sink.child(), + sink.getHivePartitionKeys()); }).toRule(RuleType.LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 11a6a7b5683de6..1c493deae033cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -46,8 +46,10 @@ public enum PlanType { // logical sinks LOGICAL_FILE_SINK, LOGICAL_OLAP_TABLE_SINK, + LOGICAL_HIVE_TABLE_SINK, LOGICAL_RESULT_SINK, LOGICAL_UNBOUND_OLAP_TABLE_SINK, + LOGICAL_UNBOUND_HIVE_TABLE_SINK, LOGICAL_UNBOUND_RESULT_SINK, // logical others diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java index 10dfdd2c2b2aec..1a6004e939a974 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java @@ -25,7 +25,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundResultSink; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.annotation.Developing; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.PhysicalProperties; @@ -153,8 +153,8 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { throw new AnalysisException(e.getMessage(), e.getCause()); } - query = new UnboundTableSink<>(createTableInfo.getTableNameParts(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), query); + query = UnboundTableSinkCreator.createUnboundTableSink(createTableInfo.getTableNameParts(), + ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), query); try { new InsertIntoTableCommand(query, Optional.empty(), Optional.empty()).run(ctx, executor); if (ctx.getState().getStateType() == MysqlStateType.ERR) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java index 3791b47f1400ff..ff70c75558db0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java @@ -21,7 +21,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; @@ -115,7 +115,7 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer && cols.size() < targetTable.getColumns().size(); // make UnboundTableSink - return new UnboundTableSink<>(nameParts, cols, ImmutableList.of(), + return UnboundTableSinkCreator.createUnboundTableSink(nameParts, cols, ImmutableList.of(), isTempPart, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index 01f2d0f8e85fd1..cea0efc6fe733e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -37,7 +37,7 @@ import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundStar; import org.apache.doris.nereids.analyzer.UnboundTVFRelation; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; @@ -236,7 +236,7 @@ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataD checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, selectLists); boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite() && sinkCols.size() < olapTable.getColumns().size(); - return new UnboundTableSink<>(dataDesc.getNameParts(), sinkCols, ImmutableList.of(), + return UnboundTableSinkCreator.createUnboundTableSink(dataDesc.getNameParts(), sinkCols, ImmutableList.of(), false, dataDesc.getPartitionNames(), isPartialUpdate, DMLCommandType.LOAD, tvfLogicalPlan); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 29d3b8d96f37d2..76143c0e80ff2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.rules.analysis.SlotBinder; @@ -189,8 +189,8 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery)); } // make UnboundTableSink - return new UnboundTableSink<>(nameParts, isPartialUpdate ? partialUpdateColNames : ImmutableList.of(), - ImmutableList.of(), + return UnboundTableSinkCreator.createUnboundTableSink(nameParts, + isPartialUpdate ? partialUpdateColNames : ImmutableList.of(), ImmutableList.of(), false, ImmutableList.of(), isPartialUpdate, DMLCommandType.UPDATE, logicalQuery); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 8b714b64edfcf4..b456be6e2600ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -24,9 +24,10 @@ import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.UserException; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; @@ -42,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.RelationUtil; @@ -76,7 +78,7 @@ private UpdateMvByPartitionCommand(LogicalPlan logicalQuery) { * @return command */ public static UpdateMvByPartitionCommand from(MTMV mv, Set partitionIds, - Map tableWithPartKey) { + Map tableWithPartKey) throws UserException { NereidsParser parser = new NereidsParser(); Map> predicates = constructTableWithPredicates(mv, partitionIds, tableWithPartKey); @@ -86,9 +88,8 @@ public static UpdateMvByPartitionCommand from(MTMV mv, Set partitionIds, if (plan instanceof Sink) { plan = plan.child(0); } - UnboundTableSink sink = - new UnboundTableSink<>(mv.getFullQualifiers(), ImmutableList.of(), ImmutableList.of(), - parts, plan); + LogicalSink sink = UnboundTableSinkCreator.createUnboundTableSink(mv.getFullQualifiers(), + ImmutableList.of(), ImmutableList.of(), parts, plan); return new UpdateMvByPartitionCommand(sink); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java new file mode 100644 index 00000000000000..9e4c2bc92a3569 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +/** + * For Hive Table + */ +public class HiveInsertCommandContext extends InsertCommandContext { + private boolean overwrite = true; + + public boolean isOverwrite() { + return overwrite; + } + + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index f8bc8f2db47924..fc6b7a776b0553 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -17,20 +17,24 @@ package org.apache.doris.nereids.trees.plans.commands.insert; -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.catalog.Env; +import org.apache.doris.common.ErrorCode; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.HiveTableSink; import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,6 +47,7 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class); private static final long INVALID_TXN_ID = -1L; private long txnId = INVALID_TXN_ID; + private TransactionStatus txnStatus = TransactionStatus.ABORTED; /** * constructor @@ -59,20 +64,15 @@ public long getTxnId() { @Override public void beginTransaction() { - + // TODO: use hive txn rather than internal txn } @Override protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { HiveTableSink hiveTableSink = (HiveTableSink) sink; - // PhysicalHiveTableSink physicalHiveTableSink = (PhysicalHiveTableSink) physicalSink; + PhysicalHiveTableSink physicalHiveSink = (PhysicalHiveTableSink) physicalSink; try { - hiveTableSink.init(); - hiveTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); - TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId); - if (state == null) { - throw new AnalysisException("txn does not exist: " + txnId); - } + hiveTableSink.bindDataSink(physicalHiveSink.getCols(), insertCtx); } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); } @@ -80,21 +80,36 @@ protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink p @Override protected void beforeExec() { - + // check params } @Override protected void onComplete() throws UserException { - + if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) { + LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier()); + } else { + txnStatus = TransactionStatus.COMMITTED; + } } @Override protected void onFail(Throwable t) { - + errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); + String queryId = DebugUtil.printId(ctx.queryId()); + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t); + if (txnId != INVALID_TXN_ID) { + LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId); + StringBuilder sb = new StringBuilder(t.getMessage()); + if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { + sb.append(". url: ").append(coordinator.getTrackingUrl()); + } + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString()); + } } @Override protected void afterExec(StmtExecutor executor) { - + // TODO: set THivePartitionUpdate } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index dd02c2c816c264..3e4ed3afd78d9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -146,9 +146,8 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor if (ctx.getMysqlChannel() != null) { ctx.getMysqlChannel().reset(); } - - Optional> plan = (planner.getPhysicalPlan() - .>>collect(PhysicalSink.class::isInstance)).stream() + Optional> plan = (planner.getPhysicalPlan() + .>>collect(PhysicalSink.class::isInstance)).stream() .findAny(); Preconditions.checkArgument(plan.isPresent(), "insert into command must contain target table"); PhysicalSink physicalSink = plan.get(); @@ -173,6 +172,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor } else if (physicalSink instanceof PhysicalHiveTableSink) { HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, insertCtx); + // set hive query options } else { // TODO: support other table types throw new AnalysisException("insert into command only support olap table"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index fffdf06b54ed48..788871c744e384 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -24,10 +24,13 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.common.util.InternalDatabaseUtil; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.TreeNode; @@ -37,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -95,8 +99,8 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); - if (!(targetTableIf instanceof OlapTable)) { - throw new AnalysisException("insert into overwrite only support OLAP table." + if (!(targetTableIf instanceof OlapTable || targetTableIf instanceof HMSExternalTable)) { + throw new AnalysisException("insert into overwrite only support OLAP and HMS table." + " But current table type is " + targetTableIf.getType()); } this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf); @@ -156,20 +160,40 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { */ private void insertInto(ConnectContext ctx, StmtExecutor executor, List tempPartitionNames) throws Exception { - UnboundTableSink sink = (UnboundTableSink) logicalQuery; - UnboundTableSink copySink = new UnboundTableSink<>( - sink.getNameParts(), - sink.getColNames(), - sink.getHints(), - true, - tempPartitionNames, - sink.isPartialUpdate(), - sink.getDMLCommandType(), - (LogicalPlan) (sink.child(0))); - // for overwrite situation, we disable auto create partition. - OlapInsertCommandContext insertCtx = new OlapInsertCommandContext(); - insertCtx.setAllowAutoPartition(false); - InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx)); + UnboundLogicalSink copySink; + InsertCommandContext insertCtx; + if (logicalQuery instanceof UnboundTableSink) { + UnboundTableSink sink = (UnboundTableSink) logicalQuery; + copySink = (UnboundLogicalSink) UnboundTableSinkCreator.createUnboundTableSink( + sink.getNameParts(), + sink.getColNames(), + sink.getHints(), + true, + tempPartitionNames, + sink.isPartialUpdate(), + sink.getDMLCommandType(), + (LogicalPlan) (sink.child(0))); + // for overwrite situation, we disable auto create partition. + insertCtx = new OlapInsertCommandContext(); + ((OlapInsertCommandContext) insertCtx).setAllowAutoPartition(false); + } else if (logicalQuery instanceof UnboundHiveTableSink) { + UnboundHiveTableSink sink = (UnboundHiveTableSink) logicalQuery; + copySink = (UnboundLogicalSink) UnboundTableSinkCreator.createUnboundTableSink( + sink.getNameParts(), + sink.getColNames(), + sink.getHints(), + false, + sink.getPartitions(), + false, + sink.getDMLCommandType(), + (LogicalPlan) (sink.child(0))); + insertCtx = new HiveInsertCommandContext(); + ((HiveInsertCommandContext) insertCtx).setOverwrite(false); + } else { + throw new RuntimeException("Current catalog does not support insert overwrite yet."); + } + InsertIntoTableCommand insertCommand = + new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx)); insertCommand.run(ctx, executor); if (ctx.getState().getStateType() == MysqlStateType.ERR) { String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 2cef70629ccb17..f0e7fab736cdbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -27,6 +27,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.analyzer.UnboundAlias; +import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -46,6 +47,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.nereids.util.TypeCoercionUtils; @@ -233,39 +235,39 @@ private static void beginBatchInsertTransaction(ConnectContext ctx, * normalize plan to let it could be process correctly by nereids */ public static Plan normalizePlan(Plan plan, TableIf table) { - UnboundTableSink unboundTableSink = (UnboundTableSink) plan; - - if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS - && unboundTableSink.isPartialUpdate()) { - // check the necessary conditions for partial updates - OlapTable olapTable = (OlapTable) table; - if (!olapTable.getEnableUniqueKeyMergeOnWrite()) { - throw new AnalysisException("Partial update is only allowed on " - + "unique table with merge-on-write enabled."); - } - if (unboundTableSink.getDMLCommandType() == DMLCommandType.INSERT) { - if (unboundTableSink.getColNames().isEmpty()) { - throw new AnalysisException("You must explicitly specify the columns to be updated when " - + "updating partial columns using the INSERT statement."); - } - for (Column col : olapTable.getFullSchema()) { - Optional insertCol = unboundTableSink.getColNames().stream() - .filter(c -> c.equalsIgnoreCase(col.getName())).findFirst(); - if (col.isKey() && !insertCol.isPresent()) { - throw new AnalysisException("Partial update should include all key columns, missing: " - + col.getName()); - } - } - } - } + UnboundLogicalSink unboundLogicalSink = (UnboundLogicalSink) plan; if (table instanceof HMSExternalTable) { - // TODO: check HMSExternalTable HMSExternalTable hiveTable = (HMSExternalTable) table; if (hiveTable.isView()) { throw new AnalysisException("View is not support in hive external table."); } } - Plan query = unboundTableSink.child(); + if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS) { + if (unboundLogicalSink instanceof UnboundTableSink + && ((UnboundTableSink) unboundLogicalSink).isPartialUpdate()) { + // check the necessary conditions for partial updates + OlapTable olapTable = (OlapTable) table; + if (!olapTable.getEnableUniqueKeyMergeOnWrite()) { + throw new AnalysisException("Partial update is only allowed on " + + "unique table with merge-on-write enabled."); + } + if (unboundLogicalSink.getDMLCommandType() == DMLCommandType.INSERT) { + if (unboundLogicalSink.getColNames().isEmpty()) { + throw new AnalysisException("You must explicitly specify the columns to be updated when " + + "updating partial columns using the INSERT statement."); + } + for (Column col : olapTable.getFullSchema()) { + Optional insertCol = unboundLogicalSink.getColNames().stream() + .filter(c -> c.equalsIgnoreCase(col.getName())).findFirst(); + if (col.isKey() && !insertCol.isPresent()) { + throw new AnalysisException("Partial update should include all key columns, missing: " + + col.getName()); + } + } + } + } + } + Plan query = unboundLogicalSink.child(); if (!(query instanceof LogicalInlineTable)) { return plan; } @@ -276,28 +278,28 @@ public static Plan normalizePlan(Plan plan, TableIf table) { for (List values : logicalInlineTable.getConstantExprsList()) { ImmutableList.Builder constantExprs = ImmutableList.builder(); if (values.isEmpty()) { - if (CollectionUtils.isNotEmpty(unboundTableSink.getColNames())) { + if (CollectionUtils.isNotEmpty(unboundLogicalSink.getColNames())) { throw new AnalysisException("value list should not be empty if columns are specified"); } for (Column column : columns) { constantExprs.add(generateDefaultExpression(column)); } } else { - if (CollectionUtils.isNotEmpty(unboundTableSink.getColNames())) { - if (values.size() != unboundTableSink.getColNames().size()) { + if (CollectionUtils.isNotEmpty(unboundLogicalSink.getColNames())) { + if (values.size() != unboundLogicalSink.getColNames().size()) { throw new AnalysisException("Column count doesn't match value count"); } for (int i = 0; i < values.size(); i++) { Column sameNameColumn = null; for (Column column : table.getBaseSchema(true)) { - if (unboundTableSink.getColNames().get(i).equalsIgnoreCase(column.getName())) { + if (unboundLogicalSink.getColNames().get(i).equalsIgnoreCase(column.getName())) { sameNameColumn = column; break; } } if (sameNameColumn == null) { throw new AnalysisException("Unknown column '" - + unboundTableSink.getColNames().get(i) + "' in target table."); + + unboundLogicalSink.getColNames().get(i) + "' in target table."); } if (values.get(i) instanceof DefaultValueSlot) { constantExprs.add(generateDefaultExpression(sameNameColumn)); @@ -345,11 +347,15 @@ private static Expression castValue(Expression value, DataType targetType) { * get target table from names. */ public static TableIf getTargetTable(Plan plan, ConnectContext ctx) { - if (!(plan instanceof UnboundTableSink)) { - throw new AnalysisException("the root of plan should be UnboundTableSink" + UnboundLogicalSink unboundTableSink; + if (plan instanceof UnboundTableSink) { + unboundTableSink = (UnboundTableSink) plan; + } else if (plan instanceof UnboundHiveTableSink) { + unboundTableSink = (UnboundHiveTableSink) plan; + } else { + throw new AnalysisException("the root of plan should be UnboundTableSink or UnboundHiveTableSink" + " but it is " + plan.getType()); } - UnboundTableSink unboundTableSink = (UnboundTableSink) plan; List tableQualifier = RelationUtil.getQualifierName(ctx, unboundTableSink.getNameParts()); return RelationUtil.getDbAndTable(tableQualifier, ctx.getEnv()).second; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java index f05790634bacce..7ae217f3fb4736 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; /** * logical hive table sink for insert command @@ -47,41 +48,46 @@ public class LogicalHiveTableSink extends LogicalTableS private final HMSExternalDatabase database; private final HMSExternalTable targetTable; private final List cols; - private final List partitionIds; + private final Set hivePartitionKeys; private final DMLCommandType dmlCommandType; /** * constructor */ - public LogicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable targetTable, List cols, - List partitionIds, List outputExprs, - DMLCommandType dmlCommandType, Optional groupExpression, - Optional logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); + public LogicalHiveTableSink(HMSExternalDatabase database, + HMSExternalTable targetTable, + List cols, + Set hivePartitionKeys, + List outputExprs, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(PlanType.LOGICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); this.database = Objects.requireNonNull(database, "database != null in LogicalHiveTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalHiveTableSink"); this.cols = Utils.copyRequiredList(cols); this.dmlCommandType = dmlCommandType; - this.partitionIds = Utils.copyRequiredList(partitionIds); + this.hivePartitionKeys = hivePartitionKeys; } public Plan withChildAndUpdateOutput(Plan child) { List output = child.getOutput().stream() .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()); - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, output, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, output, dmlCommandType, Optional.empty(), Optional.empty(), child); } @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "LogicalHiveTableSink only accepts one child"); - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs, dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); } public LogicalHiveTableSink withOutputExprs(List outputExprs) { - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs, dmlCommandType, Optional.empty(), Optional.empty(), child()); } @@ -97,8 +103,8 @@ public List getCols() { return cols; } - public List getPartitionIds() { - return partitionIds; + public Set getHivePartitionKeys() { + return hivePartitionKeys; } public DMLCommandType getDmlCommandType() { @@ -119,13 +125,12 @@ public boolean equals(Object o) { LogicalHiveTableSink that = (LogicalHiveTableSink) o; return dmlCommandType == that.dmlCommandType && Objects.equals(database, that.database) - && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) - && Objects.equals(partitionIds, that.partitionIds); + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, dmlCommandType); + return Objects.hash(super.hashCode(), database, targetTable, cols, dmlCommandType); } @Override @@ -135,7 +140,7 @@ public String toString() { "database", database.getFullName(), "targetTable", targetTable.getName(), "cols", cols, - "partitionIds", partitionIds, + "hivePartitionKeys", hivePartitionKeys, "dmlCommandType", dmlCommandType ); } @@ -147,14 +152,14 @@ public R accept(PlanVisitor visitor, C context) { @Override public Plan withGroupExpression(Optional groupExpression) { - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs, dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs, dmlCommandType, groupExpression, logicalProperties, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/UnboundLogicalSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/UnboundLogicalSink.java new file mode 100644 index 00000000000000..c02fa1bac872af --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/UnboundLogicalSink.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.util.Utils; + +import java.util.List; +import java.util.Optional; + +/** abstract logical sink */ +public abstract class UnboundLogicalSink extends LogicalSink { + protected final List nameParts; + protected final List colNames; + protected final DMLCommandType dmlCommandType; + + public UnboundLogicalSink(List nameParts, + PlanType type, + List outputExprs, + Optional groupExpression, + Optional logicalProperties, + List colNames, + DMLCommandType dmlCommandType, + CHILD_TYPE child) { + super(type, outputExprs, groupExpression, logicalProperties, child); + this.colNames = Utils.copyRequiredList(colNames); + this.dmlCommandType = dmlCommandType; + this.nameParts = Utils.copyRequiredList(nameParts); + } + + public DMLCommandType getDMLCommandType() { + return dmlCommandType; + } + + public List getColNames() { + return colNames; + } + + public List getNameParts() { + return nameParts; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java index da59214aa09f4a..8a37bb71cc6d79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java @@ -21,8 +21,10 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Plan; @@ -33,10 +35,14 @@ import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; /** abstract physical hive sink */ public class PhysicalHiveTableSink extends PhysicalTableSink implements Sink { @@ -44,7 +50,7 @@ public class PhysicalHiveTableSink extends PhysicalTabl private final HMSExternalDatabase database; private final HMSExternalTable targetTable; private final List cols; - private final List partitionIds; + private final Set hivePartitionKeys; /** * constructor @@ -52,13 +58,13 @@ public class PhysicalHiveTableSink extends PhysicalTabl public PhysicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable targetTable, List cols, - List partitionIds, List outputExprs, Optional groupExpression, LogicalProperties logicalProperties, - CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, outputExprs, groupExpression, logicalProperties, - PhysicalProperties.GATHER, null, child); + CHILD_TYPE child, + Set hivePartitionKeys) { + this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties, + PhysicalProperties.GATHER, null, child, hivePartitionKeys); } /** @@ -67,25 +73,37 @@ public PhysicalHiveTableSink(HMSExternalDatabase database, public PhysicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable targetTable, List cols, - List partitionIds, List outputExprs, Optional groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, - CHILD_TYPE child) { + CHILD_TYPE child, + Set hivePartitionKeys) { super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, logicalProperties, physicalProperties, statistics, child); this.database = Objects.requireNonNull(database, "database != null in PhysicalHiveTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalHiveTableSink"); this.cols = Utils.copyRequiredList(cols); - this.partitionIds = Utils.copyRequiredList(partitionIds); + this.hivePartitionKeys = hivePartitionKeys; + } + + public HMSExternalDatabase getDatabase() { + return database; + } + + public HMSExternalTable getTargetTable() { + return targetTable; + } + + public List getCols() { + return cols; } @Override public Plan withChildren(List children) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, groupExpression, - getLogicalProperties(), physicalProperties, statistics, children.get(0)); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, groupExpression, + getLogicalProperties(), physicalProperties, statistics, children.get(0), hivePartitionKeys); } @Override @@ -100,20 +118,48 @@ public List getExpressions() { @Override public Plan withGroupExpression(Optional groupExpression) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - groupExpression, getLogicalProperties(), child()); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), child(), hivePartitionKeys); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - groupExpression, logicalProperties.get(), children.get(0)); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + groupExpression, logicalProperties.get(), children.get(0), hivePartitionKeys); } @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child(), hivePartitionKeys); + } + + /** + * get output physical properties + */ + @Override + public PhysicalProperties getRequirePhysicalProperties() { + Set hivePartitionKeys = targetTable.getRemoteTable() + .getPartitionKeys().stream() + .map(FieldSchema::getName) + .collect(Collectors.toSet()); + if (!hivePartitionKeys.isEmpty()) { + List columnIdx = new ArrayList<>(); + List fullSchema = targetTable.getFullSchema(); + for (int i = 0; i < fullSchema.size(); i++) { + Column column = fullSchema.get(i); + if (hivePartitionKeys.contains(column.getName())) { + columnIdx.add(i); + } + } + List exprIds = columnIdx.stream() + .map(idx -> child().getOutput().get(idx).getExprId()) + .collect(Collectors.toList()); + DistributionSpecTableSinkHashPartitioned shuffleInfo = new DistributionSpecTableSinkHashPartitioned(); + shuffleInfo.setOutputColExprIds(exprIds); + return new PhysicalProperties(shuffleInfo); + } + return PhysicalProperties.SINK_RANDOM_PARTITIONED; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java index 17cf97106ea467..ed1d6a3a3a7abe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java @@ -39,11 +39,11 @@ public abstract class PhysicalSink extends PhysicalUnar protected final List outputExprs; public PhysicalSink(PlanType type, - List outputExprs, - Optional groupExpression, - LogicalProperties logicalProperties, - @Nullable PhysicalProperties physicalProperties, - Statistics statistics, CHILD_TYPE child) { + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + @Nullable PhysicalProperties physicalProperties, + Statistics statistics, CHILD_TYPE child) { super(type, groupExpression, logicalProperties, physicalProperties, statistics, child); this.outputExprs = ImmutableList.copyOf(Objects.requireNonNull(outputExprs, "outputExprs should not null")); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java index 5932c4f099920d..7feb53e24b004b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java @@ -42,4 +42,6 @@ public PhysicalTableSink(PlanType type, List outputExprs, Statistics statistics, CHILD_TYPE child) { super(type, outputExprs, groupExpression, logicalProperties, physicalProperties, statistics, child); } + + public abstract PhysicalProperties getRequirePhysicalProperties(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java index b88cd910a36722..26c9d526987966 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.visitor; +import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.trees.plans.Plan; @@ -56,6 +57,10 @@ default R visitUnboundTableSink(UnboundTableSink unboundTableSin return visitLogicalSink(unboundTableSink, context); } + default R visitUnboundHiveTableSink(UnboundHiveTableSink unboundTableSink, C context) { + return visitLogicalSink(unboundTableSink, context); + } + default R visitUnboundResultSink(UnboundResultSink unboundResultSink, C context) { return visitLogicalSink(unboundResultSink, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java index 4278f78ec55f23..a625e4490e12f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java @@ -77,7 +77,7 @@ public static TableIf getTable(List qualifierName, Env env) { /** * get database and table */ - public static Pair getDbAndTable(List qualifierName, Env env) { + public static Pair, TableIf> getDbAndTable(List qualifierName, Env env) { String catalogName = qualifierName.get(0); String dbName = qualifierName.get(1); String tableName = qualifierName.get(2); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 9c5c375a35c06d..9c6ba83408ac00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -58,6 +58,7 @@ public DataPartition(TPartitionType type, List exprs) { Preconditions.checkState(!exprs.isEmpty()); Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED || type == TPartitionType.RANGE_PARTITIONED + || type == TPartitionType.TABLE_SINK_HASH_PARTITIONED || type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.copyOf(exprs); @@ -66,6 +67,7 @@ public DataPartition(TPartitionType type, List exprs) { public DataPartition(TPartitionType type) { Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type == TPartitionType.RANDOM + || type == TPartitionType.TABLE_SINK_RANDOM_PARTITIONED || type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.of(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index 99d0c6b1b03b6a..5dce86b3330fc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -20,17 +20,44 @@ package org.apache.doris.planner; -import org.apache.doris.analysis.Analyzer; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TDataSink; +import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.THiveBucket; +import org.apache.doris.thrift.THiveColumn; +import org.apache.doris.thrift.THiveColumnType; +import org.apache.doris.thrift.THiveLocationParams; +import org.apache.doris.thrift.THivePartition; +import org.apache.doris.thrift.THiveTableSink; + +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; public class HiveTableSink extends DataSink { + private HMSExternalTable targetTable; protected TDataSink tDataSink; - public HiveTableSink(HMSExternalTable table) { + public HiveTableSink(HMSExternalTable targetTable) { super(); + this.targetTable = targetTable; } @Override @@ -59,9 +86,146 @@ public DataPartition getOutputPartition() { return DataPartition.RANDOM; } - public void init() { + /** + * check sink params and generate thrift data sink to BE + * @param insertCols target table columns + * @param insertCtx insert info context + * @throws AnalysisException if source file format cannot be read + */ + public void bindDataSink(List insertCols, Optional insertCtx) + throws AnalysisException { + THiveTableSink tSink = new THiveTableSink(); + tSink.setDbName(targetTable.getDbName()); + tSink.setTableName(targetTable.getName()); + Set partNames = new HashSet<>(targetTable.getPartitionColumnNames()); + Set colNames = targetTable.getColumns() + .stream().map(Column::getName) + .collect(Collectors.toSet()); + colNames.removeAll(partNames); + List targetColumns = new ArrayList<>(); + for (Column col : insertCols) { + if (partNames.contains(col.getName())) { + THiveColumn tHiveColumn = new THiveColumn(); + tHiveColumn.setName(col.getName()); + tHiveColumn.setDataType(col.getType().toThrift()); + tHiveColumn.setColumnType(THiveColumnType.PARTITION_KEY); + targetColumns.add(tHiveColumn); + } else if (colNames.contains(col.getName())) { + THiveColumn tHiveColumn = new THiveColumn(); + tHiveColumn.setName(col.getName()); + tHiveColumn.setDataType(col.getType().toThrift()); + tHiveColumn.setColumnType(THiveColumnType.REGULAR); + targetColumns.add(tHiveColumn); + } + } + tSink.setColumns(targetColumns); + + setPartitionValues(tSink); + + StorageDescriptor sd = targetTable.getRemoteTable().getSd(); + THiveBucket bucketInfo = new THiveBucket(); + bucketInfo.setBucketedBy(sd.getBucketCols()); + bucketInfo.setBucketCount(sd.getNumBuckets()); + tSink.setBucketInfo(bucketInfo); + + TFileFormatType formatType = getFileFormatType(sd); + tSink.setFileFormat(formatType); + setCompressType(tSink, formatType); + + THiveLocationParams locationParams = new THiveLocationParams(); + String location = sd.getLocation(); + + String writeTempPath = createTempPath(location); + locationParams.setWritePath(writeTempPath); + locationParams.setTargetPath(location); + locationParams.setFileType(LocationPath.getTFileTypeForBE(location)); + tSink.setLocation(locationParams); + + tSink.setHadoopConfig(targetTable.getHadoopProperties()); + + if (insertCtx.isPresent()) { + HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); + tSink.setOverwrite(context.isOverwrite()); + } + tDataSink = new TDataSink(getDataSinkType()); + tDataSink.setHiveTableSink(tSink); + } + + private String createTempPath(String location) { + String user = ConnectContext.get().getUserIdentity().getUser(); + return location + "/.doris_staging/" + user + "/" + UUID.randomUUID().toString().replace("-", ""); + } + + private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) { + String compressType; + switch (formatType) { + case FORMAT_ORC: + compressType = targetTable.getRemoteTable().getParameters().get("orc.compress"); + break; + case FORMAT_PARQUET: + compressType = targetTable.getRemoteTable().getParameters().get("parquet.compression"); + break; + default: + compressType = "uncompressed"; + break; + } + + if ("snappy".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.SNAPPYBLOCK); + } else if ("lz4".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.LZ4BLOCK); + } else if ("lzo".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.LZO); + } else if ("zlib".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.ZLIB); + } else if ("zstd".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.ZSTD); + } else if ("uncompressed".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.PLAIN); + } else { + // try to use plain type to decompress parquet or orc file + tSink.setCompressionType(TFileCompressType.PLAIN); + } + } + + private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { + List partitions = new ArrayList<>(); + List hivePartitions = + ((HMSExternalCatalog) targetTable.getCatalog()) + .getClient().listPartitions(targetTable.getDbName(), targetTable.getName()); + for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) { + THivePartition hivePartition = new THivePartition(); + StorageDescriptor sd = partition.getSd(); + hivePartition.setFileFormat(getFileFormatType(sd)); + + hivePartition.setValues(partition.getValues()); + THiveLocationParams locationParams = new THiveLocationParams(); + String location = sd.getLocation(); + // pass the same of write path and target path to partition + locationParams.setWritePath(location); + locationParams.setTargetPath(location); + locationParams.setFileType(LocationPath.getTFileTypeForBE(location)); + hivePartition.setLocation(locationParams); + partitions.add(hivePartition); + } + tSink.setPartitions(partitions); + } + + private TFileFormatType getFileFormatType(StorageDescriptor sd) throws AnalysisException { + TFileFormatType fileFormatType; + if (sd.getInputFormat().toLowerCase().contains("orc")) { + fileFormatType = TFileFormatType.FORMAT_ORC; + } else if (sd.getInputFormat().toLowerCase().contains("parquet")) { + fileFormatType = TFileFormatType.FORMAT_PARQUET; + } else if (sd.getInputFormat().toLowerCase().contains("text")) { + fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + } else { + throw new AnalysisException("Unsupported input format type: " + sd.getInputFormat()); + } + return fileFormatType; } - public void complete(Analyzer analyzer) { + protected TDataSinkType getDataSinkType() { + return TDataSinkType.HIVE_TABLE_SINK; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index 5f1abf12e630e1..2316e65bf608a5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -54,9 +54,7 @@ public class HmsCommitTest { private static final String tbWithoutPartition = "test_tb_without_partition"; private static Path warehousePath; static String dbLocation; - private String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - private String outputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; - private String serde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + private String fileFormat = "orc"; @BeforeClass public static void beforeClass() throws Throwable { @@ -103,11 +101,11 @@ public void before() { partitionKeys.add(new FieldSchema("c3", "string", "comment")); HiveTableMetadata tableMetadata = new HiveTableMetadata( dbName, tbWithPartition, columns, partitionKeys, - new HashMap<>(), inputFormat, outputFormat, serde); + new HashMap<>(), fileFormat); hmsClient.createTable(tableMetadata, true); HiveTableMetadata tableMetadata2 = new HiveTableMetadata( dbName, tbWithoutPartition, columns, new ArrayList<>(), - new HashMap<>(), inputFormat, outputFormat, serde); + new HashMap<>(), fileFormat); hmsClient.createTable(tableMetadata2, true); }